sergio/sergio.py
2023-12-25 15:31:24 +01:00

338 lines
8.3 KiB
Python
Executable file

#!/usr/bin/env python3
import sys
import serial
import socket
import argparse
from typing import Union
import logging
import json
import enum
from datetime import datetime
VARIABLES = [
"ADR",
"AF",
"AFCH",
"CC",
"COMSPD",
"CT",
"DATE",
"DI",
"DPS2MSG",
"DTTMOUT",
"ECC",
"ECCEN",
"EQTEXT1",
"EXTSYNC",
"GRPSEQ",
"LABPER",
"LIC",
"LEVEL",
"LTO",
"MJD",
"MS",
"MSGLIST",
"PHASE",
"PILOT",
"PI",
"PROGRAM",
"PS",
"PSNMAIN",
"PSW",
"PTY",
"PTYN",
"PTYNEN",
"RDS2MOD",
"RDSGEN",
"READWEB",
"RSTDPS",
"RT2MSG",
"RT2TYPE",
"RTP",
"RTPRUN",
"RTPER",
"RTTYPE",
"RT1",
"RT1EN",
"RT2",
"RT2EN",
"SCRLSPD",
"SEN",
"SHORTRT",
"SITE",
"SLIST",
"SPEED",
"SPSPER",
"TA",
"TATMOUT",
"TIME",
"TP",
"TPS",
"UECP",
"VER",
]
class PiraSerial:
def __init__(self, serial: serial.Serial):
self.log = logging.getLogger("pira.backend.serial")
self.serial = serial
def send(self, content: bytes):
self.log.debug(f"-> {content}")
self.serial.write(content)
def recv(self) -> bytes:
content = self.serial.read_until(b"\n")
self.log.debug(f"<- {content}")
return content
class TCPClient:
def __init__(self, socket: socket.socket):
self.log = logging.getLogger("pira.backend.tcpclient")
self.socket = socket
def connect(self):
pass
def send(self, content: bytes):
self.log.debug(f"-> {content}")
return self.socket.send(content)
def recv(self) -> bytes:
content = self.socket.recv(1)
self.log.debug(f"<- {content}")
return content
Backends = Union[PiraSerial, TCPClient]
class LookupEnum:
@classmethod
def lookup(cls, value):
for field in cls:
if field.value == value:
return field
raise ValueError()
class CommandError(Exception):
pass
class UnknownCommandError(CommandError):
pass
class InvalidArgumentError(CommandError):
pass
class ProcessedPartiallyError(CommandError):
pass
class CommandStatus(LookupEnum, enum.StrEnum):
SUCCESS = "+"
UNKNOWN_COMMAND = "!"
INVALID_ARGUMENT = "-"
PROCESSED_PARTIALLY = "/"
def to_exception(self):
error_classes = {
self.UNKNOWN_COMMAND: UnknownCommandError,
self.INVALID_ARGUMENT: InvalidArgumentError,
self.PROCESSED_PARTIALLY: ProcessedPartiallyError,
}
exc_cls = error_classes.get(self, None)
if exc_cls is not None:
return exc_cls()
class Pira:
END_OF_MESSAGE = "\r\n\r\n"
def __init__(self, backend: Backends):
self.log = logging.getLogger("Pira")
self.backend = backend
try:
self.set_variable("ECHO", "0")
except CommandError: # this sometimes fails, it's a fact
pass
def recv_response(self) -> str:
def msg_complete(s: str) -> bool:
for field in CommandStatus:
if s.endswith(field.value + self.END_OF_MESSAGE):
return True
return False
ret = ""
while True:
ret += self.backend.recv().decode("ascii")
if msg_complete(ret):
value = ret.removesuffix(self.END_OF_MESSAGE).removeprefix("\r\n")
status = CommandStatus.lookup(value[-1])
if status != CommandStatus.SUCCESS:
raise status.to_exception()
return value[:-1].removesuffix("\r\n")
def execute_command(self, command: str):
self.backend.send(command.encode("ascii") + b"\r")
return self.recv_response()
def read_variable(self, variable: str):
return self.execute_command(variable)
def set_variable(self, variable: str, value: str, save: bool = False):
cmd = f"{variable}={value}"
if save:
cmd = "*" + cmd
self.execute_command(cmd)
def save_variable(self, variable: str):
cmd = f"*{variable}"
self.execute_command(cmd)
def get_parser():
p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
serial = p.add_argument_group("serial connection")
serial.add_argument("--baudrate", type=int, default=19200)
serial.add_argument("--device", default="/dev/ttyUSB0")
tcp = p.add_argument_group("tcp connection")
tcp.add_argument("--port", type=int)
tcp.add_argument("--address")
log = p.add_argument_group("logging")
log.add_argument("--log-connection", action="store_true", default=False)
out = p.add_argument_group("output")
out.add_argument("--output", choices=["json", "text", "repr"], default="text")
sub = p.add_subparsers()
execute = sub.add_parser("exec")
execute.set_defaults(func=main_exec)
execute.add_argument("cmd")
get = sub.add_parser("get")
get.set_defaults(func=main_get)
get.add_argument("variable")
setvar = sub.add_parser("set")
setvar.set_defaults(func=main_set)
setvar.add_argument("--save", action="store_true", default=False)
setvar.add_argument("variable")
setvar.add_argument("value")
save = sub.add_parser("save")
save.set_defaults(func=main_save)
save.add_argument("variable")
dump = sub.add_parser("dump")
dump.set_defaults(func=main_dump)
synctime = sub.add_parser("synctime",
description="Set the exact time on the encoder")
synctime.set_defaults(func=main_synctime)
synctime.add_argument("timezone", type=str, help="Something like Africa/Cairo")
synctime.add_argument("--save", action="store_true", default=False)
return p
def output(args, content):
if args.output == "json":
json.dump(content, sys.stdout)
elif args.output == "text":
print(content["value"])
elif args.output == "repr":
print(repr(content["value"]))
def main_exec(args, pira: Pira):
try:
out = pira.execute_command(args.cmd)
except CommandError as exc:
print(f"Command error: {exc}", file=sys.stderr)
sys.exit(1)
output(args, dict(cmd=args.cmd, value=out))
def main_get(args, pira: Pira):
try:
out = pira.read_variable(args.variable)
except UnknownCommandError:
print(f"Variable not found: {args.variable}", file=sys.stderr)
sys.exit(1)
output(args, dict(variable=args.variable, value=out))
def main_set(args, pira: Pira):
try:
pira.set_variable(args.variable, args.value, save=args.save)
except UnknownCommandError:
print(f"Variable not found: {args.variable}", file=sys.stderr)
sys.exit(1)
def main_save(args, pira: Pira):
try:
pira.save_variable(args.variable)
except UnknownCommandError:
print(f"Variable not found: {args.variable}", file=sys.stderr)
sys.exit(1)
def main_dump(args, pira: Pira):
state = dict(vars={}, failed=[])
for var in VARIABLES:
try:
value = pira.read_variable(var)
except (UnknownCommandError, InvalidArgumentError):
state["failed"].append(var)
else:
state["vars"][var] = value
json.dump(state, sys.stdout, indent=2)
def main_synctime(args, pira: Pira):
from pytz import timezone
tz = timezone(args.timezone)
local_time = datetime.now(tz)
date = local_time.strftime("%d.%m.%y")
time = local_time.strftime("%H:%M:%S")
pira.set_variable("TIME", time, save=args.save)
pira.set_variable("DATE", date, save=args.save)
delta = tz.utcoffset(datetime.now())
offset = delta.total_seconds() / 1800 # multiple of half-hours. So +10800s => +6
pira.set_variable("LTO", "%+d" % offset, save=args.save)
def make_connection(args) -> Backends:
if args.address is not None:
skt = socket.socket()
skt.connect((args.address, args.port))
return TCPClient(socket=skt)
s = serial.Serial(baudrate=args.baudrate, port=args.device)
return PiraSerial(serial=s)
def main():
args = get_parser().parse_args()
if args.log_connection:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("pira.backend").setLevel(logging.DEBUG)
connection = Pira(make_connection(args))
args.func(args, connection)
if __name__ == "__main__":
main()