diffido.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import os
  4. import re
  5. import io
  6. import json
  7. import shutil
  8. import urllib
  9. import smtplib
  10. from email.mime.text import MIMEText
  11. import logging
  12. import datetime
  13. import requests
  14. import subprocess
  15. import multiprocessing
  16. from lxml import etree
  17. from xml.etree import ElementTree
  18. from tornado.ioloop import IOLoop
  19. from apscheduler.triggers.cron import CronTrigger
  20. from apscheduler.schedulers.tornado import TornadoScheduler
  21. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  22. import tornado.httpserver
  23. import tornado.ioloop
  24. import tornado.options
  25. from tornado.options import define, options
  26. import tornado.web
  27. from tornado import gen, escape
  28. JOBS_STORE = 'sqlite:///conf/jobs.db'
  29. API_VERSION = '1.0'
  30. SCHEDULES_FILE = 'conf/schedules.json'
  31. DEFAULT_CONF = 'conf/diffido.conf'
  32. EMAIL_FROM = 'diffido@localhost'
  33. GIT_CMD = 'git'
  34. re_commit = re.compile(r'[0-9a-f]{40} ')
  35. re_insertion = re.compile(r'(\d+) insertion')
  36. re_deletion = re.compile(r'(\d+) deletion')
  37. logger = logging.getLogger()
  38. logger.setLevel(logging.INFO)
  39. def read_schedules():
  40. if not os.path.isfile(SCHEDULES_FILE):
  41. return {'schedules': {}}
  42. try:
  43. with open(SCHEDULES_FILE, 'r') as fd:
  44. return json.loads(fd.read())
  45. except Exception as e:
  46. logger.error('unable to read %s: %s' % (SCHEDULES_FILE, e))
  47. return {'schedules': {}}
  48. def write_schedules(schedules):
  49. with open(SCHEDULES_FILE, 'w') as fd:
  50. fd.write(json.dumps(schedules, indent=2))
  51. def next_id(schedules):
  52. ids = schedules.get('schedules', {}).keys()
  53. if not ids:
  54. return '1'
  55. return str(max([int(i) for i in ids]) + 1)
  56. def get_schedule(id_, add_id=True):
  57. schedules = read_schedules()
  58. data = schedules.get('schedules', {}).get(id_, {})
  59. if add_id:
  60. data['id'] = str(id_)
  61. return data
  62. def select_xpath(content, xpath):
  63. fd = io.StringIO(content)
  64. tree = etree.parse(fd)
  65. elems = tree.xpath(xpath)
  66. if not elems:
  67. return content
  68. selected_content = []
  69. for elem in elems:
  70. selected_content.append(''.join([elem.text] + [ElementTree.tostring(e).decode('utf-8', 'replace')
  71. for e in elem.getchildren()]))
  72. content = ''.join(selected_content)
  73. return content
  74. def run_job(id_=None, *args, **kwargs):
  75. schedule = get_schedule(id_, add_id=False)
  76. url = schedule.get('url')
  77. if not url:
  78. return
  79. logger.debug('Running job id:%s title:%s url: %s' % (id_, schedule.get('title', ''), url))
  80. req = requests.get(url, allow_redirects=True, timeout=(30.10, 240))
  81. content = req.text
  82. xpath = schedule.get('xpath')
  83. if xpath:
  84. content = select_xpath(content, xpath)
  85. req_path = urllib.parse.urlparse(req.url).path
  86. base_name = os.path.basename(req_path) or 'index.html'
  87. def _commit(id_, filename, content, queue):
  88. os.chdir('storage/%s' % id_)
  89. current_lines = 0
  90. if os.path.isfile(filename):
  91. with open(filename, 'r') as fd:
  92. for line in fd:
  93. current_lines += 1
  94. with open(filename, 'w') as fd:
  95. fd.write(content)
  96. p = subprocess.Popen([GIT_CMD, 'add', filename])
  97. p.communicate()
  98. p = subprocess.Popen([GIT_CMD, 'commit', '-m', '%s' % datetime.datetime.utcnow(), '--allow-empty'],
  99. stdout=subprocess.PIPE)
  100. stdout, _ = p.communicate()
  101. stdout = stdout.decode('utf-8')
  102. insert = re_insertion.findall(stdout)
  103. if insert:
  104. insert = int(insert[0])
  105. else:
  106. insert = 0
  107. delete = re_deletion.findall(stdout)
  108. if delete:
  109. delete = int(delete[0])
  110. else:
  111. delete = 0
  112. queue.put({'insertions': insert, 'deletions': delete, 'previous_lines': current_lines,
  113. 'changes': max(insert, delete)})
  114. queue = multiprocessing.Queue()
  115. p = multiprocessing.Process(target=_commit, args=(id_, base_name, content, queue))
  116. p.start()
  117. res = queue.get()
  118. p.join()
  119. email = schedule.get('email')
  120. if not email:
  121. return
  122. changes = res.get('changes')
  123. if not changes:
  124. return
  125. min_change = schedule.get('minimum_change')
  126. previous_lines = res.get('previous_lines')
  127. if min_change and previous_lines:
  128. min_change = float(min_change)
  129. change_fraction = res.get('changes') / previous_lines
  130. if change_fraction < min_change:
  131. return
  132. # send notification
  133. diff = get_diff(id_).get('diff')
  134. if not diff:
  135. return
  136. send_email(to=email, subject='%s page changed' % schedule.get('title'),
  137. body='changes:\n\n%s' % diff)
  138. def safe_run_job(id_=None, *args, **kwargs):
  139. try:
  140. run_job(id_, *args, **kwargs)
  141. except Exception as e:
  142. send_email('error executing job %s: %s' % (id_, e))
  143. def send_email(to, subject='diffido', body='', from_=None):
  144. msg = MIMEText(body)
  145. msg['Subject'] = subject
  146. msg['From'] = from_ or EMAIL_FROM
  147. msg['To'] = to
  148. s = smtplib.SMTP('localhost')
  149. s.send_message(msg)
  150. s.quit()
  151. def get_history(id_):
  152. def _history(id_, queue):
  153. os.chdir('storage/%s' % id_)
  154. p = subprocess.Popen([GIT_CMD, 'log', '--pretty=oneline', '--shortstat'], stdout=subprocess.PIPE)
  155. stdout, _ = p.communicate()
  156. queue.put(stdout)
  157. queue = multiprocessing.Queue()
  158. p = multiprocessing.Process(target=_history, args=(id_, queue))
  159. p.start()
  160. res = queue.get().decode('utf-8')
  161. p.join()
  162. history = []
  163. res_io = io.StringIO(res)
  164. while True:
  165. commit_line = res_io.readline().strip()
  166. if not commit_line:
  167. break
  168. if re_commit.match(commit_line):
  169. commit_id, message = commit_line.split(' ', 1)
  170. if len(commit_id) != 40:
  171. continue
  172. changes_line = res_io.readline().strip()
  173. if re_commit.match(changes_line):
  174. commit_id, message = changes_line.split(' ', 1)
  175. insert = 0
  176. delete = 0
  177. else:
  178. insert = re_insertion.findall(changes_line)
  179. if insert:
  180. insert = int(insert[0])
  181. else:
  182. insert = 0
  183. delete = re_deletion.findall(changes_line)
  184. if delete:
  185. delete = int(delete[0])
  186. else:
  187. delete = 0
  188. history.append({'id': commit_id, 'message': message, 'insertions': insert, 'deletions': delete,
  189. 'changes': max(insert, delete)})
  190. lastid = None
  191. if history and 'id' in history[0]:
  192. lastid = history[0]['id']
  193. for idx, item in enumerate(history):
  194. item['seq'] = idx + 1
  195. schedule = get_schedule(id_)
  196. return {'history': history, 'lastid': lastid, 'schedule': schedule}
  197. def get_diff(id_, commit_id='HEAD', old_commit_id=None):
  198. def _history(id_, commit_id, old_commit_id, queue):
  199. os.chdir('storage/%s' % id_)
  200. p = subprocess.Popen([GIT_CMD, 'diff', old_commit_id or '%s~' % commit_id, commit_id],
  201. stdout=subprocess.PIPE)
  202. stdout, _ = p.communicate()
  203. queue.put(stdout)
  204. queue = multiprocessing.Queue()
  205. p = multiprocessing.Process(target=_history, args=(id_, commit_id, old_commit_id, queue))
  206. p.start()
  207. res = queue.get().decode('utf-8')
  208. p.join()
  209. schedule = get_schedule(id_)
  210. return {'diff': res, 'schedule': schedule}
  211. def scheduler_update(scheduler, id_):
  212. schedule = get_schedule(id_, add_id=False)
  213. if not schedule:
  214. return
  215. trigger = schedule.get('trigger')
  216. if trigger not in ('interval', 'cron'):
  217. return
  218. args = {}
  219. if trigger == 'interval':
  220. args['trigger'] = 'interval'
  221. for unit in 'weeks', 'days', 'hours', 'minutes', 'seconds':
  222. if 'interval_%s' % unit not in schedule:
  223. continue
  224. args[unit] = int(schedule['interval_%s' % unit])
  225. elif trigger == 'cron':
  226. cron_trigger = CronTrigger.from_crontab(schedule.get('cron_crontab'))
  227. args['trigger'] = cron_trigger
  228. git_create_repo(id_)
  229. scheduler.add_job(safe_run_job, id=id_, replace_existing=True, kwargs={'id_': id_}, **args)
  230. def scheduler_delete(scheduler, id_):
  231. scheduler.remove_job(job_id=id_)
  232. git_delete_repo(id_)
  233. def reset_from_schedules(scheduler):
  234. scheduler.remove_all_jobs()
  235. for key in read_schedules().get('schedules', {}).keys():
  236. scheduler_update(scheduler, id_=key)
  237. def git_create_repo(id_):
  238. repo_dir = 'storage/%s' % id_
  239. if os.path.isdir(repo_dir):
  240. return
  241. p = subprocess.Popen([GIT_CMD, 'init', repo_dir])
  242. p.communicate()
  243. def git_delete_repo(id_):
  244. repo_dir = 'storage/%s' % id_
  245. if not os.path.isdir(repo_dir):
  246. return
  247. shutil.rmtree(repo_dir)
  248. class DiffidoBaseException(Exception):
  249. """Base class for diffido custom exceptions.
  250. :param message: text message
  251. :type message: str
  252. :param status: numeric http status code
  253. :type status: int"""
  254. def __init__(self, message, status=400):
  255. super(DiffidoBaseException, self).__init__(message)
  256. self.message = message
  257. self.status = status
  258. class BaseHandler(tornado.web.RequestHandler):
  259. """Base class for request handlers."""
  260. # Cache currently connected users.
  261. _users_cache = {}
  262. # set of documents we're managing (a collection in MongoDB or a table in a SQL database)
  263. document = None
  264. collection = None
  265. # A property to access the first value of each argument.
  266. arguments = property(lambda self: dict([(k, v[0].decode('utf-8'))
  267. for k, v in self.request.arguments.items()]))
  268. @property
  269. def clean_body(self):
  270. """Return a clean dictionary from a JSON body, suitable for a query on MongoDB.
  271. :returns: a clean copy of the body arguments
  272. :rtype: dict"""
  273. return escape.json_decode(self.request.body or '{}')
  274. def write_error(self, status_code, **kwargs):
  275. """Default error handler."""
  276. if isinstance(kwargs.get('exc_info', (None, None))[1], DiffidoBaseException):
  277. exc = kwargs['exc_info'][1]
  278. status_code = exc.status
  279. message = exc.message
  280. else:
  281. message = 'internal error'
  282. self.build_error(message, status=status_code)
  283. def is_api(self):
  284. """Return True if the path is from an API call."""
  285. return self.request.path.startswith('/v%s' % API_VERSION)
  286. def initialize(self, **kwargs):
  287. """Add every passed (key, value) as attributes of the instance."""
  288. for key, value in kwargs.items():
  289. setattr(self, key, value)
  290. def build_error(self, message='', status=400):
  291. """Build and write an error message.
  292. :param message: textual message
  293. :type message: str
  294. :param status: HTTP status code
  295. :type status: int
  296. """
  297. self.set_status(status)
  298. self.write({'error': True, 'message': message})
  299. def build_success(self, message='', status=200):
  300. """Build and write a success message.
  301. :param message: textual message
  302. :type message: str
  303. :param status: HTTP status code
  304. :type status: int
  305. """
  306. self.set_status(status)
  307. self.write({'error': False, 'message': message})
  308. class SchedulesHandler(BaseHandler):
  309. @gen.coroutine
  310. def get(self, id_=None, *args, **kwargs):
  311. if id_ is not None:
  312. self.write({'schedule': get_schedule(id_)})
  313. return
  314. schedules = read_schedules()
  315. self.write(schedules)
  316. @gen.coroutine
  317. def put(self, id_=None, *args, **kwargs):
  318. if id_ is None:
  319. return self.build_error(message='update action requires an ID')
  320. data = self.clean_body
  321. schedules = read_schedules()
  322. if id_ not in schedules.get('schedules', {}):
  323. return self.build_error(message='schedule %s not found' % id_)
  324. schedules['schedules'][id_] = data
  325. write_schedules(schedules)
  326. scheduler_update(scheduler=self.scheduler, id_=id_)
  327. self.write(get_schedule(id_=id_))
  328. @gen.coroutine
  329. def post(self, *args, **kwargs):
  330. data = self.clean_body
  331. schedules = read_schedules()
  332. id_ = next_id(schedules)
  333. schedules['schedules'][id_] = data
  334. write_schedules(schedules)
  335. scheduler_update(scheduler=self.scheduler, id_=id_)
  336. self.write(get_schedule(id_=id_))
  337. @gen.coroutine
  338. def delete(self, id_=None, *args, **kwargs):
  339. if id_ is None:
  340. return self.build_error(message='an ID must be specified')
  341. schedules = read_schedules()
  342. if id_ in schedules.get('schedules', {}):
  343. del schedules['schedules'][id_]
  344. write_schedules(schedules)
  345. scheduler_delete(scheduler=self.scheduler, id_=id_)
  346. self.build_success(message='removed schedule %s' % id_)
  347. class ResetSchedulesHandler(BaseHandler):
  348. @gen.coroutine
  349. def post(self, *args, **kwargs):
  350. reset_from_schedules(self.scheduler)
  351. class HistoryHandler(BaseHandler):
  352. @gen.coroutine
  353. def get(self, id_, *args, **kwargs):
  354. self.write(get_history(id_))
  355. class DiffHandler(BaseHandler):
  356. @gen.coroutine
  357. def get(self, id_, commit_id, old_commit_id=None, *args, **kwargs):
  358. self.write(get_diff(id_, commit_id, old_commit_id))
  359. class TemplateHandler(BaseHandler):
  360. """Handler for the / path."""
  361. app_path = os.path.join(os.path.dirname(__file__), "dist")
  362. @gen.coroutine
  363. def get(self, *args, **kwargs):
  364. page = 'index.html'
  365. if args and args[0]:
  366. page = args[0].strip('/')
  367. arguments = self.arguments
  368. self.render(page, **arguments)
  369. def serve():
  370. global EMAIL_FROM
  371. jobstores = {'default': SQLAlchemyJobStore(url=JOBS_STORE)}
  372. scheduler = TornadoScheduler(jobstores=jobstores)
  373. scheduler.start()
  374. define('port', default=3210, help='run on the given port', type=int)
  375. define('address', default='', help='bind the server at the given address', type=str)
  376. define('ssl_cert', default=os.path.join(os.path.dirname(__file__), 'ssl', 'diffido_cert.pem'),
  377. help='specify the SSL certificate to use for secure connections')
  378. define('ssl_key', default=os.path.join(os.path.dirname(__file__), 'ssl', 'diffido_key.pem'),
  379. help='specify the SSL private key to use for secure connections')
  380. define('admin-email', default='', help='email address of the site administrator', type=str)
  381. define('debug', default=False, help='run in debug mode')
  382. define('config', help='read configuration file',
  383. callback=lambda path: tornado.options.parse_config_file(path, final=False))
  384. if not options.config and os.path.isfile(DEFAULT_CONF):
  385. tornado.options.parse_config_file(DEFAULT_CONF, final=False)
  386. tornado.options.parse_command_line()
  387. if options.admin_email:
  388. EMAIL_FROM = options.admin_email
  389. if options.debug:
  390. logger.setLevel(logging.DEBUG)
  391. ssl_options = {}
  392. if os.path.isfile(options.ssl_key) and os.path.isfile(options.ssl_cert):
  393. ssl_options = dict(certfile=options.ssl_cert, keyfile=options.ssl_key)
  394. init_params = dict(listen_port=options.port, logger=logger, ssl_options=ssl_options,
  395. scheduler=scheduler)
  396. _reset_schedules_path = r'schedules/reset'
  397. _schedules_path = r'schedules/?(?P<id_>\d+)?'
  398. _history_path = r'history/?(?P<id_>\d+)'
  399. _diff_path = r'diff/(?P<id_>\d+)/(?P<commit_id>[0-9a-f]+)/?(?P<old_commit_id>[0-9a-f]+)?/?'
  400. application = tornado.web.Application([
  401. (r'/api/%s' % _reset_schedules_path, ResetSchedulesHandler, init_params),
  402. (r'/api/v%s/%s' % (API_VERSION, _reset_schedules_path), ResetSchedulesHandler, init_params),
  403. (r'/api/%s' % _schedules_path, SchedulesHandler, init_params),
  404. (r'/api/v%s/%s' % (API_VERSION, _schedules_path), SchedulesHandler, init_params),
  405. (r'/api/%s' % _history_path, HistoryHandler, init_params),
  406. (r'/api/v%s/%s' % (API_VERSION, _history_path), HistoryHandler, init_params),
  407. (r'/api/%s' % _diff_path, DiffHandler, init_params),
  408. (r'/api/v%s/%s' % (API_VERSION, _diff_path), DiffHandler, init_params),
  409. (r'/?(.*)', TemplateHandler, init_params),
  410. ],
  411. static_path=os.path.join(os.path.dirname(__file__), 'dist/static'),
  412. template_path=os.path.join(os.path.dirname(__file__), 'dist/'),
  413. debug=options.debug)
  414. http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options or None)
  415. logger.info('Start serving on %s://%s:%d', 'https' if ssl_options else 'http',
  416. options.address if options.address else '127.0.0.1',
  417. options.port)
  418. http_server.listen(options.port, options.address)
  419. try:
  420. IOLoop.instance().start()
  421. except (KeyboardInterrupt, SystemExit):
  422. pass
  423. if __name__ == '__main__':
  424. serve()