82 baris
1,9 KiB
Python
82 baris
1,9 KiB
Python
"""
|
|
This module
|
|
"""
|
|
|
|
from typing import Optional
|
|
from dataclasses import dataclass, asdict
|
|
|
|
|
|
@dataclass
|
|
class Message:
|
|
orig_line: str
|
|
|
|
def asdict(self):
|
|
return asdict(self)
|
|
|
|
|
|
@dataclass
|
|
class LogMessage(Message):
|
|
level: int
|
|
message: str
|
|
|
|
|
|
@dataclass
|
|
class DumpMessage(Message):
|
|
variables: dict[str, int]
|
|
|
|
|
|
class Decoder:
|
|
"""
|
|
>>> d = Decoder()
|
|
>>> sorted(d.decode(b'DMP V=1 G=200 FREQ=1234567').variables.items())
|
|
[('FREQ', 1234567), ('G', 200), ('V', 1)]
|
|
>>> d.decode(b'LOG D ciao').level
|
|
1
|
|
>>> d.decode(b'LOG D ciao').message
|
|
'ciao'
|
|
"""
|
|
|
|
log_levels = {
|
|
"D": 1,
|
|
"I": 2,
|
|
"W": 3,
|
|
"E": 4,
|
|
"C": 5,
|
|
}
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def decode_level(self, description: str) -> int:
|
|
return self.log_levels[description]
|
|
|
|
def decode_value(self, key: str, value: str) -> Optional[dict]:
|
|
return int(value, base=10)
|
|
|
|
def decode_log(self, line: str) -> LogMessage:
|
|
level, message = line.split(" ", 1)
|
|
level = self.decode_level(level)
|
|
return LogMessage(level=level, message=message, orig_line=line)
|
|
|
|
def decode_dump(self, line: str) -> DumpMessage:
|
|
variables = {}
|
|
variables_settings = line.split()
|
|
for varset in variables_settings:
|
|
key, val = varset.split("=", 1)
|
|
val = self.decode_value(key, val)
|
|
variables[key] = val
|
|
return DumpMessage(variables=variables, orig_line=line)
|
|
|
|
def decode(self, line: bytes) -> Optional[Message]:
|
|
"""
|
|
Returns None if the line is not meant to be handled by this decoder.
|
|
|
|
Raise if meant for us but invalid.
|
|
"""
|
|
line = line.decode("ascii")
|
|
|
|
if line.startswith("LOG "):
|
|
return self.decode_log(line.removeprefix("LOG "))
|
|
if line.startswith("DMP "):
|
|
return self.decode_dump(line.removeprefix("DMP "))
|
|
return None
|