123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- '''
- Created on 29-03-2013
- @author: citan
- '''
- import serial
- import time
- from pimonitor.PMPacket import PMPacket
- class PMConnection(object):
- '''
- classdocs
- '''
- def __init__(self):
- '''
- Constructor
- '''
- self._ser = None
-
- def open(self):
- self._ser = serial.Serial(
- port='/dev/ttyUSB0',
- # port='/dev/tty.usbserial-000013FA',
- baudrate=4800,
- timeout=2000,
- writeTimeout=55,
- parity=serial.PARITY_NONE,
- stopbits=serial.STOPBITS_ONE,
- bytesize=serial.EIGHTBITS)
- time.sleep(0.2)
- def close(self):
- if self._ser != None:
- self._ser.close()
-
- def init(self, target):
- request_packet = PMPacket(self.get_destination(target), 0xF0, [0xBF])
- return self.send_packet(request_packet)
- def send_packet(self, packet):
- self._ser.write(packet.to_string())
- time.sleep(0.05)
-
- out_packet = None
- tmp = []
- data = []
- while self._ser.inWaiting() > 0:
- tmp = []
- # read header
- tmp = self._ser.read(3)
- data.extend(tmp)
- # read size
- sizebytes = self._ser.read()
- data.append(sizebytes[0])
- size = ord(sizebytes[0])
-
- # read data
- tmp = self._ser.read(size)
- data.extend(tmp)
- # read checksum
- data.extend(self._ser.read())
- data = map(ord, data)
- out_packet = PMPacket.from_array(data)
- data = []
- if(packet.is_equal(out_packet)):
- continue
- return out_packet
- def read_parameter(self, parameter):
- address = parameter.get_address()
- address_len = parameter.get_address_length()
- data = []
- data.append(0xA8)
- data.append(0x00)
- for i in range(0, address_len):
- target_address = address + i
- data.append((target_address & 0xffffff) >> 16)
- data.append((target_address & 0xffff) >> 8)
- data.append(target_address & 0xff)
-
- request_packet = PMPacket(self.get_destination(parameter.get_target()), 0xf0, data)
- return self.send_packet(request_packet)
- def read_parameters(self, parameters):
- data = []
- target = parameters[0].get_target()
-
- data.append(0xA8)
- data.append(0x00)
- for parameter in parameters:
- # TODO:
- if target != parameter.get_target() and target & 0x01 != parameter.get_target() & 0x01 and target & 0x02 != parameter.get_target() & 0x02:
- raise Exception('connection', "targets differ: " + str(target) + " vs " + str(parameter.get_target()))
- address = parameter.get_address()
- address_len = parameter.get_address_length()
- for i in range(0, address_len):
- target_address = address + i
- data.append((target_address & 0xffffff) >> 16)
- data.append((target_address & 0xffff) >> 8)
- data.append(target_address & 0xff)
-
- request_packet = PMPacket(self.get_destination(target), 0xf0, data)
- out_packet = self.send_packet(request_packet)
-
- out_data = out_packet.get_data()
- out_packets = []
- data_offset = 1 # skip E8
-
- for parameter in parameters:
- address_len = parameter.get_address_length()
- single_out_data = [0xE8]
- single_out_data.extend(out_data[data_offset:address_len + data_offset])
- single_out_packet = PMPacket(out_packet.get_destination(), out_packet.get_source(), single_out_data);
- out_packets.append(single_out_packet)
- data_offset += address_len
- return out_packets
-
- def get_destination(self, target):
- dst = target
- if target == 1:
- dst = 0x10
- if target == 2:
- dst = 0x18
- if target == 3:
- dst = 0x10
- return dst
|