From 0c2ac7bcf2b81aca785dc14afc1734101ec6f89a Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Thu, 16 Apr 2015 00:06:01 +0200 Subject: [PATCH 01/10] draft of async runner of commands --- eventman_server.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/eventman_server.py b/eventman_server.py index af6fe60..b5a0749 100755 --- a/eventman_server.py +++ b/eventman_server.py @@ -18,6 +18,7 @@ limitations under the License. """ import os +import subprocess import tornado.httpserver import tornado.ioloop @@ -72,6 +73,24 @@ class CollectionHandler(BaseHandler): # set of documents we're managing (a collection in MongoDB or a table in a SQL database) collection = None + def run_command(self, cmd, callback=None): + self.ioloop = tornado.ioloop.IOLoop.instance() + p = subprocess.Popen(cmd, close_fds=True, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + import datetime + self.tm = self.ioloop.add_timeout(datetime.timedelta(seconds=3), lambda: self.timeout(callback)) + self.ioloop.add_handler(p.stdout.fileno(), + self.async_callback(self.on_response, callback, p), self.ioloop.READ) + + def timeout(self, callback, *args): + callback((None, 'aaaaazzzz')) + + def on_response(self, callback, pipe, fd, events): + self.ioloop.remove_timeout(self.tm) + stdoutdata, stderrdata = pipe.communicate() + callback((pipe.returncode, stdoutdata)) + self.ioloop.remove_handler(fd) + def _filter_results(self, results, params): """Filter a list using keys and values from a dictionary. @@ -147,6 +166,17 @@ class CollectionHandler(BaseHandler): self.write({'success': True}) + +class TestHandler(CollectionHandler): + + @tornado.web.asynchronous + @gen.engine + def get(self): + #ret, resp = yield gen.Task(self.run_command, ['echo', str(self.arguments)]) + ret, resp = yield gen.Task(self.run_command, ['sleep', '10']) + self.write('ok: %s: %s\n' % (ret, resp)) + self.finish() + class PersonsHandler(CollectionHandler): """Handle requests for Persons.""" collection = 'persons' @@ -320,6 +350,8 @@ def run(): (r"/events/?(?P\w+)?/?(?P\w+)?/?(?P\w+)?", EventsHandler, init_params), (r"/(?:index.html)?", RootHandler, init_params), (r"/ebcsvpersons", EbCSVImportPersonsHandler, init_params), + + (r"/test", TestHandler, init_params), (r'/(.*)', tornado.web.StaticFileHandler, {"path": "angular_app"}) ], template_path=os.path.join(os.path.dirname(__file__), "templates"), From ef7732758a6a81def27fc8d14628847a9f9d632e Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Fri, 17 Apr 2015 00:17:36 +0200 Subject: [PATCH 02/10] basic timeout and some test --- eventman_server.py | 40 ++++++++++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/eventman_server.py b/eventman_server.py index b5a0749..d6b5af7 100755 --- a/eventman_server.py +++ b/eventman_server.py @@ -18,6 +18,7 @@ limitations under the License. """ import os +import glob import subprocess import tornado.httpserver @@ -78,12 +79,13 @@ class CollectionHandler(BaseHandler): p = subprocess.Popen(cmd, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) import datetime - self.tm = self.ioloop.add_timeout(datetime.timedelta(seconds=3), lambda: self.timeout(callback)) + self.tm = self.ioloop.add_timeout(datetime.timedelta(seconds=3), lambda: self.timeout(callback, p)) self.ioloop.add_handler(p.stdout.fileno(), self.async_callback(self.on_response, callback, p), self.ioloop.READ) - def timeout(self, callback, *args): - callback((None, 'aaaaazzzz')) + def timeout(self, callback, pipe, *args): + pipe.kill() + callback((pipe, 'killed process')) def on_response(self, callback, pipe, fd, events): self.ioloop.remove_timeout(self.tm) @@ -165,12 +167,37 @@ class CollectionHandler(BaseHandler): self.db.delete(self.collection, id_) self.write({'success': True}) + def run(self, cmd): + p = subprocess.Popen([cmd],) + + @gen.coroutine + def run_triggers(self, action): + for script in glob.glob(os.path.join(self.data_dir, 'triggers', '%s.d' % action, '*')): + if not (os.path.isfile(script) and os.access(script, os.X_OK)): + continue + print script + #self.run_command(script) + ret, resp = yield gen.Task(self.run_command, [script]) + print ret, resp + +class TestHandler2(CollectionHandler): + def get(self): + self.run_triggers('attends') + +def action(action): + def action_decorator(funct): + def funct_wrapper(*args, **kwrds): + result = funct(*args, **kwrds) + print 'aaaa', args, kwrds + return result + return funct_wrapper + return action_decorator class TestHandler(CollectionHandler): - @tornado.web.asynchronous - @gen.engine + @gen.coroutine + @action('salta') def get(self): #ret, resp = yield gen.Task(self.run_command, ['echo', str(self.arguments)]) ret, resp = yield gen.Task(self.run_command, ['sleep', '10']) @@ -343,7 +370,7 @@ def run(): # database backend connector db_connector = backend.EventManDB(url=options.mongodbURL, dbName=options.dbName) - init_params = dict(db=db_connector) + init_params = dict(db=db_connector, data_dir=options.data) application = tornado.web.Application([ (r"/persons/?(?P\w+)?/?(?P\w+)?/?(?P\w+)?", PersonsHandler, init_params), @@ -352,6 +379,7 @@ def run(): (r"/ebcsvpersons", EbCSVImportPersonsHandler, init_params), (r"/test", TestHandler, init_params), + (r"/test2", TestHandler2, init_params), (r'/(.*)', tornado.web.StaticFileHandler, {"path": "angular_app"}) ], template_path=os.path.join(os.path.dirname(__file__), "templates"), From b3aa4c79d515312a913abec09db040671d0fbce7 Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Fri, 17 Apr 2015 20:31:50 +0200 Subject: [PATCH 03/10] closes #20: run a series of script reacting to a trigger --- data/triggers/README.txt | 7 ++++ eventman_server.py | 84 ++++++++++++++++------------------------ 2 files changed, 41 insertions(+), 50 deletions(-) create mode 100644 data/triggers/README.txt diff --git a/data/triggers/README.txt b/data/triggers/README.txt new file mode 100644 index 0000000..5232a7b --- /dev/null +++ b/data/triggers/README.txt @@ -0,0 +1,7 @@ +Directory for scripts that will be executed when a given action is encoutered. + +You have to put your scripts into {action}.d subdirectories; the scripts must be executable. + +Valid actions: +- attends (attends.d): a person is attending at an event + diff --git a/eventman_server.py b/eventman_server.py index d6b5af7..dd5c02c 100755 --- a/eventman_server.py +++ b/eventman_server.py @@ -19,6 +19,7 @@ limitations under the License. import os import glob +import datetime import subprocess import tornado.httpserver @@ -31,6 +32,8 @@ from tornado import gen, escape import utils import backend +PROCESS_TIMEOUT = 60 + class BaseHandler(tornado.web.RequestHandler): """Base class for request handlers.""" @@ -74,25 +77,6 @@ class CollectionHandler(BaseHandler): # set of documents we're managing (a collection in MongoDB or a table in a SQL database) collection = None - def run_command(self, cmd, callback=None): - self.ioloop = tornado.ioloop.IOLoop.instance() - p = subprocess.Popen(cmd, close_fds=True, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - import datetime - self.tm = self.ioloop.add_timeout(datetime.timedelta(seconds=3), lambda: self.timeout(callback, p)) - self.ioloop.add_handler(p.stdout.fileno(), - self.async_callback(self.on_response, callback, p), self.ioloop.READ) - - def timeout(self, callback, pipe, *args): - pipe.kill() - callback((pipe, 'killed process')) - - def on_response(self, callback, pipe, fd, events): - self.ioloop.remove_timeout(self.tm) - stdoutdata, stderrdata = pipe.communicate() - callback((pipe.returncode, stdoutdata)) - self.ioloop.remove_handler(fd) - def _filter_results(self, results, params): """Filter a list using keys and values from a dictionary. @@ -167,42 +151,45 @@ class CollectionHandler(BaseHandler): self.db.delete(self.collection, id_) self.write({'success': True}) - def run(self, cmd): - p = subprocess.Popen([cmd],) + def run_command(self, cmd, callback=None): + """Execute the given action. + + :param cmd: the command to be run with its command line arguments + :type cmd: list + """ + self.ioloop = tornado.ioloop.IOLoop.instance() + p = subprocess.Popen(cmd, close_fds=True, + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + self.ioloop.add_handler(p.stdout.fileno(), + self.async_callback(self.on_response, callback, p), self.ioloop.READ) + # also set a timeout + self.timeout = self.ioloop.add_timeout(datetime.timedelta(seconds=PROCESS_TIMEOUT), + lambda: self.on_timeout(callback, p)) + + def on_timeout(self, callback, pipe, *args): + """Kill a process that is taking too long to complete.""" + pipe.kill() + callback((None, 'killed process pid:%d' % pipe.pid)) + + def on_response(self, callback, pipe, fd, events): + """Handle the execution of a script.""" + self.ioloop.remove_timeout(self.timeout) + stdoutdata, stderrdata = pipe.communicate() + callback((pipe.returncode, stdoutdata)) + self.ioloop.remove_handler(fd) @gen.coroutine def run_triggers(self, action): + """Asynchronously execute triggers for the given action. + + :param action: action name; scripts in directory ./data/triggers/{action}.d will be run + :type action: str + """ for script in glob.glob(os.path.join(self.data_dir, 'triggers', '%s.d' % action, '*')): if not (os.path.isfile(script) and os.access(script, os.X_OK)): continue - print script - #self.run_command(script) ret, resp = yield gen.Task(self.run_command, [script]) - print ret, resp -class TestHandler2(CollectionHandler): - def get(self): - self.run_triggers('attends') - -def action(action): - def action_decorator(funct): - def funct_wrapper(*args, **kwrds): - result = funct(*args, **kwrds) - print 'aaaa', args, kwrds - return result - return funct_wrapper - return action_decorator - - -class TestHandler(CollectionHandler): - - @gen.coroutine - @action('salta') - def get(self): - #ret, resp = yield gen.Task(self.run_command, ['echo', str(self.arguments)]) - ret, resp = yield gen.Task(self.run_command, ['sleep', '10']) - self.write('ok: %s: %s\n' % (ret, resp)) - self.finish() class PersonsHandler(CollectionHandler): """Handle requests for Persons.""" @@ -377,9 +364,6 @@ def run(): (r"/events/?(?P\w+)?/?(?P\w+)?/?(?P\w+)?", EventsHandler, init_params), (r"/(?:index.html)?", RootHandler, init_params), (r"/ebcsvpersons", EbCSVImportPersonsHandler, init_params), - - (r"/test", TestHandler, init_params), - (r"/test2", TestHandler2, init_params), (r'/(.*)', tornado.web.StaticFileHandler, {"path": "angular_app"}) ], template_path=os.path.join(os.path.dirname(__file__), "templates"), From 7903230ca1b707bc10573870e9b69fa5019aefb0 Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Fri, 17 Apr 2015 20:36:01 +0200 Subject: [PATCH 04/10] ignore trigger directories --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index ba74660..8c99907 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +data/triggers/*.d + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] From c71e996c385ff218a3e1368e19ccd804c4f53ad7 Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Sat, 18 Apr 2015 12:53:08 +0200 Subject: [PATCH 05/10] switch to tornado.process.Subprocess call for subproprocess execution --- eventman_server.py | 85 +++++++++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 27 deletions(-) diff --git a/eventman_server.py b/eventman_server.py index dd5c02c..15fc530 100755 --- a/eventman_server.py +++ b/eventman_server.py @@ -19,6 +19,7 @@ limitations under the License. import os import glob +import json import datetime import subprocess @@ -27,7 +28,7 @@ import tornado.ioloop import tornado.options from tornado.options import define, options import tornado.web -from tornado import gen, escape +from tornado import gen, escape, process import utils import backend @@ -151,44 +152,58 @@ class CollectionHandler(BaseHandler): self.db.delete(self.collection, id_) self.write({'success': True}) - def run_command(self, cmd, callback=None): + def on_timeout(self, pipe): + """Kill a process that is taking too long to complete.""" + try: + pipe.proc.kill() + except: + pass + + def on_exit(self, returncode, cmd, pipe): + """Callback executed when a subprocess execution is over.""" + self.ioloop.remove_timeout(self.timeout) + + @gen.coroutine + def run_subprocess(self, cmd, stdin_data=None): """Execute the given action. :param cmd: the command to be run with its command line arguments :type cmd: list + + :param stdin_data: data to be sent over stdin + :type stdin_data: str """ self.ioloop = tornado.ioloop.IOLoop.instance() - p = subprocess.Popen(cmd, close_fds=True, - stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - self.ioloop.add_handler(p.stdout.fileno(), - self.async_callback(self.on_response, callback, p), self.ioloop.READ) - # also set a timeout + p = process.Subprocess(cmd, close_fds=True, stdin=process.Subprocess.STREAM, + stdout=process.Subprocess.STREAM, stderr=process.Subprocess.STREAM) + p.set_exit_callback(lambda returncode: self.on_exit(returncode, cmd, p)) self.timeout = self.ioloop.add_timeout(datetime.timedelta(seconds=PROCESS_TIMEOUT), - lambda: self.on_timeout(callback, p)) - - def on_timeout(self, callback, pipe, *args): - """Kill a process that is taking too long to complete.""" - pipe.kill() - callback((None, 'killed process pid:%d' % pipe.pid)) - - def on_response(self, callback, pipe, fd, events): - """Handle the execution of a script.""" - self.ioloop.remove_timeout(self.timeout) - stdoutdata, stderrdata = pipe.communicate() - callback((pipe.returncode, stdoutdata)) - self.ioloop.remove_handler(fd) + lambda: self.on_timeout(p)) + yield gen.Task(p.stdin.write, stdin_data or '') + p.stdin.close() + out, err = yield [gen.Task(p.stdout.read_until_close), + gen.Task(p.stderr.read_until_close)] + raise gen.Return((out, err)) @gen.coroutine - def run_triggers(self, action): + def run_triggers(self, action, stdin_data=None): """Asynchronously execute triggers for the given action. :param action: action name; scripts in directory ./data/triggers/{action}.d will be run :type action: str + + :param stdin_data: a python dictionary that will be serialized in JSON and sent to the process over stdin + :type stdin_data: dict """ + stdin_data = stdin_data or {} + try: + stdin_data = json.dumps(stdin_data) + except: + stdin_data = '{}' for script in glob.glob(os.path.join(self.data_dir, 'triggers', '%s.d' % action, '*')): if not (os.path.isfile(script) and os.access(script, os.X_OK)): continue - ret, resp = yield gen.Task(self.run_command, [script]) + out, err = yield gen.Task(self.run_subprocess, [script], stdin_data) class PersonsHandler(CollectionHandler): @@ -230,17 +245,19 @@ class EventsHandler(CollectionHandler): collection = 'events' object_id = 'event_id' + def _get_person_data(self, person_id, persons): + for person in persons: + if str(person.get('person_id')) == person_id: + return person + return {} + def handle_get_persons(self, id_, resource_id=None): # Return every person registered at this event, or the information # about a specific person. query = {'_id': id_} event = self.db.query('events', query)[0] if resource_id: - for person in event.get('persons', []): - if str(person.get('person_id')) == resource_id: - return {'person': person} - if resource_id: - return {'person': {}} + return {'person': self._get_person_data(resource_id, event.get('persons') or [])} persons = self._filter_results(event.get('persons') or [], self.arguments) return {'persons': persons} @@ -264,8 +281,22 @@ class EventsHandler(CollectionHandler): query['_id'] = id_ if person_id is not None: query['persons.person_id'] = person_id + old_person_data = {} + current_event = self.db.query(self.collection, query) + if current_event: + current_event = current_event[0] + old_person_data = self._get_person_data(person_id, current_event.get('persons') or []) merged, doc = self.db.update('events', query, data, updateList='persons', create=False) + new_person_data = self._get_person_data(person_id, doc.get('persons') or []) + #if old_person_data and old_person_data.get('attended') != new_person_data.get('attended') \ + # and new_person_data.get('attended'): + self.run_triggers('update_person_in_event', { + 'old': old_person_data, + 'new': new_person_data, + 'event': doc, + 'merged': merged + }) return {'event': doc} def handle_delete_persons(self, id_, person_id): From c966587821d86747daccf8b41a89e293ab026937 Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Sat, 18 Apr 2015 14:27:02 +0200 Subject: [PATCH 06/10] environment variables for triggers --- eventman_server.py | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/eventman_server.py b/eventman_server.py index 15fc530..9d2f4f5 100755 --- a/eventman_server.py +++ b/eventman_server.py @@ -21,7 +21,6 @@ import os import glob import json import datetime -import subprocess import tornado.httpserver import tornado.ioloop @@ -33,6 +32,7 @@ from tornado import gen, escape, process import utils import backend +ENCODING = 'utf8' PROCESS_TIMEOUT = 60 @@ -104,6 +104,18 @@ class CollectionHandler(BaseHandler): arguments = property(lambda self: dict([(k, v[0]) for k, v in self.request.arguments.iteritems()])) + def _dict2env(self, data): + """Convert a dictionary into a form suitable to be passed as environment variables.""" + ret = {} + for key, value in data.iteritems(): + if isinstance(value, (list, tuple, dict)): + continue + try: + ret[key.upper()] = unicode(value).encode(ENCODING) + except: + continue + return ret + @gen.coroutine def get(self, id_=None, resource=None, resource_id=None, **kwargs): if resource: @@ -164,7 +176,7 @@ class CollectionHandler(BaseHandler): self.ioloop.remove_timeout(self.timeout) @gen.coroutine - def run_subprocess(self, cmd, stdin_data=None): + def run_subprocess(self, cmd, stdin_data=None, env=None): """Execute the given action. :param cmd: the command to be run with its command line arguments @@ -172,10 +184,12 @@ class CollectionHandler(BaseHandler): :param stdin_data: data to be sent over stdin :type stdin_data: str + :param env: environment of the process + :type env: dict """ self.ioloop = tornado.ioloop.IOLoop.instance() p = process.Subprocess(cmd, close_fds=True, stdin=process.Subprocess.STREAM, - stdout=process.Subprocess.STREAM, stderr=process.Subprocess.STREAM) + stdout=process.Subprocess.STREAM, stderr=process.Subprocess.STREAM, env=env) p.set_exit_callback(lambda returncode: self.on_exit(returncode, cmd, p)) self.timeout = self.ioloop.add_timeout(datetime.timedelta(seconds=PROCESS_TIMEOUT), lambda: self.on_timeout(p)) @@ -183,17 +197,19 @@ class CollectionHandler(BaseHandler): p.stdin.close() out, err = yield [gen.Task(p.stdout.read_until_close), gen.Task(p.stderr.read_until_close)] + print 'out', out raise gen.Return((out, err)) @gen.coroutine - def run_triggers(self, action, stdin_data=None): + def run_triggers(self, action, stdin_data=None, env=None): """Asynchronously execute triggers for the given action. :param action: action name; scripts in directory ./data/triggers/{action}.d will be run :type action: str - :param stdin_data: a python dictionary that will be serialized in JSON and sent to the process over stdin :type stdin_data: dict + :param env: environment of the process + :type stdin_data: dict """ stdin_data = stdin_data or {} try: @@ -203,7 +219,7 @@ class CollectionHandler(BaseHandler): for script in glob.glob(os.path.join(self.data_dir, 'triggers', '%s.d' % action, '*')): if not (os.path.isfile(script) and os.access(script, os.X_OK)): continue - out, err = yield gen.Task(self.run_subprocess, [script], stdin_data) + out, err = yield gen.Task(self.run_subprocess, [script], stdin_data, env) class PersonsHandler(CollectionHandler): @@ -289,14 +305,17 @@ class EventsHandler(CollectionHandler): merged, doc = self.db.update('events', query, data, updateList='persons', create=False) new_person_data = self._get_person_data(person_id, doc.get('persons') or []) - #if old_person_data and old_person_data.get('attended') != new_person_data.get('attended') \ - # and new_person_data.get('attended'): - self.run_triggers('update_person_in_event', { - 'old': old_person_data, + env = self._dict2env(new_person_data) + env.update({'PERSON_ID': person_id, 'EVENT_ID': id_, 'EVENT_TITLE': doc.get('title', '')}) + stdin_data = {'old': old_person_data, 'new': new_person_data, 'event': doc, 'merged': merged - }) + } + self.run_triggers('update_person_in_event', stdin_data=stdin_data, env=env) + if old_person_data and old_person_data.get('attended') != new_person_data.get('attended') \ + and new_person_data.get('attended'): + self.run_triggers('attends', stdin_data=stdin_data, env=env) return {'event': doc} def handle_delete_persons(self, id_, person_id): From 9f6e1a4b3780bd40e3a5609f0e7a41a5ea63753f Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Sat, 18 Apr 2015 14:34:31 +0200 Subject: [PATCH 07/10] collection of triggers --- data/triggers-available/README.txt | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 data/triggers-available/README.txt diff --git a/data/triggers-available/README.txt b/data/triggers-available/README.txt new file mode 100644 index 0000000..be9312f --- /dev/null +++ b/data/triggers-available/README.txt @@ -0,0 +1,4 @@ +Collection of useful triggers. + +Link them in the appropriate directory, if neeed. + From fa411d04ba9f989b55346d227621b84dc6e899e5 Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Sat, 18 Apr 2015 15:01:30 +0200 Subject: [PATCH 08/10] debugging --- eventman_server.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/eventman_server.py b/eventman_server.py index 9d2f4f5..dc12ea7 100755 --- a/eventman_server.py +++ b/eventman_server.py @@ -20,6 +20,7 @@ limitations under the License. import os import glob import json +import logging import datetime import tornado.httpserver @@ -164,8 +165,9 @@ class CollectionHandler(BaseHandler): self.db.delete(self.collection, id_) self.write({'success': True}) - def on_timeout(self, pipe): + def on_timeout(self, cmd, pipe): """Kill a process that is taking too long to complete.""" + logging.debug('cmd %s is taking too long: killing it' % ' '.join(cmd)) try: pipe.proc.kill() except: @@ -174,6 +176,7 @@ class CollectionHandler(BaseHandler): def on_exit(self, returncode, cmd, pipe): """Callback executed when a subprocess execution is over.""" self.ioloop.remove_timeout(self.timeout) + logging.debug('cmd: %s returncode: %d' % (' '.join(cmd), returncode)) @gen.coroutine def run_subprocess(self, cmd, stdin_data=None, env=None): @@ -192,12 +195,14 @@ class CollectionHandler(BaseHandler): stdout=process.Subprocess.STREAM, stderr=process.Subprocess.STREAM, env=env) p.set_exit_callback(lambda returncode: self.on_exit(returncode, cmd, p)) self.timeout = self.ioloop.add_timeout(datetime.timedelta(seconds=PROCESS_TIMEOUT), - lambda: self.on_timeout(p)) + lambda: self.on_timeout(cmd, p)) yield gen.Task(p.stdin.write, stdin_data or '') p.stdin.close() out, err = yield [gen.Task(p.stdout.read_until_close), gen.Task(p.stderr.read_until_close)] - print 'out', out + logging.debug('cmd: %s' % ' '.join(cmd)) + logging.debug('cmd stdout: %s' % out) + logging.debug('cmd strerr: %s' % err) raise gen.Return((out, err)) @gen.coroutine @@ -211,6 +216,7 @@ class CollectionHandler(BaseHandler): :param env: environment of the process :type stdin_data: dict """ + logging.debug('running triggers for action "%s"' % action) stdin_data = stdin_data or {} try: stdin_data = json.dumps(stdin_data) @@ -405,6 +411,10 @@ def run(): callback=lambda path: tornado.options.parse_config_file(path, final=False)) tornado.options.parse_command_line() + if options.debug: + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + # database backend connector db_connector = backend.EventManDB(url=options.mongodbURL, dbName=options.dbName) init_params = dict(db=db_connector, data_dir=options.data) From 331d01dab1f3d121cb1da187d55752fc5283b8ea Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Sat, 18 Apr 2015 15:01:40 +0200 Subject: [PATCH 09/10] documentation --- data/triggers/README.txt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/data/triggers/README.txt b/data/triggers/README.txt index 5232a7b..8f7dabc 100644 --- a/data/triggers/README.txt +++ b/data/triggers/README.txt @@ -3,5 +3,9 @@ Directory for scripts that will be executed when a given action is encoutered. You have to put your scripts into {action}.d subdirectories; the scripts must be executable. Valid actions: -- attends (attends.d): a person is attending at an event + +- PUT /events/:event_id/persons/:person_id + - update_person_in_event (update_person_in_event.d): called for each update + - attends (attends.d): only called when a person is attending at an event + From 75c83067d63708759f5f27fc65bd94a343f94b53 Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Sat, 18 Apr 2015 15:02:20 +0200 Subject: [PATCH 10/10] trigger example --- data/triggers-available/echo.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100755 data/triggers-available/echo.py diff --git a/data/triggers-available/echo.py b/data/triggers-available/echo.py new file mode 100755 index 0000000..454f6ae --- /dev/null +++ b/data/triggers-available/echo.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +"""echo.py - Simply echo the environment and the stdin.""" + +import os +import sys +import json + +def main(): + print 'STDIN JSON:', json.loads(sys.stdin.read()) + print '' + print 'ENV:', os.environ + + +if __name__ == '__main__': + try: + main() + except Exception, e: + print 'echo.py error: %s' % e + pass