diffido.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import os
  4. import json
  5. import shutil
  6. import urllib
  7. import logging
  8. import datetime
  9. import requests
  10. import subprocess
  11. import multiprocessing
  12. from tornado.ioloop import IOLoop
  13. from apscheduler.triggers.cron import CronTrigger
  14. from apscheduler.schedulers.tornado import TornadoScheduler
  15. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  16. import tornado.httpserver
  17. import tornado.ioloop
  18. import tornado.options
  19. from tornado.options import define, options
  20. import tornado.web
  21. from tornado import gen, escape
  22. JOBS_STORE = 'sqlite:///conf/jobs.db'
  23. API_VERSION = '1.0'
  24. SCHEDULES_FILE = 'conf/schedules.json'
  25. GIT_CMD = 'git'
  26. logger = logging.getLogger()
  27. logger.setLevel(logging.INFO)
  28. def read_schedules():
  29. if not os.path.isfile(SCHEDULES_FILE):
  30. return {'schedules': {}}
  31. try:
  32. with open(SCHEDULES_FILE, 'r') as fd:
  33. return json.loads(fd.read())
  34. except Exception as e:
  35. logger.error('unable to read %s: %s' % (SCHEDULES_FILE, e))
  36. return {'schedules': {}}
  37. def write_schedules(schedules):
  38. with open(SCHEDULES_FILE, 'w') as fd:
  39. fd.write(json.dumps(schedules, indent=2))
  40. def next_id(schedules):
  41. ids = schedules.get('schedules', {}).keys()
  42. if not ids:
  43. return '1'
  44. return str(max([int(i) for i in ids]) + 1)
  45. def get_schedule(id_, addID=True):
  46. schedules = read_schedules()
  47. data = schedules.get('schedules', {}).get(id_, {})
  48. if addID:
  49. data['id'] = str(id_)
  50. return data
  51. def run_job(id_=None, *args, **kwargs):
  52. schedule = get_schedule(id_, addID=False)
  53. url = schedule.get('url')
  54. if not url:
  55. return
  56. print('Running job id:%s title:%s url: %s' % (id_, schedule.get('title', ''), url))
  57. req = requests.get(url, allow_redirects=True, timeout=(30.10, 240))
  58. content = req.text
  59. req_path = urllib.parse.urlparse(req.url).path
  60. base_name = os.path.basename(req_path) or 'index'
  61. def _commit(id_, filename, content):
  62. os.chdir('storage/%s' % id_)
  63. with open(filename, 'w') as fd:
  64. fd.write(content)
  65. p = subprocess.Popen([GIT_CMD, 'add', filename])
  66. p.communicate()
  67. p = subprocess.Popen([GIT_CMD, 'commit', '-m', '%s' % datetime.datetime.utcnow(), '--allow-empty'])
  68. p.communicate()
  69. p = multiprocessing.Process(target=_commit, args=(id_, base_name, content))
  70. p.start()
  71. p.join()
  72. def scheduler_update(scheduler, id_):
  73. schedule = get_schedule(id_, addID=False)
  74. if not schedule:
  75. return
  76. trigger = schedule.get('trigger')
  77. if trigger not in ('interval', 'cron'):
  78. return
  79. args = {}
  80. if trigger == 'interval':
  81. args['trigger'] = 'interval'
  82. for unit in 'weeks', 'days', 'hours', 'minutes', 'seconds':
  83. if 'interval_%s' % unit not in schedule:
  84. continue
  85. args[unit] = int(schedule['interval_%s' % unit])
  86. elif trigger == 'cron':
  87. cron_trigger = CronTrigger.from_crontab(schedule.get('cron_crontab'))
  88. args['trigger'] = cron_trigger
  89. git_create_repo(id_)
  90. scheduler.add_job(run_job, id=id_, replace_existing=True, kwargs={'id_': id_}, **args)
  91. def scheduler_delete(scheduler, id_):
  92. scheduler.remove_job(job_id=id_)
  93. git_delete_repo(id_)
  94. def reset_from_schedules(scheduler):
  95. scheduler.remove_all_jobs()
  96. for key in read_schedules().get('schedules', {}).keys():
  97. scheduler_update(scheduler, id_=key)
  98. def git_create_repo(id_):
  99. repo_dir = 'storage/%s' % id_
  100. if os.path.isdir(repo_dir):
  101. return
  102. p = subprocess.Popen([GIT_CMD, 'init', repo_dir])
  103. p.communicate()
  104. def git_delete_repo(id_):
  105. repo_dir = 'storage/%s' % id_
  106. if not os.path.isdir(repo_dir):
  107. return
  108. shutil.rmtree(repo_dir)
  109. class DiffidoBaseException(Exception):
  110. """Base class for diffido custom exceptions.
  111. :param message: text message
  112. :type message: str
  113. :param status: numeric http status code
  114. :type status: int"""
  115. def __init__(self, message, status=400):
  116. super(DiffidoBaseException, self).__init__(message)
  117. self.message = message
  118. self.status = status
  119. class BaseHandler(tornado.web.RequestHandler):
  120. """Base class for request handlers."""
  121. # Cache currently connected users.
  122. _users_cache = {}
  123. # set of documents we're managing (a collection in MongoDB or a table in a SQL database)
  124. document = None
  125. collection = None
  126. # A property to access the first value of each argument.
  127. arguments = property(lambda self: dict([(k, v[0].decode('utf-8'))
  128. for k, v in self.request.arguments.items()]))
  129. @property
  130. def clean_body(self):
  131. """Return a clean dictionary from a JSON body, suitable for a query on MongoDB.
  132. :returns: a clean copy of the body arguments
  133. :rtype: dict"""
  134. return escape.json_decode(self.request.body or '{}')
  135. def write_error(self, status_code, **kwargs):
  136. """Default error handler."""
  137. if isinstance(kwargs.get('exc_info', (None, None))[1], DiffidoBaseException):
  138. exc = kwargs['exc_info'][1]
  139. status_code = exc.status
  140. message = exc.message
  141. else:
  142. message = 'internal error'
  143. self.build_error(message, status=status_code)
  144. def is_api(self):
  145. """Return True if the path is from an API call."""
  146. return self.request.path.startswith('/v%s' % API_VERSION)
  147. def initialize(self, **kwargs):
  148. """Add every passed (key, value) as attributes of the instance."""
  149. for key, value in kwargs.items():
  150. setattr(self, key, value)
  151. def build_error(self, message='', status=400):
  152. """Build and write an error message.
  153. :param message: textual message
  154. :type message: str
  155. :param status: HTTP status code
  156. :type status: int
  157. """
  158. self.set_status(status)
  159. self.write({'error': True, 'message': message})
  160. def build_success(self, message='', status=200):
  161. """Build and write a success message.
  162. :param message: textual message
  163. :type message: str
  164. :param status: HTTP status code
  165. :type status: int
  166. """
  167. self.set_status(status)
  168. self.write({'error': False, 'message': message})
  169. class SchedulesHandler(BaseHandler):
  170. @gen.coroutine
  171. def get(self, id_=None, *args, **kwargs):
  172. if id_ is not None:
  173. self.write({'schedule': get_schedule(id_)})
  174. return
  175. schedules = read_schedules()
  176. self.write(schedules)
  177. @gen.coroutine
  178. def put(self, id_=None, *args, **kwargs):
  179. if id_ is None:
  180. return self.build_error(message='update action requires an ID')
  181. data = self.clean_body
  182. schedules = read_schedules()
  183. if id_ not in schedules.get('schedules', {}):
  184. return self.build_error(message='schedule %s not found' % id_)
  185. schedules['schedules'][id_] = data
  186. write_schedules(schedules)
  187. scheduler_update(scheduler=self.scheduler, id_=id_)
  188. self.write(get_schedule(id_=id_))
  189. @gen.coroutine
  190. def post(self, *args, **kwargs):
  191. data = self.clean_body
  192. schedules = read_schedules()
  193. id_ = next_id(schedules)
  194. schedules['schedules'][id_] = data
  195. write_schedules(schedules)
  196. scheduler_update(scheduler=self.scheduler, id_=id_)
  197. self.write(get_schedule(id_=id_))
  198. @gen.coroutine
  199. def delete(self, id_=None, *args, **kwargs):
  200. if id_ is None:
  201. return self.build_error(message='an ID must be specified')
  202. schedules = read_schedules()
  203. if id_ in schedules.get('schedules', {}):
  204. del schedules['schedules'][id_]
  205. write_schedules(schedules)
  206. scheduler_delete(scheduler=self.scheduler, id_=id_)
  207. self.build_success(message='removed schedule %s' % id_)
  208. class ResetSchedulesHandler(BaseHandler):
  209. @gen.coroutine
  210. def post(self, *args, **kwargs):
  211. reset_from_schedules(self.scheduler)
  212. class TemplateHandler(BaseHandler):
  213. """Handler for the / path."""
  214. app_path = os.path.join(os.path.dirname(__file__), "dist")
  215. @gen.coroutine
  216. def get(self, *args, **kwargs):
  217. page = 'index.html'
  218. if args and args[0]:
  219. page = args[0].strip('/')
  220. arguments = self.arguments
  221. self.render(page, **arguments)
  222. def serve():
  223. jobstores = {'default': SQLAlchemyJobStore(url=JOBS_STORE)}
  224. scheduler = TornadoScheduler(jobstores=jobstores)
  225. scheduler.start()
  226. define('port', default=3210, help='run on the given port', type=int)
  227. define('address', default='', help='bind the server at the given address', type=str)
  228. define('ssl_cert', default=os.path.join(os.path.dirname(__file__), 'ssl', 'diffido_cert.pem'),
  229. help='specify the SSL certificate to use for secure connections')
  230. define('ssl_key', default=os.path.join(os.path.dirname(__file__), 'ssl', 'diffido_key.pem'),
  231. help='specify the SSL private key to use for secure connections')
  232. define('debug', default=False, help='run in debug mode')
  233. define('config', help='read configuration file',
  234. callback=lambda path: tornado.options.parse_config_file(path, final=False))
  235. tornado.options.parse_command_line()
  236. if options.debug:
  237. logger.setLevel(logging.DEBUG)
  238. ssl_options = {}
  239. if os.path.isfile(options.ssl_key) and os.path.isfile(options.ssl_cert):
  240. ssl_options = dict(certfile=options.ssl_cert, keyfile=options.ssl_key)
  241. init_params = dict(listen_port=options.port, logger=logger, ssl_options=ssl_options,
  242. scheduler=scheduler)
  243. _reset_schedules_path = r'schedules/reset'
  244. _schedules_path = r'schedules/?(?P<id_>\d+)?'
  245. application = tornado.web.Application([
  246. ('/api/%s' % _reset_schedules_path, ResetSchedulesHandler, init_params),
  247. (r'/api/v%s/%s' % (API_VERSION, _reset_schedules_path), ResetSchedulesHandler, init_params),
  248. ('/api/%s' % _schedules_path, SchedulesHandler, init_params),
  249. (r'/api/v%s/%s' % (API_VERSION, _schedules_path), SchedulesHandler, init_params),
  250. (r'/?(.*)', TemplateHandler, init_params),
  251. ],
  252. static_path=os.path.join(os.path.dirname(__file__), 'dist/static'),
  253. template_path=os.path.join(os.path.dirname(__file__), 'dist/'),
  254. debug=options.debug)
  255. http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options or None)
  256. logger.info('Start serving on %s://%s:%d', 'https' if ssl_options else 'http',
  257. options.address if options.address else '127.0.0.1',
  258. options.port)
  259. http_server.listen(options.port, options.address)
  260. try:
  261. IOLoop.instance().start()
  262. except (KeyboardInterrupt, SystemExit):
  263. pass
  264. if __name__ == '__main__':
  265. serve()