From 8da8c38a8c161403c93e46f4b9baca46b6f1fddb Mon Sep 17 00:00:00 2001 From: boyska Date: Sun, 26 Oct 2014 17:48:24 +0100 Subject: [PATCH] some basic pof --- .gitignore | 2 ++ celeryconfig.py | 2 ++ eventutils.py | 51 +++++++++++++++++++++++++++++ mpc.py | 83 ++++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 12 +++++++ task.py | 41 ++++++++++++++++++++++++ test_parented.py | 4 +++ 7 files changed, 195 insertions(+) create mode 100644 .gitignore create mode 100644 celeryconfig.py create mode 100644 eventutils.py create mode 100644 mpc.py create mode 100644 requirements.txt create mode 100644 task.py create mode 100644 test_parented.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..034ed13 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.pyc +.*.sw? diff --git a/celeryconfig.py b/celeryconfig.py new file mode 100644 index 0000000..78a7d0b --- /dev/null +++ b/celeryconfig.py @@ -0,0 +1,2 @@ +CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' +CELERYD_POOL_RESTARTS = True diff --git a/eventutils.py b/eventutils.py new file mode 100644 index 0000000..605d519 --- /dev/null +++ b/eventutils.py @@ -0,0 +1,51 @@ +import gevent + + +class ParentedLet(gevent.Greenlet): + ''' + ParentedLet is just a helper subclass that will help you when your + greenlet main duty is to "signal" things to a parent_queue. + + It won't save you much code, but "standardize" messages and make explicit + the role of that greenlet + ''' + def __init__(self, queue): + gevent.Greenlet.__init__(self) + self.parent_queue = queue + + def parent_msg(self, kind, *args): + return { + 'emitter': self, + 'class': self.__class__.__name__, + 'kind': kind, + 'args': args + } + + def send_to_parent(self, kind, *args): + self.parent_queue.put(self.parent_msg(kind, *args)) + + def _run(self): + if not hasattr(self, 'do_business'): + raise Exception("do_business method not implemented by %s" % + self.__class__.__name__) + for msg in self.do_business(): + self.send_to_parent(*msg) + + +class CeleryTask(ParentedLet): + def __init__(self, task, queue, task_args=tuple(), **kwargs): + ParentedLet.__init__(self, queue) + self.task = task + self.task_args = task_args + self.apply_async_args = kwargs + + def parent_msg(self, kind, *args): + msg = ParentedLet.parent_msg(self, kind, *args) + msg['task'] = self.task + return msg + + def do_business(self): + asyncres = self.task.apply_async(self.task_args, + **self.apply_async_args) + val = asyncres.get() + yield ('celery', val) diff --git a/mpc.py b/mpc.py new file mode 100644 index 0000000..5dfeeb2 --- /dev/null +++ b/mpc.py @@ -0,0 +1,83 @@ +from __future__ import print_function +from gevent import monkey +monkey.patch_all(subprocess=True) + +import logging +logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(message)s', + datefmt='%H:%M:%S') +from subprocess import check_output + +import gevent +from gevent.queue import Queue + + +from eventutils import ParentedLet, CeleryTask +from task import create as create_continous + + +class MpcWatcher(ParentedLet): + def __init__(self, queue): + ParentedLet.__init__(self, queue) + + def do_business(self): + while True: + status = check_output(['mpc', 'idle']).decode('utf-8').strip() + yield ('mpc', status) + + +class Timer(ParentedLet): + def __init__(self, milliseconds, queue): + ParentedLet.__init__(self, queue) + self.ms = milliseconds + + def parent_msg(self, kind, *args): + msg = ParentedLet.parent_msg(self, kind, *args) + msg['period'] = self.ms + return msg + + def do_business(self): + while True: + gevent.sleep(self.ms / 1000.0) + yield ('timer', ) + + +class Player(gevent.Greenlet): + def __init__(self): + gevent.Greenlet.__init__(self) + self.min_playlist_length = 10 + self.q = Queue() + + def check_playlist(self): + out = check_output(['mpc', 'playlist']).decode('utf-8').strip() + songs = out.split('\n') + if(len(songs) >= self.min_playlist_length): + + return + logging.info('need to add new songs') + CeleryTask(create_continous, self.q).start() + CeleryTask(create_continous, self.q).start() + + def _run(self): + MpcWatcher(self.q).start() + Timer(6000, self.q).start() + while True: + value = self.q.get() + # emitter = value['emitter'] + kind = value['kind'] + args = value['args'] + if kind == 'timer': + logging.info('CLOCK') + if kind == 'timer' or (kind == 'mpc' and args[0] == 'playlist'): + gevent.Greenlet.spawn(self.check_playlist) + elif kind == 'celery': + logging.info("celery: %s" % str(args)) + else: + logging.warning("Unknown message: %s" % str(value)) + logging.info(str(value)) + + +if __name__ == '__main__': + p = Player() + p.start() + gevent.wait() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..08b74e5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,12 @@ +amqp==1.4.6 +anyjson==0.3.3 +billiard==3.3.0.18 +celery==3.1.16 +eventlet==0.15.2 +gevent==1.0.1 +greenlet==0.4.5 +kombu==3.0.23 +pyinotify==0.9.4 +pytz==2014.7 +redis==2.10.3 +wsgiref==0.1.2 diff --git a/task.py b/task.py new file mode 100644 index 0000000..456b0e7 --- /dev/null +++ b/task.py @@ -0,0 +1,41 @@ +import time +import logging +import random +logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s %(message)s', + datefmt='%H:%M:%S') + +from celery import Celery + +celery = Celery('hello', backend='redis://localhost', + broker='redis://localhost:6379/0') + + +@celery.task(name='create_continous') +def create(): + sec = random.uniform(2, 5) + time.sleep(sec) + logging.info('hello world') + return 'slept! %.2f' % sec + +if __name__ == '__main__': + celery.control.broadcast('pool_restart', + arguments={'reload': True}) + res = [] + N = 14 + + def callback(*args, **kwargs): + print(args) + print(kwargs) + print('---') + + for i in xrange(N): + print('append', i) + res.append(create.apply_async(expires=2)) + + for i in xrange(N): + logging.info('wait %d' % i) + val = res[i].get() + logging.info('got %s' % str(val)) + + time.sleep(30) diff --git a/test_parented.py b/test_parented.py new file mode 100644 index 0000000..a199c63 --- /dev/null +++ b/test_parented.py @@ -0,0 +1,4 @@ +from mpc import ParentedLet + +# TODO: implement simple children and check that we will receive the expected +# messages on the queue