diffido/diffido.py

492 lines
16 KiB
Python
Raw Normal View History

2018-01-15 21:58:10 +01:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
2018-01-16 20:54:56 +01:00
import os
2018-01-21 01:26:46 +01:00
import re
import io
2018-01-18 22:46:48 +01:00
import json
2018-01-20 19:57:59 +01:00
import shutil
import urllib
2018-01-21 13:29:36 +01:00
import smtplib
from email.mime.text import MIMEText
2018-01-16 20:54:56 +01:00
import logging
2018-01-20 19:57:59 +01:00
import datetime
2018-01-20 18:19:41 +01:00
import requests
2018-01-20 19:57:59 +01:00
import subprocess
import multiprocessing
2018-01-21 14:20:27 +01:00
from lxml import etree
from xml.etree import ElementTree
2018-01-16 20:54:56 +01:00
2018-01-15 21:58:10 +01:00
from tornado.ioloop import IOLoop
2018-01-20 18:10:31 +01:00
from apscheduler.triggers.cron import CronTrigger
2018-01-15 21:58:10 +01:00
from apscheduler.schedulers.tornado import TornadoScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
2018-01-16 20:54:56 +01:00
import tornado.httpserver
import tornado.ioloop
import tornado.options
from tornado.options import define, options
import tornado.web
from tornado import gen, escape
JOBS_STORE = 'sqlite:///conf/jobs.db'
2018-01-16 20:54:56 +01:00
API_VERSION = '1.0'
2018-01-18 22:46:48 +01:00
SCHEDULES_FILE = 'conf/schedules.json'
2018-01-22 22:22:54 +01:00
DEFAULT_CONF = 'conf/diffido.conf'
2018-01-23 09:10:26 +01:00
EMAIL_FROM = 'diffido@localhost'
2018-01-20 19:57:59 +01:00
GIT_CMD = 'git'
2018-01-16 20:54:56 +01:00
2018-01-21 01:26:46 +01:00
re_insertion = re.compile(r'(\d+) insertion')
re_deletion = re.compile(r'(\d+) deletion')
logger = logging.getLogger()
logger.setLevel(logging.INFO)
2018-01-20 18:19:41 +01:00
def read_schedules():
if not os.path.isfile(SCHEDULES_FILE):
return {'schedules': {}}
try:
with open(SCHEDULES_FILE, 'r') as fd:
return json.loads(fd.read())
except Exception as e:
logger.error('unable to read %s: %s' % (SCHEDULES_FILE, e))
return {'schedules': {}}
def write_schedules(schedules):
with open(SCHEDULES_FILE, 'w') as fd:
fd.write(json.dumps(schedules, indent=2))
def next_id(schedules):
ids = schedules.get('schedules', {}).keys()
if not ids:
return '1'
return str(max([int(i) for i in ids]) + 1)
2018-01-23 09:06:41 +01:00
def get_schedule(id_, add_id=True):
schedules = read_schedules()
data = schedules.get('schedules', {}).get(id_, {})
2018-01-23 09:06:41 +01:00
if add_id:
2018-01-20 17:46:56 +01:00
data['id'] = str(id_)
return data
2018-01-16 20:54:56 +01:00
2018-01-21 14:20:27 +01:00
def select_xpath(content, xpath):
fd = io.StringIO(content)
tree = etree.parse(fd)
elems = tree.xpath(xpath)
if not elems:
return content
selected_content = []
for elem in elems:
2018-01-23 09:06:41 +01:00
selected_content.append(''.join([elem.text] + [ElementTree.tostring(e).decode('utf-8', 'replace')
2018-01-21 14:20:27 +01:00
for e in elem.getchildren()]))
content = ''.join(selected_content)
return content
2018-01-20 17:46:56 +01:00
def run_job(id_=None, *args, **kwargs):
2018-01-23 09:06:41 +01:00
schedule = get_schedule(id_, add_id=False)
2018-01-20 17:46:56 +01:00
url = schedule.get('url')
if not url:
return
2018-01-21 01:26:46 +01:00
logger.debug('Running job id:%s title:%s url: %s' % (id_, schedule.get('title', ''), url))
2018-01-20 19:57:59 +01:00
req = requests.get(url, allow_redirects=True, timeout=(30.10, 240))
content = req.text
2018-01-21 14:20:27 +01:00
xpath = schedule.get('xpath')
if xpath:
content = select_xpath(content, xpath)
2018-01-20 19:57:59 +01:00
req_path = urllib.parse.urlparse(req.url).path
2018-01-21 14:20:27 +01:00
base_name = os.path.basename(req_path) or 'index.html'
2018-01-21 13:29:36 +01:00
def _commit(id_, filename, content, queue):
2018-01-20 19:57:59 +01:00
os.chdir('storage/%s' % id_)
2018-01-21 13:29:36 +01:00
current_lines = 0
if os.path.isfile(filename):
with open(filename, 'r') as fd:
for line in fd:
current_lines += 1
2018-01-20 19:57:59 +01:00
with open(filename, 'w') as fd:
fd.write(content)
p = subprocess.Popen([GIT_CMD, 'add', filename])
p.communicate()
2018-01-21 13:29:36 +01:00
p = subprocess.Popen([GIT_CMD, 'commit', '-m', '%s' % datetime.datetime.utcnow(), '--allow-empty'],
stdout=subprocess.PIPE)
stdout, _ = p.communicate()
stdout = stdout.decode('utf-8')
insert = re_insertion.findall(stdout)
if insert:
insert = int(insert[0])
else:
insert = 0
delete = re_deletion.findall(stdout)
if delete:
delete = int(delete[0])
else:
delete = 0
queue.put({'insertions': insert, 'deletions': delete, 'previous_lines': current_lines,
'changes': max(insert, delete)})
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=_commit, args=(id_, base_name, content, queue))
2018-01-20 19:57:59 +01:00
p.start()
2018-01-21 13:29:36 +01:00
res = queue.get()
2018-01-20 19:57:59 +01:00
p.join()
2018-01-21 13:29:36 +01:00
email = schedule.get('email')
if not email:
return
changes = res.get('changes')
if not changes:
return
min_change = schedule.get('minimum_change')
previous_lines = res.get('previous_lines')
if min_change and previous_lines:
min_change = float(min_change)
change_fraction = res.get('changes') / previous_lines
if change_fraction < min_change:
return
# send notification
2018-01-23 18:22:08 +01:00
diff = get_diff(id_, {}).get('diff')
if not diff:
return
2018-01-23 09:06:41 +01:00
send_email(to=email, subject='%s page changed' % schedule.get('title'),
2018-01-23 18:22:08 +01:00
body='changes:\n\n%s' % diff)
2018-01-23 09:06:41 +01:00
def safe_run_job(id_=None, *args, **kwargs):
try:
run_job(id_, *args, **kwargs)
except Exception as e:
send_email('error executing job %s: %s' % (id_, e))
def send_email(to, subject='diffido', body='', from_=None):
msg = MIMEText(body)
msg['Subject'] = subject
2018-01-23 09:10:26 +01:00
msg['From'] = from_ or EMAIL_FROM
2018-01-23 09:06:41 +01:00
msg['To'] = to
2018-01-21 13:29:36 +01:00
s = smtplib.SMTP('localhost')
s.send_message(msg)
s.quit()
2018-01-20 17:46:56 +01:00
2018-01-21 01:26:46 +01:00
def get_history(id_):
def _history(id_, queue):
os.chdir('storage/%s' % id_)
p = subprocess.Popen([GIT_CMD, 'log', '--pretty=oneline', '--shortstat'], stdout=subprocess.PIPE)
stdout, _ = p.communicate()
queue.put(stdout)
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=_history, args=(id_, queue))
p.start()
res = queue.get().decode('utf-8')
p.join()
history = []
res_io = io.StringIO(res)
while True:
commit_line = res_io.readline().strip()
if not commit_line:
break
commit_id, message = commit_line.split(' ', 1)
if len(commit_id) != 40:
continue
changes_line = res_io.readline().strip()
insert = re_insertion.findall(changes_line)
if insert:
insert = int(insert[0])
else:
insert = 0
delete = re_deletion.findall(changes_line)
if delete:
delete = int(delete[0])
else:
delete = 0
history.append({'id': commit_id, 'message': message, 'insertions': insert, 'deletions': delete,
'changes': max(insert, delete)})
2018-01-21 11:20:42 +01:00
lastid = None
if history and 'id' in history[0]:
lastid = history[0]['id']
for idx, item in enumerate(history):
item['seq'] = idx + 1
return {'history': history, 'lastid': lastid}
2018-01-23 09:06:41 +01:00
def get_diff(id_, commit_id='HEAD', old_commit_id=None):
def _history(id_, commit_id, old_commit_id, queue):
2018-01-21 11:20:42 +01:00
os.chdir('storage/%s' % id_)
2018-01-23 09:06:41 +01:00
p = subprocess.Popen([GIT_CMD, 'commit_id', old_commit_id or '%s~' % commit_id, commit_id],
stdout=subprocess.PIPE)
2018-01-21 11:20:42 +01:00
stdout, _ = p.communicate()
queue.put(stdout)
queue = multiprocessing.Queue()
2018-01-23 09:06:41 +01:00
p = multiprocessing.Process(target=_history, args=(id_, commit_id, old_commit_id, queue))
2018-01-21 11:20:42 +01:00
p.start()
res = queue.get().decode('utf-8')
p.join()
return {'diff': res}
2018-01-21 01:26:46 +01:00
2018-01-20 17:46:56 +01:00
def scheduler_update(scheduler, id_):
2018-01-23 09:06:41 +01:00
schedule = get_schedule(id_, add_id=False)
2018-01-20 17:46:56 +01:00
if not schedule:
return
trigger = schedule.get('trigger')
if trigger not in ('interval', 'cron'):
return
2018-01-20 18:10:31 +01:00
args = {}
2018-01-20 17:46:56 +01:00
if trigger == 'interval':
2018-01-20 18:10:31 +01:00
args['trigger'] = 'interval'
2018-01-20 17:46:56 +01:00
for unit in 'weeks', 'days', 'hours', 'minutes', 'seconds':
if 'interval_%s' % unit not in schedule:
continue
args[unit] = int(schedule['interval_%s' % unit])
2018-01-20 18:10:31 +01:00
elif trigger == 'cron':
cron_trigger = CronTrigger.from_crontab(schedule.get('cron_crontab'))
args['trigger'] = cron_trigger
2018-01-20 19:57:59 +01:00
git_create_repo(id_)
2018-01-23 09:06:41 +01:00
scheduler.add_job(safe_run_job, id=id_, replace_existing=True, kwargs={'id_': id_}, **args)
2018-01-20 17:46:56 +01:00
def scheduler_delete(scheduler, id_):
scheduler.remove_job(job_id=id_)
2018-01-20 19:57:59 +01:00
git_delete_repo(id_)
2018-01-20 17:46:56 +01:00
def reset_from_schedules(scheduler):
scheduler.remove_all_jobs()
for key in read_schedules().get('schedules', {}).keys():
scheduler_update(scheduler, id_=key)
2018-01-20 19:57:59 +01:00
def git_create_repo(id_):
repo_dir = 'storage/%s' % id_
if os.path.isdir(repo_dir):
return
p = subprocess.Popen([GIT_CMD, 'init', repo_dir])
p.communicate()
def git_delete_repo(id_):
repo_dir = 'storage/%s' % id_
if not os.path.isdir(repo_dir):
return
shutil.rmtree(repo_dir)
2018-01-16 20:54:56 +01:00
class DiffidoBaseException(Exception):
"""Base class for diffido custom exceptions.
:param message: text message
:type message: str
:param status: numeric http status code
:type status: int"""
def __init__(self, message, status=400):
super(DiffidoBaseException, self).__init__(message)
self.message = message
self.status = status
class BaseHandler(tornado.web.RequestHandler):
"""Base class for request handlers."""
# Cache currently connected users.
_users_cache = {}
# set of documents we're managing (a collection in MongoDB or a table in a SQL database)
document = None
collection = None
# A property to access the first value of each argument.
arguments = property(lambda self: dict([(k, v[0].decode('utf-8'))
for k, v in self.request.arguments.items()]))
@property
def clean_body(self):
"""Return a clean dictionary from a JSON body, suitable for a query on MongoDB.
:returns: a clean copy of the body arguments
:rtype: dict"""
return escape.json_decode(self.request.body or '{}')
def write_error(self, status_code, **kwargs):
"""Default error handler."""
if isinstance(kwargs.get('exc_info', (None, None))[1], DiffidoBaseException):
exc = kwargs['exc_info'][1]
status_code = exc.status
message = exc.message
else:
message = 'internal error'
self.build_error(message, status=status_code)
def is_api(self):
"""Return True if the path is from an API call."""
return self.request.path.startswith('/v%s' % API_VERSION)
def initialize(self, **kwargs):
"""Add every passed (key, value) as attributes of the instance."""
for key, value in kwargs.items():
setattr(self, key, value)
def build_error(self, message='', status=400):
"""Build and write an error message.
:param message: textual message
:type message: str
:param status: HTTP status code
:type status: int
"""
self.set_status(status)
self.write({'error': True, 'message': message})
2018-01-20 14:56:34 +01:00
def build_success(self, message='', status=200):
"""Build and write a success message.
:param message: textual message
:type message: str
:param status: HTTP status code
:type status: int
"""
self.set_status(status)
self.write({'error': False, 'message': message})
2018-01-16 20:54:56 +01:00
2018-01-18 22:46:48 +01:00
class SchedulesHandler(BaseHandler):
@gen.coroutine
def get(self, id_=None, *args, **kwargs):
2018-01-20 14:56:34 +01:00
if id_ is not None:
self.write({'schedule': get_schedule(id_)})
2018-01-20 14:56:34 +01:00
return
schedules = read_schedules()
2018-01-18 22:46:48 +01:00
self.write(schedules)
2018-01-20 14:56:34 +01:00
@gen.coroutine
def put(self, id_=None, *args, **kwargs):
if id_ is None:
return self.build_error(message='update action requires an ID')
data = self.clean_body
schedules = read_schedules()
2018-01-20 14:56:34 +01:00
if id_ not in schedules.get('schedules', {}):
return self.build_error(message='schedule %s not found' % id_)
schedules['schedules'][id_] = data
write_schedules(schedules)
2018-01-20 17:46:56 +01:00
scheduler_update(scheduler=self.scheduler, id_=id_)
self.write(get_schedule(id_=id_))
2018-01-20 14:56:34 +01:00
@gen.coroutine
def post(self, *args, **kwargs):
data = self.clean_body
schedules = read_schedules()
id_ = next_id(schedules)
2018-01-20 14:56:34 +01:00
schedules['schedules'][id_] = data
write_schedules(schedules)
2018-01-20 17:46:56 +01:00
scheduler_update(scheduler=self.scheduler, id_=id_)
self.write(get_schedule(id_=id_))
2018-01-20 14:56:34 +01:00
@gen.coroutine
def delete(self, id_=None, *args, **kwargs):
if id_ is None:
return self.build_error(message='an ID must be specified')
schedules = read_schedules()
2018-01-20 14:56:34 +01:00
if id_ in schedules.get('schedules', {}):
del schedules['schedules'][id_]
write_schedules(schedules)
2018-01-20 17:46:56 +01:00
scheduler_delete(scheduler=self.scheduler, id_=id_)
2018-01-20 14:56:34 +01:00
self.build_success(message='removed schedule %s' % id_)
2018-01-18 22:46:48 +01:00
2018-01-20 17:46:56 +01:00
class ResetSchedulesHandler(BaseHandler):
@gen.coroutine
def post(self, *args, **kwargs):
reset_from_schedules(self.scheduler)
2018-01-21 01:26:46 +01:00
class HistoryHandler(BaseHandler):
@gen.coroutine
def get(self, id_, *args, **kwargs):
2018-01-21 11:20:42 +01:00
self.write(get_history(id_))
class DiffHandler(BaseHandler):
@gen.coroutine
2018-01-23 09:06:41 +01:00
def get(self, id_, commit_id, old_commit_id=None, *args, **kwargs):
self.write(get_diff(id_, commit_id, old_commit_id))
2018-01-21 01:26:46 +01:00
2018-01-16 22:24:30 +01:00
class TemplateHandler(BaseHandler):
2018-01-16 20:54:56 +01:00
"""Handler for the / path."""
app_path = os.path.join(os.path.dirname(__file__), "dist")
@gen.coroutine
def get(self, *args, **kwargs):
2018-01-16 22:24:30 +01:00
page = 'index.html'
if args and args[0]:
page = args[0].strip('/')
2018-01-17 22:54:06 +01:00
arguments = self.arguments
self.render(page, **arguments)
2018-01-15 21:58:10 +01:00
def serve():
2018-01-23 09:10:26 +01:00
global EMAIL_FROM
2018-01-15 21:58:10 +01:00
jobstores = {'default': SQLAlchemyJobStore(url=JOBS_STORE)}
scheduler = TornadoScheduler(jobstores=jobstores)
scheduler.start()
2018-01-16 20:54:56 +01:00
2018-01-18 22:46:48 +01:00
define('port', default=3210, help='run on the given port', type=int)
define('address', default='', help='bind the server at the given address', type=str)
define('ssl_cert', default=os.path.join(os.path.dirname(__file__), 'ssl', 'diffido_cert.pem'),
help='specify the SSL certificate to use for secure connections')
define('ssl_key', default=os.path.join(os.path.dirname(__file__), 'ssl', 'diffido_key.pem'),
help='specify the SSL private key to use for secure connections')
2018-01-22 22:22:54 +01:00
define('admin-email', default='', help='email address of the site administrator', type=str)
2018-01-18 22:46:48 +01:00
define('debug', default=False, help='run in debug mode')
define('config', help='read configuration file',
2018-01-16 20:54:56 +01:00
callback=lambda path: tornado.options.parse_config_file(path, final=False))
2018-01-22 22:22:54 +01:00
if not options.config and os.path.isfile(DEFAULT_CONF):
tornado.options.parse_config_file(DEFAULT_CONF, final=False)
2018-01-16 20:54:56 +01:00
tornado.options.parse_command_line()
2018-01-23 09:10:26 +01:00
if options.admin_email:
EMAIL_FROM = options.admin_email
2018-01-16 20:54:56 +01:00
if options.debug:
logger.setLevel(logging.DEBUG)
ssl_options = {}
if os.path.isfile(options.ssl_key) and os.path.isfile(options.ssl_cert):
ssl_options = dict(certfile=options.ssl_cert, keyfile=options.ssl_key)
2018-01-18 22:46:48 +01:00
init_params = dict(listen_port=options.port, logger=logger, ssl_options=ssl_options,
scheduler=scheduler)
2018-01-16 20:54:56 +01:00
2018-01-20 17:46:56 +01:00
_reset_schedules_path = r'schedules/reset'
2018-01-18 22:46:48 +01:00
_schedules_path = r'schedules/?(?P<id_>\d+)?'
2018-01-21 01:26:46 +01:00
_history_path = r'history/?(?P<id_>\d+)'
2018-01-23 09:06:41 +01:00
_diff_path = r'diff/(?P<id_>\d+)/(?P<commit_id>[0-9a-f]+)/?(?P<old_commit_id>[0-9a-f]+)?/?'
2018-01-16 20:54:56 +01:00
application = tornado.web.Application([
2018-01-21 11:20:42 +01:00
(r'/api/%s' % _reset_schedules_path, ResetSchedulesHandler, init_params),
2018-01-20 17:46:56 +01:00
(r'/api/v%s/%s' % (API_VERSION, _reset_schedules_path), ResetSchedulesHandler, init_params),
2018-01-21 11:20:42 +01:00
(r'/api/%s' % _schedules_path, SchedulesHandler, init_params),
2018-01-18 22:46:48 +01:00
(r'/api/v%s/%s' % (API_VERSION, _schedules_path), SchedulesHandler, init_params),
2018-01-21 11:20:42 +01:00
(r'/api/%s' % _history_path, HistoryHandler, init_params),
2018-01-21 01:26:46 +01:00
(r'/api/v%s/%s' % (API_VERSION, _history_path), HistoryHandler, init_params),
2018-01-21 11:20:42 +01:00
(r'/api/%s' % _diff_path, DiffHandler, init_params),
(r'/api/v%s/%s' % (API_VERSION, _diff_path), DiffHandler, init_params),
2018-01-18 22:46:48 +01:00
(r'/?(.*)', TemplateHandler, init_params),
2018-01-16 20:54:56 +01:00
],
2018-01-18 22:46:48 +01:00
static_path=os.path.join(os.path.dirname(__file__), 'dist/static'),
2018-01-16 22:24:30 +01:00
template_path=os.path.join(os.path.dirname(__file__), 'dist/'),
2018-01-16 20:54:56 +01:00
debug=options.debug)
http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options or None)
logger.info('Start serving on %s://%s:%d', 'https' if ssl_options else 'http',
options.address if options.address else '127.0.0.1',
options.port)
http_server.listen(options.port, options.address)
2018-01-15 21:58:10 +01:00
try:
IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
pass
if __name__ == '__main__':
serve()