Merge pull request #42 from alberanid/master

basic triggers
This commit is contained in:
Davide Alberani 2015-04-18 15:03:30 +02:00
commit d8742c8c63
5 changed files with 147 additions and 7 deletions

2
.gitignore vendored
View file

@ -1,3 +1,5 @@
data/triggers/*.d
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]

View file

@ -0,0 +1,4 @@
Collection of useful triggers.
Link them in the appropriate directory, if neeed.

19
data/triggers-available/echo.py Executable file
View file

@ -0,0 +1,19 @@
#!/usr/bin/env python
"""echo.py - Simply echo the environment and the stdin."""
import os
import sys
import json
def main():
print 'STDIN JSON:', json.loads(sys.stdin.read())
print ''
print 'ENV:', os.environ
if __name__ == '__main__':
try:
main()
except Exception, e:
print 'echo.py error: %s' % e
pass

11
data/triggers/README.txt Normal file
View file

@ -0,0 +1,11 @@
Directory for scripts that will be executed when a given action is encoutered.
You have to put your scripts into {action}.d subdirectories; the scripts must be executable.
Valid actions:
- PUT /events/:event_id/persons/:person_id
- update_person_in_event (update_person_in_event.d): called for each update
- attends (attends.d): only called when a person is attending at an event

View file

@ -18,17 +18,24 @@ limitations under the License.
""" """
import os import os
import glob
import json
import logging
import datetime
import tornado.httpserver import tornado.httpserver
import tornado.ioloop import tornado.ioloop
import tornado.options import tornado.options
from tornado.options import define, options from tornado.options import define, options
import tornado.web import tornado.web
from tornado import gen, escape from tornado import gen, escape, process
import utils import utils
import backend import backend
ENCODING = 'utf8'
PROCESS_TIMEOUT = 60
class BaseHandler(tornado.web.RequestHandler): class BaseHandler(tornado.web.RequestHandler):
"""Base class for request handlers.""" """Base class for request handlers."""
@ -98,6 +105,18 @@ class CollectionHandler(BaseHandler):
arguments = property(lambda self: dict([(k, v[0]) arguments = property(lambda self: dict([(k, v[0])
for k, v in self.request.arguments.iteritems()])) for k, v in self.request.arguments.iteritems()]))
def _dict2env(self, data):
"""Convert a dictionary into a form suitable to be passed as environment variables."""
ret = {}
for key, value in data.iteritems():
if isinstance(value, (list, tuple, dict)):
continue
try:
ret[key.upper()] = unicode(value).encode(ENCODING)
except:
continue
return ret
@gen.coroutine @gen.coroutine
def get(self, id_=None, resource=None, resource_id=None, **kwargs): def get(self, id_=None, resource=None, resource_id=None, **kwargs):
if resource: if resource:
@ -146,6 +165,68 @@ class CollectionHandler(BaseHandler):
self.db.delete(self.collection, id_) self.db.delete(self.collection, id_)
self.write({'success': True}) self.write({'success': True})
def on_timeout(self, cmd, pipe):
"""Kill a process that is taking too long to complete."""
logging.debug('cmd %s is taking too long: killing it' % ' '.join(cmd))
try:
pipe.proc.kill()
except:
pass
def on_exit(self, returncode, cmd, pipe):
"""Callback executed when a subprocess execution is over."""
self.ioloop.remove_timeout(self.timeout)
logging.debug('cmd: %s returncode: %d' % (' '.join(cmd), returncode))
@gen.coroutine
def run_subprocess(self, cmd, stdin_data=None, env=None):
"""Execute the given action.
:param cmd: the command to be run with its command line arguments
:type cmd: list
:param stdin_data: data to be sent over stdin
:type stdin_data: str
:param env: environment of the process
:type env: dict
"""
self.ioloop = tornado.ioloop.IOLoop.instance()
p = process.Subprocess(cmd, close_fds=True, stdin=process.Subprocess.STREAM,
stdout=process.Subprocess.STREAM, stderr=process.Subprocess.STREAM, env=env)
p.set_exit_callback(lambda returncode: self.on_exit(returncode, cmd, p))
self.timeout = self.ioloop.add_timeout(datetime.timedelta(seconds=PROCESS_TIMEOUT),
lambda: self.on_timeout(cmd, p))
yield gen.Task(p.stdin.write, stdin_data or '')
p.stdin.close()
out, err = yield [gen.Task(p.stdout.read_until_close),
gen.Task(p.stderr.read_until_close)]
logging.debug('cmd: %s' % ' '.join(cmd))
logging.debug('cmd stdout: %s' % out)
logging.debug('cmd strerr: %s' % err)
raise gen.Return((out, err))
@gen.coroutine
def run_triggers(self, action, stdin_data=None, env=None):
"""Asynchronously execute triggers for the given action.
:param action: action name; scripts in directory ./data/triggers/{action}.d will be run
:type action: str
:param stdin_data: a python dictionary that will be serialized in JSON and sent to the process over stdin
:type stdin_data: dict
:param env: environment of the process
:type stdin_data: dict
"""
logging.debug('running triggers for action "%s"' % action)
stdin_data = stdin_data or {}
try:
stdin_data = json.dumps(stdin_data)
except:
stdin_data = '{}'
for script in glob.glob(os.path.join(self.data_dir, 'triggers', '%s.d' % action, '*')):
if not (os.path.isfile(script) and os.access(script, os.X_OK)):
continue
out, err = yield gen.Task(self.run_subprocess, [script], stdin_data, env)
class PersonsHandler(CollectionHandler): class PersonsHandler(CollectionHandler):
"""Handle requests for Persons.""" """Handle requests for Persons."""
@ -186,17 +267,19 @@ class EventsHandler(CollectionHandler):
collection = 'events' collection = 'events'
object_id = 'event_id' object_id = 'event_id'
def _get_person_data(self, person_id, persons):
for person in persons:
if str(person.get('person_id')) == person_id:
return person
return {}
def handle_get_persons(self, id_, resource_id=None): def handle_get_persons(self, id_, resource_id=None):
# Return every person registered at this event, or the information # Return every person registered at this event, or the information
# about a specific person. # about a specific person.
query = {'_id': id_} query = {'_id': id_}
event = self.db.query('events', query)[0] event = self.db.query('events', query)[0]
if resource_id: if resource_id:
for person in event.get('persons', []): return {'person': self._get_person_data(resource_id, event.get('persons') or [])}
if str(person.get('person_id')) == resource_id:
return {'person': person}
if resource_id:
return {'person': {}}
persons = self._filter_results(event.get('persons') or [], self.arguments) persons = self._filter_results(event.get('persons') or [], self.arguments)
return {'persons': persons} return {'persons': persons}
@ -220,8 +303,25 @@ class EventsHandler(CollectionHandler):
query['_id'] = id_ query['_id'] = id_
if person_id is not None: if person_id is not None:
query['persons.person_id'] = person_id query['persons.person_id'] = person_id
old_person_data = {}
current_event = self.db.query(self.collection, query)
if current_event:
current_event = current_event[0]
old_person_data = self._get_person_data(person_id, current_event.get('persons') or [])
merged, doc = self.db.update('events', query, merged, doc = self.db.update('events', query,
data, updateList='persons', create=False) data, updateList='persons', create=False)
new_person_data = self._get_person_data(person_id, doc.get('persons') or [])
env = self._dict2env(new_person_data)
env.update({'PERSON_ID': person_id, 'EVENT_ID': id_, 'EVENT_TITLE': doc.get('title', '')})
stdin_data = {'old': old_person_data,
'new': new_person_data,
'event': doc,
'merged': merged
}
self.run_triggers('update_person_in_event', stdin_data=stdin_data, env=env)
if old_person_data and old_person_data.get('attended') != new_person_data.get('attended') \
and new_person_data.get('attended'):
self.run_triggers('attends', stdin_data=stdin_data, env=env)
return {'event': doc} return {'event': doc}
def handle_delete_persons(self, id_, person_id): def handle_delete_persons(self, id_, person_id):
@ -311,9 +411,13 @@ def run():
callback=lambda path: tornado.options.parse_config_file(path, final=False)) callback=lambda path: tornado.options.parse_config_file(path, final=False))
tornado.options.parse_command_line() tornado.options.parse_command_line()
if options.debug:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# database backend connector # database backend connector
db_connector = backend.EventManDB(url=options.mongodbURL, dbName=options.dbName) db_connector = backend.EventManDB(url=options.mongodbURL, dbName=options.dbName)
init_params = dict(db=db_connector) init_params = dict(db=db_connector, data_dir=options.data)
application = tornado.web.Application([ application = tornado.web.Application([
(r"/persons/?(?P<id_>\w+)?/?(?P<resource>\w+)?/?(?P<resource_id>\w+)?", PersonsHandler, init_params), (r"/persons/?(?P<id_>\w+)?/?(?P<resource>\w+)?/?(?P<resource_id>\w+)?", PersonsHandler, init_params),