PMPacket.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. """
  2. Created on 13-04-2013
  3. @author: citan
  4. """
  5. import array
  6. class PMPacket(object):
  7. """
  8. classdocs
  9. """
  10. # 0x80
  11. # destination byte
  12. # source byte
  13. # data size byte
  14. # ...
  15. # checksum byte sum of every byte in packet (incl. header)
  16. _header_byte = 0x80
  17. _valid_bytes = [0xFF, 0xA8, 0xE8]
  18. def __init__(self, dst, src, data):
  19. self._dst = dst
  20. self._src = src
  21. self._data = data
  22. @classmethod
  23. def from_array(cls, data):
  24. validate = PMPacket.is_valid(data)
  25. if not validate[0]:
  26. raise Exception('packet', validate[1])
  27. dst = data[1]
  28. src = data[2]
  29. data = data[4:-1]
  30. return cls(dst, src, data)
  31. @classmethod
  32. def is_valid(cls, data):
  33. # TODO: check E8
  34. valid = True
  35. msg = ""
  36. valid = valid and (len(data) > 5)
  37. msg += "invalid length (too short), " if (not valid) else ""
  38. valid = valid and (data[0] == PMPacket._header_byte)
  39. msg += "invalid header, " if (not valid) else ""
  40. # valid = data[4] in PMPacket._valid_bytes
  41. # msg += "invalid header, expected one of " + ', '.join(hex(s) for s in PMPacket._valid_bytes) +", got: " + hex(data[4]) + ", " if (not valid) else ""
  42. #valid = valid and ((data[1] == 0x10) or (data[1] == 0xf0))
  43. #valid = valid and ((data[2] == 0x10) or (data[2] == 0xf0))
  44. #valid = valid and (data[1] != data[2])
  45. #msg += "invalid source/target, " if (not valid) else ""
  46. current_len = len(data)
  47. expected_len = 5 + data[3]
  48. valid = valid and (current_len == expected_len)
  49. msg += "invalid length (is: " + str(current_len) + ", expected: " + str(expected_len) + "), " if (
  50. not valid) else ""
  51. checksum = 0
  52. for i in range(0, len(data) - 1):
  53. checksum = (checksum + data[i]) & 0xFF
  54. valid = valid and (checksum == data[-1])
  55. msg += "invalid checksum (is " + str(checksum) + ", expected: " + str(data[-1]) + "), " if (not valid) else ""
  56. return valid, msg
  57. def is_equal(self, packet):
  58. return self.to_bytes() == packet.to_bytes()
  59. def to_bytes(self):
  60. length = len(self._data)
  61. packet = [self._header_byte, self._dst, self._src, length]
  62. packet.extend(self._data)
  63. checksum = 0
  64. for b in packet:
  65. checksum = (checksum + b) & 0xFF
  66. packet.append(checksum)
  67. return packet
  68. def to_string(self):
  69. return array.array('B', self.to_bytes()).tostring()
  70. def dump(self):
  71. return "[" + ', '.join(("0x%0.2X" % s) for s in self.to_bytes()) + "], dst: " + hex(
  72. self._dst) + ", src: " + hex(self._src) + ", len: " + str(len(self._data))
  73. def get_data(self):
  74. return self._data
  75. def get_destination(self):
  76. return self._dst
  77. def get_source(self):
  78. return self._src
  79. def get_rom_id(self):
  80. return PMPacket.extract_rom_id(self._data)
  81. @classmethod
  82. def extract_rom_id(cls, data):
  83. if data[0] != 0xFF:
  84. raise Exception('packet', "not valid init response: " + hex(data[0]) + " instead 0xFF")
  85. if len(data) < 9:
  86. raise Exception('packet', "not valid init response")
  87. rom_id = ((data[4] << 32) | (data[5] << 24) | (data[6] << 16) | (data[7] << 8) | (data[8])) & 0xFFFFFFFFFF
  88. return hex(rom_id).lstrip("0x").upper()
  89. @classmethod
  90. def dump_header(cls, data):
  91. print("header [" + ', '.join(hex(s) for s in data) + "], len: " + str(len(data)))