Browse Source

event db integrated with player

boyska 9 years ago
3 changed files with 184 additions and 13 deletions
  1. 2 0
  2. 146 0
  3. 36 13

+ 2 - 0

@@ -5,3 +5,5 @@

+ 146 - 0

@@ -0,0 +1,146 @@
+from __future__ import print_function
+from gevent import monkey
+import logging
+FORMAT = '%(asctime)s|%(levelname)s[%(name)s:%(lineno)d] %(message)s'
+                    format=FORMAT,
+                    datefmt='%H:%M:%S')
+import signal
+from datetime import datetime, timedelta
+import gevent
+from gevent.queue import Queue
+from eventutils import ParentedLet
+from timegen import timegenerate
+from audiogen import audiogenerate
+class EventSource(ParentedLet):
+    def __init__(self, queue, uri):
+        ParentedLet.__init__(self, queue)
+        import pyejdb
+        self.log = logging.getLogger(self.__class__.__name__)
+        self.log.debug('uri is %s' % uri)
+        self.ejdb = pyejdb.EJDB(uri,
+                                pyejdb.JBOREADER | pyejdb.JBOLCKNB |
+                                pyejdb.JBOTRUNC)
+        self.log.debug('opened %s' % uri)
+    def parent_msg(self, kind, *args):
+        if kind == 'add':
+            msg = ParentedLet.parent_msg(self, kind, *args[2:])
+            msg['timespec'] = args[0]
+            msg['audiospec'] = args[1]
+        else:
+            msg = ParentedLet.parent_msg(self, kind, *args)
+        return msg
+    def _get_actions_by_alarm(self, alarm):
+        if 'actions' not in alarm:
+            return
+        for action_id in alarm['actions']:
+            with self.ejdb.find('actions',
+                                {'_id': action_id}) as subcur:
+                for action in subcur:
+                    yield action
+    def _get_by_alarmid(self, alarmid):
+        with self.ejdb.find('alarms', {'_id': alarmid}) as cur:
+            if len(cur) > 1:
+                self.log.warn("Found more than one alarm with given id")
+            for alarm in cur:
+                for action in self._get_actions_by_alarm(alarm):
+                    yield alarm, action
+    def reload(self):
+        with self.ejdb.find('alarms', {}) as cur:
+            for alarm in cur:
+      '%s\t%s' % (alarm['kind'],
+                                          ', '.join(alarm.keys())))
+                for action in self._get_actions_by_alarm(alarm):
+                    yield alarm, action
+    def reload_id(self, event_id):
+        '''
+        Check if the event is still valid, and put "add" messages on queue
+        '''
+        for alarm, action in self._get_by_alarmid(event_id):
+            self.send_to_parent('add', alarm, action)
+    def do_business(self):
+        for alarm, action in self.reload():
+            yield ('add', alarm, action)
+class Monitor(ParentedLet):
+    def __init__(self, parent_queue, conf):
+        ParentedLet.__init__(self, parent_queue)
+        self.log = logging.getLogger(self.__class__.__name__)
+        self.q = Queue()
+        self.running = {}
+        self.conf = conf
+        self.source = EventSource(self.q, uri=conf['DB_URI'])
+    def add(self, timespec, audiospec):
+        '''
+        this is somewhat recursive: after completion calls reload_id, which
+        could call this method again
+        '''
+        now = + timedelta(seconds=self.conf['CACHING_TIME'])
+        when = next(timegenerate(timespec, now=now))
+        delta = when - now
+        assert delta.total_seconds() > 0
+'Will run after %d seconds' % delta.total_seconds())
+        audiogen = gevent.spawn_later(delta.total_seconds(), audiogenerate,
+                                      audiospec)
+        self.running[timespec['_id']] = audiogen
+        gevent.spawn_later(delta.total_seconds(),
+                           self.source.reload_id,
+                           timespec['_id'])
+        # FIXME: audiogen is ready in a moment between
+        # exact_time - CACHING_TIME and the exact_time
+        # atm we are just ignoring this "window", saying that any moment is
+        # fine
+        # the more correct thing will be to wait till that exact moment
+        # adding another spawn_later
+ g:'should play %s' % str(g.value)))
+ g: self.send_to_parent('add', g.value))
+    def _run(self):
+        self.source.start()
+        while True:
+            value = self.q.get()
+            self.log.debug('<- %s' % str(value))
+            kind = value['kind']
+            if kind == 'add':
+                self.add(value['timespec'], value['audiospec'])
+            elif kind == 'remove':
+                raise NotImplementedError()
+            else:
+                self.log.warning("Unknown message: %s" % str(value))
+def on_player_crash(*args, **kwargs):
+    print('A crash occurred in "main" greenlet. Aborting...')
+    import sys
+    sys.exit(1)
+def main():
+    conf = dict(CACHING_TIME=10, DB_URI='larigira.db')
+    monitor = Monitor(Queue(), conf)
+    monitor.start()
+    monitor.link_exception(on_player_crash)
+    def sig(*args):
+        print('invoked sig', args)
+        monitor.q.put('signal', *args)
+    gevent.signal(signal.SIGHUP, sig, signal.SIGHUP)
+    gevent.wait()
+if __name__ == '__main__':
+    main()

+ 36 - 13

@@ -18,20 +18,24 @@ from mpd import MPDClient
 from eventutils import ParentedLet, Timer
 import rpc
 from audiogen import audiogenerate
+from event import Monitor
-CONTINOUS_AUDIODESC = dict(kind='mpd', howmany=1)
-MPD_HOST = os.getenv('MPD_HOST', 'localhost')
-MPD_PORT = int(os.getenv('MPD_PORT', '6600'))
+conf = {}
+conf['CONTINOUS_AUDIODESC'] = dict(kind='mpd', howmany=1)
+conf['MPD_HOST'] = os.getenv('MPD_HOST', 'localhost')
+conf['MPD_PORT'] = int(os.getenv('MPD_PORT', '6600'))
+conf['CACHING_TIME'] = 10
 class MpcWatcher(ParentedLet):
-    def __init__(self, queue, client=None):
+    def __init__(self, queue, conf, client=None):
         ParentedLet.__init__(self, queue)
         self.log = logging.getLogger(self.__class__.__name__)
+        self.conf = conf
         if client is None:
             self.client = MPDClient()
             # TODO: use config values
-            self.client.connect(MPD_HOST, MPD_PORT)
+            self.client.connect(self.conf['MPD_HOST'], self.conf['MPD_PORT'])
             self.client = client  # assume it is already connected
@@ -43,26 +47,37 @@ class MpcWatcher(ParentedLet):
 class Player(gevent.Greenlet):
-    def __init__(self):
+    def __init__(self, conf):
         self.min_playlist_length = 10
         self.log = logging.getLogger(self.__class__.__name__)
         self.q = Queue()
+        self.conf = conf
-    def check_playlist(self):
+    def _get_mpd(self):
         mpd_client = MPDClient()
         # TODO: use config values
-        mpd_client.connect("localhost", 6600)
+        mpd_client.connect(self.conf['MPD_HOST'], self.conf['MPD_PORT'])
+        return mpd_client
+    def check_playlist(self):
+        mpd_client = self._get_mpd()
         songs = mpd_client.playlist()
         if(len(songs) >= self.min_playlist_length):
             return'need to add new songs')
-        picker = gevent.Greenlet(audiogenerate, CONTINOUS_AUDIODESC)
+        picker = gevent.Greenlet(audiogenerate,
+                                 self.conf['CONTINOUS_AUDIODESC'])
         picker.link_value(lambda g: mpd_client.add(g.value[0].strip()))
+    def enqueue(self, songs):
+        mpd_client = self._get_mpd()
+        for song in songs:
+            mpd_client.addid(song, 1)
     def _run(self):
-        MpcWatcher(self.q, client=None).start()
+        MpcWatcher(self.q, self.conf, client=None).start()
         Timer(60 * 1000, self.q).start()
         http_server = WSGIServer(('', 5000), rpc.create_app(self.q))
@@ -72,10 +87,12 @@ class Player(gevent.Greenlet):
             # emitter = value['emitter']
             kind = value['kind']
             args = value['args']
-            if kind == 'timer':
-      'CLOCK')
             if kind == 'timer' or (kind == 'mpc' and args[0] == 'playlist'):
+            elif kind == 'mpc':
+                pass
+            elif kind == 'add':
+                self.enqueue(args[0])
                 self.log.warning("Unknown message: %s" % str(value))
@@ -87,8 +104,14 @@ def on_player_crash(*args, **kwargs):
 def main():
-    p = Player()
+    # TODO: update conf from somewhere
+    conf['DB_URI'] = 'larigira.db'
+    p = Player(conf)
+    # TODO: if <someoption> create Monitor(p.q)
+    if 'DB_URI' in conf:
+        m = Monitor(p.q, conf)
+        m.start()
     def sig(*args):