sergio.py 8.3 KB


  1. #!/usr/bin/env python3
  2. import sys
  3. import serial
  4. import socket
  5. import argparse
  6. from typing import Union
  7. import logging
  8. import json
  9. import enum
  10. from datetime import datetime
  11. VARIABLES = [
  12. "ADR",
  13. "AF",
  14. "AFCH",
  15. "CC",
  16. "COMSPD",
  17. "CT",
  18. "DATE",
  19. "DI",
  20. "DPS2MSG",
  21. "DTTMOUT",
  22. "ECC",
  23. "ECCEN",
  24. "EQTEXT1",
  25. "EXTSYNC",
  26. "GRPSEQ",
  27. "LABPER",
  28. "LIC",
  29. "LEVEL",
  30. "LTO",
  31. "MJD",
  32. "MS",
  33. "MSGLIST",
  34. "PHASE",
  35. "PILOT",
  36. "PI",
  37. "PROGRAM",
  38. "PS",
  39. "PSNMAIN",
  40. "PSW",
  41. "PTY",
  42. "PTYN",
  43. "PTYNEN",
  44. "RDS2MOD",
  45. "RDSGEN",
  46. "READWEB",
  47. "RSTDPS",
  48. "RT2MSG",
  49. "RT2TYPE",
  50. "RTP",
  51. "RTPRUN",
  52. "RTPER",
  53. "RTTYPE",
  54. "RT1",
  55. "RT1EN",
  56. "RT2",
  57. "RT2EN",
  58. "SCRLSPD",
  59. "SEN",
  60. "SHORTRT",
  61. "SITE",
  62. "SLIST",
  63. "SPEED",
  64. "SPSPER",
  65. "TA",
  66. "TATMOUT",
  67. "TIME",
  68. "TP",
  69. "TPS",
  70. "UECP",
  71. "VER",
  72. ]
  73. class PiraSerial:
  74. def __init__(self, serial: serial.Serial):
  75. self.log = logging.getLogger("pira.backend.serial")
  76. self.serial = serial
  77. def send(self, content: bytes):
  78. self.log.debug(f"-> {content}")
  79. self.serial.write(content)
  80. def recv(self) -> bytes:
  81. content = self.serial.read_until(b"\n")
  82. self.log.debug(f"<- {content}")
  83. return content
  84. class TCPClient:
  85. def __init__(self, socket: socket.socket):
  86. self.log = logging.getLogger("pira.backend.tcpclient")
  87. self.socket = socket
  88. def connect(self):
  89. pass
  90. def send(self, content: bytes):
  91. self.log.debug(f"-> {content}")
  92. return self.socket.send(content)
  93. def recv(self) -> bytes:
  94. content = self.socket.recv(1)
  95. self.log.debug(f"<- {content}")
  96. return content
  97. Backends = Union[PiraSerial, TCPClient]
  98. class LookupEnum:
  99. @classmethod
  100. def lookup(cls, value):
  101. for field in cls:
  102. if field.value == value:
  103. return field
  104. raise ValueError()
  105. class CommandError(Exception):
  106. pass
  107. class UnknownCommandError(CommandError):
  108. pass
  109. class InvalidArgumentError(CommandError):
  110. pass
  111. class ProcessedPartiallyError(CommandError):
  112. pass
  113. class CommandStatus(LookupEnum, enum.StrEnum):
  114. SUCCESS = "+"
  115. UNKNOWN_COMMAND = "!"
  116. INVALID_ARGUMENT = "-"
  117. PROCESSED_PARTIALLY = "/"
  118. def to_exception(self):
  119. error_classes = {
  120. self.UNKNOWN_COMMAND: UnknownCommandError,
  121. self.INVALID_ARGUMENT: InvalidArgumentError,
  122. self.PROCESSED_PARTIALLY: ProcessedPartiallyError,
  123. }
  124. exc_cls = error_classes.get(self, None)
  125. if exc_cls is not None:
  126. return exc_cls()
  127. class Pira:
  128. END_OF_MESSAGE = "\r\n\r\n"
  129. def __init__(self, backend: Backends):
  130. self.log = logging.getLogger("Pira")
  131. self.backend = backend
  132. try:
  133. self.set_variable("ECHO", "0")
  134. except CommandError: # this sometimes fails, it's a fact
  135. pass
  136. def recv_response(self) -> str:
  137. def msg_complete(s: str) -> bool:
  138. for field in CommandStatus:
  139. if s.endswith(field.value + self.END_OF_MESSAGE):
  140. return True
  141. return False
  142. ret = ""
  143. while True:
  144. ret += self.backend.recv().decode("ascii")
  145. if msg_complete(ret):
  146. value = ret.removesuffix(self.END_OF_MESSAGE).removeprefix("\r\n")
  147. status = CommandStatus.lookup(value[-1])
  148. if status != CommandStatus.SUCCESS:
  149. raise status.to_exception()
  150. return value[:-1].removesuffix("\r\n")
  151. def execute_command(self, command: str):
  152. self.backend.send(command.encode("ascii") + b"\r")
  153. return self.recv_response()
  154. def read_variable(self, variable: str):
  155. return self.execute_command(variable)
  156. def set_variable(self, variable: str, value: str, save: bool = False):
  157. cmd = f"{variable}={value}"
  158. if save:
  159. cmd = "*" + cmd
  160. self.execute_command(cmd)
  161. def save_variable(self, variable: str):
  162. cmd = f"*{variable}"
  163. self.execute_command(cmd)
  164. def get_parser():
  165. p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  166. serial = p.add_argument_group("serial connection")
  167. serial.add_argument("--baudrate", type=int, default=19200)
  168. serial.add_argument("--device", default="/dev/ttyUSB0")
  169. tcp = p.add_argument_group("tcp connection")
  170. tcp.add_argument("--port", type=int)
  171. tcp.add_argument("--address")
  172. log = p.add_argument_group("logging")
  173. log.add_argument("--log-connection", action="store_true", default=False)
  174. out = p.add_argument_group("output")
  175. out.add_argument("--output", choices=["json", "text", "repr"], default="text")
  176. sub = p.add_subparsers()
  177. execute = sub.add_parser("exec")
  178. execute.set_defaults(func=main_exec)
  179. execute.add_argument("cmd")
  180. get = sub.add_parser("get")
  181. get.set_defaults(func=main_get)
  182. get.add_argument("variable")
  183. setvar = sub.add_parser("set")
  184. setvar.set_defaults(func=main_set)
  185. setvar.add_argument("--save", action="store_true", default=False)
  186. setvar.add_argument("variable")
  187. setvar.add_argument("value")
  188. save = sub.add_parser("save")
  189. save.set_defaults(func=main_save)
  190. save.add_argument("variable")
  191. dump = sub.add_parser("dump")
  192. dump.set_defaults(func=main_dump)
  193. synctime = sub.add_parser("synctime",
  194. description="Set the exact time on the encoder")
  195. synctime.set_defaults(func=main_synctime)
  196. synctime.add_argument("timezone", type=str, help="Something like Africa/Cairo")
  197. synctime.add_argument("--save", action="store_true", default=False)
  198. return p
  199. def output(args, content):
  200. if args.output == "json":
  201. json.dump(content, sys.stdout)
  202. elif args.output == "text":
  203. print(content["value"])
  204. elif args.output == "repr":
  205. print(repr(content["value"]))
  206. def main_exec(args, pira: Pira):
  207. try:
  208. out = pira.execute_command(args.cmd)
  209. except CommandError as exc:
  210. print(f"Command error: {exc}", file=sys.stderr)
  211. sys.exit(1)
  212. output(args, dict(cmd=args.cmd, value=out))
  213. def main_get(args, pira: Pira):
  214. try:
  215. out = pira.read_variable(args.variable)
  216. except UnknownCommandError:
  217. print(f"Variable not found: {args.variable}", file=sys.stderr)
  218. sys.exit(1)
  219. output(args, dict(variable=args.variable, value=out))
  220. def main_set(args, pira: Pira):
  221. try:
  222. pira.set_variable(args.variable, args.value, save=args.save)
  223. except UnknownCommandError:
  224. print(f"Variable not found: {args.variable}", file=sys.stderr)
  225. sys.exit(1)
  226. def main_save(args, pira: Pira):
  227. try:
  228. pira.save_variable(args.variable)
  229. except UnknownCommandError:
  230. print(f"Variable not found: {args.variable}", file=sys.stderr)
  231. sys.exit(1)
  232. def main_dump(args, pira: Pira):
  233. state = dict(vars={}, failed=[])
  234. for var in VARIABLES:
  235. try:
  236. value = pira.read_variable(var)
  237. except (UnknownCommandError, InvalidArgumentError):
  238. state["failed"].append(var)
  239. else:
  240. state["vars"][var] = value
  241. json.dump(state, sys.stdout, indent=2)
  242. def main_synctime(args, pira: Pira):
  243. from pytz import timezone
  244. tz = timezone(args.timezone)
  245. local_time = datetime.now(tz)
  246. date = local_time.strftime("%d.%m.%y")
  247. time = local_time.strftime("%H:%M:%S")
  248. pira.set_variable("TIME", time, save=args.save)
  249. pira.set_variable("DATE", date, save=args.save)
  250. delta = tz.utcoffset(datetime.now())
  251. offset = delta.total_seconds() / 1800 # multiple of half-hours. So +10800s => +6
  252. pira.set_variable("LTO", "%+d" % offset, save=args.save)
  253. def make_connection(args) -> Backends:
  254. if args.address is not None:
  255. skt = socket.socket()
  256. skt.connect((args.address, args.port))
  257. return TCPClient(socket=skt)
  258. s = serial.Serial(baudrate=args.baudrate, port=args.device)
  259. return PiraSerial(serial=s)
  260. def main():
  261. args = get_parser().parse_args()
  262. if args.log_connection:
  263. logging.basicConfig(level=logging.DEBUG)
  264. logging.getLogger("pira.backend").setLevel(logging.DEBUG)
  265. connection = Pira(make_connection(args))
  266. args.func(args, connection)
  267. if __name__ == "__main__":
  268. main()