eventutils.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import gevent
  2. class ParentedLet(gevent.Greenlet):
  3. '''
  4. ParentedLet is just a helper subclass that will help you when your
  5. greenlet main duty is to "signal" things to a parent_queue.
  6. It won't save you much code, but "standardize" messages and make explicit
  7. the role of that greenlet
  8. '''
  9. def __init__(self, queue):
  10. gevent.Greenlet.__init__(self)
  11. self.parent_queue = queue
  12. def parent_msg(self, kind, *args):
  13. return {
  14. 'emitter': self,
  15. 'class': self.__class__.__name__,
  16. 'kind': kind,
  17. 'args': args
  18. }
  19. def send_to_parent(self, kind, *args):
  20. self.parent_queue.put(self.parent_msg(kind, *args))
  21. def _run(self):
  22. if not hasattr(self, 'do_business'):
  23. raise Exception("do_business method not implemented by %s" %
  24. self.__class__.__name__)
  25. for msg in self.do_business():
  26. self.send_to_parent(*msg)
  27. class CeleryTask(ParentedLet):
  28. def __init__(self, task, queue, task_args=tuple(), **kwargs):
  29. ParentedLet.__init__(self, queue)
  30. self.task = task
  31. self.task_args = task_args
  32. self.apply_async_args = kwargs
  33. def parent_msg(self, kind, *args):
  34. msg = ParentedLet.parent_msg(self, kind, *args)
  35. msg['task'] = self.task
  36. return msg
  37. def do_business(self):
  38. asyncres = self.task.apply_async(self.task_args,
  39. **self.apply_async_args)
  40. val = asyncres.get()
  41. yield ('celery', val)
  42. class Timer(ParentedLet):
  43. def __init__(self, milliseconds, queue):
  44. ParentedLet.__init__(self, queue)
  45. self.ms = milliseconds
  46. def parent_msg(self, kind, *args):
  47. msg = ParentedLet.parent_msg(self, kind, *args)
  48. msg['period'] = self.ms
  49. return msg
  50. def do_business(self):
  51. while True:
  52. gevent.sleep(self.ms / 1000.0)
  53. yield ('timer', )