sergio.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. #!/usr/bin/env python3
  2. import sys
  3. import serial
  4. import socket
  5. import argparse
  6. from typing import Union
  7. import logging
  8. import json
  9. import enum
  10. VARIABLES = [
  11. "ADR",
  12. "AF",
  13. "AFCH",
  14. "CC",
  15. "COMSPD",
  16. "CT",
  17. "DATE",
  18. "DI",
  19. "DPS2MSG",
  20. "DTTMOUT",
  21. "ECC",
  22. "ECCEN",
  23. "EQTEXT1",
  24. "EXTSYNC",
  25. "GRPSEQ",
  26. "LABPER",
  27. "LIC",
  28. "LEVEL",
  29. "LTO",
  30. "MJD",
  31. "MS",
  32. "MSGLIST",
  33. "PHASE",
  34. "PILOT",
  35. "PI",
  36. "PROGRAM",
  37. "PS",
  38. "PSNMAIN",
  39. "PSW",
  40. "PTY",
  41. "PTYN",
  42. "PTYNEN",
  43. "RDS2MOD",
  44. "RDSGEN",
  45. "READWEB",
  46. "RSTDPS",
  47. "RT2MSG",
  48. "RT2TYPE",
  49. "RTP",
  50. "RTPRUN",
  51. "RTPER",
  52. "RTTYPE",
  53. "RT1",
  54. "RT1EN",
  55. "RT2",
  56. "RT2EN",
  57. "SCRLSPD",
  58. "SEN",
  59. "SHORTRT",
  60. "SITE",
  61. "SLIST",
  62. "SPEED",
  63. "SPSPER",
  64. "TA",
  65. "TATMOUT",
  66. "TIME",
  67. "TP",
  68. "TPS",
  69. "UECP",
  70. "VER",
  71. ]
  72. class PiraSerial:
  73. def __init__(self, serial: serial.Serial):
  74. self.log = logging.getLogger("pira.backend.serial")
  75. self.serial = serial
  76. def send(self, content: bytes):
  77. self.log.debug(f"-> {content}")
  78. self.serial.write(content)
  79. def recv(self) -> bytes:
  80. content = self.serial.read_until(b"\n")
  81. self.log.debug(f"<- {content}")
  82. return content
  83. class TCPClient:
  84. def __init__(self, socket: socket.socket):
  85. self.log = logging.getLogger("pira.backend.tcpclient")
  86. self.socket = socket
  87. def connect(self):
  88. pass
  89. def send(self, content: bytes):
  90. self.log.debug(f"-> {content}")
  91. return self.socket.send(content)
  92. def recv(self) -> bytes:
  93. content = self.socket.recv(1)
  94. self.log.debug(f"<- {content}")
  95. return content
  96. Backends = Union[PiraSerial, TCPClient]
  97. class LookupEnum:
  98. @classmethod
  99. def lookup(cls, value):
  100. for field in cls:
  101. if field.value == value:
  102. return field
  103. raise ValueError()
  104. class CommandError(Exception):
  105. pass
  106. class UnknownCommandError(CommandError):
  107. pass
  108. class InvalidArgumentError(CommandError):
  109. pass
  110. class ProcessedPartiallyError(CommandError):
  111. pass
  112. class CommandStatus(LookupEnum, enum.StrEnum):
  113. SUCCESS = "+"
  114. UNKNOWN_COMMAND = "!"
  115. INVALID_ARGUMENT = "-"
  116. PROCESSED_PARTIALLY = "/"
  117. def to_exception(self):
  118. error_classes = {
  119. self.UNKNOWN_COMMAND: UnknownCommandError,
  120. self.INVALID_ARGUMENT: InvalidArgumentError,
  121. self.PROCESSED_PARTIALLY: ProcessedPartiallyError,
  122. }
  123. exc_cls = error_classes.get(self, None)
  124. if exc_cls is not None:
  125. return exc_cls()
  126. class Pira:
  127. END_OF_MESSAGE = "".join([chr(13), chr(10), chr(13), chr(10)])
  128. def __init__(self, backend: Backends):
  129. self.log = logging.getLogger("Pira")
  130. self.backend = backend
  131. self.set_variable("ECHO", "0")
  132. def recv_response(self) -> str:
  133. def msg_complete(s: str) -> bool:
  134. for field in CommandStatus:
  135. if s.endswith(field.value + self.END_OF_MESSAGE):
  136. return True
  137. return False
  138. ret = ""
  139. while True:
  140. ret += self.backend.recv().decode("ascii")
  141. if msg_complete(ret):
  142. value = ret.removesuffix(self.END_OF_MESSAGE).removeprefix("\r\n")
  143. status = CommandStatus.lookup(value[-1])
  144. if status != CommandStatus.SUCCESS:
  145. raise status.to_exception()
  146. return value[:-1].removesuffix("\r\n")
  147. def execute_command(self, command: str):
  148. self.backend.send(command.encode("ascii") + b"\r")
  149. return self.recv_response()
  150. def read_variable(self, variable: str):
  151. return self.execute_command(variable)
  152. def set_variable(self, variable: str, value: str):
  153. cmd = f"{variable}={value}"
  154. self.execute_command(cmd)
  155. def save_variable(self, variable: str):
  156. cmd = f"*{variable}"
  157. self.execute_command(cmd)
  158. def get_parser():
  159. p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  160. serial = p.add_argument_group("serial connection")
  161. serial.add_argument("--baudrate", type=int, default=19200)
  162. serial.add_argument("--device", default="/dev/ttyUSB0")
  163. tcp = p.add_argument_group("tcp connection")
  164. tcp.add_argument("--port", type=int)
  165. tcp.add_argument("--address")
  166. log = p.add_argument_group("logging")
  167. log.add_argument("--log-connection", action="store_true", default=False)
  168. out = p.add_argument_group("output")
  169. out.add_argument("--output", choices=["json", "text", "repr"], default="text")
  170. sub = p.add_subparsers()
  171. get = sub.add_parser("get")
  172. get.set_defaults(func=main_get)
  173. get.add_argument("variable")
  174. setvar = sub.add_parser("set")
  175. setvar.set_defaults(func=main_set)
  176. setvar.add_argument("variable")
  177. setvar.add_argument("value")
  178. save = sub.add_parser("save")
  179. save.set_defaults(func=main_save)
  180. save.add_argument("variable")
  181. dump = sub.add_parser("dump")
  182. dump.set_defaults(func=main_dump)
  183. synctime = sub.add_parser("synctime",
  184. description="Set the exact time on the encoder")
  185. synctime.set_defaults(func=main_synctime)
  186. synctime.add_argument("timezone", type=str, help="Something like Africa/Cairo")
  187. synctime.add_argument("--save", action="store_true", default=False)
  188. return p
  189. def output(args, content):
  190. if args.output == "json":
  191. json.dump(content, sys.stdout)
  192. elif args.output == "text":
  193. print(content["value"])
  194. elif args.output == "repr":
  195. print(repr(content["value"]))
  196. def main_get(args, pira: Pira):
  197. try:
  198. out = pira.read_variable(args.variable)
  199. except UnknownCommandError:
  200. print("Variable not found: {args.variable}", file=sys.stderr)
  201. sys.exit(1)
  202. output(args, dict(variable=args.variable, value=out))
  203. def main_set(args, pira: Pira):
  204. try:
  205. pira.set_variable(args.variable, args.value)
  206. except UnknownCommandError:
  207. print("Variable not found: {args.variable}", file=sys.stderr)
  208. sys.exit(1)
  209. def main_save(args, pira: Pira):
  210. try:
  211. pira.save_variable(args.variable)
  212. except UnknownCommandError:
  213. print("Variable not found: {args.variable}", file=sys.stderr)
  214. sys.exit(1)
  215. def main_dump(args, pira: Pira):
  216. state = dict(vars={}, failed=[])
  217. for var in VARIABLES:
  218. try:
  219. value = pira.read_variable(var)
  220. except (UnknownCommandError, InvalidArgumentError):
  221. state["failed"].append(var)
  222. else:
  223. state["vars"][var] = value
  224. json.dump(state, sys.stdout, indent=2)
  225. def main_synctime(args, pira: Pira):
  226. from datetime import datetime
  227. from pytz import timezone
  228. tz = timezone(args.timezone)
  229. local_time = datetime.now(tz)
  230. date = local_time.strftime("%d.%m.%y")
  231. time = local_time.strftime("%H:%M:%S")
  232. pira.set_variable("TIME", time)
  233. pira.set_variable("DATE", date)
  234. delta = tz.utcoffset(datetime.now())
  235. offset = delta.total_seconds() / 1800 # multiple of half-hours. So +10800s => +6
  236. pira.set_variable("LTO", "%+d" % offset)
  237. if args.save:
  238. pira.save_variable("DATE")
  239. pira.save_variable("TIME")
  240. pira.save_variable("LTO")
  241. def make_connection(args) -> Backends:
  242. if args.address is not None:
  243. skt = socket.socket()
  244. skt.connect((args.address, args.port))
  245. return TCPClient(socket=skt)
  246. s = serial.Serial(baudrate=args.baudrate, port=args.device)
  247. return PiraSerial(serial=s)
  248. def main():
  249. args = get_parser().parse_args()
  250. if args.log_connection:
  251. logging.basicConfig(level=logging.DEBUG)
  252. logging.getLogger("pira.backend").setLevel(logging.DEBUG)
  253. connection = Pira(make_connection(args))
  254. args.func(args, connection)
  255. if __name__ == "__main__":
  256. main()