seriow/read.py
2023-07-01 14:23:33 +02:00

172 lines
4.7 KiB
Python

"""
This module connects to serial port and exposes the results in stdout.
"""
import decoder
import json
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from typing import Optional
import atexit
import time
import logging
import sys
import multiprocessing
import serial
import requests
log = logging.getLogger("inaria")
def read_auth(buf) -> tuple[str, str]:
username = buf.readline()
password = buf.readline()
buf.close()
return (username.rstrip(), password.rstrip("\n"))
class MessageForwarder:
def __init__(self):
self.base_url: Optional[str] = None
self.auth: Optional[tuple[str, str]] = None
def initialize_from_args(self, args):
pass
@property
def request_params(self) -> dict:
r = {}
if self.auth is not None:
r["auth"] = self.auth
return r
def send_log(self, message: decoder.LogMessage):
requests.post(
f"{self.base_url}/messages", json=message.asdict(), **self.request_params
)
def send_dump(self, message: decoder.DumpMessage):
requests.post(
f"{self.base_url}/variables", json=message.asdict(), **self.request_params
)
def send_message(self, message: decoder.Message):
if not self.base_url:
return
if isinstance(message, decoder.LogMessage):
self.send_log(message)
elif isinstance(message, decoder.DumpMessage):
self.send_dump(message)
def get_next_message(serial) -> Optional[bytes]:
"""
>>> from io import BytesIO
>>> msg = 'foo\\x01LOG D ciao\\n'
>>> get_next_message(BytesIO(msg.encode('ascii'))).rstrip().decode('ascii')
'LOG D ciao'
>>> msg = 'foo\\nasd\\x01LOG D ciao\\n'
>>> get_next_message(BytesIO(msg.encode('ascii'))).rstrip().decode('ascii')
'LOG D ciao'
"""
while True:
c = serial.read(1)
log.info("%r", c)
if not c:
return None
if ord(c) == 1:
break
return serial.readline() # read a '\n' terminated line
def loop(serial, forwarder: MessageForwarder, args):
dec = decoder.Decoder()
while True:
line = get_next_message(serial)
try:
message = dec.decode(line)
except Exception:
continue
if message is None:
continue
obj = (str(type(message)), message.asdict())
print(json.dumps(obj))
multiprocessing.Process(target=forwarder.send_message, args=(message,)).start()
def close_all(serial):
serial.close()
def get_parser() -> ArgumentParser:
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
serial_parser = parser.add_argument_group("serial options")
serial_parser.add_argument(
"--device",
default="hwgrep://ttyUSB",
help="Device path, or URL as in https://pyserial_parser.readthedocs.io/en/latest/url_handlers.html",
)
serial_parser.add_argument("--baudrate", type=int, default=115200)
serial_parser.add_argument(
"--wait",
action="store_true",
default=False,
help="Wait until serial is found, and retries upon failures",
)
http_parser = parser.add_argument_group("http options")
http_parser.add_argument(
"--http-endpoint", metavar="URL", help="sth like http://127.0.0.1:8000/"
)
http_parser.add_argument(
"--http-auth-file",
type=open,
metavar="FILE",
help="Path to a file with two lines: first is username, second is password",
)
parser.add_argument("--verbose", "-v", action="store_true", default=False)
return parser
def main():
args = get_parser().parse_args()
logging.basicConfig(level=logging.INFO)
log.setLevel(logging.INFO if args.verbose else logging.WARN)
log.info("Connecting...")
forwarder = MessageForwarder()
if args.http_endpoint:
forwarder.base_url = args.http_endpoint
if args.http_auth_file is not None:
forwarder.auth = read_auth(args.http_auth_file)
while True:
try:
s = serial.serial_for_url(args.device, do_not_open=True)
s.baudrate = args.baudrate
s.open()
except Exception as exc:
if not args.wait:
log.info("Cannot connect: %s", exc)
sys.exit(1)
log.info("Cannot connect, will retry...")
time.sleep(1)
continue
log.info("Connected!")
atexit.register(close_all, s)
try:
loop(s, forwarder, args)
except serial.serialutil.SerialException:
if not args.wait:
sys.exit(1)
if __name__ == "__main__":
main()