event.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. from __future__ import print_function
  2. from gevent import monkey
  3. monkey.patch_all(subprocess=True)
  4. import logging
  5. FORMAT = '%(asctime)s|%(levelname)s[%(name)s:%(lineno)d] %(message)s'
  6. logging.basicConfig(level=logging.INFO,
  7. format=FORMAT,
  8. datefmt='%H:%M:%S')
  9. logging.getLogger('mpd').setLevel(logging.WARNING)
  10. from datetime import datetime, timedelta
  11. import gevent
  12. from gevent.queue import Queue
  13. from tinydb import TinyDB
  14. from .eventutils import ParentedLet
  15. from .timegen import timegenerate
  16. from .audiogen import audiogenerate
  17. class EventModel(object):
  18. def __init__(self, uri):
  19. self.uri = uri
  20. self.db = TinyDB(uri)
  21. self.actions = self.db.table('actions')
  22. self.alarms = self.db.table('alarms')
  23. def get_action_by_id(self, action_id):
  24. return self.actions.get(eid=action_id)
  25. def get_alarm_by_id(self, alarm_id):
  26. return self.alarms.get(eid=alarm_id)
  27. def get_actions_by_alarm(self, alarm):
  28. for action_id in alarm.get('actions', []):
  29. yield self.get_action_by_id(action_id)
  30. def get_all_alarms(self):
  31. return self.alarms.all()
  32. def get_all_actions(self):
  33. return self.actions.all()
  34. def get_all_alarms_expanded(self):
  35. for alarm in self.get_all_alarms():
  36. for action in self.get_actions_by_alarm(alarm):
  37. yield alarm, action
  38. def add_event(self, alarm, actions):
  39. action_ids = [self.add_action(a) for a in actions]
  40. alarm['actions'] = action_ids
  41. return self.alarms.insert(alarm)
  42. def add_action(self, action):
  43. return self.actions.insert(action)
  44. def add_alarm(self, alarm):
  45. return self.add_event(alarm, [])
  46. def update_alarm(self, alarmid, new_fields={}):
  47. return self.alarms.update(new_fields, eids=[alarmid])
  48. class EventSource(ParentedLet):
  49. def __init__(self, queue, uri):
  50. ParentedLet.__init__(self, queue)
  51. self.log = logging.getLogger(self.__class__.__name__)
  52. self.log.debug('uri is %s' % uri)
  53. self.model = EventModel(uri)
  54. self.log.debug('opened %s' % uri)
  55. def parent_msg(self, kind, *args):
  56. if kind == 'add':
  57. msg = ParentedLet.parent_msg(self, kind, *args[2:])
  58. msg['timespec'] = args[0]
  59. msg['audiospec'] = args[1]
  60. else:
  61. msg = ParentedLet.parent_msg(self, kind, *args)
  62. return msg
  63. def reload_id(self, alarm_id):
  64. '''
  65. Check if the event is still valid, and put "add" messages on queue
  66. '''
  67. alarm = self.model.get_alarm_by_id(alarm_id)
  68. for action in self.model.get_actions_by_alarm(alarm):
  69. self.send_to_parent('add', alarm, action)
  70. def do_business(self):
  71. for alarm, action in self.model.get_all_alarms_expanded():
  72. self.log.debug('scheduling {}'.format(alarm))
  73. yield ('add', alarm, action)
  74. class Monitor(ParentedLet):
  75. '''Manages timegenerators and audiogenerators'''
  76. def __init__(self, parent_queue, conf):
  77. ParentedLet.__init__(self, parent_queue)
  78. self.log = logging.getLogger(self.__class__.__name__)
  79. self.q = Queue()
  80. self.running = {}
  81. self.conf = conf
  82. self.source = EventSource(self.q, uri=conf['DB_URI'])
  83. self.source.parent_greenlet = self
  84. def add(self, timespec, audiospec):
  85. '''
  86. this is somewhat recursive: after completion calls reload_id, which
  87. could call this method again
  88. '''
  89. now = datetime.now() + timedelta(seconds=self.conf['CACHING_TIME'])
  90. try:
  91. when = next(timegenerate(timespec, now=now))
  92. except:
  93. logging.exception("Could not generate "
  94. "an alarm from timespec {}".format(timespec))
  95. if when is None:
  96. # expired
  97. return
  98. delta = when - now
  99. assert delta.total_seconds() > 0
  100. self.log.info('Timer<{}> will run after {} seconds, triggering <{}>'.format(
  101. timespec.get('nick', timespec.eid),
  102. int(delta.total_seconds()),
  103. audiospec.get('nick', audiospec.eid)
  104. ))
  105. audiogen = gevent.spawn_later(delta.total_seconds(), audiogenerate,
  106. audiospec)
  107. audiogen.parent_greenlet = self
  108. audiogen.doc = 'Will wait {} seconds, then generate audio "{}"'.format(
  109. delta.total_seconds(),
  110. audiospec.get('nick', ''))
  111. self.running[timespec.eid] = {
  112. 'greenlet': audiogen,
  113. 'running_time': datetime.now() + timedelta(
  114. seconds=delta.total_seconds()),
  115. 'audiospec': audiospec
  116. }
  117. gl = gevent.spawn_later(delta.total_seconds(),
  118. self.source.reload_id,
  119. timespec.eid)
  120. gl.parent_greenlet = self
  121. gl.doc = 'Will wait, then reload id {}'.format(timespec.eid)
  122. # FIXME: audiogen is ready in a moment between
  123. # exact_time - CACHING_TIME and the exact_time
  124. # atm we are just ignoring this "window", saying that any moment is
  125. # fine
  126. # the more correct thing will be to wait till that exact moment
  127. # adding another spawn_later
  128. audiogen.link_value(lambda g: self.log.info(
  129. 'should play %s' % str(g.value)))
  130. audiogen.link_exception(lambda g: self.log.exception(
  131. 'Failure in audiogen {}: {}'.format(audiospec, audiogen.exception)))
  132. audiogen.link_value(lambda g:
  133. self.send_to_parent('add',
  134. dict(uris=g.value,
  135. audiospec=audiospec,
  136. aid=audiospec.eid
  137. ))
  138. )
  139. def _run(self):
  140. self.source.start()
  141. while True:
  142. value = self.q.get()
  143. self.log.debug('<- %s' % str(value))
  144. kind = value['kind']
  145. if kind == 'add':
  146. self.add(value['timespec'], value['audiospec'])
  147. elif kind == 'remove':
  148. raise NotImplementedError()
  149. else:
  150. self.log.warning("Unknown message: %s" % str(value))