event.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. from __future__ import print_function
  2. from gevent import monkey
  3. monkey.patch_all(subprocess=True)
  4. import logging
  5. logging.getLogger('mpd').setLevel(logging.WARNING)
  6. from datetime import datetime, timedelta
  7. import gevent
  8. from gevent.queue import Queue
  9. from tinydb import TinyDB
  10. from .eventutils import ParentedLet, Timer
  11. from .timegen import timegenerate
  12. from .audiogen import audiogenerate
  13. class EventModel(object):
  14. def __init__(self, uri):
  15. self.uri = uri
  16. self.db = None
  17. self.reload()
  18. def reload(self):
  19. if self.db is not None:
  20. self.db.close()
  21. self.db = TinyDB(self.uri)
  22. self.actions = self.db.table('actions')
  23. self.alarms = self.db.table('alarms')
  24. def get_action_by_id(self, action_id):
  25. return self.actions.get(eid=action_id)
  26. def get_alarm_by_id(self, alarm_id):
  27. return self.alarms.get(eid=alarm_id)
  28. def get_actions_by_alarm(self, alarm):
  29. for action_id in alarm.get('actions', []):
  30. yield self.get_action_by_id(action_id)
  31. def get_all_alarms(self):
  32. return self.alarms.all()
  33. def get_all_actions(self):
  34. return self.actions.all()
  35. def get_all_alarms_expanded(self):
  36. for alarm in self.get_all_alarms():
  37. for action in self.get_actions_by_alarm(alarm):
  38. yield alarm, action
  39. def add_event(self, alarm, actions):
  40. action_ids = [self.add_action(a) for a in actions]
  41. alarm['actions'] = action_ids
  42. return self.alarms.insert(alarm)
  43. def add_action(self, action):
  44. return self.actions.insert(action)
  45. def add_alarm(self, alarm):
  46. return self.add_event(alarm, [])
  47. def update_alarm(self, alarmid, new_fields={}):
  48. return self.alarms.update(new_fields, eids=[alarmid])
  49. def update_action(self, actionid, new_fields={}):
  50. return self.actions.update(new_fields, eids=[actionid])
  51. class Monitor(ParentedLet):
  52. '''
  53. Manages timegenerators and audiogenerators.
  54. The mechanism is partially based on ticks, partially on scheduled actions.
  55. Ticks are emitted periodically; at every tick, :func:`on_tick
  56. <larigira.event.Monitor.on_tick>` checks if any event is "near enough". If
  57. an event is near enough, it is ":func:`scheduled
  58. <larigira.event.Monitor.schedule>`": a greenlet is run which will wait for
  59. the right time, then generate the audio, then submit to Controller.
  60. The tick mechanism allows for events to be changed on disk: if everything
  61. was scheduled immediately, no further changes would be possible.
  62. The scheduling mechanism allows for more precision, catching exactly the
  63. right time. Being accurate only with ticks would have required very
  64. frequent ticks, which is cpu-intensive.
  65. '''
  66. def __init__(self, parent_queue, conf):
  67. ParentedLet.__init__(self, parent_queue)
  68. self.log = logging.getLogger(self.__class__.__name__)
  69. self.running = {}
  70. self.conf = conf
  71. self.q = Queue()
  72. self.model = EventModel(self.conf['DB_URI'])
  73. self.ticker = Timer(int(self.conf['EVENT_TICK_SECS']) * 1000, self.q)
  74. def _alarm_missing_time(self, timespec):
  75. now = datetime.now() + timedelta(seconds=self.conf['CACHING_TIME'])
  76. try:
  77. when = next(timegenerate(timespec, now=now))
  78. except:
  79. logging.exception("Could not generate "
  80. "an alarm from timespec {}".format(timespec))
  81. if when is None:
  82. # expired
  83. return None
  84. delta = (when - now).total_seconds()
  85. assert delta > 0
  86. return delta
  87. def on_tick(self):
  88. '''
  89. this is called every EVENT_TICK_SECS.
  90. Checks every event in the DB (which might be slightly CPU-intensive, so
  91. it is advisable to run it in its own greenlet); if the event is "near
  92. enough", schedule it; if it is too far, or already expired, ignore it.
  93. '''
  94. self.model.reload()
  95. for alarm in self.model.get_all_alarms():
  96. actions = list(self.model.get_actions_by_alarm(alarm))
  97. if alarm.eid in self.running:
  98. continue
  99. delta = self._alarm_missing_time(alarm)
  100. # why this 2*EVENT_TICK_SECS? EVENT_TICK_SECS would be enough,
  101. # but it is "tricky"; any small delay would cause the event to be
  102. # missed
  103. if delta is not None and delta <= 2*self.conf['EVENT_TICK_SECS']:
  104. self.log.debug('Scheduling event {} ({}s) => {}'
  105. .format(alarm.get('nick', alarm.eid),
  106. delta,
  107. [a.get('nick', a.eid) for a in actions]
  108. ))
  109. self.schedule(alarm, actions, delta)
  110. else:
  111. self.log.debug('Skipping event {}, too far ({}s)'
  112. .format(alarm.get('nick', alarm.eid),
  113. delta))
  114. def schedule(self, timespec, audiospecs, delta=None):
  115. '''
  116. prepare an event to be run at a specified time with the specified
  117. actions; the DB won't be read anymore after this call.
  118. This means that this call should not be done too early, or any update
  119. to the DB will be ignored.
  120. '''
  121. if delta is None:
  122. delta = self._alarm_missing_time(timespec)
  123. audiogen = gevent.spawn_later(delta,
  124. self.process_action,
  125. timespec, audiospecs)
  126. audiogen.parent_greenlet = self
  127. audiogen.doc = 'Will wait {} seconds, then generate audio "{}"'.format(
  128. delta,
  129. ','.join(aspec.get('nick', '') for aspec in audiospecs),
  130. )
  131. self.running[timespec.eid] = {
  132. 'greenlet': audiogen,
  133. 'running_time': datetime.now() + timedelta(seconds=delta),
  134. 'timespec': timespec,
  135. 'audiospecs': audiospecs
  136. }
  137. def process_action(self, timespec, audiospecs):
  138. '''Generate audio and submit it to Controller'''
  139. if timespec.eid in self.running:
  140. del self.running[timespec.eid]
  141. else:
  142. self.log.warn('Timespec {} completed but not in running '
  143. 'registry; this is most likely a bug'.
  144. format(timespec.get('nick', timespec.eid)))
  145. uris = []
  146. for audiospec in audiospecs:
  147. try:
  148. uris.extend(audiogenerate(audiospec))
  149. except:
  150. self.log.error('audiogenerate for {} failed'
  151. .format(audiospec))
  152. self.send_to_parent('uris_enqueue',
  153. dict(uris=uris,
  154. timespec=timespec,
  155. audiospecs=audiospecs,
  156. aids=[a.eid for a in audiospecs]))
  157. def _run(self):
  158. self.ticker.start()
  159. gevent.spawn(self.on_tick)
  160. while True:
  161. value = self.q.get()
  162. kind = value['kind']
  163. if kind in ('forcetick', 'timer'):
  164. gevent.spawn(self.on_tick)
  165. else:
  166. self.log.warning("Unknown message: %s" % str(value))