decoder.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. """
  2. This module
  3. """
  4. from typing import Optional
  5. from dataclasses import dataclass, asdict
  6. @dataclass
  7. class Message:
  8. orig_line: str
  9. def asdict(self):
  10. return asdict(self)
  11. @dataclass
  12. class LogMessage(Message):
  13. level: int
  14. message: str
  15. @dataclass
  16. class DumpMessage(Message):
  17. variables: dict[str, int]
  18. class Decoder:
  19. """
  20. >>> d = Decoder()
  21. >>> sorted(d.decode(b'DMP V=1 G=200 FREQ=1234567').variables.items())
  22. [('FREQ', 1234567), ('G', 200), ('V', 1)]
  23. >>> d.decode(b'LOG D ciao').level
  24. 1
  25. >>> d.decode(b'LOG D ciao').message
  26. 'ciao'
  27. """
  28. log_levels = {
  29. "D": 1,
  30. "I": 2,
  31. "W": 3,
  32. "E": 4,
  33. "C": 5,
  34. }
  35. def __init__(self):
  36. pass
  37. def decode_level(self, description: str) -> int:
  38. return self.log_levels[description]
  39. def decode_value(self, key: str, value: str) -> Optional[dict]:
  40. return int(value, base=10)
  41. def decode_log(self, line: str) -> LogMessage:
  42. level, message = line.split(" ", 1)
  43. level = self.decode_level(level)
  44. return LogMessage(level=level, message=message, orig_line=line)
  45. def decode_dump(self, line: str) -> DumpMessage:
  46. variables = {}
  47. variables_settings = line.split()
  48. for varset in variables_settings:
  49. key, val = varset.split("=", 1)
  50. val = self.decode_value(key, val)
  51. variables[key] = val
  52. return DumpMessage(variables=variables, orig_line=line)
  53. def decode(self, line: bytes) -> Optional[Message]:
  54. """
  55. Returns None if the line is not meant to be handled by this decoder.
  56. Raise if meant for us but invalid.
  57. """
  58. line = line.decode("ascii")
  59. if line.startswith("LOG "):
  60. return self.decode_log(line.removeprefix("LOG "))
  61. if line.startswith("DMP "):
  62. return self.decode_dump(line.removeprefix("DMP "))
  63. return None