12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- """
- This module
- """
- from typing import Optional
- from dataclasses import dataclass, asdict
- @dataclass
- class Message:
- orig_line: str
- def asdict(self):
- return asdict(self)
- @dataclass
- class LogMessage(Message):
- level: int
- message: str
- @dataclass
- class DumpMessage(Message):
- variables: dict[str, int]
- class Decoder:
- """
- >>> d = Decoder()
- >>> sorted(d.decode(b'DMP V=1 G=200 FREQ=1234567').variables.items())
- [('FREQ', 1234567), ('G', 200), ('V', 1)]
- >>> d.decode(b'LOG D ciao').level
- 1
- >>> d.decode(b'LOG D ciao').message
- 'ciao'
- """
- log_levels = {
- "D": 1,
- "I": 2,
- "W": 3,
- "E": 4,
- "C": 5,
- }
- def __init__(self):
- pass
- def decode_level(self, description: str) -> int:
- return self.log_levels[description]
- def decode_value(self, key: str, value: str) -> Optional[dict]:
- return int(value, base=10)
- def decode_log(self, line: str) -> LogMessage:
- level, message = line.split(" ", 1)
- level = self.decode_level(level)
- return LogMessage(level=level, message=message, orig_line=line)
- def decode_dump(self, line: str) -> DumpMessage:
- variables = {}
- variables_settings = line.split()
- for varset in variables_settings:
- key, val = varset.split("=", 1)
- val = self.decode_value(key, val)
- variables[key] = val
- return DumpMessage(variables=variables, orig_line=line)
- def decode(self, line: bytes) -> Optional[Message]:
- """
- Returns None if the line is not meant to be handled by this decoder.
- Raise if meant for us but invalid.
- """
- line = line.decode("ascii")
- if line.startswith("LOG "):
- return self.decode_log(line.removeprefix("LOG "))
- if line.startswith("DMP "):
- return self.decode_dump(line.removeprefix("DMP "))
- return None
|