#!/usr/bin/env python3 """ This module connects to serial port and exposes the results in stdout. """ import decoder import json from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from typing import Optional import atexit import time import logging import sys import multiprocessing import serial import requests log = logging.getLogger("seriow") def read_auth(buf) -> tuple[str, str]: username = buf.readline() password = buf.readline() buf.close() return (username.rstrip(), password.rstrip("\n")) class MessageForwarder: def __init__(self): self.base_url: Optional[str] = None self.auth: Optional[tuple[str, str]] = None def initialize_from_args(self, args): pass @property def request_params(self) -> dict: r = {} if self.auth is not None: r["auth"] = self.auth return r def send_log(self, message: decoder.LogMessage): requests.post( f"{self.base_url}/messages", json=message.asdict(), **self.request_params ) def send_dump(self, message: decoder.DumpMessage): requests.post( f"{self.base_url}/variables", json=message.asdict(), **self.request_params ) def send_message(self, message: decoder.Message): if not self.base_url: return if isinstance(message, decoder.LogMessage): self.send_log(message) elif isinstance(message, decoder.DumpMessage): self.send_dump(message) def get_next_message(serial) -> Optional[bytes]: """ >>> from io import BytesIO >>> msg = 'foo\\x01LOG D ciao\\n' >>> get_next_message(BytesIO(msg.encode('ascii'))).rstrip().decode('ascii') 'LOG D ciao' >>> msg = 'foo\\nasd\\x01LOG D ciao\\n' >>> get_next_message(BytesIO(msg.encode('ascii'))).rstrip().decode('ascii') 'LOG D ciao' """ while True: c = serial.read(1) log.info("%r", c) if not c: return None if ord(c) == 1: break return serial.readline() # read a '\n' terminated line def loop(serial, forwarder: MessageForwarder, args): dec = decoder.Decoder() while True: line = get_next_message(serial) try: message = dec.decode(line) except Exception: continue if message is None: continue obj = (str(type(message)), message.asdict()) print(json.dumps(obj)) multiprocessing.Process(target=forwarder.send_message, args=(message,)).start() def close_all(serial): serial.close() def get_parser() -> ArgumentParser: parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) serial_parser = parser.add_argument_group("serial options") serial_parser.add_argument( "--device", default="hwgrep://ttyUSB", help="Device path, or URL as in https://pyserial_parser.readthedocs.io/en/latest/url_handlers.html", ) serial_parser.add_argument("--baudrate", type=int, default=115200) serial_parser.add_argument( "--wait", action="store_true", default=False, help="Wait until serial is found, and retries upon failures", ) http_parser = parser.add_argument_group("http options") http_parser.add_argument( "--http-endpoint", metavar="URL", help="sth like http://127.0.0.1:8000/" ) http_parser.add_argument( "--http-auth-file", type=open, metavar="FILE", help="Path to a file with two lines: first is username, second is password", ) parser.add_argument("--verbose", "-v", action="store_true", default=False) return parser def main(): args = get_parser().parse_args() logging.basicConfig(level=logging.INFO) log.setLevel(logging.INFO if args.verbose else logging.WARN) log.info("Connecting...") forwarder = MessageForwarder() if args.http_endpoint: forwarder.base_url = args.http_endpoint if args.http_auth_file is not None: forwarder.auth = read_auth(args.http_auth_file) while True: try: s = serial.serial_for_url(args.device, do_not_open=True) s.baudrate = args.baudrate s.open() except Exception as exc: if not args.wait: log.info("Cannot connect: %s", exc) sys.exit(1) log.info("Cannot connect, will retry...") time.sleep(1) continue log.info("Connected!") atexit.register(close_all, s) try: loop(s, forwarder, args) except serial.serialutil.SerialException: if not args.wait: sys.exit(1) if __name__ == "__main__": main()