#!/usr/bin/env python3 # SPDX-License-Identifier: LGPL-2.1-or-later import dbus try: from gi.repository import GObject except ImportError: import gobject as GObject import sys import time from dbus.mainloop.glib import DBusGMainLoop import bluezutils import struct import keyboard # Settings ROOMBA="CE:E0:94:15:6A:3D" HCI="hci0" bus = None mainloop = None BLUEZ_SERVICE_NAME = 'org.bluez' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' GATT_SERVICE_IFACE = 'org.bluez.GattService1' GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' ROOMBA_SVC_UUID = '35f28386-3070-4f3b-ba38-27507e991760' ROOMBA_CHR_CTL_UUID = '35f28386-3070-4f3b-ba38-27507e991762' ROOMBA_CHR_SENSORS_READ_UUID = '35f28386-3070-4f3b-ba38-27507e991764' ROOMBA_CHR_SENSORS_NOTIFY_UUID = '35f28386-3070-4f3b-ba38-27507e991766' # The objects that we interact with. roomba_service = None roomba_ctrl_chrc = None roomba_sensors_chrc = None roomba_notify_chrc = None bt_write_in_progress = False bt_write_buffer = None bt_write_idx = 0 bt_write_len = 0 cur_speed = struct.unpack('h', struct.pack('h', 0))[0] angle = struct.unpack('h', struct.pack('h', 0))[0] roomba_main_brush_on = False # 'j' roomba_vacuum_on = False # 'k' roomba_side_brush_on = False # 'l' def bt_write(L): global bt_write_buffer global bt_write_in_progress global bt_write_idx global bt_write_len while (bt_write_in_progress): time.sleep(0.1) bt_write_in_progress = True bt_write_buffer = L bt_write_len = len(L) bt_write_idx = 0 roomba_ctrl_chrc[0].WriteValue(bytes(bt_write_buffer[bt_write_idx]), {}, reply_handler=ctrl_write_cb, error_handler=generic_error_cb, dbus_interface=GATT_CHRC_IFACE) bt_write_idx = 1 def ctrl_write_cb(): global bt_write_buffer global bt_write_in_progress global bt_write_idx global bt_write_len if bt_write_idx < bt_write_len: # print("Writing next char %d / %d " % (bt_write_idx, bt_write_len - 1)) roomba_ctrl_chrc[0].WriteValue(bytes(bt_write_buffer[bt_write_idx]), {}, reply_handler=ctrl_write_cb, error_handler=generic_error_cb, dbus_interface=GATT_CHRC_IFACE) bt_write_idx += 1 else: bt_write_in_progress = False return def roomba_drive(s, a): print("driving %d %d" % (s, a)) si = struct.pack('!h', int(s)) if (a == 0): ai = struct.pack('!H', 0x8000) else: ai = struct.pack('!h', int(a)) bt_write([[0x44], [si[0]], [si[1]], [ai[0]], [ai[1]]]) while (bt_write_in_progress): time.sleep(0.100000) roomba_cmd_clean() def roomba_cmd_endsteer(arg): global angle global cur_speed if (angle): angle = 0 roomba_drive(cur_speed, angle) def roomba_cmd_clean(): global roomba_main_brush_on global roomba_vacuum_on global roomba_side_brush_on b0 = 0 b1 = 0 b2 = 0 if roomba_main_brush_on: b0 = 1 if roomba_vacuum_on: b1 = 1 if roomba_side_brush_on: b2 = 1 bt_write([[0x43], [b0], [b1], [b2]]) def toggle_brush(): global roomba_main_brush_on roomba_main_brush_on = not roomba_main_brush_on roomba_cmd_clean() def toggle_vacuum(): global roomba_vacuum_on roomba_vacuum_on = not roomba_vacuum_on roomba_cmd_clean() def toggle_side(): global roomba_side_brush_on roomba_side_brush_on = not roomba_side_brush_on roomba_cmd_clean() def roomba_mode(m): bt_write([[0x4d], [m]]) def key_parse(key): global cur_speed global angle old_speed = cur_speed old_angle = angle if keyboard.is_pressed('0'): roomba_mode(0) return if keyboard.is_pressed('1'): roomba_mode(1) return if keyboard.is_pressed('2'): roomba_mode(2) return if keyboard.is_pressed('3'): roomba_mode(3) return if keyboard.is_pressed('4'): roomba_mode(4) return if keyboard.is_pressed('5'): roomba_mode(5) return if keyboard.is_pressed('6'): roomba_mode(66) return if keyboard.is_pressed('j'): toggle_brush() return if keyboard.is_pressed('k'): toggle_vacuum() return if keyboard.is_pressed('l'): toggle_side() return if keyboard.is_pressed(' '): cur_speed = 0 angle = 0 if keyboard.is_pressed('a'): angle = 200 elif keyboard.is_pressed('d'): angle = (-200) else: angle = 0 if keyboard.is_pressed('w'): cur_speed += 50 if keyboard.is_pressed('s'): cur_speed -= 50 cur_speed = int(cur_speed) angle = int(angle) if old_speed == cur_speed and old_angle == angle: return if cur_speed == 0 and angle != 0: turn_speed = 150 if angle > 0: angle = 1 else: angle = -1 roomba_drive(turn_speed, angle) else: roomba_drive(cur_speed, angle) def generic_error_cb(error): print('D-Bus call failed: ' + str(error)) mainloop.quit() def sensor_contact_val_to_str(val): if val == 0 or val == 1: return 'not supported' if val == 2: return 'no contact detected' if val == 3: return 'contact detected' return 'invalid value' def mode_val_cb(value): print('Mode value: ' + hex(value[0])) def sensors_val_cb(value): print(bytes(value)) x = struct.unpack("I11B", bytes(value)) for y in x: print('Sensors Values : ' + str(y)) def roomba_sensors_start_notify_cb(): print('Roomba sensors: notification enabled') def roomba_sensors_changed_cb(iface, changed_props, invalidated_props): if iface != GATT_CHRC_IFACE: return if not len(changed_props): return value = changed_props.get('Value', None) if not value: return roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb, error_handler=generic_error_cb, dbus_interface=GATT_CHRC_IFACE) def sensors_start_notify_cb(): print("Sensors notifications: enabled.") def start_client(): roomba_ctrl_chrc[0].ReadValue({}, reply_handler=mode_val_cb, error_handler=generic_error_cb, dbus_interface=GATT_CHRC_IFACE) roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb, error_handler=generic_error_cb, dbus_interface=GATT_CHRC_IFACE) roomba_sensors_prop_iface = dbus.Interface(roomba_notify_chrc[0], DBUS_PROP_IFACE) roomba_sensors_prop_iface.connect_to_signal("PropertiesChanged", roomba_sensors_changed_cb) roomba_notify_chrc[0].StartNotify(reply_handler=sensors_start_notify_cb, error_handler=generic_error_cb, dbus_interface=GATT_CHRC_IFACE) roomba_mode(0x02) def process_chrc(chrc_path): chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path) chrc_props = chrc.GetAll(GATT_CHRC_IFACE, dbus_interface=DBUS_PROP_IFACE) uuid = chrc_props['UUID'] if uuid == ROOMBA_CHR_CTL_UUID: global roomba_ctrl_chrc roomba_ctrl_chrc = (chrc, chrc_props) elif uuid == ROOMBA_CHR_SENSORS_READ_UUID: global roomba_sensors_chrc roomba_sensors_chrc = (chrc, chrc_props) elif uuid == ROOMBA_CHR_SENSORS_NOTIFY_UUID: global roomba_notify_chrc roomba_notify_chrc = (chrc, chrc_props) else: print('Unrecognized characteristic: ' + uuid) return True def process_roomba_service(service_path, chrc_paths): service = bus.get_object(BLUEZ_SERVICE_NAME, service_path) service_props = service.GetAll(GATT_SERVICE_IFACE, dbus_interface=DBUS_PROP_IFACE) uuid = service_props['UUID'] if uuid != ROOMBA_SVC_UUID: return False print('roomba GATT service found: ' + service_path) global roomba_service roomba_service = (service, service_props, service_path) # Process the characteristics. for chrc_path in chrc_paths: process_chrc(chrc_path) return True def interfaces_removed_cb(object_path, interfaces): if not roomba_service: return if object_path == roomba_service[2]: print('Service was removed') mainloop.quit() def main(): # Set up the main loop. DBusGMainLoop(set_as_default=True) global bus bus = dbus.SystemBus() global mainloop mainloop = GObject.MainLoop() om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb) print('Getting objects...') objects = om.GetManagedObjects() chrcs = [] # List devices found for path, interfaces in objects.items(): device = interfaces.get("org.bluez.Device1") if (device is None): continue if (device["Address"] == ROOMBA): print("Found ROOMBA!") print(device["Address"]) device = bluezutils.find_device(ROOMBA,HCI) if (device is None): print("Cannot 'find_device'") else: device.Connect() print("Connected") # List characteristics found for path, interfaces in objects.items(): if GATT_CHRC_IFACE not in interfaces.keys(): continue chrcs.append(path) # List sevices found for path, interfaces in objects.items(): if GATT_SERVICE_IFACE not in interfaces.keys(): continue chrc_paths = [d for d in chrcs if d.startswith(path + "/")] if process_roomba_service(path, chrc_paths): break if not roomba_service: print('No ROOMBA found.') sys.exit(1) start_client() keyboard.on_press_key("w", key_parse) keyboard.on_press_key("a", key_parse) keyboard.on_press_key("s", key_parse) keyboard.on_press_key("d", key_parse) keyboard.on_press_key(" ", key_parse) keyboard.on_press_key("0", key_parse) keyboard.on_press_key("1", key_parse) keyboard.on_press_key("2", key_parse) keyboard.on_press_key("3", key_parse) keyboard.on_press_key("4", key_parse) keyboard.on_press_key("5", key_parse) keyboard.on_press_key("6", key_parse) keyboard.on_press_key("j", key_parse) keyboard.on_press_key("k", key_parse) keyboard.on_press_key("l", key_parse) keyboard.on_release_key("a", roomba_cmd_endsteer) keyboard.on_release_key("d", roomba_cmd_endsteer) mainloop.run() if __name__ == '__main__': main()