#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Diffido - because the F5 key is a terrible thing to waste. Copyright 2018 Davide Alberani Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import os import re import json import pytz import shutil import urllib import smtplib from email.mime.text import MIMEText import logging import datetime import requests import subprocess import multiprocessing from lxml import etree from xml.etree import ElementTree from tornado.ioloop import IOLoop from apscheduler.triggers.cron import CronTrigger from apscheduler.schedulers.tornado import TornadoScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore import tornado.httpserver import tornado.ioloop import tornado.options from tornado.options import define, options import tornado.web from tornado import gen, escape JOBS_STORE = 'sqlite:///conf/jobs.db' API_VERSION = '1.0' SCHEDULES_FILE = 'conf/schedules.json' DEFAULT_CONF = 'conf/diffido.conf' EMAIL_FROM = 'diffido@localhost' SMTP_SETTINGS = {} GIT_CMD = 'git' re_commit = re.compile(r'^(?P[0-9a-f]{40}) (?P.*)\n(?: .* ' '(?P\d+) insertion.* (?P\d+) deletion.*$)?', re.M) re_insertion = re.compile(r'(\d+) insertion') re_deletion = re.compile(r'(\d+) deletion') logger = logging.getLogger() logger.setLevel(logging.INFO) def read_schedules(): """Return the schedules configuration. :returns: dictionary from the JSON object in conf/schedules.json :rtype: dict""" if not os.path.isfile(SCHEDULES_FILE): return {'schedules': {}} try: with open(SCHEDULES_FILE, 'r') as fd: schedules = json.loads(fd.read()) for id_ in schedules.get('schedules', {}).keys(): schedule = schedules['schedules'][id_] try: schedule['last_history'] = get_last_history(id_) except: schedule['last_history'] = {} continue return schedules except Exception as e: logger.error('unable to read %s: %s' % (SCHEDULES_FILE, e)) return {'schedules': {}} def write_schedules(schedules): """Write the schedules configuration. :param schedules: the schedules to save :type schedules: dict :returns: True in case of success :rtype: bool""" try: with open(SCHEDULES_FILE, 'w') as fd: fd.write(json.dumps(schedules, indent=2)) except Exception as e: logger.error('unable to write %s: %s' % (SCHEDULES_FILE, e)) return False return True def next_id(schedules): """Return the next available integer (as a string) in the list of schedules keys (do not fills holes) :param schedules: the schedules :type schedules: dict :returns: the ID of the next schedule :rtype: str""" ids = schedules.get('schedules', {}).keys() if not ids: return '1' return str(max([int(i) for i in ids]) + 1) def get_schedule(id_, add_id=True, add_history=False): """Return information about a single schedule :param id_: ID of the schedule :type id_: str :param add_id: if True, add the ID in the dictionary :type add_id: bool :returns: the schedule :rtype: dict""" try: schedules = read_schedules() except Exception: return {} data = schedules.get('schedules', {}).get(id_, {}) if add_history and data: data['last_history'] = get_last_history(id_) if add_id: data['id'] = str(id_) return data def select_xpath(content, xpath): """Select a portion of a HTML document :param content: the content of the document :type content: str :param xpath: the XPath selector :type xpath: str :returns: the selected document :rtype: str""" tree = etree.HTML(content) elems = tree.xpath(xpath) if not elems: return content selected_content = [] for elem in elems: pieces = [] if elem.text: pieces.append(elem.text) for sub_el in elem.getchildren(): try: sub_el_text = ElementTree.tostring(sub_el, method='html').decode('utf-8', 'replace') except: continue if sub_el_text: pieces.append(sub_el_text) selected_content.append(''.join(pieces)) content = ''.join(selected_content).strip() return content def run_job(id_=None, force=False, *args, **kwargs): """Run a job :param id_: ID of the schedule to run :type id_: str :param force: run even if disabled :type force: bool :param args: positional arguments :type args: tuple :param kwargs: named arguments :type kwargs: dict :returns: True in case of success :rtype: bool""" schedule = get_schedule(id_, add_id=False) url = schedule.get('url') if not url: return False logger.debug('running job id:%s title:%s url: %s' % (id_, schedule.get('title', ''), url)) if not schedule.get('enabled') and not force: logger.info('not running job %s: disabled' % id_) return True req = requests.get(url, allow_redirects=True, timeout=(30.10, 240)) content = req.text xpath = schedule.get('xpath') if xpath: try: content = select_xpath(content, xpath) except Exception as e: logger.warn('unable to extract XPath %s: %s' % (xpath, e)) req_path = urllib.parse.urlparse(req.url).path base_name = os.path.basename(req_path) or 'index.html' def _commit(id_, filename, content, queue): try: os.chdir('storage/%s' % id_) except Exception as e: logger.info('unable to move to storage/%s directory: %s; trying to create it...' % (id_, e)) _created = False try: _created = git_create_repo(id_) except Exception as e: logger.info('unable to move to storage/%s directory: %s; unable to create it' % (id_, e)) if not _created: return False current_lines = 0 if os.path.isfile(filename): with open(filename, 'r') as fd: for line in fd: current_lines += 1 with open(filename, 'w') as fd: fd.write(content) p = subprocess.Popen([GIT_CMD, 'add', filename]) p.communicate() p = subprocess.Popen([GIT_CMD, 'commit', '-m', '%s' % datetime.datetime.utcnow(), '--allow-empty'], stdout=subprocess.PIPE) stdout, _ = p.communicate() stdout = stdout.decode('utf-8') insert = re_insertion.findall(stdout) if insert: insert = int(insert[0]) else: insert = 0 delete = re_deletion.findall(stdout) if delete: delete = int(delete[0]) else: delete = 0 queue.put({'insertions': insert, 'deletions': delete, 'previous_lines': current_lines, 'changes': max(insert, delete)}) queue = multiprocessing.Queue() p = multiprocessing.Process(target=_commit, args=(id_, base_name, content, queue)) p.start() res = queue.get() p.join() email = schedule.get('email') if not email: return True changes = res.get('changes') if not changes: return True min_change = schedule.get('minimum_change') previous_lines = res.get('previous_lines') if min_change and previous_lines: min_change = float(min_change) change_fraction = res.get('changes') / previous_lines if change_fraction < min_change: return True # send notification diff = get_diff(id_).get('diff') if not diff: return True send_email(to=email, subject='%s page changed' % schedule.get('title'), body='changes:\n\n%s' % diff) return True def safe_run_job(id_=None, *args, **kwargs): """Safely run a job, catching all the exceptions :param id_: ID of the schedule to run :type id_: str :param args: positional arguments :type args: tuple :param kwargs: named arguments :type kwargs: dict :returns: True in case of success :rtype: bool""" try: run_job(id_, *args, **kwargs) except Exception as e: send_email('error executing job %s: %s' % (id_, e)) def send_email(to, subject='diffido', body='', from_=None): """Send an email :param to: destination address :type to: str :param subject: email subject :type subject: str :param body: body of the email :type body: str :param from_: sender address :type from_: str :returns: True in case of success :rtype: bool""" msg = MIMEText(body) msg['Subject'] = subject msg['From'] = from_ or EMAIL_FROM msg['To'] = to starttls = SMTP_SETTINGS.get('smtp-starttls') use_ssl = SMTP_SETTINGS.get('smtp-use-ssl') args = {} for key, value in SMTP_SETTINGS.items(): if key in ('smtp-starttls', 'smtp-use-ssl'): continue if key in ('smtp-port'): value = int(value) key = key.replace('smtp-', '', 1).replace('-', '_') args[key] = value try: if use_ssl: with smtplib.SMTP_SSL(**args) as s: s.send_message(msg) else: tls_args = {} for key in ('ssl_keyfile', 'ssl_certfile', 'ssl_context'): if key in args: tls_args = args[key] del args[key] with smtplib.SMTP(**args) as s: if starttls: s.starttls(**tls_args) s.ehlo_or_helo_if_needed() s.send_message(msg) except Exception as e: logger.error('unable to send email to %s: %s' % (to, e)) return False return True def get_history(id_, limit=None, add_info=False): """Read the history of a schedule :param id_: ID of the schedule :type id_: str :param limit: number of entries to fetch :type limit: int :param add_info: add information about the schedule itself :type add_info: int :returns: information about the schedule and its history :rtype: dict""" def _history(id_, limit, queue): try: os.chdir('storage/%s' % id_) except Exception as e: logger.info('unable to move to storage/%s directory: %s' % (id_, e)) return queue.put(b'') cmd = [GIT_CMD, 'log', '--pretty=oneline', '--shortstat'] if limit is not None: cmd.append('-%s' % limit) p = subprocess.Popen(cmd, stdout=subprocess.PIPE) stdout, _ = p.communicate() queue.put(stdout) queue = multiprocessing.Queue() p = multiprocessing.Process(target=_history, args=(id_, limit, queue)) p.start() res = queue.get().decode('utf-8') p.join() history = [] for match in re_commit.finditer(res): info = match.groupdict() info['insertions'] = int(info['insertions'] or 0) info['deletions'] = int(info['deletions'] or 0) info['changes'] = max(info['insertions'], info['deletions']) history.append(info) last_id = None if history and 'id' in history[0]: last_id = history[0]['id'] for idx, item in enumerate(history): item['seq'] = idx + 1 data = {'history': history, 'last_id': last_id} if add_info: data['schedule'] = get_schedule(id_) return data def get_last_history(id_): """Read the last history entry of a schedule :param id_: ID of the schedule :type id_: str :returns: information about the schedule and its history :rtype: dict""" history = get_history(id_, limit=1) hist = history.get('history') or [{}] return hist[0] def get_diff(id_, commit_id='HEAD', old_commit_id=None): """Return the diff between commits of a schedule :param id_: ID of the schedule :type id_: str :param commit_id: the most recent commit ID; HEAD by default :type commit_id: str :param old_commit_id: the older commit ID; if None, the previous commit is used :type old_commit_id: str :returns: information about the schedule and the diff between commits :rtype: dict""" def _history(id_, commit_id, old_commit_id, queue): try: os.chdir('storage/%s' % id_) except Exception as e: logger.info('unable to move to storage/%s directory: %s' % (id_, e)) return queue.put(b'') p = subprocess.Popen([GIT_CMD, 'diff', old_commit_id or '%s~' % commit_id, commit_id], stdout=subprocess.PIPE) stdout, _ = p.communicate() queue.put(stdout) queue = multiprocessing.Queue() p = multiprocessing.Process(target=_history, args=(id_, commit_id, old_commit_id, queue)) p.start() res = queue.get().decode('utf-8') p.join() schedule = get_schedule(id_) return {'diff': res, 'schedule': schedule} def scheduler_update(scheduler, id_): """Update a scheduler job, using information from the JSON object :param scheduler: the TornadoScheduler instance to modify :type scheduler: TornadoScheduler :param id_: ID of the schedule that must be updated :type id_: str :returns: True in case of success :rtype: bool""" schedule = get_schedule(id_, add_id=False) if not schedule: logger.warn('unable to update empty schedule %s' % id_) return False trigger = schedule.get('trigger') if trigger not in ('interval', 'cron'): logger.warn('unable to update empty schedule %s: trigger not in ("cron", "interval")' % id_) return False args = {} if trigger == 'interval': args['trigger'] = 'interval' for unit in 'weeks', 'days', 'hours', 'minutes', 'seconds': if 'interval_%s' % unit not in schedule: continue try: val = schedule['interval_%s' % unit] if not val: continue args[unit] = int(val) except Exception: logger.warn('invalid argument on schedule %s: %s parameter %s is not an integer' % (id_, 'interval_%s' % unit, schedule['interval_%s' % unit])) if len(args) == 1: logger.error('no valid interval specified, skipping schedule %s' % id_) return False elif trigger == 'cron': try: cron_trigger = CronTrigger.from_crontab(schedule['cron_crontab']) args['trigger'] = cron_trigger except Exception: logger.warn('invalid argument on schedule %s: cron_tab parameter %s is not a valid crontab' % (id_, schedule.get('cron_crontab'))) return False git_create_repo(id_) try: scheduler.add_job(safe_run_job, id=id_, replace_existing=True, kwargs={'id_': id_}, **args) except Exception as e: logger.warn('unable to update job %s: %s' % (id_, e)) return False return True def scheduler_delete(scheduler, id_): """Update a scheduler job, using information from the JSON object :param scheduler: the TornadoScheduler instance to modify :type scheduler: TornadoScheduler :param id_: ID of the schedule :type id_: str :returns: True in case of success :rtype: bool""" try: scheduler.remove_job(job_id=id_) except Exception as e: logger.warn('unable to delete job %s: %s' % (id_, e)) return False return git_delete_repo(id_) def reset_from_schedules(scheduler): """"Reset all scheduler jobs, using information from the JSON object :param scheduler: the TornadoScheduler instance to modify :type scheduler: TornadoScheduler :returns: True in case of success :rtype: bool""" ret = False try: scheduler.remove_all_jobs() for key in read_schedules().get('schedules', {}).keys(): ret |= scheduler_update(scheduler, id_=key) except Exception as e: logger.warn('unable to reset all jobs: %s' % e) return False return ret def git_init(): """Initialize Git global settings""" p = subprocess.Popen([GIT_CMD, 'config', '--global', 'user.email', '"%s"' % EMAIL_FROM]) p.communicate() p = subprocess.Popen([GIT_CMD, 'config', '--global', 'user.name', '"Diffido"']) p.communicate() def git_create_repo(id_): """Create a Git repository :param id_: ID of the schedule :type id_: str :returns: True in case of success :rtype: bool""" repo_dir = 'storage/%s' % id_ if os.path.isdir(repo_dir): return True p = subprocess.Popen([GIT_CMD, 'init', repo_dir]) p.communicate() return p.returncode == 0 def git_delete_repo(id_): """Delete a Git repository :param id_: ID of the schedule :type id_: str :returns: True in case of success :rtype: bool""" repo_dir = 'storage/%s' % id_ if not os.path.isdir(repo_dir): return False try: shutil.rmtree(repo_dir) except Exception as e: logger.warn('unable to delete Git repository %s: %s' % (id_, e)) return False return True class DiffidoBaseException(Exception): """Base class for diffido custom exceptions. :param message: text message :type message: str :param status: numeric http status code :type status: int""" def __init__(self, message, status=400): super(DiffidoBaseException, self).__init__(message) self.message = message self.status = status class BaseHandler(tornado.web.RequestHandler): """Base class for request handlers.""" # A property to access the first value of each argument. arguments = property(lambda self: dict([(k, v[0].decode('utf-8')) for k, v in self.request.arguments.items()])) @property def clean_body(self): """Return a clean dictionary from a JSON body, suitable for a query on MongoDB. :returns: a clean copy of the body arguments :rtype: dict""" return escape.json_decode(self.request.body or '{}') def write_error(self, status_code, **kwargs): """Default error handler.""" if isinstance(kwargs.get('exc_info', (None, None))[1], DiffidoBaseException): exc = kwargs['exc_info'][1] status_code = exc.status message = exc.message else: message = 'internal error' self.build_error(message, status=status_code) def initialize(self, **kwargs): """Add every passed (key, value) as attributes of the instance.""" for key, value in kwargs.items(): setattr(self, key, value) def build_error(self, message='', status=400): """Build and write an error message. :param message: textual message :type message: str :param status: HTTP status code :type status: int """ self.set_status(status) self.write({'error': True, 'message': message}) def build_success(self, message='', status=200): """Build and write a success message. :param message: textual message :type message: str :param status: HTTP status code :type status: int """ self.set_status(status) self.write({'error': False, 'message': message}) class SchedulesHandler(BaseHandler): """Schedules handler.""" @gen.coroutine def get(self, id_=None, *args, **kwargs): """Get a schedule.""" if id_ is not None: return self.write({'schedule': get_schedule(id_, add_history=True)}) schedules = read_schedules() self.write(schedules) @gen.coroutine def put(self, id_=None, *args, **kwargs): """Update a schedule.""" if id_ is None: return self.build_error(message='update action requires an ID') data = self.clean_body schedules = read_schedules() if id_ not in schedules.get('schedules', {}): return self.build_error(message='schedule %s not found' % id_) schedules['schedules'][id_] = data write_schedules(schedules) scheduler_update(scheduler=self.scheduler, id_=id_) self.write(get_schedule(id_=id_)) @gen.coroutine def post(self, *args, **kwargs): """Add a schedule.""" data = self.clean_body schedules = read_schedules() id_ = next_id(schedules) schedules['schedules'][id_] = data write_schedules(schedules) scheduler_update(scheduler=self.scheduler, id_=id_) self.write(get_schedule(id_=id_)) @gen.coroutine def delete(self, id_=None, *args, **kwargs): """Delete a schedule.""" if id_ is None: return self.build_error(message='an ID must be specified') schedules = read_schedules() if id_ in schedules.get('schedules', {}): del schedules['schedules'][id_] write_schedules(schedules) scheduler_delete(scheduler=self.scheduler, id_=id_) self.build_success(message='removed schedule %s' % id_) class RunScheduleHandler(BaseHandler): """Reset schedules handler.""" @gen.coroutine def post(self, id_, *args, **kwargs): if run_job(id_, force=True): return self.build_success('job run') self.build_error('job not run') class ResetSchedulesHandler(BaseHandler): """Reset schedules handler.""" @gen.coroutine def post(self, *args, **kwargs): reset_from_schedules(self.scheduler) class HistoryHandler(BaseHandler): """History handler.""" @gen.coroutine def get(self, id_, *args, **kwargs): self.write(get_history(id_, add_info=True)) class DiffHandler(BaseHandler): """Diff handler.""" @gen.coroutine def get(self, id_, commit_id, old_commit_id=None, *args, **kwargs): self.write(get_diff(id_, commit_id, old_commit_id)) class TemplateHandler(BaseHandler): """Handler for the template files in the / path.""" @gen.coroutine def get(self, *args, **kwargs): """Get a template file.""" page = 'index.html' if args and args[0]: page = args[0].strip('/') arguments = self.arguments self.render(page, **arguments) def serve(): """Read configuration and start the server.""" global EMAIL_FROM, SMTP_SETTINGS jobstores = {'default': SQLAlchemyJobStore(url=JOBS_STORE)} scheduler = TornadoScheduler(jobstores=jobstores, timezone=pytz.utc) scheduler.start() define('port', default=3210, help='run on the given port', type=int) define('address', default='', help='bind the server at the given address', type=str) define('ssl_cert', default=os.path.join(os.path.dirname(__file__), 'ssl', 'diffido_cert.pem'), help='specify the SSL certificate to use for secure connections') define('ssl_key', default=os.path.join(os.path.dirname(__file__), 'ssl', 'diffido_key.pem'), help='specify the SSL private key to use for secure connections') define('admin-email', default='', help='email address of the site administrator', type=str) define('smtp-host', default='localhost', help='SMTP server address', type=str) define('smtp-port', default=0, help='SMTP server port', type=int) define('smtp-local-hostname', default=None, help='SMTP local hostname', type=str) define('smtp-use-ssl', default=False, help='Use SSL to connect to the SMTP server', type=bool) define('smtp-starttls', default=False, help='Use STARTTLS to connect to the SMTP server', type=bool) define('smtp-ssl-keyfile', default=None, help='SSL key file', type=str) define('smtp-ssl-certfile', default=None, help='SSL cert file', type=str) define('smtp-ssl-context', default=None, help='SSL context', type=str) define('debug', default=False, help='run in debug mode', type=bool) define('config', help='read configuration file', callback=lambda path: tornado.options.parse_config_file(path, final=False)) if not options.config and os.path.isfile(DEFAULT_CONF): tornado.options.parse_config_file(DEFAULT_CONF, final=False) tornado.options.parse_command_line() if options.admin_email: EMAIL_FROM = options.admin_email for key, value in options.as_dict().items(): if key.startswith('smtp-'): SMTP_SETTINGS[key] = value if options.debug: logger.setLevel(logging.DEBUG) ssl_options = {} if os.path.isfile(options.ssl_key) and os.path.isfile(options.ssl_cert): ssl_options = dict(certfile=options.ssl_cert, keyfile=options.ssl_key) init_params = dict(listen_port=options.port, logger=logger, ssl_options=ssl_options, scheduler=scheduler) git_init() _reset_schedules_path = r'schedules/reset' _schedule_run_path = r'schedules/(?P\d+)/run' _schedules_path = r'schedules/?(?P\d+)?' _history_path = r'schedules/?(?P\d+)/history' _diff_path = r'schedules/(?P\d+)/diff/(?P[0-9a-f]+)/?(?P[0-9a-f]+)?/?' application = tornado.web.Application([ (r'/api/%s' % _reset_schedules_path, ResetSchedulesHandler, init_params), (r'/api/v%s/%s' % (API_VERSION, _reset_schedules_path), ResetSchedulesHandler, init_params), (r'/api/%s' % _schedule_run_path, RunScheduleHandler, init_params), (r'/api/v%s/%s' % (API_VERSION, _schedule_run_path), RunScheduleHandler, init_params), (r'/api/%s' % _history_path, HistoryHandler, init_params), (r'/api/v%s/%s' % (API_VERSION, _history_path), HistoryHandler, init_params), (r'/api/%s' % _diff_path, DiffHandler, init_params), (r'/api/v%s/%s' % (API_VERSION, _diff_path), DiffHandler, init_params), (r'/api/%s' % _schedules_path, SchedulesHandler, init_params), (r'/api/v%s/%s' % (API_VERSION, _schedules_path), SchedulesHandler, init_params), (r'/?(.*)', TemplateHandler, init_params), ], static_path=os.path.join(os.path.dirname(__file__), 'dist/static'), template_path=os.path.join(os.path.dirname(__file__), 'dist/'), debug=options.debug) http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options or None) logger.info('Start serving on %s://%s:%d', 'https' if ssl_options else 'http', options.address if options.address else '127.0.0.1', options.port) http_server.listen(options.port, options.address) try: IOLoop.instance().start() except (KeyboardInterrupt, SystemExit): pass if __name__ == '__main__': serve()