Browse Source

initial commit

boyska 4 months ago
commit
a143faecb2
1 changed files with 324 additions and 0 deletions
  1. 324 0
      sergio.py

+ 324 - 0
sergio.py

@@ -0,0 +1,324 @@
+#!/usr/bin/env python3
+
+import sys
+import serial
+import socket
+import argparse
+from typing import Union
+import logging
+import json
+import enum
+
+
+VARIABLES = [
+    "ADR",
+    "AF",
+    "AFCH",
+    "CC",
+    "COMSPD",
+    "CT",
+    "DATE",
+    "DI",
+    "DPS2MSG",
+    "DTTMOUT",
+    "ECC",
+    "ECCEN",
+    "EQTEXT1",
+    "EXTSYNC",
+    "GRPSEQ",
+    "LABPER",
+    "LIC",
+    "LEVEL",
+    "LTO",
+    "MJD",
+    "MS",
+    "MSGLIST",
+    "PHASE",
+    "PILOT",
+    "PI",
+    "PROGRAM",
+    "PS",
+    "PSNMAIN",
+    "PSW",
+    "PTY",
+    "PTYN",
+    "PTYNEN",
+    "RDS2MOD",
+    "RDSGEN",
+    "READWEB",
+    "RSTDPS",
+    "RT2MSG",
+    "RT2TYPE",
+    "RTP",
+    "RTPRUN",
+    "RTPER",
+    "RTTYPE",
+    "RT1",
+    "RT1EN",
+    "RT2",
+    "RT2EN",
+    "SCRLSPD",
+    "SEN",
+    "SHORTRT",
+    "SITE",
+    "SLIST",
+    "SPEED",
+    "SPSPER",
+    "TA",
+    "TATMOUT",
+    "TIME",
+    "TP",
+    "TPS",
+    "UECP",
+    "VER",
+]
+
+
+class PiraSerial:
+    def __init__(self, serial: serial.Serial):
+        self.log = logging.getLogger("pira.backend.serial")
+        self.serial = serial
+
+    def send(self, content: bytes):
+        self.log.debug(f"-> {content}")
+        self.serial.write(content)
+
+    def recv(self) -> bytes:
+        content = self.serial.read_until(b"\n")
+        self.log.debug(f"<- {content}")
+        return content
+
+
+class TCPClient:
+    def __init__(self, socket: socket.socket):
+        self.log = logging.getLogger("pira.backend.tcpclient")
+        self.socket = socket
+
+    def connect(self):
+        pass
+
+    def send(self, content: bytes):
+        self.log.debug(f"-> {content}")
+        return self.socket.send(content)
+
+    def recv(self) -> bytes:
+        content = self.socket.recv(1)
+        self.log.debug(f"<- {content}")
+        return content
+
+
+Backends = Union[PiraSerial, TCPClient]
+
+
+class LookupEnum:
+    @classmethod
+    def lookup(cls, value):
+        for field in cls:
+            if field.value == value:
+                return field
+        raise ValueError()
+
+
+class CommandError(Exception):
+    pass
+
+
+class UnknownCommandError(CommandError):
+    pass
+
+
+class InvalidArgumentError(CommandError):
+    pass
+
+
+class ProcessedPartiallyError(CommandError):
+    pass
+
+
+class CommandStatus(LookupEnum, enum.StrEnum):
+    SUCCESS = "+"
+    UNKNOWN_COMMAND = "!"
+    INVALID_ARGUMENT = "-"
+    PROCESSED_PARTIALLY = "/"
+
+    def to_exception(self):
+        error_classes = {
+            self.UNKNOWN_COMMAND: UnknownCommandError,
+            self.INVALID_ARGUMENT: InvalidArgumentError,
+            self.PROCESSED_PARTIALLY: ProcessedPartiallyError,
+        }
+        exc_cls = error_classes.get(self, None)
+        if exc_cls is not None:
+            return exc_cls()
+
+
+class Pira:
+    END_OF_MESSAGE = "".join([chr(13), chr(10), chr(13), chr(10)])
+
+    def __init__(self, backend: Backends):
+        self.log = logging.getLogger("Pira")
+        self.backend = backend
+        self.set_variable("ECHO", "0")
+
+    def recv_response(self) -> str:
+        def msg_complete(s: str) -> bool:
+            for field in CommandStatus:
+                if s.endswith(field.value + self.END_OF_MESSAGE):
+                    return True
+            return False
+
+        ret = ""
+        while True:
+            ret += self.backend.recv().decode("ascii")
+            if msg_complete(ret):
+                value = ret.removesuffix(self.END_OF_MESSAGE).removeprefix("\r\n")
+                status = CommandStatus.lookup(value[-1])
+                if status != CommandStatus.SUCCESS:
+                    raise status.to_exception()
+                return value[:-1].removesuffix("\r\n")
+
+    def execute_command(self, command: str):
+        self.backend.send(command.encode("ascii") + b"\r")
+        return self.recv_response()
+
+    def read_variable(self, variable: str):
+        return self.execute_command(variable)
+
+    def set_variable(self, variable: str, value: str):
+        cmd = f"{variable}={value}"
+        self.execute_command(cmd)
+
+    def save_variable(self, variable: str):
+        cmd = f"*{variable}"
+        self.execute_command(cmd)
+
+
+def get_parser():
+    p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+
+    serial = p.add_argument_group("serial connection")
+    serial.add_argument("--baudrate", type=int, default=19200)
+    serial.add_argument("--device", default="/dev/ttyUSB0")
+    tcp = p.add_argument_group("tcp connection")
+    tcp.add_argument("--port", type=int)
+    tcp.add_argument("--address")
+
+    log = p.add_argument_group("logging")
+    log.add_argument("--log-connection", action="store_true", default=False)
+
+    out = p.add_argument_group("output")
+    out.add_argument("--output", choices=["json", "text", "repr"], default="text")
+
+    sub = p.add_subparsers()
+
+    get = sub.add_parser("get")
+    get.set_defaults(func=main_get)
+    get.add_argument("variable")
+
+    setvar = sub.add_parser("set")
+    setvar.set_defaults(func=main_set)
+    setvar.add_argument("variable")
+    setvar.add_argument("value")
+
+    save = sub.add_parser("save")
+    save.set_defaults(func=main_save)
+    save.add_argument("variable")
+
+    dump = sub.add_parser("dump")
+    dump.set_defaults(func=main_dump)
+
+    synctime = sub.add_parser("synctime",
+                              description="Set the exact time on the encoder")
+    synctime.set_defaults(func=main_synctime)
+    synctime.add_argument("timezone", type=str, help="Something like Africa/Cairo")
+    synctime.add_argument("--save", action="store_true", default=False)
+
+    return p
+
+
+def output(args, content):
+    if args.output == "json":
+        json.dump(content, sys.stdout)
+    elif args.output == "text":
+        print(content["value"])
+    elif args.output == "repr":
+        print(repr(content["value"]))
+
+
+def main_get(args, pira: Pira):
+    try:
+        out = pira.read_variable(args.variable)
+    except UnknownCommandError:
+        print("Variable not found: {args.variable}", file=sys.stderr)
+        sys.exit(1)
+    output(args, dict(variable=args.variable, value=out))
+
+
+def main_set(args, pira: Pira):
+    try:
+        pira.set_variable(args.variable, args.value)
+    except UnknownCommandError:
+        print("Variable not found: {args.variable}", file=sys.stderr)
+        sys.exit(1)
+
+
+def main_save(args, pira: Pira):
+    try:
+        pira.save_variable(args.variable)
+    except UnknownCommandError:
+        print("Variable not found: {args.variable}", file=sys.stderr)
+        sys.exit(1)
+
+
+def main_dump(args, pira: Pira):
+    state = dict(vars={}, failed=[])
+    for var in VARIABLES:
+        try:
+            value = pira.read_variable(var)
+        except (UnknownCommandError, InvalidArgumentError):
+            state["failed"].append(var)
+        else:
+            state["vars"][var] = value
+    json.dump(state, sys.stdout, indent=2)
+
+
+def main_synctime(args, pira: Pira):
+    from datetime import datetime
+    from pytz import timezone
+
+    tz = timezone(args.timezone)
+    local_time = datetime.now(tz)
+    date = local_time.strftime("%d.%m.%y")
+    time = local_time.strftime("%H:%M:%S")
+    pira.set_variable("TIME", time)
+    pira.set_variable("DATE", date)
+    delta = tz.utcoffset(datetime.now())
+    offset = delta.total_seconds() / 1800  # multiple of half-hours. So +10800s => +6
+    pira.set_variable("LTO", "%+d" % offset)
+
+    if args.save:
+        pira.save_variable("DATE")
+        pira.save_variable("TIME")
+        pira.save_variable("LTO")
+
+
+def make_connection(args) -> Backends:
+    if args.address is not None:
+        skt = socket.socket()
+        skt.connect((args.address, args.port))
+        return TCPClient(socket=skt)
+    s = serial.Serial(baudrate=args.baudrate, port=args.device)
+    return PiraSerial(serial=s)
+
+
+def main():
+    args = get_parser().parse_args()
+    if args.log_connection:
+        logging.basicConfig(level=logging.DEBUG)
+        logging.getLogger("pira.backend").setLevel(logging.DEBUG)
+    connection = Pira(make_connection(args))
+    args.func(args, connection)
+
+
+if __name__ == "__main__":
+    main()