#!/usr/bin/env python3 # -*- coding: utf-8 -*- """toot-my-t-shirt Copyright 2018-2019 Davide Alberani Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import os import re import json import base64 import logging import tempfile import datetime import tornado.httpserver import tornado.ioloop from tornado.options import define, options import tornado.web import tornado.websocket from tornado import gen, escape from mastodon import Mastodon API_VERSION = '1.0' # Keep track of WebSocket connections. _ws_clients = {} re_slashes = re.compile(r'//+') class Socialite: def __init__(self, options, logger=None): self.options = options self.logger = logger self.init() with_mastodon = property(lambda self: self.options.mastodon_token and self.options.mastodon_api_url) with_store = property(lambda self: bool(self.options.store_dir)) def init(self): self.mastodon = None if self.with_store: if not os.path.isdir(self.options.store_dir): os.makedirs(self.options.store_dir) if self.with_mastodon: self.mastodon = Mastodon(access_token=self.options.mastodon_token, api_base_url=self.options.mastodon_api_url) def post_image(self, img, mime_type='image/jpeg', message=None, description=None): errors = [] if message is None: message = self.options.default_message if description is None: description = self.options.default_image_description if self.with_store: try: self.store_image(img, mime_type, message, description) except Exception as e: errors.append(str(e)) if self.with_mastodon: try: self.mastodon_post_image(img, mime_type, message, description) except Exception as e: errors.append(str(e)) if errors and self.logger: for err in errors: self.logger.error("ERROR: %s" % err) return errors def mastodon_post_image(self, img, mime_type, message, description): mdict = self.mastodon.media_post(media_file=img, mime_type=mime_type, description=description) media_id = mdict['id'] self.mastodon.status_post(status=message, media_ids=[media_id], visibility=self.options.mastodon_visibility) def store_image(self, img, mime_type, message, description): suffix = '.jpg' if mime_type: ms = mime_type.split('/', 1) if len(ms) == 2 and ms[1]: suffix = '.' + ms[1] prefix = str(datetime.datetime.now()).replace(' ', 'T') + '-' fd, fname = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=self.options.store_dir) os.write(fd, img) os.close(fd) txt_fname = '%s.info' % fname with open(txt_fname, 'w') as tfd: tfd.write('message: %s\n' % message or '') tfd.write('description: %s\n' % description or '') class BaseException(Exception): """Base class for toot-my-t-shirt custom exceptions. :param message: text message :type message: str :param status: numeric http status code :type status: int""" def __init__(self, message, status=200): super(BaseException, self).__init__(message) self.message = message self.status = status class InputException(BaseException): """Exception raised by errors in input handling.""" pass class BaseHandler(tornado.web.RequestHandler): """Base class for request handlers.""" # A property to access the first value of each argument. arguments = property(lambda self: dict([(k, v[0].decode('utf-8')) for k, v in self.request.arguments.items()])) @property def json_body(self): """Return a dictionary from a JSON body. :returns: a copy of the body arguments :rtype: dict""" return escape.json_decode(self.request.body or '{}') def write_error(self, status_code, **kwargs): """Default error handler.""" if isinstance(kwargs.get('exc_info', (None, None))[1], BaseException): exc = kwargs['exc_info'][1] status_code = exc.status message = exc.message else: message = 'internal error' self.build_error(message, status=status_code) def is_api(self): """Return True if the path is from an API call.""" return self.request.path.startswith('/v%s' % API_VERSION) def initialize(self, **kwargs): """Add every passed (key, value) as attributes of the instance.""" for key, value in kwargs.items(): setattr(self, key, value) def build_error(self, message='', status=200): """Build and write an error message. :param message: textual message :type message: str :param status: HTTP status code :type status: int """ self.set_status(status) self.write({'success': False, 'message': message}) class RootHandler(BaseHandler): """Handler for the / path.""" app_path = os.path.join(os.path.dirname(__file__), "static") @gen.coroutine def get(self, *args, **kwargs): # serve the ./static/index.html file with open(self.app_path + "/index.html", 'r') as fd: self.write(fd.read()) class PublishHandler(BaseHandler): @gen.coroutine def post(self, **kwargs): reply = {'success': True} for info in self.request.files['selfie']: _, content_type = info['filename'], info['content_type'] body = info['body'] b64_image = body.split(b',')[1] image = base64.decodestring(b64_image) try: errors = self.socialite.post_image(image) if errors: reply['success'] = False reply['message'] = '
\n'.join(errors) except Exception as e: reply = {'success': False, 'message': 'something wrong sharing the image'} self.write(reply) class ButtonHandler(BaseHandler): @gen.coroutine def post(self, **kwargs): reply = {'success': True} self.send_ws_message('/ws', json.dumps({"source": "button", "action": "clicked"})) self.write(reply) @gen.coroutine def send_ws_message(self, path, message): """Send a WebSocket message to all the connected clients. :param path: partial path used to build the WebSocket url :type path: str :param message: message to send :type message: str """ try: url = '%s://localhost:%s/ws?uuid=bigredbutton' % ('wss' if self.ssl_options else 'ws', self.global_options.port) self.logger.info(url) req = tornado.httpclient.HTTPRequest(url, validate_cert=False) ws = yield tornado.websocket.websocket_connect(req) ws.write_message(message) ws.close() except Exception as e: self.logger.error('Error yielding WebSocket message: %s', e) class WSHandler(tornado.websocket.WebSocketHandler): def initialize(self, **kwargs): """Add every passed (key, value) as attributes of the instance.""" for key, value in kwargs.items(): setattr(self, key, value) def _clean_url(self, url): url = re_slashes.sub('/', url) ridx = url.rfind('?') if ridx != -1: url = url[:ridx] return url def open(self, *args, **kwargs): try: self.uuid = self.get_argument('uuid') except: self.uuid = None url = self._clean_url(self.request.uri) _ws_clients.setdefault(url, {}) if self.uuid and self.uuid not in _ws_clients[url]: _ws_clients[url][self.uuid] = self self.logger.debug('WSHandler.open %s clients connected' % len(_ws_clients[url])) def on_message(self, message): url = self._clean_url(self.request.uri) self.logger.debug('WSHandler.on_message url: %s' % url) count = 0 _to_delete = set() current_uuid = None try: current_uuid = self.get_argument('uuid') except: pass for uuid, client in _ws_clients.get(url, {}).items(): if uuid and uuid == current_uuid: continue try: client.write_message(message) except: _to_delete.add(uuid) continue count += 1 for uuid in _to_delete: try: del _ws_clients[url][uuid] except KeyError: pass self.logger.debug('WSHandler.on_message sent message to %d clients' % count) def run(): """Run the Tornado web application.""" # command line arguments; can also be written in a configuration file, # specified with the --config argument. define("port", default=9000, help="listen on the given port", type=int) define("address", default='', help="bind the server at the given address", type=str) define("default-message", help="Default message", type=str) define("default-image-description", help="Default image description", type=str) define("mastodon-token", help="Mastodon token", type=str) define("mastodon-api-url", help="Mastodon API URL", type=str) define("mastodon-visibility", help="Mastodon toot visibility", default='unlisted') define("store-dir", help="store images in this directory", type=str) define("ssl_cert", default=os.path.join(os.path.dirname(__file__), 'ssl', 'sb-cert.pem'), help="specify the SSL certificate to use for secure connections") define("ssl_key", default=os.path.join(os.path.dirname(__file__), 'ssl', 'sb-cert.key'), help="specify the SSL private key to use for secure connections") define("debug", default=False, help="run in debug mode") define("config", help="read configuration file", callback=lambda path: tornado.options.parse_config_file(path, final=False)) tornado.options.parse_command_line() logger = logging.getLogger() logger.setLevel(logging.INFO) if options.debug: logger.setLevel(logging.DEBUG) ssl_options = {} if os.path.isfile(options.ssl_key) and os.path.isfile(options.ssl_cert): ssl_options = dict(certfile=options.ssl_cert, keyfile=options.ssl_key) socialite = Socialite(options, logger=logger) init_params = dict(global_options=options, ssl_options=ssl_options, socialite=socialite, logger=logger) _publish_path = r"/publish/?" _button_path = r"/button/?" application = tornado.web.Application([ (r'/ws', WSHandler, init_params), (_publish_path, PublishHandler, init_params), (r'/v%s%s' % (API_VERSION, _publish_path), PublishHandler, init_params), (_button_path, ButtonHandler, init_params), (r'/v%s%s' % (API_VERSION, _button_path), ButtonHandler, init_params), (r"/(?:index.html)?", RootHandler, init_params), (r'/?(.*)', tornado.web.StaticFileHandler, {"path": "static"}) ], static_path=os.path.join(os.path.dirname(__file__), "static"), debug=options.debug) http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options or None) logger.info('Start serving on %s://%s:%d', 'https' if ssl_options else 'http', options.address if options.address else '127.0.0.1', options.port) http_server.listen(options.port, options.address) tornado.ioloop.IOLoop.instance().start() if __name__ == '__main__': try: run() except KeyboardInterrupt: print('Server stopped')