event.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. from __future__ import print_function
  2. from gevent import monkey
  3. monkey.patch_all(subprocess=True)
  4. import logging
  5. FORMAT = '%(asctime)s|%(levelname)s[%(name)s:%(lineno)d] %(message)s'
  6. logging.basicConfig(level=logging.INFO,
  7. format=FORMAT,
  8. datefmt='%H:%M:%S')
  9. logging.getLogger('mpd').setLevel(logging.WARNING)
  10. from datetime import datetime, timedelta
  11. import gevent
  12. from gevent.queue import Queue
  13. from tinydb import TinyDB
  14. from eventutils import ParentedLet
  15. from timegen import timegenerate
  16. from audiogen import audiogenerate
  17. class EventModel(object):
  18. def __init__(self, uri):
  19. self.uri = uri
  20. self.db = TinyDB(uri)
  21. self.actions = self.db.table('actions')
  22. self.alarms = self.db.table('alarms')
  23. def get_action_by_id(self, action_id):
  24. return self.actions.get(eid=action_id)
  25. def get_alarm_by_id(self, alarm_id):
  26. return self.alarms.get(eid=alarm_id)
  27. def get_actions_by_alarm(self, alarm):
  28. for action_id in alarm.get('actions', []):
  29. yield self.get_action_by_id(action_id)
  30. def get_all_alarms(self):
  31. return self.alarms.all()
  32. def get_all_actions(self):
  33. return self.actions.all()
  34. def get_all_alarms_expanded(self):
  35. for alarm in self.get_all_alarms():
  36. for action in self.get_actions_by_alarm(alarm):
  37. yield alarm, action
  38. def add_event(self, alarm, actions):
  39. action_ids = [self.add_action(a) for a in actions]
  40. alarm['actions'] = action_ids
  41. return self.alarms.insert(alarm)
  42. def add_action(self, action):
  43. return self.actions.insert(action)
  44. def add_alarm(self, alarm):
  45. return self.add_event(alarm, [])
  46. def update_alarm(self, alarmid, new_fields={}):
  47. return self.alarms.update(new_fields, eids=[alarmid])
  48. class EventSource(ParentedLet):
  49. def __init__(self, queue, uri):
  50. ParentedLet.__init__(self, queue)
  51. self.log = logging.getLogger(self.__class__.__name__)
  52. self.log.debug('uri is %s' % uri)
  53. self.model = EventModel(uri)
  54. self.log.debug('opened %s' % uri)
  55. def parent_msg(self, kind, *args):
  56. if kind == 'add':
  57. msg = ParentedLet.parent_msg(self, kind, *args[2:])
  58. msg['timespec'] = args[0]
  59. msg['audiospec'] = args[1]
  60. else:
  61. msg = ParentedLet.parent_msg(self, kind, *args)
  62. return msg
  63. def reload_id(self, alarm_id):
  64. '''
  65. Check if the event is still valid, and put "add" messages on queue
  66. '''
  67. alarm = self.model.get_alarm_by_id(alarm_id)
  68. for action in self.model.get_actions_by_alarm(alarm):
  69. self.send_to_parent('add', alarm, action)
  70. def do_business(self):
  71. for alarm, action in self.model.get_all_alarms_expanded():
  72. self.log.debug('scheduling {}'.format(alarm))
  73. yield ('add', alarm, action)
  74. class Monitor(ParentedLet):
  75. def __init__(self, parent_queue, conf):
  76. ParentedLet.__init__(self, parent_queue)
  77. self.log = logging.getLogger(self.__class__.__name__)
  78. self.q = Queue()
  79. self.running = {}
  80. self.conf = conf
  81. self.source = EventSource(self.q, uri=conf['DB_URI'])
  82. def add(self, timespec, audiospec):
  83. '''
  84. this is somewhat recursive: after completion calls reload_id, which
  85. could call this method again
  86. '''
  87. now = datetime.now() + timedelta(seconds=self.conf['CACHING_TIME'])
  88. try:
  89. when = next(timegenerate(timespec, now=now))
  90. except:
  91. logging.exception("Could not generate "
  92. "an alarm from timespec {}".format(timespec))
  93. if when is None:
  94. # expired
  95. return
  96. delta = when - now
  97. assert delta.total_seconds() > 0
  98. self.log.info('Timer<{}> will run after {} seconds, triggering <{}>'.format(
  99. timespec.get('nick', timespec.eid),
  100. int(delta.total_seconds()),
  101. audiospec.get('nick', audiospec.eid)
  102. ))
  103. audiogen = gevent.spawn_later(delta.total_seconds(), audiogenerate,
  104. audiospec)
  105. self.running[timespec.eid] = {
  106. 'greenlet': audiogen,
  107. 'running_time': datetime.now() + timedelta(
  108. seconds=delta.total_seconds()),
  109. 'audiospec': audiospec
  110. }
  111. gevent.spawn_later(delta.total_seconds(),
  112. self.source.reload_id,
  113. timespec.eid)
  114. # FIXME: audiogen is ready in a moment between
  115. # exact_time - CACHING_TIME and the exact_time
  116. # atm we are just ignoring this "window", saying that any moment is
  117. # fine
  118. # the more correct thing will be to wait till that exact moment
  119. # adding another spawn_later
  120. audiogen.link_value(lambda g: self.log.info(
  121. 'should play %s' % str(g.value)))
  122. audiogen.link_exception(lambda g: self.log.exception(
  123. 'Failure in audiogen {}: {}'.format(audiospec, audiogen.exception)))
  124. audiogen.link_value(lambda g: self.send_to_parent('add', g.value))
  125. def _run(self):
  126. self.source.start()
  127. while True:
  128. value = self.q.get()
  129. self.log.debug('<- %s' % str(value))
  130. kind = value['kind']
  131. if kind == 'add':
  132. self.add(value['timespec'], value['audiospec'])
  133. elif kind == 'remove':
  134. raise NotImplementedError()
  135. else:
  136. self.log.warning("Unknown message: %s" % str(value))