commit a143faecb26071a3b456b7bed70fa6d2beb1830c Author: boyska Date: Mon Dec 25 00:16:33 2023 +0100 initial commit diff --git a/sergio.py b/sergio.py new file mode 100755 index 0000000..959fff7 --- /dev/null +++ b/sergio.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python3 + +import sys +import serial +import socket +import argparse +from typing import Union +import logging +import json +import enum + + +VARIABLES = [ + "ADR", + "AF", + "AFCH", + "CC", + "COMSPD", + "CT", + "DATE", + "DI", + "DPS2MSG", + "DTTMOUT", + "ECC", + "ECCEN", + "EQTEXT1", + "EXTSYNC", + "GRPSEQ", + "LABPER", + "LIC", + "LEVEL", + "LTO", + "MJD", + "MS", + "MSGLIST", + "PHASE", + "PILOT", + "PI", + "PROGRAM", + "PS", + "PSNMAIN", + "PSW", + "PTY", + "PTYN", + "PTYNEN", + "RDS2MOD", + "RDSGEN", + "READWEB", + "RSTDPS", + "RT2MSG", + "RT2TYPE", + "RTP", + "RTPRUN", + "RTPER", + "RTTYPE", + "RT1", + "RT1EN", + "RT2", + "RT2EN", + "SCRLSPD", + "SEN", + "SHORTRT", + "SITE", + "SLIST", + "SPEED", + "SPSPER", + "TA", + "TATMOUT", + "TIME", + "TP", + "TPS", + "UECP", + "VER", +] + + +class PiraSerial: + def __init__(self, serial: serial.Serial): + self.log = logging.getLogger("pira.backend.serial") + self.serial = serial + + def send(self, content: bytes): + self.log.debug(f"-> {content}") + self.serial.write(content) + + def recv(self) -> bytes: + content = self.serial.read_until(b"\n") + self.log.debug(f"<- {content}") + return content + + +class TCPClient: + def __init__(self, socket: socket.socket): + self.log = logging.getLogger("pira.backend.tcpclient") + self.socket = socket + + def connect(self): + pass + + def send(self, content: bytes): + self.log.debug(f"-> {content}") + return self.socket.send(content) + + def recv(self) -> bytes: + content = self.socket.recv(1) + self.log.debug(f"<- {content}") + return content + + +Backends = Union[PiraSerial, TCPClient] + + +class LookupEnum: + @classmethod + def lookup(cls, value): + for field in cls: + if field.value == value: + return field + raise ValueError() + + +class CommandError(Exception): + pass + + +class UnknownCommandError(CommandError): + pass + + +class InvalidArgumentError(CommandError): + pass + + +class ProcessedPartiallyError(CommandError): + pass + + +class CommandStatus(LookupEnum, enum.StrEnum): + SUCCESS = "+" + UNKNOWN_COMMAND = "!" + INVALID_ARGUMENT = "-" + PROCESSED_PARTIALLY = "/" + + def to_exception(self): + error_classes = { + self.UNKNOWN_COMMAND: UnknownCommandError, + self.INVALID_ARGUMENT: InvalidArgumentError, + self.PROCESSED_PARTIALLY: ProcessedPartiallyError, + } + exc_cls = error_classes.get(self, None) + if exc_cls is not None: + return exc_cls() + + +class Pira: + END_OF_MESSAGE = "".join([chr(13), chr(10), chr(13), chr(10)]) + + def __init__(self, backend: Backends): + self.log = logging.getLogger("Pira") + self.backend = backend + self.set_variable("ECHO", "0") + + def recv_response(self) -> str: + def msg_complete(s: str) -> bool: + for field in CommandStatus: + if s.endswith(field.value + self.END_OF_MESSAGE): + return True + return False + + ret = "" + while True: + ret += self.backend.recv().decode("ascii") + if msg_complete(ret): + value = ret.removesuffix(self.END_OF_MESSAGE).removeprefix("\r\n") + status = CommandStatus.lookup(value[-1]) + if status != CommandStatus.SUCCESS: + raise status.to_exception() + return value[:-1].removesuffix("\r\n") + + def execute_command(self, command: str): + self.backend.send(command.encode("ascii") + b"\r") + return self.recv_response() + + def read_variable(self, variable: str): + return self.execute_command(variable) + + def set_variable(self, variable: str, value: str): + cmd = f"{variable}={value}" + self.execute_command(cmd) + + def save_variable(self, variable: str): + cmd = f"*{variable}" + self.execute_command(cmd) + + +def get_parser(): + p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + serial = p.add_argument_group("serial connection") + serial.add_argument("--baudrate", type=int, default=19200) + serial.add_argument("--device", default="/dev/ttyUSB0") + tcp = p.add_argument_group("tcp connection") + tcp.add_argument("--port", type=int) + tcp.add_argument("--address") + + log = p.add_argument_group("logging") + log.add_argument("--log-connection", action="store_true", default=False) + + out = p.add_argument_group("output") + out.add_argument("--output", choices=["json", "text", "repr"], default="text") + + sub = p.add_subparsers() + + get = sub.add_parser("get") + get.set_defaults(func=main_get) + get.add_argument("variable") + + setvar = sub.add_parser("set") + setvar.set_defaults(func=main_set) + setvar.add_argument("variable") + setvar.add_argument("value") + + save = sub.add_parser("save") + save.set_defaults(func=main_save) + save.add_argument("variable") + + dump = sub.add_parser("dump") + dump.set_defaults(func=main_dump) + + synctime = sub.add_parser("synctime", + description="Set the exact time on the encoder") + synctime.set_defaults(func=main_synctime) + synctime.add_argument("timezone", type=str, help="Something like Africa/Cairo") + synctime.add_argument("--save", action="store_true", default=False) + + return p + + +def output(args, content): + if args.output == "json": + json.dump(content, sys.stdout) + elif args.output == "text": + print(content["value"]) + elif args.output == "repr": + print(repr(content["value"])) + + +def main_get(args, pira: Pira): + try: + out = pira.read_variable(args.variable) + except UnknownCommandError: + print("Variable not found: {args.variable}", file=sys.stderr) + sys.exit(1) + output(args, dict(variable=args.variable, value=out)) + + +def main_set(args, pira: Pira): + try: + pira.set_variable(args.variable, args.value) + except UnknownCommandError: + print("Variable not found: {args.variable}", file=sys.stderr) + sys.exit(1) + + +def main_save(args, pira: Pira): + try: + pira.save_variable(args.variable) + except UnknownCommandError: + print("Variable not found: {args.variable}", file=sys.stderr) + sys.exit(1) + + +def main_dump(args, pira: Pira): + state = dict(vars={}, failed=[]) + for var in VARIABLES: + try: + value = pira.read_variable(var) + except (UnknownCommandError, InvalidArgumentError): + state["failed"].append(var) + else: + state["vars"][var] = value + json.dump(state, sys.stdout, indent=2) + + +def main_synctime(args, pira: Pira): + from datetime import datetime + from pytz import timezone + + tz = timezone(args.timezone) + local_time = datetime.now(tz) + date = local_time.strftime("%d.%m.%y") + time = local_time.strftime("%H:%M:%S") + pira.set_variable("TIME", time) + pira.set_variable("DATE", date) + delta = tz.utcoffset(datetime.now()) + offset = delta.total_seconds() / 1800 # multiple of half-hours. So +10800s => +6 + pira.set_variable("LTO", "%+d" % offset) + + if args.save: + pira.save_variable("DATE") + pira.save_variable("TIME") + pira.save_variable("LTO") + + +def make_connection(args) -> Backends: + if args.address is not None: + skt = socket.socket() + skt.connect((args.address, args.port)) + return TCPClient(socket=skt) + s = serial.Serial(baudrate=args.baudrate, port=args.device) + return PiraSerial(serial=s) + + +def main(): + args = get_parser().parse_args() + if args.log_connection: + logging.basicConfig(level=logging.DEBUG) + logging.getLogger("pira.backend").setLevel(logging.DEBUG) + connection = Pira(make_connection(args)) + args.func(args, connection) + + +if __name__ == "__main__": + main()