123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- #!/usr/bin/env python3
- import sys
- import serial
- import socket
- import argparse
- from typing import Union
- import logging
- import json
- import enum
- from datetime import datetime
- VARIABLES = [
- "ADR",
- "AF",
- "AFCH",
- "CC",
- "COMSPD",
- "CT",
- "DATE",
- "DI",
- "DPS2MSG",
- "DTTMOUT",
- "ECC",
- "ECCEN",
- "EQTEXT1",
- "EXTSYNC",
- "GRPSEQ",
- "LABPER",
- "LIC",
- "LEVEL",
- "LTO",
- "MJD",
- "MS",
- "MSGLIST",
- "PHASE",
- "PILOT",
- "PI",
- "PROGRAM",
- "PS",
- "PSNMAIN",
- "PSW",
- "PTY",
- "PTYN",
- "PTYNEN",
- "RDS2MOD",
- "RDSGEN",
- "READWEB",
- "RSTDPS",
- "RT2MSG",
- "RT2TYPE",
- "RTP",
- "RTPRUN",
- "RTPER",
- "RTTYPE",
- "RT1",
- "RT1EN",
- "RT2",
- "RT2EN",
- "SCRLSPD",
- "SEN",
- "SHORTRT",
- "SITE",
- "SLIST",
- "SPEED",
- "SPSPER",
- "TA",
- "TATMOUT",
- "TIME",
- "TP",
- "TPS",
- "UECP",
- "VER",
- ]
- class PiraSerial:
- def __init__(self, serial: serial.Serial):
- self.log = logging.getLogger("pira.backend.serial")
- self.serial = serial
- def send(self, content: bytes):
- self.log.debug(f"-> {content}")
- self.serial.write(content)
- def recv(self) -> bytes:
- content = self.serial.read_until(b"\n")
- self.log.debug(f"<- {content}")
- return content
- class TCPClient:
- def __init__(self, socket: socket.socket):
- self.log = logging.getLogger("pira.backend.tcpclient")
- self.socket = socket
- def connect(self):
- pass
- def send(self, content: bytes):
- self.log.debug(f"-> {content}")
- return self.socket.send(content)
- def recv(self) -> bytes:
- content = self.socket.recv(1)
- self.log.debug(f"<- {content}")
- return content
- Backends = Union[PiraSerial, TCPClient]
- class LookupEnum:
- @classmethod
- def lookup(cls, value):
- for field in cls:
- if field.value == value:
- return field
- raise ValueError()
- class CommandError(Exception):
- pass
- class UnknownCommandError(CommandError):
- pass
- class InvalidArgumentError(CommandError):
- pass
- class ProcessedPartiallyError(CommandError):
- pass
- class CommandStatus(LookupEnum, enum.StrEnum):
- SUCCESS = "+"
- UNKNOWN_COMMAND = "!"
- INVALID_ARGUMENT = "-"
- PROCESSED_PARTIALLY = "/"
- def to_exception(self):
- error_classes = {
- self.UNKNOWN_COMMAND: UnknownCommandError,
- self.INVALID_ARGUMENT: InvalidArgumentError,
- self.PROCESSED_PARTIALLY: ProcessedPartiallyError,
- }
- exc_cls = error_classes.get(self, None)
- if exc_cls is not None:
- return exc_cls()
- class Pira:
- END_OF_MESSAGE = "\r\n\r\n"
- def __init__(self, backend: Backends):
- self.log = logging.getLogger("Pira")
- self.backend = backend
- try:
- self.set_variable("ECHO", "0")
- except CommandError: # this sometimes fails, it's a fact
- pass
- def recv_response(self) -> str:
- def msg_complete(s: str) -> bool:
- for field in CommandStatus:
- if s.endswith(field.value + self.END_OF_MESSAGE):
- return True
- return False
- ret = ""
- while True:
- ret += self.backend.recv().decode("ascii")
- if msg_complete(ret):
- value = ret.removesuffix(self.END_OF_MESSAGE).removeprefix("\r\n")
- status = CommandStatus.lookup(value[-1])
- if status != CommandStatus.SUCCESS:
- raise status.to_exception()
- return value[:-1].removesuffix("\r\n")
- def execute_command(self, command: str):
- self.backend.send(command.encode("ascii") + b"\r")
- return self.recv_response()
- def read_variable(self, variable: str):
- return self.execute_command(variable)
- def set_variable(self, variable: str, value: str, save: bool = False):
- cmd = f"{variable}={value}"
- if save:
- cmd = "*" + cmd
- self.execute_command(cmd)
- def save_variable(self, variable: str):
- cmd = f"*{variable}"
- self.execute_command(cmd)
- def get_parser():
- p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- serial = p.add_argument_group("serial connection")
- serial.add_argument("--baudrate", type=int, default=19200)
- serial.add_argument("--device", default="/dev/ttyUSB0")
- tcp = p.add_argument_group("tcp connection")
- tcp.add_argument("--port", type=int)
- tcp.add_argument("--address")
- log = p.add_argument_group("logging")
- log.add_argument("--log-connection", action="store_true", default=False)
- out = p.add_argument_group("output")
- out.add_argument("--output", choices=["json", "text", "repr"], default="text")
- sub = p.add_subparsers()
- execute = sub.add_parser("exec")
- execute.set_defaults(func=main_exec)
- execute.add_argument("cmd")
- get = sub.add_parser("get")
- get.set_defaults(func=main_get)
- get.add_argument("variable")
- setvar = sub.add_parser("set")
- setvar.set_defaults(func=main_set)
- setvar.add_argument("--save", action="store_true", default=False)
- setvar.add_argument("variable")
- setvar.add_argument("value")
- save = sub.add_parser("save")
- save.set_defaults(func=main_save)
- save.add_argument("variable")
- dump = sub.add_parser("dump")
- dump.set_defaults(func=main_dump)
- synctime = sub.add_parser("synctime",
- description="Set the exact time on the encoder")
- synctime.set_defaults(func=main_synctime)
- synctime.add_argument("timezone", type=str, help="Something like Africa/Cairo")
- synctime.add_argument("--save", action="store_true", default=False)
- return p
- def output(args, content):
- if args.output == "json":
- json.dump(content, sys.stdout)
- elif args.output == "text":
- print(content["value"])
- elif args.output == "repr":
- print(repr(content["value"]))
- def main_exec(args, pira: Pira):
- try:
- out = pira.execute_command(args.cmd)
- except CommandError as exc:
- print(f"Command error: {exc}", file=sys.stderr)
- sys.exit(1)
- output(args, dict(cmd=args.cmd, value=out))
- def main_get(args, pira: Pira):
- try:
- out = pira.read_variable(args.variable)
- except UnknownCommandError:
- print("Variable not found: {args.variable}", file=sys.stderr)
- sys.exit(1)
- output(args, dict(variable=args.variable, value=out))
- def main_set(args, pira: Pira):
- try:
- pira.set_variable(args.variable, args.value, save=args.save)
- except UnknownCommandError:
- print("Variable not found: {args.variable}", file=sys.stderr)
- sys.exit(1)
- def main_save(args, pira: Pira):
- try:
- pira.save_variable(args.variable)
- except UnknownCommandError:
- print("Variable not found: {args.variable}", file=sys.stderr)
- sys.exit(1)
- def main_dump(args, pira: Pira):
- state = dict(vars={}, failed=[])
- for var in VARIABLES:
- try:
- value = pira.read_variable(var)
- except (UnknownCommandError, InvalidArgumentError):
- state["failed"].append(var)
- else:
- state["vars"][var] = value
- json.dump(state, sys.stdout, indent=2)
- def main_synctime(args, pira: Pira):
- from pytz import timezone
- tz = timezone(args.timezone)
- local_time = datetime.now(tz)
- date = local_time.strftime("%d.%m.%y")
- time = local_time.strftime("%H:%M:%S")
- pira.set_variable("TIME", time, save=args.save)
- pira.set_variable("DATE", date, save=args.save)
- delta = tz.utcoffset(datetime.now())
- offset = delta.total_seconds() / 1800 # multiple of half-hours. So +10800s => +6
- pira.set_variable("LTO", "%+d" % offset, save=args.save)
- def make_connection(args) -> Backends:
- if args.address is not None:
- skt = socket.socket()
- skt.connect((args.address, args.port))
- return TCPClient(socket=skt)
- s = serial.Serial(baudrate=args.baudrate, port=args.device)
- return PiraSerial(serial=s)
- def main():
- args = get_parser().parse_args()
- if args.log_connection:
- logging.basicConfig(level=logging.DEBUG)
- logging.getLogger("pira.backend").setLevel(logging.DEBUG)
- connection = Pira(make_connection(args))
- args.func(args, connection)
- if __name__ == "__main__":
- main()
|