123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- """
- This module connects to serial port and exposes the results in stdout.
- """
- import decoder
- import json
- from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
- from typing import Optional
- import atexit
- import time
- import logging
- import sys
- import multiprocessing
- import serial
- import requests
- log = logging.getLogger("inaria")
- def read_auth(buf) -> tuple[str, str]:
- username = buf.readline()
- password = buf.readline()
- buf.close()
- return (username.rstrip(), password.rstrip("\n"))
- class MessageForwarder:
- def __init__(self):
- self.base_url: Optional[str] = None
- self.auth: Optional[tuple[str, str]] = None
- def initialize_from_args(self, args):
- pass
- @property
- def request_params(self) -> dict:
- r = {}
- if self.auth is not None:
- r["auth"] = self.auth
- return r
- def send_log(self, message: decoder.LogMessage):
- requests.post(
- f"{self.base_url}/messages", json=message.asdict(), **self.request_params
- )
- def send_dump(self, message: decoder.DumpMessage):
- requests.post(
- f"{self.base_url}/variables", json=message.asdict(), **self.request_params
- )
- def send_message(self, message: decoder.Message):
- if not self.base_url:
- return
- if isinstance(message, decoder.LogMessage):
- self.send_log(message)
- elif isinstance(message, decoder.DumpMessage):
- self.send_dump(message)
- def get_next_message(serial) -> Optional[bytes]:
- """
- >>> from io import BytesIO
- >>> msg = 'foo\\x01LOG D ciao\\n'
- >>> get_next_message(BytesIO(msg.encode('ascii'))).rstrip().decode('ascii')
- 'LOG D ciao'
- >>> msg = 'foo\\nasd\\x01LOG D ciao\\n'
- >>> get_next_message(BytesIO(msg.encode('ascii'))).rstrip().decode('ascii')
- 'LOG D ciao'
- """
- while True:
- c = serial.read(1)
- log.info("%r", c)
- if not c:
- return None
- if ord(c) == 1:
- break
- return serial.readline() # read a '\n' terminated line
- def loop(serial, forwarder: MessageForwarder, args):
- dec = decoder.Decoder()
- while True:
- line = get_next_message(serial)
- try:
- message = dec.decode(line)
- except Exception:
- continue
- if message is None:
- continue
- obj = (str(type(message)), message.asdict())
- print(json.dumps(obj))
- multiprocessing.Process(target=forwarder.send_message, args=(message,)).start()
- def close_all(serial):
- serial.close()
- def get_parser() -> ArgumentParser:
- parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
- serial_parser = parser.add_argument_group("serial options")
- serial_parser.add_argument(
- "--device",
- default="hwgrep://ttyUSB",
- help="Device path, or URL as in https://pyserial_parser.readthedocs.io/en/latest/url_handlers.html",
- )
- serial_parser.add_argument("--baudrate", type=int, default=115200)
- serial_parser.add_argument(
- "--wait",
- action="store_true",
- default=False,
- help="Wait until serial is found, and retries upon failures",
- )
- http_parser = parser.add_argument_group("http options")
- http_parser.add_argument(
- "--http-endpoint", metavar="URL", help="sth like http://127.0.0.1:8000/"
- )
- http_parser.add_argument(
- "--http-auth-file",
- type=open,
- metavar="FILE",
- help="Path to a file with two lines: first is username, second is password",
- )
- parser.add_argument("--verbose", "-v", action="store_true", default=False)
- return parser
- def main():
- args = get_parser().parse_args()
- logging.basicConfig(level=logging.INFO)
- log.setLevel(logging.INFO if args.verbose else logging.WARN)
- log.info("Connecting...")
- forwarder = MessageForwarder()
- if args.http_endpoint:
- forwarder.base_url = args.http_endpoint
- if args.http_auth_file is not None:
- forwarder.auth = read_auth(args.http_auth_file)
- while True:
- try:
- s = serial.serial_for_url(args.device, do_not_open=True)
- s.baudrate = args.baudrate
- s.open()
- except Exception as exc:
- if not args.wait:
- log.info("Cannot connect: %s", exc)
- sys.exit(1)
- log.info("Cannot connect, will retry...")
- time.sleep(1)
- continue
- log.info("Connected!")
- atexit.register(close_all, s)
- try:
- loop(s, forwarder, args)
- except serial.serialutil.SerialException:
- if not args.wait:
- sys.exit(1)
- if __name__ == "__main__":
- main()
|