SSM_PiMonitor/pimonitor/PMConnection.py

140 lines
4.2 KiB
Python
Raw Permalink Normal View History

2013-12-03 06:00:58 +01:00
'''
Created on 29-03-2013
@author: citan
'''
import serial
import time
from pimonitor.PMPacket import PMPacket
class PMConnection(object):
'''
classdocs
'''
def __init__(self):
'''
Constructor
'''
self._ser = None
def open(self):
self._ser = serial.Serial(
port='/dev/ttyUSB0',
# port='/dev/tty.usbserial-000013FA',
baudrate=4800,
timeout=2000,
writeTimeout=55,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS)
time.sleep(0.2)
def close(self):
if self._ser != None:
self._ser.close()
def init(self, target):
request_packet = PMPacket(self.get_destination(target), 0xF0, [0xBF])
return self.send_packet(request_packet)
def send_packet(self, packet):
self._ser.write(packet.to_string())
time.sleep(0.05)
out_packet = None
tmp = []
data = []
while self._ser.inWaiting() > 0:
tmp = []
# read header
tmp = self._ser.read(3)
data.extend(tmp)
# read size
sizebytes = self._ser.read()
data.append(sizebytes[0])
size = ord(sizebytes[0])
# read data
tmp = self._ser.read(size)
data.extend(tmp)
# read checksum
data.extend(self._ser.read())
data = map(ord, data)
out_packet = PMPacket.from_array(data)
data = []
if(packet.is_equal(out_packet)):
continue
return out_packet
def read_parameter(self, parameter):
address = parameter.get_address()
address_len = parameter.get_address_length()
data = []
data.append(0xA8)
data.append(0x00)
for i in range(0, address_len):
target_address = address + i
data.append((target_address & 0xffffff) >> 16)
data.append((target_address & 0xffff) >> 8)
data.append(target_address & 0xff)
request_packet = PMPacket(self.get_destination(parameter.get_target()), 0xf0, data)
return self.send_packet(request_packet)
def read_parameters(self, parameters):
data = []
target = parameters[0].get_target()
data.append(0xA8)
data.append(0x00)
for parameter in parameters:
# TODO:
if target != parameter.get_target() and target & 0x01 != parameter.get_target() & 0x01 and target & 0x02 != parameter.get_target() & 0x02:
raise Exception('connection', "targets differ: " + str(target) + " vs " + str(parameter.get_target()))
address = parameter.get_address()
address_len = parameter.get_address_length()
for i in range(0, address_len):
target_address = address + i
data.append((target_address & 0xffffff) >> 16)
data.append((target_address & 0xffff) >> 8)
data.append(target_address & 0xff)
request_packet = PMPacket(self.get_destination(target), 0xf0, data)
out_packet = self.send_packet(request_packet)
out_data = out_packet.get_data()
out_packets = []
data_offset = 1 # skip E8
for parameter in parameters:
address_len = parameter.get_address_length()
single_out_data = [0xE8]
single_out_data.extend(out_data[data_offset:address_len + data_offset])
single_out_packet = PMPacket(out_packet.get_destination(), out_packet.get_source(), single_out_data);
out_packets.append(single_out_packet)
data_offset += address_len
return out_packets
def get_destination(self, target):
dst = target
if target == 1:
dst = 0x10
if target == 2:
dst = 0x18
if target == 3:
dst = 0x10
return dst