event.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. from __future__ import print_function
  2. import logging
  3. from datetime import datetime, timedelta
  4. import gevent
  5. from gevent import monkey
  6. from gevent.queue import Queue
  7. from .audiogen import audiogenerate
  8. from .db import EventModel
  9. from .eventutils import ParentedLet, Timer
  10. from .timegen import timegenerate
  11. monkey.patch_all(subprocess=True)
  12. logging.getLogger("mpd").setLevel(logging.WARNING)
  13. class Monitor(ParentedLet):
  14. """
  15. Manages timegenerators and audiogenerators for DB events
  16. The mechanism is partially based on ticks, partially on scheduled actions.
  17. Ticks are emitted periodically; at every tick, :func:`on_tick
  18. <larigira.event.Monitor.on_tick>` checks if any event is "near enough". If
  19. an event is near enough, it is ":func:`scheduled
  20. <larigira.event.Monitor.schedule>`": a greenlet is run which will wait for
  21. the right time, then generate the audio, then submit to Controller.
  22. The tick mechanism allows for events to be changed on disk: if everything
  23. was scheduled immediately, no further changes would be possible.
  24. The scheduling mechanism allows for more precision, catching exactly the
  25. right time. Being accurate only with ticks would have required very
  26. frequent ticks, which is cpu-intensive.
  27. """
  28. def __init__(self, parent_queue, conf):
  29. ParentedLet.__init__(self, parent_queue)
  30. self.log = logging.getLogger(self.__class__.__name__)
  31. self.running = {}
  32. self.conf = conf
  33. self.q = Queue()
  34. self.model = EventModel(self.conf["DB_URI"])
  35. self.ticker = Timer(int(self.conf["EVENT_TICK_SECS"]) * 1000, self.q)
  36. def _alarm_missing_time(self, timespec):
  37. now = datetime.now() + timedelta(seconds=self.conf["CACHING_TIME"])
  38. try:
  39. when = next(timegenerate(timespec, now=now))
  40. except:
  41. logging.exception(
  42. "Could not generate " "an alarm from timespec %s", timespec
  43. )
  44. return None
  45. if when is None:
  46. # expired
  47. return None
  48. delta = (when - now).total_seconds()
  49. assert delta > 0
  50. return delta
  51. def on_tick(self):
  52. """
  53. this is called every EVENT_TICK_SECS.
  54. Checks every event in the DB (which might be slightly CPU-intensive, so
  55. it is advisable to run it in its own greenlet); if the event is "near
  56. enough", schedule it; if it is too far, or already expired, ignore it.
  57. """
  58. self.model.reload()
  59. for alarm in self.model.get_all_alarms():
  60. actions = list(self.model.get_actions_by_alarm(alarm))
  61. if alarm.eid in self.running:
  62. continue
  63. delta = self._alarm_missing_time(alarm)
  64. # why this 2*EVENT_TICK_SECS? EVENT_TICK_SECS would be enough,
  65. # but it is "tricky"; any small delay would cause the event to be
  66. # missed
  67. if delta is None:
  68. # this is way too much logging! we need more levels!
  69. # self.log.debug(
  70. # "Skipping event %s: will never ring", alarm.get("nick", alarm.eid)
  71. # )
  72. pass
  73. elif delta <= 2 * self.conf["EVENT_TICK_SECS"]:
  74. self.log.debug(
  75. "Scheduling event %s (%ds) => %s",
  76. alarm.get("nick", alarm.eid),
  77. delta,
  78. [a.get("nick", a.eid) for a in actions],
  79. )
  80. self.schedule(alarm, actions, delta)
  81. else:
  82. self.log.debugv(
  83. "Skipping event %s too far (%ds)",
  84. alarm.get("nick", alarm.eid),
  85. delta,
  86. )
  87. def schedule(self, timespec, audiospecs, delta=None):
  88. """
  89. prepare an event to be run at a specified time with the specified
  90. actions; the DB won't be read anymore after this call.
  91. This means that this call should not be done too early, or any update
  92. to the DB will be ignored.
  93. """
  94. if delta is None:
  95. delta = self._alarm_missing_time(timespec)
  96. audiogen = gevent.spawn_later(
  97. delta, self.process_action, timespec, audiospecs
  98. )
  99. audiogen.parent_greenlet = self
  100. audiogen.doc = 'Will wait {} seconds, then generate audio "{}"'.format(
  101. delta, ",".join(aspec.get("nick", "") for aspec in audiospecs)
  102. )
  103. self.running[timespec.eid] = {
  104. "greenlet": audiogen,
  105. "running_time": datetime.now() + timedelta(seconds=delta),
  106. "timespec": timespec,
  107. "audiospecs": audiospecs,
  108. }
  109. def process_action(self, timespec, audiospecs):
  110. """Generate audio and submit it to Controller"""
  111. if timespec.eid in self.running:
  112. del self.running[timespec.eid]
  113. else:
  114. self.log.warning(
  115. "Timespec %s completed but not in running "
  116. "registry; this is most likely a bug",
  117. timespec.get("nick", timespec.eid),
  118. )
  119. uris = []
  120. for audiospec in audiospecs:
  121. try:
  122. uris.extend(audiogenerate(audiospec))
  123. except Exception as exc:
  124. self.log.error(
  125. "audiogenerate for <%s> failed; reason: %s",
  126. str(audiospec),
  127. str(exc),
  128. )
  129. self.send_to_parent(
  130. "uris_enqueue",
  131. dict(
  132. uris=uris,
  133. timespec=timespec,
  134. audiospecs=audiospecs,
  135. aids=[a.eid for a in audiospecs],
  136. ),
  137. )
  138. def _run(self):
  139. self.ticker.start()
  140. gevent.spawn(self.on_tick)
  141. while True:
  142. value = self.q.get()
  143. kind = value["kind"]
  144. if kind in ("forcetick", "timer"):
  145. gevent.spawn(self.on_tick)
  146. else:
  147. self.log.warning("Unknown message: %s", str(value))