341 lines
12 KiB
Python
Executable file
341 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""toot-my-t-shirt
|
|
|
|
Copyright 2018-2019 Davide Alberani <da@mimante.net>
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import json
|
|
import base64
|
|
import logging
|
|
import tempfile
|
|
import datetime
|
|
|
|
import tornado.httpserver
|
|
import tornado.ioloop
|
|
from tornado.options import define, options
|
|
import tornado.web
|
|
import tornado.websocket
|
|
from tornado import gen, escape
|
|
|
|
from mastodon import Mastodon
|
|
|
|
API_VERSION = '1.0'
|
|
|
|
# Keep track of WebSocket connections.
|
|
_ws_clients = {}
|
|
|
|
re_slashes = re.compile(r'//+')
|
|
|
|
class Socialite:
|
|
def __init__(self, options, logger=None):
|
|
self.options = options
|
|
self.logger = logger
|
|
self.init()
|
|
|
|
with_mastodon = property(lambda self: self.options.mastodon_token and
|
|
self.options.mastodon_api_url)
|
|
|
|
with_store = property(lambda self: bool(self.options.store_dir))
|
|
|
|
def init(self):
|
|
self.mastodon = None
|
|
if self.with_store:
|
|
if not os.path.isdir(self.options.store_dir):
|
|
os.makedirs(self.options.store_dir)
|
|
if self.with_mastodon:
|
|
self.mastodon = Mastodon(access_token=self.options.mastodon_token,
|
|
api_base_url=self.options.mastodon_api_url)
|
|
|
|
def post_image(self, img, mime_type='image/jpeg', message=None, description=None):
|
|
errors = []
|
|
if message is None:
|
|
message = self.options.default_message
|
|
if description is None:
|
|
description = self.options.default_image_description
|
|
if self.with_store:
|
|
try:
|
|
self.store_image(img, mime_type, message, description)
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
if self.with_mastodon:
|
|
try:
|
|
self.mastodon_post_image(img, mime_type, message, description)
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
if errors and self.logger:
|
|
for err in errors:
|
|
self.logger.error("ERROR: %s" % err)
|
|
return errors
|
|
|
|
def mastodon_post_image(self, img, mime_type, message, description):
|
|
mdict = self.mastodon.media_post(media_file=img, mime_type=mime_type, description=description)
|
|
media_id = mdict['id']
|
|
self.mastodon.status_post(status=message, media_ids=[media_id],
|
|
visibility=self.options.mastodon_visibility)
|
|
|
|
def store_image(self, img, mime_type, message, description):
|
|
suffix = '.jpg'
|
|
if mime_type:
|
|
ms = mime_type.split('/', 1)
|
|
if len(ms) == 2 and ms[1]:
|
|
suffix = '.' + ms[1]
|
|
prefix = str(datetime.datetime.now()).replace(' ', 'T') + '-'
|
|
fd, fname = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=self.options.store_dir)
|
|
os.write(fd, img)
|
|
os.close(fd)
|
|
txt_fname = '%s.info' % fname
|
|
with open(txt_fname, 'w') as tfd:
|
|
tfd.write('message: %s\n' % message or '')
|
|
tfd.write('description: %s\n' % description or '')
|
|
|
|
|
|
class BaseException(Exception):
|
|
"""Base class for toot-my-t-shirt custom exceptions.
|
|
|
|
:param message: text message
|
|
:type message: str
|
|
:param status: numeric http status code
|
|
:type status: int"""
|
|
def __init__(self, message, status=200):
|
|
super(BaseException, self).__init__(message)
|
|
self.message = message
|
|
self.status = status
|
|
|
|
|
|
class InputException(BaseException):
|
|
"""Exception raised by errors in input handling."""
|
|
pass
|
|
|
|
|
|
class BaseHandler(tornado.web.RequestHandler):
|
|
"""Base class for request handlers."""
|
|
|
|
# A property to access the first value of each argument.
|
|
arguments = property(lambda self: dict([(k, v[0].decode('utf-8'))
|
|
for k, v in self.request.arguments.items()]))
|
|
|
|
@property
|
|
def json_body(self):
|
|
"""Return a dictionary from a JSON body.
|
|
|
|
:returns: a copy of the body arguments
|
|
:rtype: dict"""
|
|
return escape.json_decode(self.request.body or '{}')
|
|
|
|
def write_error(self, status_code, **kwargs):
|
|
"""Default error handler."""
|
|
if isinstance(kwargs.get('exc_info', (None, None))[1], BaseException):
|
|
exc = kwargs['exc_info'][1]
|
|
status_code = exc.status
|
|
message = exc.message
|
|
else:
|
|
message = 'internal error'
|
|
self.build_error(message, status=status_code)
|
|
|
|
def is_api(self):
|
|
"""Return True if the path is from an API call."""
|
|
return self.request.path.startswith('/v%s' % API_VERSION)
|
|
|
|
def initialize(self, **kwargs):
|
|
"""Add every passed (key, value) as attributes of the instance."""
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
|
|
def build_error(self, message='', status=200):
|
|
"""Build and write an error message.
|
|
|
|
:param message: textual message
|
|
:type message: str
|
|
:param status: HTTP status code
|
|
:type status: int
|
|
"""
|
|
self.set_status(status)
|
|
self.write({'success': False, 'message': message})
|
|
|
|
|
|
class RootHandler(BaseHandler):
|
|
"""Handler for the / path."""
|
|
app_path = os.path.join(os.path.dirname(__file__), "static")
|
|
|
|
@gen.coroutine
|
|
def get(self, *args, **kwargs):
|
|
# serve the ./static/index.html file
|
|
with open(self.app_path + "/index.html", 'r') as fd:
|
|
self.write(fd.read())
|
|
|
|
|
|
class PublishHandler(BaseHandler):
|
|
@gen.coroutine
|
|
def post(self, **kwargs):
|
|
reply = {'success': True}
|
|
for info in self.request.files['selfie']:
|
|
_, content_type = info['filename'], info['content_type']
|
|
body = info['body']
|
|
b64_image = body.split(b',')[1]
|
|
image = base64.decodestring(b64_image)
|
|
try:
|
|
errors = self.socialite.post_image(image)
|
|
if errors:
|
|
reply['success'] = False
|
|
reply['message'] = '<br>\n'.join(errors)
|
|
except Exception as e:
|
|
reply = {'success': False, 'message': 'something wrong sharing the image'}
|
|
self.write(reply)
|
|
|
|
|
|
class ButtonHandler(BaseHandler):
|
|
@gen.coroutine
|
|
def post(self, **kwargs):
|
|
reply = {'success': True}
|
|
self.send_ws_message('/ws', json.dumps({"source": "button", "action": "clicked"}))
|
|
self.write(reply)
|
|
|
|
@gen.coroutine
|
|
def send_ws_message(self, path, message):
|
|
"""Send a WebSocket message to all the connected clients.
|
|
|
|
:param path: partial path used to build the WebSocket url
|
|
:type path: str
|
|
:param message: message to send
|
|
:type message: str
|
|
"""
|
|
try:
|
|
url = '%s://localhost:%s/ws?uuid=bigredbutton' % ('wss' if self.ssl_options else 'ws',
|
|
self.global_options.port)
|
|
self.logger.info(url)
|
|
req = tornado.httpclient.HTTPRequest(url, validate_cert=False)
|
|
ws = yield tornado.websocket.websocket_connect(req)
|
|
ws.write_message(message)
|
|
ws.close()
|
|
except Exception as e:
|
|
self.logger.error('Error yielding WebSocket message: %s', e)
|
|
|
|
|
|
class WSHandler(tornado.websocket.WebSocketHandler):
|
|
def initialize(self, **kwargs):
|
|
"""Add every passed (key, value) as attributes of the instance."""
|
|
for key, value in kwargs.items():
|
|
setattr(self, key, value)
|
|
|
|
def _clean_url(self, url):
|
|
url = re_slashes.sub('/', url)
|
|
ridx = url.rfind('?')
|
|
if ridx != -1:
|
|
url = url[:ridx]
|
|
return url
|
|
|
|
def open(self, *args, **kwargs):
|
|
try:
|
|
self.uuid = self.get_argument('uuid')
|
|
except:
|
|
self.uuid = None
|
|
url = self._clean_url(self.request.uri)
|
|
_ws_clients.setdefault(url, {})
|
|
if self.uuid and self.uuid not in _ws_clients[url]:
|
|
_ws_clients[url][self.uuid] = self
|
|
self.logger.debug('WSHandler.open %s clients connected' % len(_ws_clients[url]))
|
|
|
|
def on_message(self, message):
|
|
url = self._clean_url(self.request.uri)
|
|
self.logger.debug('WSHandler.on_message url: %s' % url)
|
|
count = 0
|
|
_to_delete = set()
|
|
current_uuid = None
|
|
try:
|
|
current_uuid = self.get_argument('uuid')
|
|
except:
|
|
pass
|
|
for uuid, client in _ws_clients.get(url, {}).items():
|
|
if uuid and uuid == current_uuid:
|
|
continue
|
|
try:
|
|
client.write_message(message)
|
|
except:
|
|
_to_delete.add(uuid)
|
|
continue
|
|
count += 1
|
|
for uuid in _to_delete:
|
|
try:
|
|
del _ws_clients[url][uuid]
|
|
except KeyError:
|
|
pass
|
|
self.logger.debug('WSHandler.on_message sent message to %d clients' % count)
|
|
|
|
|
|
def run():
|
|
"""Run the Tornado web application."""
|
|
# command line arguments; can also be written in a configuration file,
|
|
# specified with the --config argument.
|
|
define("port", default=9000, help="listen on the given port", type=int)
|
|
define("address", default='', help="bind the server at the given address", type=str)
|
|
|
|
define("default-message", help="Default message", type=str)
|
|
define("default-image-description", help="Default image description", type=str)
|
|
|
|
define("mastodon-token", help="Mastodon token", type=str)
|
|
define("mastodon-api-url", help="Mastodon API URL", type=str)
|
|
define("mastodon-visibility", help="Mastodon toot visibility", default='unlisted')
|
|
|
|
define("store-dir", help="store images in this directory", type=str)
|
|
|
|
define("ssl_cert", default=os.path.join(os.path.dirname(__file__), 'ssl', 'sb-cert.pem'),
|
|
help="specify the SSL certificate to use for secure connections")
|
|
define("ssl_key", default=os.path.join(os.path.dirname(__file__), 'ssl', 'sb-cert.key'),
|
|
help="specify the SSL private key to use for secure connections")
|
|
|
|
define("debug", default=False, help="run in debug mode")
|
|
define("config", help="read configuration file",
|
|
callback=lambda path: tornado.options.parse_config_file(path, final=False))
|
|
tornado.options.parse_command_line()
|
|
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
if options.debug:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
ssl_options = {}
|
|
if os.path.isfile(options.ssl_key) and os.path.isfile(options.ssl_cert):
|
|
ssl_options = dict(certfile=options.ssl_cert, keyfile=options.ssl_key)
|
|
|
|
socialite = Socialite(options, logger=logger)
|
|
init_params = dict(global_options=options, ssl_options=ssl_options, socialite=socialite, logger=logger)
|
|
|
|
_publish_path = r"/publish/?"
|
|
_button_path = r"/button/?"
|
|
application = tornado.web.Application([
|
|
(r'/ws', WSHandler, init_params),
|
|
(_publish_path, PublishHandler, init_params),
|
|
(r'/v%s%s' % (API_VERSION, _publish_path), PublishHandler, init_params),
|
|
(_button_path, ButtonHandler, init_params),
|
|
(r'/v%s%s' % (API_VERSION, _button_path), ButtonHandler, init_params),
|
|
(r"/(?:index.html)?", RootHandler, init_params),
|
|
(r'/?(.*)', tornado.web.StaticFileHandler, {"path": "static"})
|
|
],
|
|
static_path=os.path.join(os.path.dirname(__file__), "static"),
|
|
debug=options.debug)
|
|
http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options or None)
|
|
logger.info('Start serving on %s://%s:%d', 'https' if ssl_options else 'http',
|
|
options.address if options.address else '127.0.0.1',
|
|
options.port)
|
|
http_server.listen(options.port, options.address)
|
|
tornado.ioloop.IOLoop.instance().start()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
run()
|
|
except KeyboardInterrupt:
|
|
print('Server stopped')
|