diff --git a/doc/source/about.rst b/doc/source/about.rst index 161d682..3c34a5d 100644 --- a/doc/source/about.rst +++ b/doc/source/about.rst @@ -57,8 +57,13 @@ Alarm system ~~~~~~~~~~~~ There is a DB. The lowest level is handled by TinyDB. :class:`larigira.event.EventModel` is a thin layer on -it, providing more abstract functions. The real deal is :class:`larigira.event.EventSource`, which is a -greenlet that sends notifications about alarms in the DB. Those notifications are received by -:class:`larigira.event.Monitor`, which "runs" them; it executes the time specification, make an appropriate -"sleep" and after that runs the audiogenerator. +it, providing more abstract functions. +There is a :class:`Monitor `, which is something that, well, "monitors" the DB and +schedule events appropriately. It will check alarms every ``EVENT_TICK_SECS`` seconds, or when larigira received +a ``SIGALRM`` (so ``pkill -ALRM larigira`` might be useful for you). + +You can view scheduled events using the web interface, at ``/view/status/running``. Please note that you will +only see *scheduled* events, which are events that will soon be added to playlist. That page will not give you +information about events that will run in more than ``2 * EVENT_TICK_SECS`` seconds (by default, this amounts +to 1 minute). diff --git a/larigira/config.py b/larigira/config.py index 6ee6ef5..7c7c944 100644 --- a/larigira/config.py +++ b/larigira/config.py @@ -23,7 +23,8 @@ def get_conf(prefix='LARIGIRA_'): conf['SECRET_KEY'] = 'Please replace me!' conf['MPD_WAIT_START'] = True conf['MPD_WAIT_START_RETRYSECS'] = 5 - conf['CHECK_SECS'] = 20 + conf['CHECK_SECS'] = 20 # period for checking playlist length + conf['EVENT_TICK_SECS'] = 30 # period for scheduling events conf['DEBUG'] = False conf.update(from_envvars(prefix=prefix)) return conf diff --git a/larigira/event.py b/larigira/event.py index 3a5c014..94d1369 100644 --- a/larigira/event.py +++ b/larigira/event.py @@ -9,7 +9,7 @@ import gevent from gevent.queue import Queue from tinydb import TinyDB -from .eventutils import ParentedLet +from .eventutils import ParentedLet, Timer from .timegen import timegenerate from .audiogen import audiogenerate @@ -57,53 +57,33 @@ class EventModel(object): return self.alarms.update(new_fields, eids=[alarmid]) -class EventSource(ParentedLet): - def __init__(self, queue, uri): - ParentedLet.__init__(self, queue) - self.log = logging.getLogger(self.__class__.__name__) - self.log.debug('uri is %s' % uri) - self.model = EventModel(uri) - self.log.debug('opened %s' % uri) - - def parent_msg(self, kind, *args): - if kind == 'add': - msg = ParentedLet.parent_msg(self, kind, *args[2:]) - msg['timespec'] = args[0] - msg['audiospec'] = args[1] - else: - msg = ParentedLet.parent_msg(self, kind, *args) - return msg - - def reload_id(self, alarm_id): - ''' - Check if the event is still valid, and put "add" messages on queue - ''' - alarm = self.model.get_alarm_by_id(alarm_id) - for action in self.model.get_actions_by_alarm(alarm): - self.send_to_parent('add', alarm, action) - - def do_business(self): - for alarm, action in self.model.get_all_alarms_expanded(): - self.log.debug('scheduling {}'.format(alarm)) - yield ('add', alarm, action) - - class Monitor(ParentedLet): - '''Manages timegenerators and audiogenerators''' + ''' + Manages timegenerators and audiogenerators. + + The mechanism is partially based on ticks, partially on scheduled actions. + Ticks are emitted periodically; at every tick, :func:`on_tick + ` checks if any event is "near enough". If + an event is near enough, it is ":func:`scheduled + `": a greenlet is run which will wait for + the right time, then generate the audio, then submit to Controller. + + The tick mechanism allows for events to be changed on disk: if everything + was scheduled immediately, no further changes would be possible. + The scheduling mechanism allows for more precision, catching exactly the + right time. Being accurate only with ticks would have required very + frequent ticks, which is cpu-intensive. + ''' def __init__(self, parent_queue, conf): ParentedLet.__init__(self, parent_queue) self.log = logging.getLogger(self.__class__.__name__) - self.q = Queue() self.running = {} self.conf = conf - self.source = EventSource(self.q, uri=conf['DB_URI']) - self.source.parent_greenlet = self + self.q = Queue() + self.model = EventModel(self.conf['DB_URI']) + self.ticker = Timer(int(self.conf['EVENT_TICK_SECS']) * 1000, self.q) - def add(self, timespec, audiospec): - ''' - this is somewhat recursive: after completion calls reload_id, which - could call this method again - ''' + def _alarm_missing_time(self, timespec): now = datetime.now() + timedelta(seconds=self.conf['CACHING_TIME']) try: when = next(timegenerate(timespec, now=now)) @@ -112,59 +92,79 @@ class Monitor(ParentedLet): "an alarm from timespec {}".format(timespec)) if when is None: # expired - return - delta = when - now - assert delta.total_seconds() > 0 - self.log.info('Timer<{}> will run after {} seconds, triggering <{}>'.format( - timespec.get('nick', timespec.eid), - int(delta.total_seconds()), - audiospec.get('nick', audiospec.eid) - )) + return None + delta = (when - now).total_seconds() + assert delta > 0 + return delta - audiogen = gevent.spawn_later(delta.total_seconds(), audiogenerate, - audiospec) + def on_tick(self): + ''' + this is called every EVENT_TICK_SECS. + Checks every event in the DB (which might be slightly CPU-intensive, so + it is advisable to run it in its own greenlet); if the event is "near + enough", schedule it; if it is too far, or already expired, ignore it. + ''' + for alarm, action in self.model.get_all_alarms_expanded(): + if alarm.eid in self.running: + continue + delta = self._alarm_missing_time(alarm) + # why this 2*EVENT_TICK_SECS? EVENT_TICK_SECS would be enough, + # but it is "tricky"; any small delay would cause the event to be + # missed + if delta is not None and delta <= 2*self.conf['EVENT_TICK_SECS']: + self.log.debug('Scheduling event {} ({}s)' + .format(alarm.get('nick', alarm.eid), + delta)) + self.schedule(alarm, action, delta) + else: + self.log.debug('Skipping event {}, too far ({}s)' + .format(alarm.get('nick', alarm.eid), + delta)) + + def schedule(self, timespec, audiospec, delta=None): + ''' + prepare an event to be run at a specified time with a specified action; + the DB won't be read anymore after this call. + + This means that this call should not be done too early, or any update + to the DB will be ignored. + ''' + if delta is None: + delta = self._alarm_missing_time(timespec) + + audiogen = gevent.spawn_later(delta, + self.process_action, + timespec, audiospec) audiogen.parent_greenlet = self audiogen.doc = 'Will wait {} seconds, then generate audio "{}"'.format( - delta.total_seconds(), + delta, audiospec.get('nick', '')) self.running[timespec.eid] = { 'greenlet': audiogen, - 'running_time': datetime.now() + timedelta( - seconds=delta.total_seconds()), + 'running_time': datetime.now() + timedelta(seconds=delta), 'audiospec': audiospec } - gl = gevent.spawn_later(delta.total_seconds(), - self.source.reload_id, - timespec.eid) - gl.parent_greenlet = self - gl.doc = 'Will wait, then reload id {}'.format(timespec.eid) - # FIXME: audiogen is ready in a moment between - # exact_time - CACHING_TIME and the exact_time - # atm we are just ignoring this "window", saying that any moment is - # fine - # the more correct thing will be to wait till that exact moment - # adding another spawn_later - audiogen.link_value(lambda g: self.log.info( - 'should play %s' % str(g.value))) - audiogen.link_exception(lambda g: self.log.exception( - 'Failure in audiogen {}: {}'.format(audiospec, audiogen.exception))) - audiogen.link_value(lambda g: - self.send_to_parent('add', - dict(uris=g.value, - audiospec=audiospec, - aid=audiospec.eid - )) - ) + + def process_action(self, timespec, audiospec): + '''Generate audio and submit it to Controller''' + if timespec.eid in self.running: + del self.running[timespec.eid] + else: + self.log.warn('Timespec {} completed but not in running ' + 'registry; this is most likely a bug'. + format(timespec.get('nick', timespec.eid))) + uris = audiogenerate(audiospec) + self.send_to_parent('add', dict(uris=uris, + audiospec=audiospec, + aid=audiospec.eid)) def _run(self): - self.source.start() + self.ticker.start() + gevent.spawn(self.on_tick) while True: value = self.q.get() - self.log.debug('<- %s' % str(value)) kind = value['kind'] - if kind == 'add': - self.add(value['timespec'], value['audiospec']) - elif kind == 'remove': - raise NotImplementedError() + if kind == 'timer': + gevent.spawn(self.on_tick) else: self.log.warning("Unknown message: %s" % str(value)) diff --git a/larigira/eventutils.py b/larigira/eventutils.py index afc0777..1f7d472 100644 --- a/larigira/eventutils.py +++ b/larigira/eventutils.py @@ -36,7 +36,7 @@ class ParentedLet(gevent.Greenlet): class Timer(ParentedLet): - '''wait some time, then send a "timer" message to parent''' + '''continously sleeps some time, then send a "timer" message to parent''' def __init__(self, milliseconds, queue): ParentedLet.__init__(self, queue) self.ms = milliseconds