123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """toot-my-t-shirt
- Copyright 2018-2019 Davide Alberani <da@mimante.net>
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- """
- import os
- import re
- import json
- import base64
- import logging
- import tempfile
- import datetime
- import tornado.httpserver
- import tornado.ioloop
- from tornado.options import define, options
- import tornado.web
- import tornado.websocket
- from tornado import gen, escape
- from mastodon import Mastodon
- API_VERSION = '1.0'
- # Keep track of WebSocket connections.
- _ws_clients = {}
- re_slashes = re.compile(r'//+')
- class Socialite:
- def __init__(self, options, logger=None):
- self.options = options
- self.logger = logger
- self.init()
- with_mastodon = property(lambda self: self.options.mastodon_token and
- self.options.mastodon_api_url)
- with_store = property(lambda self: bool(self.options.store_dir))
- def init(self):
- self.mastodon = None
- if self.with_store:
- if not os.path.isdir(self.options.store_dir):
- os.makedirs(self.options.store_dir)
- if self.with_mastodon:
- self.mastodon = Mastodon(access_token=self.options.mastodon_token,
- api_base_url=self.options.mastodon_api_url)
- def post_image(self, img, mime_type='image/jpeg', message=None, description=None):
- errors = []
- if message is None:
- message = self.options.default_message
- if description is None:
- description = self.options.default_image_description
- if self.with_store:
- try:
- self.store_image(img, mime_type, message, description)
- except Exception as e:
- errors.append(str(e))
- if self.with_mastodon:
- try:
- self.mastodon_post_image(img, mime_type, message, description)
- except Exception as e:
- errors.append(str(e))
- if errors and self.logger:
- for err in errors:
- self.logger.error("ERROR: %s" % err)
- return errors
- def mastodon_post_image(self, img, mime_type, message, description):
- mdict = self.mastodon.media_post(media_file=img, mime_type=mime_type, description=description)
- media_id = mdict['id']
- self.mastodon.status_post(status=message, media_ids=[media_id],
- visibility=self.options.mastodon_visibility)
- def store_image(self, img, mime_type, message, description):
- suffix = '.jpg'
- if mime_type:
- ms = mime_type.split('/', 1)
- if len(ms) == 2 and ms[1]:
- suffix = '.' + ms[1]
- prefix = str(datetime.datetime.now()).replace(' ', 'T') + '-'
- fd, fname = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=self.options.store_dir)
- os.write(fd, img)
- os.close(fd)
- txt_fname = '%s.info' % fname
- with open(txt_fname, 'w') as tfd:
- tfd.write('message: %s\n' % message or '')
- tfd.write('description: %s\n' % description or '')
- class BaseException(Exception):
- """Base class for toot-my-t-shirt custom exceptions.
- :param message: text message
- :type message: str
- :param status: numeric http status code
- :type status: int"""
- def __init__(self, message, status=200):
- super(BaseException, self).__init__(message)
- self.message = message
- self.status = status
- class InputException(BaseException):
- """Exception raised by errors in input handling."""
- pass
- class BaseHandler(tornado.web.RequestHandler):
- """Base class for request handlers."""
- # A property to access the first value of each argument.
- arguments = property(lambda self: dict([(k, v[0].decode('utf-8'))
- for k, v in self.request.arguments.items()]))
- @property
- def json_body(self):
- """Return a dictionary from a JSON body.
- :returns: a copy of the body arguments
- :rtype: dict"""
- return escape.json_decode(self.request.body or '{}')
- def write_error(self, status_code, **kwargs):
- """Default error handler."""
- if isinstance(kwargs.get('exc_info', (None, None))[1], BaseException):
- exc = kwargs['exc_info'][1]
- status_code = exc.status
- message = exc.message
- else:
- message = 'internal error'
- self.build_error(message, status=status_code)
- def is_api(self):
- """Return True if the path is from an API call."""
- return self.request.path.startswith('/v%s' % API_VERSION)
- def initialize(self, **kwargs):
- """Add every passed (key, value) as attributes of the instance."""
- for key, value in kwargs.items():
- setattr(self, key, value)
- def build_error(self, message='', status=200):
- """Build and write an error message.
- :param message: textual message
- :type message: str
- :param status: HTTP status code
- :type status: int
- """
- self.set_status(status)
- self.write({'success': False, 'message': message})
- class RootHandler(BaseHandler):
- """Handler for the / path."""
- app_path = os.path.join(os.path.dirname(__file__), "static")
- @gen.coroutine
- def get(self, *args, **kwargs):
- # serve the ./static/index.html file
- with open(self.app_path + "/index.html", 'r') as fd:
- self.write(fd.read())
- class PublishHandler(BaseHandler):
- @gen.coroutine
- def post(self, **kwargs):
- reply = {'success': True}
- for info in self.request.files['selfie']:
- _, content_type = info['filename'], info['content_type']
- body = info['body']
- b64_image = body.split(b',')[1]
- image = base64.decodestring(b64_image)
- try:
- errors = self.socialite.post_image(image)
- if errors:
- reply['success'] = False
- reply['message'] = '<br>\n'.join(errors)
- except Exception as e:
- reply = {'success': False, 'message': 'something wrong sharing the image'}
- self.write(reply)
- class ButtonHandler(BaseHandler):
- @gen.coroutine
- def post(self, **kwargs):
- reply = {'success': True}
- self.send_ws_message('/ws', json.dumps({"source": "button", "action": "clicked"}))
- self.write(reply)
- @gen.coroutine
- def send_ws_message(self, path, message):
- """Send a WebSocket message to all the connected clients.
- :param path: partial path used to build the WebSocket url
- :type path: str
- :param message: message to send
- :type message: str
- """
- try:
- url = '%s://localhost:%s/ws?uuid=bigredbutton' % ('wss' if self.ssl_options else 'ws',
- self.global_options.port)
- self.logger.info(url)
- req = tornado.httpclient.HTTPRequest(url, validate_cert=False)
- ws = yield tornado.websocket.websocket_connect(req)
- ws.write_message(message)
- ws.close()
- except Exception as e:
- self.logger.error('Error yielding WebSocket message: %s', e)
- class WSHandler(tornado.websocket.WebSocketHandler):
- def initialize(self, **kwargs):
- """Add every passed (key, value) as attributes of the instance."""
- for key, value in kwargs.items():
- setattr(self, key, value)
- def _clean_url(self, url):
- url = re_slashes.sub('/', url)
- ridx = url.rfind('?')
- if ridx != -1:
- url = url[:ridx]
- return url
- def open(self, *args, **kwargs):
- try:
- self.uuid = self.get_argument('uuid')
- except:
- self.uuid = None
- url = self._clean_url(self.request.uri)
- _ws_clients.setdefault(url, {})
- if self.uuid and self.uuid not in _ws_clients[url]:
- _ws_clients[url][self.uuid] = self
- self.logger.debug('WSHandler.open %s clients connected' % len(_ws_clients[url]))
- def on_message(self, message):
- url = self._clean_url(self.request.uri)
- self.logger.debug('WSHandler.on_message url: %s' % url)
- count = 0
- _to_delete = set()
- current_uuid = None
- try:
- current_uuid = self.get_argument('uuid')
- except:
- pass
- for uuid, client in _ws_clients.get(url, {}).items():
- if uuid and uuid == current_uuid:
- continue
- try:
- client.write_message(message)
- except:
- _to_delete.add(uuid)
- continue
- count += 1
- for uuid in _to_delete:
- try:
- del _ws_clients[url][uuid]
- except KeyError:
- pass
- self.logger.debug('WSHandler.on_message sent message to %d clients' % count)
- def run():
- """Run the Tornado web application."""
- # command line arguments; can also be written in a configuration file,
- # specified with the --config argument.
- define("port", default=9000, help="listen on the given port", type=int)
- define("address", default='', help="bind the server at the given address", type=str)
- define("default-message", help="Default message", type=str)
- define("default-image-description", help="Default image description", type=str)
- define("mastodon-token", help="Mastodon token", type=str)
- define("mastodon-api-url", help="Mastodon API URL", type=str)
- define("mastodon-visibility", help="Mastodon toot visibility", default='unlisted')
- define("store-dir", help="store images in this directory", type=str)
- define("ssl_cert", default=os.path.join(os.path.dirname(__file__), 'ssl', 'sb-cert.pem'),
- help="specify the SSL certificate to use for secure connections")
- define("ssl_key", default=os.path.join(os.path.dirname(__file__), 'ssl', 'sb-cert.key'),
- help="specify the SSL private key to use for secure connections")
- define("debug", default=False, help="run in debug mode")
- define("config", help="read configuration file",
- callback=lambda path: tornado.options.parse_config_file(path, final=False))
- tornado.options.parse_command_line()
- logger = logging.getLogger()
- logger.setLevel(logging.INFO)
- if options.debug:
- logger.setLevel(logging.DEBUG)
- ssl_options = {}
- if os.path.isfile(options.ssl_key) and os.path.isfile(options.ssl_cert):
- ssl_options = dict(certfile=options.ssl_cert, keyfile=options.ssl_key)
- socialite = Socialite(options, logger=logger)
- init_params = dict(global_options=options, ssl_options=ssl_options, socialite=socialite, logger=logger)
- _publish_path = r"/publish/?"
- _button_path = r"/button/?"
- application = tornado.web.Application([
- (r'/ws', WSHandler, init_params),
- (_publish_path, PublishHandler, init_params),
- (r'/v%s%s' % (API_VERSION, _publish_path), PublishHandler, init_params),
- (_button_path, ButtonHandler, init_params),
- (r'/v%s%s' % (API_VERSION, _button_path), ButtonHandler, init_params),
- (r"/(?:index.html)?", RootHandler, init_params),
- (r'/?(.*)', tornado.web.StaticFileHandler, {"path": "static"})
- ],
- static_path=os.path.join(os.path.dirname(__file__), "static"),
- debug=options.debug)
- http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options or None)
- logger.info('Start serving on %s://%s:%d', 'https' if ssl_options else 'http',
- options.address if options.address else '127.0.0.1',
- options.port)
- http_server.listen(options.port, options.address)
- tornado.ioloop.IOLoop.instance().start()
- if __name__ == '__main__':
- try:
- run()
- except KeyboardInterrupt:
- print('Server stopped')
|