sergio.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. #!/usr/bin/env python3
  2. import sys
  3. import serial
  4. import socket
  5. import argparse
  6. from typing import Union
  7. import logging
  8. import json
  9. import enum
  10. from datetime import datetime
  11. VARIABLES = [
  12. "ADR",
  13. "AF",
  14. "AFCH",
  15. "CC",
  16. "COMSPD",
  17. "CT",
  18. "DATE",
  19. "DI",
  20. "DPS2MSG",
  21. "DTTMOUT",
  22. "ECC",
  23. "ECCEN",
  24. "EQTEXT1",
  25. "EXTSYNC",
  26. "GRPSEQ",
  27. "LABPER",
  28. "LIC",
  29. "LEVEL",
  30. "LTO",
  31. "MJD",
  32. "MS",
  33. "MSGLIST",
  34. "PHASE",
  35. "PILOT",
  36. "PI",
  37. "PROGRAM",
  38. "PS",
  39. "PSNMAIN",
  40. "PSW",
  41. "PTY",
  42. "PTYN",
  43. "PTYNEN",
  44. "RDS2MOD",
  45. "RDSGEN",
  46. "READWEB",
  47. "RSTDPS",
  48. "RT2MSG",
  49. "RT2TYPE",
  50. "RTP",
  51. "RTPRUN",
  52. "RTPER",
  53. "RTTYPE",
  54. "RT1",
  55. "RT1EN",
  56. "RT2",
  57. "RT2EN",
  58. "SCRLSPD",
  59. "SEN",
  60. "SHORTRT",
  61. "SITE",
  62. "SLIST",
  63. "SPEED",
  64. "SPSPER",
  65. "TA",
  66. "TATMOUT",
  67. "TIME",
  68. "TP",
  69. "TPS",
  70. "UECP",
  71. "VER",
  72. ]
  73. class PiraSerial:
  74. def __init__(self, serial: serial.Serial):
  75. self.log = logging.getLogger("pira.backend.serial")
  76. self.serial = serial
  77. def send(self, content: bytes):
  78. self.log.debug(f"-> {content}")
  79. self.serial.write(content)
  80. def recv(self) -> bytes:
  81. content = self.serial.read_until(b"\n")
  82. self.log.debug(f"<- {content}")
  83. return content
  84. class TCPClient:
  85. def __init__(self, socket: socket.socket):
  86. self.log = logging.getLogger("pira.backend.tcpclient")
  87. self.socket = socket
  88. def connect(self):
  89. pass
  90. def send(self, content: bytes):
  91. self.log.debug(f"-> {content}")
  92. return self.socket.send(content)
  93. def recv(self) -> bytes:
  94. content = self.socket.recv(1)
  95. self.log.debug(f"<- {content}")
  96. return content
  97. Backends = Union[PiraSerial, TCPClient]
  98. class LookupEnum:
  99. @classmethod
  100. def lookup(cls, value):
  101. for field in cls:
  102. if field.value == value:
  103. return field
  104. raise ValueError()
  105. class CommandError(Exception):
  106. pass
  107. class UnknownCommandError(CommandError):
  108. pass
  109. class InvalidArgumentError(CommandError):
  110. pass
  111. class ProcessedPartiallyError(CommandError):
  112. pass
  113. class CommandStatus(LookupEnum, enum.StrEnum):
  114. SUCCESS = "+"
  115. UNKNOWN_COMMAND = "!"
  116. INVALID_ARGUMENT = "-"
  117. PROCESSED_PARTIALLY = "/"
  118. def to_exception(self):
  119. error_classes = {
  120. self.UNKNOWN_COMMAND: UnknownCommandError,
  121. self.INVALID_ARGUMENT: InvalidArgumentError,
  122. self.PROCESSED_PARTIALLY: ProcessedPartiallyError,
  123. }
  124. exc_cls = error_classes.get(self, None)
  125. if exc_cls is not None:
  126. return exc_cls()
  127. class Pira:
  128. END_OF_MESSAGE = "".join([chr(13), chr(10), chr(13), chr(10)])
  129. def __init__(self, backend: Backends):
  130. self.log = logging.getLogger("Pira")
  131. self.backend = backend
  132. self.set_variable("ECHO", "0")
  133. def recv_response(self) -> str:
  134. def msg_complete(s: str) -> bool:
  135. for field in CommandStatus:
  136. if s.endswith(field.value + self.END_OF_MESSAGE):
  137. return True
  138. return False
  139. ret = ""
  140. while True:
  141. ret += self.backend.recv().decode("ascii")
  142. if msg_complete(ret):
  143. value = ret.removesuffix(self.END_OF_MESSAGE).removeprefix("\r\n")
  144. status = CommandStatus.lookup(value[-1])
  145. if status != CommandStatus.SUCCESS:
  146. raise status.to_exception()
  147. return value[:-1].removesuffix("\r\n")
  148. def execute_command(self, command: str):
  149. self.backend.send(command.encode("ascii") + b"\r")
  150. return self.recv_response()
  151. def read_variable(self, variable: str):
  152. return self.execute_command(variable)
  153. def set_variable(self, variable: str, value: str):
  154. cmd = f"{variable}={value}"
  155. self.execute_command(cmd)
  156. def save_variable(self, variable: str):
  157. cmd = f"*{variable}"
  158. self.execute_command(cmd)
  159. def get_parser():
  160. p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  161. serial = p.add_argument_group("serial connection")
  162. serial.add_argument("--baudrate", type=int, default=19200)
  163. serial.add_argument("--device", default="/dev/ttyUSB0")
  164. tcp = p.add_argument_group("tcp connection")
  165. tcp.add_argument("--port", type=int)
  166. tcp.add_argument("--address")
  167. log = p.add_argument_group("logging")
  168. log.add_argument("--log-connection", action="store_true", default=False)
  169. out = p.add_argument_group("output")
  170. out.add_argument("--output", choices=["json", "text", "repr"], default="text")
  171. sub = p.add_subparsers()
  172. get = sub.add_parser("get")
  173. get.set_defaults(func=main_get)
  174. get.add_argument("variable")
  175. setvar = sub.add_parser("set")
  176. setvar.set_defaults(func=main_set)
  177. setvar.add_argument("variable")
  178. setvar.add_argument("value")
  179. save = sub.add_parser("save")
  180. save.set_defaults(func=main_save)
  181. save.add_argument("variable")
  182. dump = sub.add_parser("dump")
  183. dump.set_defaults(func=main_dump)
  184. synctime = sub.add_parser("synctime",
  185. description="Set the exact time on the encoder")
  186. synctime.set_defaults(func=main_synctime)
  187. synctime.add_argument("timezone", type=str, help="Something like Africa/Cairo")
  188. synctime.add_argument("--save", action="store_true", default=False)
  189. return p
  190. def output(args, content):
  191. if args.output == "json":
  192. json.dump(content, sys.stdout)
  193. elif args.output == "text":
  194. print(content["value"])
  195. elif args.output == "repr":
  196. print(repr(content["value"]))
  197. def main_get(args, pira: Pira):
  198. try:
  199. out = pira.read_variable(args.variable)
  200. except UnknownCommandError:
  201. print("Variable not found: {args.variable}", file=sys.stderr)
  202. sys.exit(1)
  203. output(args, dict(variable=args.variable, value=out))
  204. def main_set(args, pira: Pira):
  205. try:
  206. pira.set_variable(args.variable, args.value)
  207. except UnknownCommandError:
  208. print("Variable not found: {args.variable}", file=sys.stderr)
  209. sys.exit(1)
  210. def main_save(args, pira: Pira):
  211. try:
  212. pira.save_variable(args.variable)
  213. except UnknownCommandError:
  214. print("Variable not found: {args.variable}", file=sys.stderr)
  215. sys.exit(1)
  216. def main_dump(args, pira: Pira):
  217. state = dict(vars={}, failed=[])
  218. for var in VARIABLES:
  219. try:
  220. value = pira.read_variable(var)
  221. except (UnknownCommandError, InvalidArgumentError):
  222. state["failed"].append(var)
  223. else:
  224. state["vars"][var] = value
  225. json.dump(state, sys.stdout, indent=2)
  226. def main_synctime(args, pira: Pira):
  227. from pytz import timezone
  228. tz = timezone(args.timezone)
  229. local_time = datetime.now(tz)
  230. date = local_time.strftime("%d.%m.%y")
  231. time = local_time.strftime("%H:%M:%S")
  232. pira.set_variable("TIME", time)
  233. pira.set_variable("DATE", date)
  234. delta = tz.utcoffset(datetime.now())
  235. offset = delta.total_seconds() / 1800 # multiple of half-hours. So +10800s => +6
  236. pira.set_variable("LTO", "%+d" % offset)
  237. if args.save:
  238. pira.save_variable("DATE")
  239. pira.save_variable("TIME")
  240. pira.save_variable("LTO")
  241. def make_connection(args) -> Backends:
  242. if args.address is not None:
  243. skt = socket.socket()
  244. skt.connect((args.address, args.port))
  245. return TCPClient(socket=skt)
  246. s = serial.Serial(baudrate=args.baudrate, port=args.device)
  247. return PiraSerial(serial=s)
  248. def main():
  249. args = get_parser().parse_args()
  250. if args.log_connection:
  251. logging.basicConfig(level=logging.DEBUG)
  252. logging.getLogger("pira.backend").setLevel(logging.DEBUG)
  253. connection = Pira(make_connection(args))
  254. args.func(args, connection)
  255. if __name__ == "__main__":
  256. main()