From 1008ca1e42f04ef27715160ff86ee8e7af93a047 Mon Sep 17 00:00:00 2001 From: boyska Date: Tue, 2 Dec 2014 01:53:43 +0100 Subject: [PATCH] event db integrated with player --- .gitignore | 2 + larigira/event.py | 146 ++++++++++++++++++++++++++++++++++++++++++++++ larigira/mpc.py | 49 +++++++++++----- 3 files changed, 184 insertions(+), 13 deletions(-) create mode 100644 larigira/event.py diff --git a/.gitignore b/.gitignore index 1a12488..f4275f5 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ build /*.egg/ /*.egg +larigira.db +larigira.db_* diff --git a/larigira/event.py b/larigira/event.py new file mode 100644 index 0000000..baddf9b --- /dev/null +++ b/larigira/event.py @@ -0,0 +1,146 @@ +from __future__ import print_function +from gevent import monkey +monkey.patch_all(subprocess=True) +import logging +FORMAT = '%(asctime)s|%(levelname)s[%(name)s:%(lineno)d] %(message)s' +logging.basicConfig(level=logging.INFO, + format=FORMAT, + datefmt='%H:%M:%S') +logging.getLogger('mpd').setLevel(logging.WARNING) +import signal +from datetime import datetime, timedelta + +import gevent +from gevent.queue import Queue + +from eventutils import ParentedLet +from timegen import timegenerate +from audiogen import audiogenerate + + +class EventSource(ParentedLet): + def __init__(self, queue, uri): + ParentedLet.__init__(self, queue) + import pyejdb + self.log = logging.getLogger(self.__class__.__name__) + self.log.debug('uri is %s' % uri) + self.ejdb = pyejdb.EJDB(uri, + pyejdb.JBOREADER | pyejdb.JBOLCKNB | + pyejdb.JBOTRUNC) + self.log.debug('opened %s' % uri) + + def parent_msg(self, kind, *args): + if kind == 'add': + msg = ParentedLet.parent_msg(self, kind, *args[2:]) + msg['timespec'] = args[0] + msg['audiospec'] = args[1] + else: + msg = ParentedLet.parent_msg(self, kind, *args) + return msg + + def _get_actions_by_alarm(self, alarm): + if 'actions' not in alarm: + return + for action_id in alarm['actions']: + with self.ejdb.find('actions', + {'_id': action_id}) as subcur: + for action in subcur: + yield action + + def _get_by_alarmid(self, alarmid): + with self.ejdb.find('alarms', {'_id': alarmid}) as cur: + if len(cur) > 1: + self.log.warn("Found more than one alarm with given id") + for alarm in cur: + for action in self._get_actions_by_alarm(alarm): + yield alarm, action + + def reload(self): + with self.ejdb.find('alarms', {}) as cur: + for alarm in cur: + self.log.info('%s\t%s' % (alarm['kind'], + ', '.join(alarm.keys()))) + for action in self._get_actions_by_alarm(alarm): + yield alarm, action + + def reload_id(self, event_id): + ''' + Check if the event is still valid, and put "add" messages on queue + ''' + for alarm, action in self._get_by_alarmid(event_id): + self.send_to_parent('add', alarm, action) + + def do_business(self): + for alarm, action in self.reload(): + yield ('add', alarm, action) + + +class Monitor(ParentedLet): + def __init__(self, parent_queue, conf): + ParentedLet.__init__(self, parent_queue) + self.log = logging.getLogger(self.__class__.__name__) + self.q = Queue() + self.running = {} + self.conf = conf + self.source = EventSource(self.q, uri=conf['DB_URI']) + + def add(self, timespec, audiospec): + ''' + this is somewhat recursive: after completion calls reload_id, which + could call this method again + ''' + now = datetime.now() + timedelta(seconds=self.conf['CACHING_TIME']) + when = next(timegenerate(timespec, now=now)) + delta = when - now + assert delta.total_seconds() > 0 + self.log.info('Will run after %d seconds' % delta.total_seconds()) + + audiogen = gevent.spawn_later(delta.total_seconds(), audiogenerate, + audiospec) + self.running[timespec['_id']] = audiogen + gevent.spawn_later(delta.total_seconds(), + self.source.reload_id, + timespec['_id']) + # FIXME: audiogen is ready in a moment between + # exact_time - CACHING_TIME and the exact_time + # atm we are just ignoring this "window", saying that any moment is + # fine + # the more correct thing will be to wait till that exact moment + # adding another spawn_later + audiogen.link(lambda g: self.log.info('should play %s' % str(g.value))) + audiogen.link(lambda g: self.send_to_parent('add', g.value)) + + def _run(self): + self.source.start() + while True: + value = self.q.get() + self.log.debug('<- %s' % str(value)) + kind = value['kind'] + if kind == 'add': + self.add(value['timespec'], value['audiospec']) + elif kind == 'remove': + raise NotImplementedError() + else: + self.log.warning("Unknown message: %s" % str(value)) + + +def on_player_crash(*args, **kwargs): + print('A crash occurred in "main" greenlet. Aborting...') + import sys + sys.exit(1) + + +def main(): + conf = dict(CACHING_TIME=10, DB_URI='larigira.db') + monitor = Monitor(Queue(), conf) + monitor.start() + monitor.link_exception(on_player_crash) + + def sig(*args): + print('invoked sig', args) + monitor.q.put('signal', *args) + gevent.signal(signal.SIGHUP, sig, signal.SIGHUP) + gevent.wait() + +if __name__ == '__main__': + main() diff --git a/larigira/mpc.py b/larigira/mpc.py index 0b3e04f..c2ab36c 100644 --- a/larigira/mpc.py +++ b/larigira/mpc.py @@ -18,20 +18,24 @@ from mpd import MPDClient from eventutils import ParentedLet, Timer import rpc from audiogen import audiogenerate +from event import Monitor -CONTINOUS_AUDIODESC = dict(kind='mpd', howmany=1) -MPD_HOST = os.getenv('MPD_HOST', 'localhost') -MPD_PORT = int(os.getenv('MPD_PORT', '6600')) +conf = {} +conf['CONTINOUS_AUDIODESC'] = dict(kind='mpd', howmany=1) +conf['MPD_HOST'] = os.getenv('MPD_HOST', 'localhost') +conf['MPD_PORT'] = int(os.getenv('MPD_PORT', '6600')) +conf['CACHING_TIME'] = 10 class MpcWatcher(ParentedLet): - def __init__(self, queue, client=None): + def __init__(self, queue, conf, client=None): ParentedLet.__init__(self, queue) self.log = logging.getLogger(self.__class__.__name__) + self.conf = conf if client is None: self.client = MPDClient() # TODO: use config values - self.client.connect(MPD_HOST, MPD_PORT) + self.client.connect(self.conf['MPD_HOST'], self.conf['MPD_PORT']) else: self.client = client # assume it is already connected @@ -43,26 +47,37 @@ class MpcWatcher(ParentedLet): class Player(gevent.Greenlet): - def __init__(self): + def __init__(self, conf): gevent.Greenlet.__init__(self) self.min_playlist_length = 10 self.log = logging.getLogger(self.__class__.__name__) self.q = Queue() + self.conf = conf - def check_playlist(self): + def _get_mpd(self): mpd_client = MPDClient() # TODO: use config values - mpd_client.connect("localhost", 6600) + mpd_client.connect(self.conf['MPD_HOST'], self.conf['MPD_PORT']) + return mpd_client + + def check_playlist(self): + mpd_client = self._get_mpd() songs = mpd_client.playlist() if(len(songs) >= self.min_playlist_length): return self.log.info('need to add new songs') - picker = gevent.Greenlet(audiogenerate, CONTINOUS_AUDIODESC) + picker = gevent.Greenlet(audiogenerate, + self.conf['CONTINOUS_AUDIODESC']) picker.link_value(lambda g: mpd_client.add(g.value[0].strip())) picker.start() + def enqueue(self, songs): + mpd_client = self._get_mpd() + for song in songs: + mpd_client.addid(song, 1) + def _run(self): - MpcWatcher(self.q, client=None).start() + MpcWatcher(self.q, self.conf, client=None).start() Timer(60 * 1000, self.q).start() http_server = WSGIServer(('', 5000), rpc.create_app(self.q)) http_server.start() @@ -72,10 +87,12 @@ class Player(gevent.Greenlet): # emitter = value['emitter'] kind = value['kind'] args = value['args'] - if kind == 'timer': - self.log.info('CLOCK') if kind == 'timer' or (kind == 'mpc' and args[0] == 'playlist'): gevent.Greenlet.spawn(self.check_playlist) + elif kind == 'mpc': + pass + elif kind == 'add': + self.enqueue(args[0]) else: self.log.warning("Unknown message: %s" % str(value)) @@ -87,8 +104,14 @@ def on_player_crash(*args, **kwargs): def main(): - p = Player() + # TODO: update conf from somewhere + conf['DB_URI'] = 'larigira.db' + p = Player(conf) p.start() + # TODO: if create Monitor(p.q) + if 'DB_URI' in conf: + m = Monitor(p.q, conf) + m.start() p.link_exception(on_player_crash) def sig(*args):