#!/usr/bin/env python3 import sys import serial import socket import argparse from typing import Union import logging import json import enum from datetime import datetime VARIABLES = [ "ADR", "AF", "AFCH", "CC", "COMSPD", "CT", "DATE", "DI", "DPS2MSG", "DTTMOUT", "ECC", "ECCEN", "EQTEXT1", "EXTSYNC", "GRPSEQ", "LABPER", "LIC", "LEVEL", "LTO", "MJD", "MS", "MSGLIST", "PHASE", "PILOT", "PI", "PROGRAM", "PS", "PSNMAIN", "PSW", "PTY", "PTYN", "PTYNEN", "RDS2MOD", "RDSGEN", "READWEB", "RSTDPS", "RT2MSG", "RT2TYPE", "RTP", "RTPRUN", "RTPER", "RTTYPE", "RT1", "RT1EN", "RT2", "RT2EN", "SCRLSPD", "SEN", "SHORTRT", "SITE", "SLIST", "SPEED", "SPSPER", "TA", "TATMOUT", "TIME", "TP", "TPS", "UECP", "VER", ] class PiraSerial: def __init__(self, serial: serial.Serial): self.log = logging.getLogger("pira.backend.serial") self.serial = serial def send(self, content: bytes): self.log.debug(f"-> {content}") self.serial.write(content) def recv(self) -> bytes: content = self.serial.read_until(b"\n") self.log.debug(f"<- {content}") return content class TCPClient: def __init__(self, socket: socket.socket): self.log = logging.getLogger("pira.backend.tcpclient") self.socket = socket def connect(self): pass def send(self, content: bytes): self.log.debug(f"-> {content}") return self.socket.send(content) def recv(self) -> bytes: content = self.socket.recv(1) self.log.debug(f"<- {content}") return content Backends = Union[PiraSerial, TCPClient] class LookupEnum: @classmethod def lookup(cls, value): for field in cls: if field.value == value: return field raise ValueError() class CommandError(Exception): pass class UnknownCommandError(CommandError): pass class InvalidArgumentError(CommandError): pass class ProcessedPartiallyError(CommandError): pass class CommandStatus(LookupEnum, enum.StrEnum): SUCCESS = "+" UNKNOWN_COMMAND = "!" INVALID_ARGUMENT = "-" PROCESSED_PARTIALLY = "/" def to_exception(self): error_classes = { self.UNKNOWN_COMMAND: UnknownCommandError, self.INVALID_ARGUMENT: InvalidArgumentError, self.PROCESSED_PARTIALLY: ProcessedPartiallyError, } exc_cls = error_classes.get(self, None) if exc_cls is not None: return exc_cls() class Pira: END_OF_MESSAGE = "".join([chr(13), chr(10), chr(13), chr(10)]) def __init__(self, backend: Backends): self.log = logging.getLogger("Pira") self.backend = backend try: self.set_variable("ECHO", "0") except CommandError: # this sometimes fails, it's a fact pass def recv_response(self) -> str: def msg_complete(s: str) -> bool: for field in CommandStatus: if s.endswith(field.value + self.END_OF_MESSAGE): return True return False ret = "" while True: ret += self.backend.recv().decode("ascii") if msg_complete(ret): value = ret.removesuffix(self.END_OF_MESSAGE).removeprefix("\r\n") status = CommandStatus.lookup(value[-1]) if status != CommandStatus.SUCCESS: raise status.to_exception() return value[:-1].removesuffix("\r\n") def execute_command(self, command: str): self.backend.send(command.encode("ascii") + b"\r") return self.recv_response() def read_variable(self, variable: str): return self.execute_command(variable) def set_variable(self, variable: str, value: str): cmd = f"{variable}={value}" self.execute_command(cmd) def save_variable(self, variable: str): cmd = f"*{variable}" self.execute_command(cmd) def get_parser(): p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) serial = p.add_argument_group("serial connection") serial.add_argument("--baudrate", type=int, default=19200) serial.add_argument("--device", default="/dev/ttyUSB0") tcp = p.add_argument_group("tcp connection") tcp.add_argument("--port", type=int) tcp.add_argument("--address") log = p.add_argument_group("logging") log.add_argument("--log-connection", action="store_true", default=False) out = p.add_argument_group("output") out.add_argument("--output", choices=["json", "text", "repr"], default="text") sub = p.add_subparsers() get = sub.add_parser("get") get.set_defaults(func=main_get) get.add_argument("variable") setvar = sub.add_parser("set") setvar.set_defaults(func=main_set) setvar.add_argument("variable") setvar.add_argument("value") save = sub.add_parser("save") save.set_defaults(func=main_save) save.add_argument("variable") dump = sub.add_parser("dump") dump.set_defaults(func=main_dump) synctime = sub.add_parser("synctime", description="Set the exact time on the encoder") synctime.set_defaults(func=main_synctime) synctime.add_argument("timezone", type=str, help="Something like Africa/Cairo") synctime.add_argument("--save", action="store_true", default=False) return p def output(args, content): if args.output == "json": json.dump(content, sys.stdout) elif args.output == "text": print(content["value"]) elif args.output == "repr": print(repr(content["value"])) def main_get(args, pira: Pira): try: out = pira.read_variable(args.variable) except UnknownCommandError: print("Variable not found: {args.variable}", file=sys.stderr) sys.exit(1) output(args, dict(variable=args.variable, value=out)) def main_set(args, pira: Pira): try: pira.set_variable(args.variable, args.value) except UnknownCommandError: print("Variable not found: {args.variable}", file=sys.stderr) sys.exit(1) def main_save(args, pira: Pira): try: pira.save_variable(args.variable) except UnknownCommandError: print("Variable not found: {args.variable}", file=sys.stderr) sys.exit(1) def main_dump(args, pira: Pira): state = dict(vars={}, failed=[]) for var in VARIABLES: try: value = pira.read_variable(var) except (UnknownCommandError, InvalidArgumentError): state["failed"].append(var) else: state["vars"][var] = value json.dump(state, sys.stdout, indent=2) def main_synctime(args, pira: Pira): from pytz import timezone tz = timezone(args.timezone) local_time = datetime.now(tz) date = local_time.strftime("%d.%m.%y") time = local_time.strftime("%H:%M:%S") pira.set_variable("TIME", time) pira.set_variable("DATE", date) delta = tz.utcoffset(datetime.now()) offset = delta.total_seconds() / 1800 # multiple of half-hours. So +10800s => +6 pira.set_variable("LTO", "%+d" % offset) if args.save: pira.save_variable("DATE") pira.save_variable("TIME") pira.save_variable("LTO") def make_connection(args) -> Backends: if args.address is not None: skt = socket.socket() skt.connect((args.address, args.port)) return TCPClient(socket=skt) s = serial.Serial(baudrate=args.baudrate, port=args.device) return PiraSerial(serial=s) def main(): args = get_parser().parse_args() if args.log_connection: logging.basicConfig(level=logging.DEBUG) logging.getLogger("pira.backend").setLevel(logging.DEBUG) connection = Pira(make_connection(args)) args.func(args, connection) if __name__ == "__main__": main()