eventutils.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import gevent
  2. class ParentedLet(gevent.Greenlet):
  3. '''
  4. ParentedLet is just a helper subclass that will help you when your
  5. greenlet main duty is to "signal" things to a parent_queue.
  6. It won't save you much code, but "standardize" messages and make explicit
  7. the role of that greenlet
  8. '''
  9. def __init__(self, queue):
  10. gevent.Greenlet.__init__(self)
  11. self.parent_queue = queue
  12. self.tracker = None # set this to recognize easily
  13. def parent_msg(self, kind, *args):
  14. return {
  15. 'emitter': self,
  16. 'class': self.__class__.__name__,
  17. 'tracker': self.tracker,
  18. 'kind': kind,
  19. 'args': args
  20. }
  21. def send_to_parent(self, kind, *args):
  22. self.parent_queue.put(self.parent_msg(kind, *args))
  23. def _run(self):
  24. if not hasattr(self, 'do_business'):
  25. raise Exception("do_business method not implemented by %s" %
  26. self.__class__.__name__)
  27. for msg in self.do_business():
  28. self.send_to_parent(*msg)
  29. class CeleryTask(ParentedLet):
  30. def __init__(self, task, queue, task_args=tuple(), **kwargs):
  31. ParentedLet.__init__(self, queue)
  32. self.task = task
  33. self.task_args = task_args
  34. self.apply_async_args = kwargs
  35. def parent_msg(self, kind, *args):
  36. msg = ParentedLet.parent_msg(self, kind, *args)
  37. msg['task'] = self.task
  38. return msg
  39. def do_business(self):
  40. asyncres = self.task.apply_async(self.task_args,
  41. **self.apply_async_args)
  42. val = asyncres.get()
  43. yield ('celery', val)
  44. class Timer(ParentedLet):
  45. def __init__(self, milliseconds, queue):
  46. ParentedLet.__init__(self, queue)
  47. self.ms = milliseconds
  48. def parent_msg(self, kind, *args):
  49. msg = ParentedLet.parent_msg(self, kind, *args)
  50. msg['period'] = self.ms
  51. return msg
  52. def do_business(self):
  53. while True:
  54. gevent.sleep(self.ms / 1000.0)
  55. yield ('timer', )