123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- """
- Created on 29-03-2013
- @author: citan
- """
- import serial
- import time
- from pimonitor.PMPacket import PMPacket
- class PMConnection(object):
- """
- classdocs
- """
- def __init__(self):
- """
- Constructor
- """
- self._ser = None
- def open(self):
- self._ser = serial.Serial(
- port='/dev/ttyUSB0',
- # port='/dev/tty.usbserial-000013FA',
- baudrate=4800,
- timeout=2000,
- writeTimeout=55,
- parity=serial.PARITY_NONE,
- stopbits=serial.STOPBITS_ONE,
- bytesize=serial.EIGHTBITS)
- time.sleep(0.2)
- return self._ser is not None
- def close(self):
- if self._ser is not None:
- self._ser.close()
- def init(self, target):
- request_packet = PMPacket(self.get_destination(target), 0xF0, [0xBF])
- return self.send_packet(request_packet)
- def send_packet(self, packet):
- self._ser.write(packet.to_string())
- time.sleep(0.05)
- out_packet = None
- data = []
- while self._ser.inWaiting() > 0:
- # read header
- tmp = self._ser.read(3)
- data.extend(tmp)
- # read size
- sizebytes = self._ser.read()
- data.append(sizebytes[0])
- size = ord(sizebytes[0])
- # read data
- tmp = self._ser.read(size)
- data.extend(tmp)
- # read checksum
- data.extend(self._ser.read())
- data = map(ord, data)
- out_packet = PMPacket.from_array(data)
- data = []
- if packet.is_equal(out_packet):
- continue
- return out_packet
- def read_parameter(self, parameter):
- address = parameter.get_address().get_address()
- address_len = parameter.get_address().get_length()
- data = [0xA8, 0x00]
- for i in range(0, address_len):
- target_address = address + i
- data.append((target_address & 0xffffff) >> 16)
- data.append((target_address & 0xffff) >> 8)
- data.append(target_address & 0xff)
- request_packet = PMPacket(self.get_destination(parameter.get_target()), 0xf0, data)
- return self.send_packet(request_packet)
- def read_parameters(self, parameters):
- data = []
- target = parameters[0].get_target()
- data.append(0xA8)
- data.append(0x00)
- for parameter in parameters:
- # TODO:
- if target != parameter.get_target() and target & 0x01 != parameter.get_target() & 0x01 and target & 0x02 != parameter.get_target() & 0x02:
- raise Exception('connection', "targets differ: " + str(target) + " vs " + str(parameter.get_target()))
- address = parameter.get_address().get_address()
- address_len = parameter.get_address().get_length()
- for i in range(0, address_len):
- target_address = address + i
- data.append((target_address & 0xffffff) >> 16)
- data.append((target_address & 0xffff) >> 8)
- data.append(target_address & 0xff)
- request_packet = PMPacket(self.get_destination(target), 0xf0, data)
- out_packet = self.send_packet(request_packet)
- out_data = out_packet.get_data()
- out_packets = []
- data_offset = 1 # skip E8
- for parameter in parameters:
- address_len = parameter.get_address().get_length()
- single_out_data = [0xE8]
- single_out_data.extend(out_data[data_offset:address_len + data_offset])
- single_out_packet = PMPacket(out_packet.get_destination(), out_packet.get_source(), single_out_data)
- out_packets.append(single_out_packet)
- data_offset += address_len
- return out_packets
- @staticmethod
- def get_destination(target):
- dst = target
- if target == 1:
- dst = 0x10
- if target == 2:
- dst = 0x18
- if target == 3:
- dst = 0x10
- return dst
|