rewrite event system to be tick-based

FIX #16
This commit is contained in:
boyska 2016-09-08 22:43:00 +02:00
parent 6a1f69109d
commit 9b9e0b72a4
No known key found for this signature in database
GPG key ID: 7395DCAE58289CA9
4 changed files with 94 additions and 88 deletions

View file

@ -57,8 +57,13 @@ Alarm system
~~~~~~~~~~~~
There is a DB. The lowest level is handled by TinyDB. :class:`larigira.event.EventModel` is a thin layer on
it, providing more abstract functions. The real deal is :class:`larigira.event.EventSource`, which is a
greenlet that sends notifications about alarms in the DB. Those notifications are received by
:class:`larigira.event.Monitor`, which "runs" them; it executes the time specification, make an appropriate
"sleep" and after that runs the audiogenerator.
it, providing more abstract functions.
There is a :class:`Monitor <larigira.event.Monitor>`, which is something that, well, "monitors" the DB and
schedule events appropriately. It will check alarms every ``EVENT_TICK_SECS`` seconds, or when larigira received
a ``SIGALRM`` (so ``pkill -ALRM larigira`` might be useful for you).
You can view scheduled events using the web interface, at ``/view/status/running``. Please note that you will
only see *scheduled* events, which are events that will soon be added to playlist. That page will not give you
information about events that will run in more than ``2 * EVENT_TICK_SECS`` seconds (by default, this amounts
to 1 minute).

View file

@ -23,7 +23,8 @@ def get_conf(prefix='LARIGIRA_'):
conf['SECRET_KEY'] = 'Please replace me!'
conf['MPD_WAIT_START'] = True
conf['MPD_WAIT_START_RETRYSECS'] = 5
conf['CHECK_SECS'] = 20
conf['CHECK_SECS'] = 20 # period for checking playlist length
conf['EVENT_TICK_SECS'] = 30 # period for scheduling events
conf['DEBUG'] = False
conf.update(from_envvars(prefix=prefix))
return conf

View file

@ -9,7 +9,7 @@ import gevent
from gevent.queue import Queue
from tinydb import TinyDB
from .eventutils import ParentedLet
from .eventutils import ParentedLet, Timer
from .timegen import timegenerate
from .audiogen import audiogenerate
@ -57,53 +57,33 @@ class EventModel(object):
return self.alarms.update(new_fields, eids=[alarmid])
class EventSource(ParentedLet):
def __init__(self, queue, uri):
ParentedLet.__init__(self, queue)
self.log = logging.getLogger(self.__class__.__name__)
self.log.debug('uri is %s' % uri)
self.model = EventModel(uri)
self.log.debug('opened %s' % uri)
def parent_msg(self, kind, *args):
if kind == 'add':
msg = ParentedLet.parent_msg(self, kind, *args[2:])
msg['timespec'] = args[0]
msg['audiospec'] = args[1]
else:
msg = ParentedLet.parent_msg(self, kind, *args)
return msg
def reload_id(self, alarm_id):
'''
Check if the event is still valid, and put "add" messages on queue
'''
alarm = self.model.get_alarm_by_id(alarm_id)
for action in self.model.get_actions_by_alarm(alarm):
self.send_to_parent('add', alarm, action)
def do_business(self):
for alarm, action in self.model.get_all_alarms_expanded():
self.log.debug('scheduling {}'.format(alarm))
yield ('add', alarm, action)
class Monitor(ParentedLet):
'''Manages timegenerators and audiogenerators'''
'''
Manages timegenerators and audiogenerators.
The mechanism is partially based on ticks, partially on scheduled actions.
Ticks are emitted periodically; at every tick, :func:`on_tick
<larigira.event.Monitor.on_tick>` checks if any event is "near enough". If
an event is near enough, it is ":func:`scheduled
<larigira.event.Monitor.schedule>`": a greenlet is run which will wait for
the right time, then generate the audio, then submit to Controller.
The tick mechanism allows for events to be changed on disk: if everything
was scheduled immediately, no further changes would be possible.
The scheduling mechanism allows for more precision, catching exactly the
right time. Being accurate only with ticks would have required very
frequent ticks, which is cpu-intensive.
'''
def __init__(self, parent_queue, conf):
ParentedLet.__init__(self, parent_queue)
self.log = logging.getLogger(self.__class__.__name__)
self.q = Queue()
self.running = {}
self.conf = conf
self.source = EventSource(self.q, uri=conf['DB_URI'])
self.source.parent_greenlet = self
self.q = Queue()
self.model = EventModel(self.conf['DB_URI'])
self.ticker = Timer(int(self.conf['EVENT_TICK_SECS']) * 1000, self.q)
def add(self, timespec, audiospec):
'''
this is somewhat recursive: after completion calls reload_id, which
could call this method again
'''
def _alarm_missing_time(self, timespec):
now = datetime.now() + timedelta(seconds=self.conf['CACHING_TIME'])
try:
when = next(timegenerate(timespec, now=now))
@ -112,59 +92,79 @@ class Monitor(ParentedLet):
"an alarm from timespec {}".format(timespec))
if when is None:
# expired
return
delta = when - now
assert delta.total_seconds() > 0
self.log.info('Timer<{}> will run after {} seconds, triggering <{}>'.format(
timespec.get('nick', timespec.eid),
int(delta.total_seconds()),
audiospec.get('nick', audiospec.eid)
))
return None
delta = (when - now).total_seconds()
assert delta > 0
return delta
audiogen = gevent.spawn_later(delta.total_seconds(), audiogenerate,
audiospec)
def on_tick(self):
'''
this is called every EVENT_TICK_SECS.
Checks every event in the DB (which might be slightly CPU-intensive, so
it is advisable to run it in its own greenlet); if the event is "near
enough", schedule it; if it is too far, or already expired, ignore it.
'''
for alarm, action in self.model.get_all_alarms_expanded():
if alarm.eid in self.running:
continue
delta = self._alarm_missing_time(alarm)
# why this 2*EVENT_TICK_SECS? EVENT_TICK_SECS would be enough,
# but it is "tricky"; any small delay would cause the event to be
# missed
if delta is not None and delta <= 2*self.conf['EVENT_TICK_SECS']:
self.log.debug('Scheduling event {} ({}s)'
.format(alarm.get('nick', alarm.eid),
delta))
self.schedule(alarm, action, delta)
else:
self.log.debug('Skipping event {}, too far ({}s)'
.format(alarm.get('nick', alarm.eid),
delta))
def schedule(self, timespec, audiospec, delta=None):
'''
prepare an event to be run at a specified time with a specified action;
the DB won't be read anymore after this call.
This means that this call should not be done too early, or any update
to the DB will be ignored.
'''
if delta is None:
delta = self._alarm_missing_time(timespec)
audiogen = gevent.spawn_later(delta,
self.process_action,
timespec, audiospec)
audiogen.parent_greenlet = self
audiogen.doc = 'Will wait {} seconds, then generate audio "{}"'.format(
delta.total_seconds(),
delta,
audiospec.get('nick', ''))
self.running[timespec.eid] = {
'greenlet': audiogen,
'running_time': datetime.now() + timedelta(
seconds=delta.total_seconds()),
'running_time': datetime.now() + timedelta(seconds=delta),
'audiospec': audiospec
}
gl = gevent.spawn_later(delta.total_seconds(),
self.source.reload_id,
timespec.eid)
gl.parent_greenlet = self
gl.doc = 'Will wait, then reload id {}'.format(timespec.eid)
# FIXME: audiogen is ready in a moment between
# exact_time - CACHING_TIME and the exact_time
# atm we are just ignoring this "window", saying that any moment is
# fine
# the more correct thing will be to wait till that exact moment
# adding another spawn_later
audiogen.link_value(lambda g: self.log.info(
'should play %s' % str(g.value)))
audiogen.link_exception(lambda g: self.log.exception(
'Failure in audiogen {}: {}'.format(audiospec, audiogen.exception)))
audiogen.link_value(lambda g:
self.send_to_parent('add',
dict(uris=g.value,
audiospec=audiospec,
aid=audiospec.eid
))
)
def process_action(self, timespec, audiospec):
'''Generate audio and submit it to Controller'''
if timespec.eid in self.running:
del self.running[timespec.eid]
else:
self.log.warn('Timespec {} completed but not in running '
'registry; this is most likely a bug'.
format(timespec.get('nick', timespec.eid)))
uris = audiogenerate(audiospec)
self.send_to_parent('add', dict(uris=uris,
audiospec=audiospec,
aid=audiospec.eid))
def _run(self):
self.source.start()
self.ticker.start()
gevent.spawn(self.on_tick)
while True:
value = self.q.get()
self.log.debug('<- %s' % str(value))
kind = value['kind']
if kind == 'add':
self.add(value['timespec'], value['audiospec'])
elif kind == 'remove':
raise NotImplementedError()
if kind == 'timer':
gevent.spawn(self.on_tick)
else:
self.log.warning("Unknown message: %s" % str(value))

View file

@ -36,7 +36,7 @@ class ParentedLet(gevent.Greenlet):
class Timer(ParentedLet):
'''wait some time, then send a "timer" message to parent'''
'''continously sleeps some time, then send a "timer" message to parent'''
def __init__(self, milliseconds, queue):
ParentedLet.__init__(self, queue)
self.ms = milliseconds