Browse Source

some basic pof

boyska 9 years ago
commit
8da8c38a8c
7 changed files with 195 additions and 0 deletions
  1. 2 0
      .gitignore
  2. 2 0
      celeryconfig.py
  3. 51 0
      eventutils.py
  4. 83 0
      mpc.py
  5. 12 0
      requirements.txt
  6. 41 0
      task.py
  7. 4 0
      test_parented.py

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+*.pyc
+.*.sw?

+ 2 - 0
celeryconfig.py

@@ -0,0 +1,2 @@
+CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
+CELERYD_POOL_RESTARTS = True

+ 51 - 0
eventutils.py

@@ -0,0 +1,51 @@
+import gevent
+
+
+class ParentedLet(gevent.Greenlet):
+    '''
+    ParentedLet is just a helper subclass that will help you when your
+    greenlet main duty is to "signal" things to a parent_queue.
+
+    It won't save you much code, but "standardize" messages and make explicit
+    the role of that greenlet
+    '''
+    def __init__(self, queue):
+        gevent.Greenlet.__init__(self)
+        self.parent_queue = queue
+
+    def parent_msg(self, kind, *args):
+        return {
+            'emitter': self,
+            'class': self.__class__.__name__,
+            'kind': kind,
+            'args': args
+        }
+
+    def send_to_parent(self, kind, *args):
+        self.parent_queue.put(self.parent_msg(kind, *args))
+
+    def _run(self):
+        if not hasattr(self, 'do_business'):
+            raise Exception("do_business method not implemented by %s" %
+                            self.__class__.__name__)
+        for msg in self.do_business():
+            self.send_to_parent(*msg)
+
+
+class CeleryTask(ParentedLet):
+    def __init__(self, task, queue, task_args=tuple(), **kwargs):
+        ParentedLet.__init__(self, queue)
+        self.task = task
+        self.task_args = task_args
+        self.apply_async_args = kwargs
+
+    def parent_msg(self, kind, *args):
+        msg = ParentedLet.parent_msg(self, kind, *args)
+        msg['task'] = self.task
+        return msg
+
+    def do_business(self):
+        asyncres = self.task.apply_async(self.task_args,
+                                         **self.apply_async_args)
+        val = asyncres.get()
+        yield ('celery', val)

+ 83 - 0
mpc.py

@@ -0,0 +1,83 @@
+from __future__ import print_function
+from gevent import monkey
+monkey.patch_all(subprocess=True)
+
+import logging
+logging.basicConfig(level=logging.INFO,
+                    format='%(asctime)s %(message)s',
+                    datefmt='%H:%M:%S')
+from subprocess import check_output
+
+import gevent
+from gevent.queue import Queue
+
+
+from eventutils import ParentedLet, CeleryTask
+from task import create as create_continous
+
+
+class MpcWatcher(ParentedLet):
+    def __init__(self, queue):
+        ParentedLet.__init__(self, queue)
+
+    def do_business(self):
+        while True:
+            status = check_output(['mpc', 'idle']).decode('utf-8').strip()
+            yield ('mpc', status)
+
+
+class Timer(ParentedLet):
+    def __init__(self, milliseconds, queue):
+        ParentedLet.__init__(self, queue)
+        self.ms = milliseconds
+
+    def parent_msg(self, kind, *args):
+        msg = ParentedLet.parent_msg(self, kind, *args)
+        msg['period'] = self.ms
+        return msg
+
+    def do_business(self):
+        while True:
+            gevent.sleep(self.ms / 1000.0)
+            yield ('timer', )
+
+
+class Player(gevent.Greenlet):
+    def __init__(self):
+        gevent.Greenlet.__init__(self)
+        self.min_playlist_length = 10
+        self.q = Queue()
+
+    def check_playlist(self):
+        out = check_output(['mpc', 'playlist']).decode('utf-8').strip()
+        songs = out.split('\n')
+        if(len(songs) >= self.min_playlist_length):
+
+            return
+        logging.info('need to add new songs')
+        CeleryTask(create_continous, self.q).start()
+        CeleryTask(create_continous, self.q).start()
+
+    def _run(self):
+        MpcWatcher(self.q).start()
+        Timer(6000, self.q).start()
+        while True:
+            value = self.q.get()
+            # emitter = value['emitter']
+            kind = value['kind']
+            args = value['args']
+            if kind == 'timer':
+                logging.info('CLOCK')
+            if kind == 'timer' or (kind == 'mpc' and args[0] == 'playlist'):
+                gevent.Greenlet.spawn(self.check_playlist)
+            elif kind == 'celery':
+                logging.info("celery: %s" % str(args))
+            else:
+                logging.warning("Unknown message: %s" % str(value))
+            logging.info(str(value))
+
+
+if __name__ == '__main__':
+    p = Player()
+    p.start()
+    gevent.wait()

+ 12 - 0
requirements.txt

@@ -0,0 +1,12 @@
+amqp==1.4.6
+anyjson==0.3.3
+billiard==3.3.0.18
+celery==3.1.16
+eventlet==0.15.2
+gevent==1.0.1
+greenlet==0.4.5
+kombu==3.0.23
+pyinotify==0.9.4
+pytz==2014.7
+redis==2.10.3
+wsgiref==0.1.2

+ 41 - 0
task.py

@@ -0,0 +1,41 @@
+import time
+import logging
+import random
+logging.basicConfig(level=logging.DEBUG,
+                    format='%(asctime)s %(message)s',
+                    datefmt='%H:%M:%S')
+
+from celery import Celery
+
+celery = Celery('hello', backend='redis://localhost',
+                broker='redis://localhost:6379/0')
+
+
+@celery.task(name='create_continous')
+def create():
+    sec = random.uniform(2, 5)
+    time.sleep(sec)
+    logging.info('hello world')
+    return 'slept! %.2f' % sec
+
+if __name__ == '__main__':
+    celery.control.broadcast('pool_restart',
+                             arguments={'reload': True})
+    res = []
+    N = 14
+
+    def callback(*args, **kwargs):
+        print(args)
+        print(kwargs)
+        print('---')
+
+    for i in xrange(N):
+        print('append', i)
+        res.append(create.apply_async(expires=2))
+
+    for i in xrange(N):
+        logging.info('wait %d' % i)
+        val = res[i].get()
+        logging.info('got %s' % str(val))
+
+    time.sleep(30)

+ 4 - 0
test_parented.py

@@ -0,0 +1,4 @@
+from mpc import ParentedLet
+
+# TODO: implement simple children and check that we will receive the expected
+# messages on the queue