event.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from __future__ import print_function
  2. from gevent import monkey
  3. monkey.patch_all(subprocess=True)
  4. import logging
  5. logging.getLogger('mpd').setLevel(logging.WARNING)
  6. from datetime import datetime, timedelta
  7. import gevent
  8. from gevent.queue import Queue
  9. from tinydb import TinyDB
  10. from .eventutils import ParentedLet, Timer
  11. from .timegen import timegenerate
  12. from .audiogen import audiogenerate
  13. class EventModel(object):
  14. def __init__(self, uri):
  15. self.uri = uri
  16. self.db = None
  17. self.reload()
  18. def reload(self):
  19. if self.db is not None:
  20. self.db.close()
  21. self.db = TinyDB(self.uri, indent=2)
  22. self.actions = self.db.table('actions')
  23. self.alarms = self.db.table('alarms')
  24. def get_action_by_id(self, action_id):
  25. return self.actions.get(eid=action_id)
  26. def get_alarm_by_id(self, alarm_id):
  27. return self.alarms.get(eid=alarm_id)
  28. def get_actions_by_alarm(self, alarm):
  29. for action_id in alarm.get('actions', []):
  30. action = self.get_action_by_id(action_id)
  31. if action is None: continue
  32. yield action
  33. def get_all_alarms(self):
  34. return self.alarms.all()
  35. def get_all_actions(self):
  36. return self.actions.all()
  37. def get_all_alarms_expanded(self):
  38. for alarm in self.get_all_alarms():
  39. for action in self.get_actions_by_alarm(alarm):
  40. yield alarm, action
  41. def add_event(self, alarm, actions):
  42. action_ids = [self.add_action(a) for a in actions]
  43. alarm['actions'] = action_ids
  44. return self.alarms.insert(alarm)
  45. def add_action(self, action):
  46. return self.actions.insert(action)
  47. def add_alarm(self, alarm):
  48. return self.add_event(alarm, [])
  49. def update_alarm(self, alarmid, new_fields={}):
  50. return self.alarms.update(new_fields, eids=[alarmid])
  51. def update_action(self, actionid, new_fields={}):
  52. return self.actions.update(new_fields, eids=[actionid])
  53. def delete_alarm(self, alarmid):
  54. return self.alarms.remove(eids=[alarmid])
  55. def delete_action(self, actionid):
  56. return self.actions.remove(eids=[actionid])
  57. class Monitor(ParentedLet):
  58. '''
  59. Manages timegenerators and audiogenerators.
  60. The mechanism is partially based on ticks, partially on scheduled actions.
  61. Ticks are emitted periodically; at every tick, :func:`on_tick
  62. <larigira.event.Monitor.on_tick>` checks if any event is "near enough". If
  63. an event is near enough, it is ":func:`scheduled
  64. <larigira.event.Monitor.schedule>`": a greenlet is run which will wait for
  65. the right time, then generate the audio, then submit to Controller.
  66. The tick mechanism allows for events to be changed on disk: if everything
  67. was scheduled immediately, no further changes would be possible.
  68. The scheduling mechanism allows for more precision, catching exactly the
  69. right time. Being accurate only with ticks would have required very
  70. frequent ticks, which is cpu-intensive.
  71. '''
  72. def __init__(self, parent_queue, conf):
  73. ParentedLet.__init__(self, parent_queue)
  74. self.log = logging.getLogger(self.__class__.__name__)
  75. self.running = {}
  76. self.conf = conf
  77. self.q = Queue()
  78. self.model = EventModel(self.conf['DB_URI'])
  79. self.ticker = Timer(int(self.conf['EVENT_TICK_SECS']) * 1000, self.q)
  80. def _alarm_missing_time(self, timespec):
  81. now = datetime.now() + timedelta(seconds=self.conf['CACHING_TIME'])
  82. try:
  83. when = next(timegenerate(timespec, now=now))
  84. except:
  85. logging.exception("Could not generate "
  86. "an alarm from timespec %s", timespec)
  87. if when is None:
  88. # expired
  89. return None
  90. delta = (when - now).total_seconds()
  91. assert delta > 0
  92. return delta
  93. def on_tick(self):
  94. '''
  95. this is called every EVENT_TICK_SECS.
  96. Checks every event in the DB (which might be slightly CPU-intensive, so
  97. it is advisable to run it in its own greenlet); if the event is "near
  98. enough", schedule it; if it is too far, or already expired, ignore it.
  99. '''
  100. self.model.reload()
  101. for alarm in self.model.get_all_alarms():
  102. actions = list(self.model.get_actions_by_alarm(alarm))
  103. if alarm.eid in self.running:
  104. continue
  105. delta = self._alarm_missing_time(alarm)
  106. # why this 2*EVENT_TICK_SECS? EVENT_TICK_SECS would be enough,
  107. # but it is "tricky"; any small delay would cause the event to be
  108. # missed
  109. if delta is None:
  110. self.log.debug('Skipping event %s: will never ring',
  111. alarm.get('nick', alarm.eid))
  112. elif delta <= 2*self.conf['EVENT_TICK_SECS']:
  113. self.log.debug('Scheduling event %s (%ds) => %s',
  114. alarm.get('nick', alarm.eid),
  115. delta,
  116. [a.get('nick', a.eid) for a in actions]
  117. )
  118. self.schedule(alarm, actions, delta)
  119. else:
  120. self.log.debug('Skipping event %s too far (%ds)',
  121. alarm.get('nick', alarm.eid),
  122. delta,
  123. )
  124. def schedule(self, timespec, audiospecs, delta=None):
  125. '''
  126. prepare an event to be run at a specified time with the specified
  127. actions; the DB won't be read anymore after this call.
  128. This means that this call should not be done too early, or any update
  129. to the DB will be ignored.
  130. '''
  131. if delta is None:
  132. delta = self._alarm_missing_time(timespec)
  133. audiogen = gevent.spawn_later(delta,
  134. self.process_action,
  135. timespec, audiospecs)
  136. audiogen.parent_greenlet = self
  137. audiogen.doc = 'Will wait {} seconds, then generate audio "{}"'.format(
  138. delta,
  139. ','.join(aspec.get('nick', '') for aspec in audiospecs),
  140. )
  141. self.running[timespec.eid] = {
  142. 'greenlet': audiogen,
  143. 'running_time': datetime.now() + timedelta(seconds=delta),
  144. 'timespec': timespec,
  145. 'audiospecs': audiospecs
  146. }
  147. def process_action(self, timespec, audiospecs):
  148. '''Generate audio and submit it to Controller'''
  149. if timespec.eid in self.running:
  150. del self.running[timespec.eid]
  151. else:
  152. self.log.warning('Timespec %s completed but not in running '
  153. 'registry; this is most likely a bug',
  154. timespec.get('nick', timespec.eid))
  155. uris = []
  156. for audiospec in audiospecs:
  157. try:
  158. uris.extend(audiogenerate(audiospec))
  159. except:
  160. self.log.error('audiogenerate for %s failed',
  161. str(audiospec))
  162. self.send_to_parent('uris_enqueue',
  163. dict(uris=uris,
  164. timespec=timespec,
  165. audiospecs=audiospecs,
  166. aids=[a.eid for a in audiospecs]))
  167. def _run(self):
  168. self.ticker.start()
  169. gevent.spawn(self.on_tick)
  170. while True:
  171. value = self.q.get()
  172. kind = value['kind']
  173. if kind in ('forcetick', 'timer'):
  174. gevent.spawn(self.on_tick)
  175. else:
  176. self.log.warning("Unknown message: %s", str(value))