325 lines
7.8 KiB
Python
325 lines
7.8 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
import sys
|
||
|
import serial
|
||
|
import socket
|
||
|
import argparse
|
||
|
from typing import Union
|
||
|
import logging
|
||
|
import json
|
||
|
import enum
|
||
|
|
||
|
|
||
|
VARIABLES = [
|
||
|
"ADR",
|
||
|
"AF",
|
||
|
"AFCH",
|
||
|
"CC",
|
||
|
"COMSPD",
|
||
|
"CT",
|
||
|
"DATE",
|
||
|
"DI",
|
||
|
"DPS2MSG",
|
||
|
"DTTMOUT",
|
||
|
"ECC",
|
||
|
"ECCEN",
|
||
|
"EQTEXT1",
|
||
|
"EXTSYNC",
|
||
|
"GRPSEQ",
|
||
|
"LABPER",
|
||
|
"LIC",
|
||
|
"LEVEL",
|
||
|
"LTO",
|
||
|
"MJD",
|
||
|
"MS",
|
||
|
"MSGLIST",
|
||
|
"PHASE",
|
||
|
"PILOT",
|
||
|
"PI",
|
||
|
"PROGRAM",
|
||
|
"PS",
|
||
|
"PSNMAIN",
|
||
|
"PSW",
|
||
|
"PTY",
|
||
|
"PTYN",
|
||
|
"PTYNEN",
|
||
|
"RDS2MOD",
|
||
|
"RDSGEN",
|
||
|
"READWEB",
|
||
|
"RSTDPS",
|
||
|
"RT2MSG",
|
||
|
"RT2TYPE",
|
||
|
"RTP",
|
||
|
"RTPRUN",
|
||
|
"RTPER",
|
||
|
"RTTYPE",
|
||
|
"RT1",
|
||
|
"RT1EN",
|
||
|
"RT2",
|
||
|
"RT2EN",
|
||
|
"SCRLSPD",
|
||
|
"SEN",
|
||
|
"SHORTRT",
|
||
|
"SITE",
|
||
|
"SLIST",
|
||
|
"SPEED",
|
||
|
"SPSPER",
|
||
|
"TA",
|
||
|
"TATMOUT",
|
||
|
"TIME",
|
||
|
"TP",
|
||
|
"TPS",
|
||
|
"UECP",
|
||
|
"VER",
|
||
|
]
|
||
|
|
||
|
|
||
|
class PiraSerial:
|
||
|
def __init__(self, serial: serial.Serial):
|
||
|
self.log = logging.getLogger("pira.backend.serial")
|
||
|
self.serial = serial
|
||
|
|
||
|
def send(self, content: bytes):
|
||
|
self.log.debug(f"-> {content}")
|
||
|
self.serial.write(content)
|
||
|
|
||
|
def recv(self) -> bytes:
|
||
|
content = self.serial.read_until(b"\n")
|
||
|
self.log.debug(f"<- {content}")
|
||
|
return content
|
||
|
|
||
|
|
||
|
class TCPClient:
|
||
|
def __init__(self, socket: socket.socket):
|
||
|
self.log = logging.getLogger("pira.backend.tcpclient")
|
||
|
self.socket = socket
|
||
|
|
||
|
def connect(self):
|
||
|
pass
|
||
|
|
||
|
def send(self, content: bytes):
|
||
|
self.log.debug(f"-> {content}")
|
||
|
return self.socket.send(content)
|
||
|
|
||
|
def recv(self) -> bytes:
|
||
|
content = self.socket.recv(1)
|
||
|
self.log.debug(f"<- {content}")
|
||
|
return content
|
||
|
|
||
|
|
||
|
Backends = Union[PiraSerial, TCPClient]
|
||
|
|
||
|
|
||
|
class LookupEnum:
|
||
|
@classmethod
|
||
|
def lookup(cls, value):
|
||
|
for field in cls:
|
||
|
if field.value == value:
|
||
|
return field
|
||
|
raise ValueError()
|
||
|
|
||
|
|
||
|
class CommandError(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class UnknownCommandError(CommandError):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class InvalidArgumentError(CommandError):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class ProcessedPartiallyError(CommandError):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class CommandStatus(LookupEnum, enum.StrEnum):
|
||
|
SUCCESS = "+"
|
||
|
UNKNOWN_COMMAND = "!"
|
||
|
INVALID_ARGUMENT = "-"
|
||
|
PROCESSED_PARTIALLY = "/"
|
||
|
|
||
|
def to_exception(self):
|
||
|
error_classes = {
|
||
|
self.UNKNOWN_COMMAND: UnknownCommandError,
|
||
|
self.INVALID_ARGUMENT: InvalidArgumentError,
|
||
|
self.PROCESSED_PARTIALLY: ProcessedPartiallyError,
|
||
|
}
|
||
|
exc_cls = error_classes.get(self, None)
|
||
|
if exc_cls is not None:
|
||
|
return exc_cls()
|
||
|
|
||
|
|
||
|
class Pira:
|
||
|
END_OF_MESSAGE = "".join([chr(13), chr(10), chr(13), chr(10)])
|
||
|
|
||
|
def __init__(self, backend: Backends):
|
||
|
self.log = logging.getLogger("Pira")
|
||
|
self.backend = backend
|
||
|
self.set_variable("ECHO", "0")
|
||
|
|
||
|
def recv_response(self) -> str:
|
||
|
def msg_complete(s: str) -> bool:
|
||
|
for field in CommandStatus:
|
||
|
if s.endswith(field.value + self.END_OF_MESSAGE):
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
ret = ""
|
||
|
while True:
|
||
|
ret += self.backend.recv().decode("ascii")
|
||
|
if msg_complete(ret):
|
||
|
value = ret.removesuffix(self.END_OF_MESSAGE).removeprefix("\r\n")
|
||
|
status = CommandStatus.lookup(value[-1])
|
||
|
if status != CommandStatus.SUCCESS:
|
||
|
raise status.to_exception()
|
||
|
return value[:-1].removesuffix("\r\n")
|
||
|
|
||
|
def execute_command(self, command: str):
|
||
|
self.backend.send(command.encode("ascii") + b"\r")
|
||
|
return self.recv_response()
|
||
|
|
||
|
def read_variable(self, variable: str):
|
||
|
return self.execute_command(variable)
|
||
|
|
||
|
def set_variable(self, variable: str, value: str):
|
||
|
cmd = f"{variable}={value}"
|
||
|
self.execute_command(cmd)
|
||
|
|
||
|
def save_variable(self, variable: str):
|
||
|
cmd = f"*{variable}"
|
||
|
self.execute_command(cmd)
|
||
|
|
||
|
|
||
|
def get_parser():
|
||
|
p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||
|
|
||
|
serial = p.add_argument_group("serial connection")
|
||
|
serial.add_argument("--baudrate", type=int, default=19200)
|
||
|
serial.add_argument("--device", default="/dev/ttyUSB0")
|
||
|
tcp = p.add_argument_group("tcp connection")
|
||
|
tcp.add_argument("--port", type=int)
|
||
|
tcp.add_argument("--address")
|
||
|
|
||
|
log = p.add_argument_group("logging")
|
||
|
log.add_argument("--log-connection", action="store_true", default=False)
|
||
|
|
||
|
out = p.add_argument_group("output")
|
||
|
out.add_argument("--output", choices=["json", "text", "repr"], default="text")
|
||
|
|
||
|
sub = p.add_subparsers()
|
||
|
|
||
|
get = sub.add_parser("get")
|
||
|
get.set_defaults(func=main_get)
|
||
|
get.add_argument("variable")
|
||
|
|
||
|
setvar = sub.add_parser("set")
|
||
|
setvar.set_defaults(func=main_set)
|
||
|
setvar.add_argument("variable")
|
||
|
setvar.add_argument("value")
|
||
|
|
||
|
save = sub.add_parser("save")
|
||
|
save.set_defaults(func=main_save)
|
||
|
save.add_argument("variable")
|
||
|
|
||
|
dump = sub.add_parser("dump")
|
||
|
dump.set_defaults(func=main_dump)
|
||
|
|
||
|
synctime = sub.add_parser("synctime",
|
||
|
description="Set the exact time on the encoder")
|
||
|
synctime.set_defaults(func=main_synctime)
|
||
|
synctime.add_argument("timezone", type=str, help="Something like Africa/Cairo")
|
||
|
synctime.add_argument("--save", action="store_true", default=False)
|
||
|
|
||
|
return p
|
||
|
|
||
|
|
||
|
def output(args, content):
|
||
|
if args.output == "json":
|
||
|
json.dump(content, sys.stdout)
|
||
|
elif args.output == "text":
|
||
|
print(content["value"])
|
||
|
elif args.output == "repr":
|
||
|
print(repr(content["value"]))
|
||
|
|
||
|
|
||
|
def main_get(args, pira: Pira):
|
||
|
try:
|
||
|
out = pira.read_variable(args.variable)
|
||
|
except UnknownCommandError:
|
||
|
print("Variable not found: {args.variable}", file=sys.stderr)
|
||
|
sys.exit(1)
|
||
|
output(args, dict(variable=args.variable, value=out))
|
||
|
|
||
|
|
||
|
def main_set(args, pira: Pira):
|
||
|
try:
|
||
|
pira.set_variable(args.variable, args.value)
|
||
|
except UnknownCommandError:
|
||
|
print("Variable not found: {args.variable}", file=sys.stderr)
|
||
|
sys.exit(1)
|
||
|
|
||
|
|
||
|
def main_save(args, pira: Pira):
|
||
|
try:
|
||
|
pira.save_variable(args.variable)
|
||
|
except UnknownCommandError:
|
||
|
print("Variable not found: {args.variable}", file=sys.stderr)
|
||
|
sys.exit(1)
|
||
|
|
||
|
|
||
|
def main_dump(args, pira: Pira):
|
||
|
state = dict(vars={}, failed=[])
|
||
|
for var in VARIABLES:
|
||
|
try:
|
||
|
value = pira.read_variable(var)
|
||
|
except (UnknownCommandError, InvalidArgumentError):
|
||
|
state["failed"].append(var)
|
||
|
else:
|
||
|
state["vars"][var] = value
|
||
|
json.dump(state, sys.stdout, indent=2)
|
||
|
|
||
|
|
||
|
def main_synctime(args, pira: Pira):
|
||
|
from datetime import datetime
|
||
|
from pytz import timezone
|
||
|
|
||
|
tz = timezone(args.timezone)
|
||
|
local_time = datetime.now(tz)
|
||
|
date = local_time.strftime("%d.%m.%y")
|
||
|
time = local_time.strftime("%H:%M:%S")
|
||
|
pira.set_variable("TIME", time)
|
||
|
pira.set_variable("DATE", date)
|
||
|
delta = tz.utcoffset(datetime.now())
|
||
|
offset = delta.total_seconds() / 1800 # multiple of half-hours. So +10800s => +6
|
||
|
pira.set_variable("LTO", "%+d" % offset)
|
||
|
|
||
|
if args.save:
|
||
|
pira.save_variable("DATE")
|
||
|
pira.save_variable("TIME")
|
||
|
pira.save_variable("LTO")
|
||
|
|
||
|
|
||
|
def make_connection(args) -> Backends:
|
||
|
if args.address is not None:
|
||
|
skt = socket.socket()
|
||
|
skt.connect((args.address, args.port))
|
||
|
return TCPClient(socket=skt)
|
||
|
s = serial.Serial(baudrate=args.baudrate, port=args.device)
|
||
|
return PiraSerial(serial=s)
|
||
|
|
||
|
|
||
|
def main():
|
||
|
args = get_parser().parse_args()
|
||
|
if args.log_connection:
|
||
|
logging.basicConfig(level=logging.DEBUG)
|
||
|
logging.getLogger("pira.backend").setLevel(logging.DEBUG)
|
||
|
connection = Pira(make_connection(args))
|
||
|
args.func(args, connection)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|