PMCUStandardParameter.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. from pimonitor.cu.PMCUContext import PMCUContext
  2. from pimonitor.cu.PMCUParameter import PMCUParameter
  3. __author__ = 'citan'
  4. class PMCUStandardParameter(PMCUParameter):
  5. def __init__(self, pid, name, desc, byte_index, bit_index, target):
  6. PMCUParameter.__init__(self, PMCUParameter.CU_TYPE_STD_PARAMETER())
  7. self._id = pid
  8. self._name = name
  9. self._desc = desc
  10. self._byte_index = byte_index
  11. self._bit_index = bit_index
  12. self._target = target
  13. self._conversions = []
  14. self._address = None
  15. def get_id(self):
  16. return self._id
  17. def set_address(self, address):
  18. self._address = address
  19. def get_address(self):
  20. return self._address
  21. def get_target(self):
  22. return self._target
  23. def get_name(self):
  24. return self._name
  25. def add_conversion(self, conversion):
  26. self._conversions.append(conversion)
  27. # noinspection PyUnusedLocal
  28. def get_value(self, packet, unit=None):
  29. value = ""
  30. if len(self._conversions) > 0 and unit is None:
  31. unit = self._conversions[0].get_unit()
  32. for conversion in self._conversions:
  33. curr_unit = conversion.get_unit()
  34. expr = conversion.get_expr()
  35. value_format = conversion.get_format()
  36. if unit == curr_unit:
  37. # ignore 0xe8
  38. index = 1
  39. x = 0
  40. value_bytes = packet.get_data()[index:index + self.get_address().get_length()]
  41. address_length = self.get_address().get_length()
  42. if address_length == 1:
  43. x = value_bytes[0]
  44. elif address_length == 2:
  45. x = (value_bytes[0] << 8) | value_bytes[1]
  46. elif address_length == 3:
  47. x = (value_bytes[0] << 16) | (value_bytes[1] << 8) | value_bytes[2]
  48. elif address_length == 4:
  49. x = (value_bytes[0] << 24) | (value_bytes[1] << 16) | (value_bytes[2] << 8) | value_bytes[3]
  50. try:
  51. value = eval(expr)
  52. except (SyntaxError, ZeroDivisionError, NameError):
  53. return "ERROR EVAL"
  54. format_tokens = value_format.split(".")
  55. output_format = "%.0f"
  56. if len(format_tokens) > 1:
  57. output_format = "%." + str(len(format_tokens[1])) + "f"
  58. value = output_format % value
  59. return value
  60. def get_default_unit(self):
  61. if len(self._conversions) > 0:
  62. return self._conversions[0].get_unit()
  63. return ""
  64. def is_supported(self, data):
  65. offset = PMCUContext.RESPONSE_MARK_OFFSET() + 1 + self._byte_index
  66. # <, not <= because last one is checksum
  67. if offset < len(data):
  68. cu_byte = data[offset]
  69. bit_mask = 1 << self._bit_index
  70. return cu_byte & bit_mask == bit_mask
  71. else:
  72. return False
  73. def to_string(self):
  74. return "id=" + self._id + "\nname=" + self._name + "\ndesc=" + self._desc + "\nbyte=" + str(
  75. self._byte_index) + "\n" + self._address.to_string() + "\nbit=" + str(
  76. self._bit_index) + "\ntarget=" + str(
  77. self._target) + "\nconversion:\n\t" + '%s' % ',\n\t'.join(x.to_string() for x in self._conversions)