|
@@ -15,6 +15,10 @@ import bluezutils
|
|
|
import struct
|
|
|
import keyboard
|
|
|
|
|
|
+# Settings
|
|
|
+ROOMBA="F4:A4:72:BA:27:C1"
|
|
|
+HCI="hci1"
|
|
|
+
|
|
|
|
|
|
bus = None
|
|
|
mainloop = None
|
|
@@ -32,7 +36,6 @@ ROOMBA_CHR_CTL_UUID = '35f28386-3070-4f3b-ba38-27507e991762'
|
|
|
ROOMBA_CHR_SENSORS_READ_UUID = '35f28386-3070-4f3b-ba38-27507e991764'
|
|
|
ROOMBA_CHR_SENSORS_NOTIFY_UUID = '35f28386-3070-4f3b-ba38-27507e991766'
|
|
|
|
|
|
-ROOMBA="F4:A4:72:BA:27:C1"
|
|
|
|
|
|
|
|
|
# The objects that we interact with.
|
|
@@ -47,6 +50,10 @@ bt_write_len = 0
|
|
|
cur_speed = struct.unpack('h', struct.pack('h', 0))[0]
|
|
|
angle = struct.unpack('h', struct.pack('h', 0))[0]
|
|
|
|
|
|
+roomba_main_brush_on = False # 'j'
|
|
|
+roomba_vacuum_on = False # 'k'
|
|
|
+roomba_side_brush_on = False # 'l'
|
|
|
+
|
|
|
def bt_write(L):
|
|
|
global bt_write_buffer
|
|
|
global bt_write_in_progress
|
|
@@ -72,14 +79,13 @@ def ctrl_write_cb():
|
|
|
global bt_write_idx
|
|
|
global bt_write_len
|
|
|
if bt_write_idx < bt_write_len:
|
|
|
- print("Writing next char %d / %d " % (bt_write_idx, bt_write_len - 1))
|
|
|
+ # print("Writing next char %d / %d " % (bt_write_idx, bt_write_len - 1))
|
|
|
roomba_ctrl_chrc[0].WriteValue(bytes(bt_write_buffer[bt_write_idx]), {}, reply_handler=ctrl_write_cb,
|
|
|
error_handler=generic_error_cb,
|
|
|
dbus_interface=GATT_CHRC_IFACE)
|
|
|
bt_write_idx += 1
|
|
|
else:
|
|
|
bt_write_in_progress = False
|
|
|
- print("written.")
|
|
|
return
|
|
|
|
|
|
def roomba_drive(s, a):
|
|
@@ -89,26 +95,60 @@ def roomba_drive(s, a):
|
|
|
ai = struct.pack('!H', 0x8000)
|
|
|
else:
|
|
|
ai = struct.pack('!h', int(a))
|
|
|
- print("integers: %02x %02x %02x %02x" % (si[0], si[1], ai[0], ai[1]))
|
|
|
- bt_write([[0x44], [si[0]], [si[1]], [ai[0]], [ai[1]], [0xFF]])
|
|
|
+ bt_write([[0x44], [si[0]], [si[1]], [ai[0]], [ai[1]]])
|
|
|
+ while (bt_write_in_progress):
|
|
|
+ time.sleep(0.100000)
|
|
|
+ roomba_cmd_clean()
|
|
|
+
|
|
|
+def roomba_cmd_endsteer(arg):
|
|
|
+ global angle
|
|
|
+ global cur_speed
|
|
|
+ if (angle):
|
|
|
+ angle = 0
|
|
|
+ roomba_drive(cur_speed, angle)
|
|
|
+
|
|
|
+
|
|
|
+def roomba_cmd_clean():
|
|
|
+ global roomba_main_brush_on
|
|
|
+ global roomba_vacuum_on
|
|
|
+ global roomba_side_brush_on
|
|
|
+ b0 = 0
|
|
|
+ b1 = 0
|
|
|
+ b2 = 0
|
|
|
+ if roomba_main_brush_on:
|
|
|
+ b0 = 1
|
|
|
+ if roomba_vacuum_on:
|
|
|
+ b1 = 1
|
|
|
+ if roomba_side_brush_on:
|
|
|
+ b2 = 1
|
|
|
+ bt_write([[0x43], [b0], [b1], [b2]])
|
|
|
+
|
|
|
+
|
|
|
+def toggle_brush():
|
|
|
+ global roomba_main_brush_on
|
|
|
+ roomba_main_brush_on = not roomba_main_brush_on
|
|
|
+ roomba_cmd_clean()
|
|
|
+
|
|
|
+def toggle_vacuum():
|
|
|
+ global roomba_vacuum_on
|
|
|
+ roomba_vacuum_on = not roomba_vacuum_on
|
|
|
+ roomba_cmd_clean()
|
|
|
+
|
|
|
+def toggle_side():
|
|
|
+ global roomba_side_brush_on
|
|
|
+ roomba_side_brush_on = not roomba_side_brush_on
|
|
|
+ roomba_cmd_clean()
|
|
|
|
|
|
|
|
|
def roomba_mode(m):
|
|
|
- bt_write([[0x4d], [m], [0xFF]])
|
|
|
+ bt_write([[0x4d], [m]])
|
|
|
|
|
|
def key_parse(key):
|
|
|
- print("keypress")
|
|
|
global cur_speed
|
|
|
global angle
|
|
|
- print("spd: %d ang: %d" % (cur_speed, angle))
|
|
|
- if keyboard.is_pressed('w'):
|
|
|
- cur_speed += 50
|
|
|
- if keyboard.is_pressed('s'):
|
|
|
- cur_speed -= 50
|
|
|
- if keyboard.is_pressed('a'):
|
|
|
- angle = 200
|
|
|
- if keyboard.is_pressed('d'):
|
|
|
- angle = (-200)
|
|
|
+ old_speed = cur_speed
|
|
|
+ old_angle = angle
|
|
|
+
|
|
|
if keyboard.is_pressed('0'):
|
|
|
roomba_mode(0)
|
|
|
return
|
|
@@ -127,13 +167,49 @@ def key_parse(key):
|
|
|
if keyboard.is_pressed('5'):
|
|
|
roomba_mode(5)
|
|
|
return
|
|
|
+ if keyboard.is_pressed('6'):
|
|
|
+ roomba_mode(66)
|
|
|
+ return
|
|
|
+ if keyboard.is_pressed('j'):
|
|
|
+ toggle_brush()
|
|
|
+ return
|
|
|
+ if keyboard.is_pressed('k'):
|
|
|
+ toggle_vacuum()
|
|
|
+ return
|
|
|
+ if keyboard.is_pressed('l'):
|
|
|
+ toggle_side()
|
|
|
+ return
|
|
|
+
|
|
|
if keyboard.is_pressed(' '):
|
|
|
cur_speed = 0
|
|
|
angle = 0
|
|
|
+
|
|
|
+
|
|
|
+ if keyboard.is_pressed('a'):
|
|
|
+ angle = 200
|
|
|
+ elif keyboard.is_pressed('d'):
|
|
|
+ angle = (-200)
|
|
|
+ else:
|
|
|
+ angle = 0
|
|
|
+
|
|
|
+ if keyboard.is_pressed('w'):
|
|
|
+ cur_speed += 50
|
|
|
+ if keyboard.is_pressed('s'):
|
|
|
+ cur_speed -= 50
|
|
|
cur_speed = int(cur_speed)
|
|
|
angle = int(angle)
|
|
|
+ if old_speed == cur_speed and old_angle == angle:
|
|
|
+ return
|
|
|
+ if cur_speed == 0 and angle != 0:
|
|
|
+ turn_speed = 150
|
|
|
+ if angle > 0:
|
|
|
+ angle = 1
|
|
|
+ else:
|
|
|
+ angle = -1
|
|
|
+ roomba_drive(turn_speed, angle)
|
|
|
+ else:
|
|
|
+ roomba_drive(cur_speed, angle)
|
|
|
|
|
|
- roomba_drive(cur_speed, angle)
|
|
|
|
|
|
def generic_error_cb(error):
|
|
|
print('D-Bus call failed: ' + str(error))
|
|
@@ -174,7 +250,6 @@ def roomba_sensors_changed_cb(iface, changed_props, invalidated_props):
|
|
|
value = changed_props.get('Value', None)
|
|
|
if not value:
|
|
|
return
|
|
|
- print("sensor values: changed.\n")
|
|
|
roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb,
|
|
|
error_handler=generic_error_cb,
|
|
|
dbus_interface=GATT_CHRC_IFACE)
|
|
@@ -283,7 +358,7 @@ def main():
|
|
|
print("Found ROOMBA!")
|
|
|
print(device["Address"])
|
|
|
|
|
|
- device = bluezutils.find_device(ROOMBA, "hci1")
|
|
|
+ device = bluezutils.find_device(ROOMBA,HCI)
|
|
|
if (device is None):
|
|
|
print("Cannot 'find_device'")
|
|
|
else:
|
|
@@ -323,6 +398,12 @@ def main():
|
|
|
keyboard.on_press_key("3", key_parse)
|
|
|
keyboard.on_press_key("4", key_parse)
|
|
|
keyboard.on_press_key("5", key_parse)
|
|
|
+ keyboard.on_press_key("6", key_parse)
|
|
|
+ keyboard.on_press_key("j", key_parse)
|
|
|
+ keyboard.on_press_key("k", key_parse)
|
|
|
+ keyboard.on_press_key("l", key_parse)
|
|
|
+ keyboard.on_release_key("a", roomba_cmd_endsteer)
|
|
|
+ keyboard.on_release_key("d", roomba_cmd_endsteer)
|
|
|
mainloop.run()
|
|
|
|
|
|
|