#!/usr/bin/env python3 # SPDX-License-Identifier: LGPL-2.1-or-later import dbus try: from gi.repository import GObject except ImportError: import gobject as GObject import sys from dbus.mainloop.glib import DBusGMainLoop import bluezutils bus = None mainloop = None BLUEZ_SERVICE_NAME = 'org.bluez' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' GATT_SERVICE_IFACE = 'org.bluez.GattService1' GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' ROOMBA_SVC_UUID = '35f28386-3070-4f3b-ba38-27507e991760' ROOMBA_CHR_CTL_UUID = '35f28386-3070-4f3b-ba38-27507e991762' ROOMBA_CHR_SENSORS_UUID = '35f28386-3070-4f3b-ba38-27507e991764' ROOMBA="F4:A4:72:BA:27:C1" # The objects that we interact with. roomba_service = None roomba_ctrl_chrc = None roomba_sensors_chrc = None def generic_error_cb(error): print('D-Bus call failed: ' + str(error)) mainloop.quit() def sensor_contact_val_to_str(val): if val == 0 or val == 1: return 'not supported' if val == 2: return 'no contact detected' if val == 3: return 'contact detected' return 'invalid value' def mode_val_cb(value): print('Mode value: ' + hex(value[0])) def sensors_val_cb(value): roomba_sensors = value[0] | (value[1] << 8) | (value[2] << 16) | (value[3] << 24) print('Sensors Value : ' + str(hex(roomba_sensors))) def roomba_sensors_start_notify_cb(): print('Roomba sensors: notification enabled') def roomba_sensors_changed_cb(iface, changed_props, invalidated_props): if iface != GATT_CHRC_IFACE: return if not len(changed_props): return value = changed_props.get('Value', None) if not value: return print('New Sensor values') roomba_sensors = value[0] | (value[1] << 8) | (value[2] << 16) | (value[3] << 24) print('\tValue : ' + str(hex(roomba_sensors))) def start_client(): roomba_ctrl_chrc[0].ReadValue({}, reply_handler=mode_val_cb, error_handler=generic_error_cb, dbus_interface=GATT_CHRC_IFACE) roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb, error_handler=generic_error_cb, dbus_interface=GATT_CHRC_IFACE) def process_chrc(chrc_path): chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path) chrc_props = chrc.GetAll(GATT_CHRC_IFACE, dbus_interface=DBUS_PROP_IFACE) uuid = chrc_props['UUID'] if uuid == ROOMBA_CHR_CTL_UUID: global roomba_ctrl_chrc roomba_ctrl_chrc = (chrc, chrc_props) elif uuid == ROOMBA_CHR_SENSORS_UUID: global roomba_sensors_chrc roomba_sensors_chrc = (chrc, chrc_props) else: print('Unrecognized characteristic: ' + uuid) return True def process_roomba_service(service_path, chrc_paths): service = bus.get_object(BLUEZ_SERVICE_NAME, service_path) service_props = service.GetAll(GATT_SERVICE_IFACE, dbus_interface=DBUS_PROP_IFACE) uuid = service_props['UUID'] if uuid != ROOMBA_SVC_UUID: return False print('roomba GATT service found: ' + service_path) global roomba_service roomba_service = (service, service_props, service_path) # Process the characteristics. for chrc_path in chrc_paths: process_chrc(chrc_path) return True def interfaces_removed_cb(object_path, interfaces): if not roomba_service: return if object_path == roomba_service[2]: print('Service was removed') mainloop.quit() def main(): # Set up the main loop. DBusGMainLoop(set_as_default=True) global bus bus = dbus.SystemBus() global mainloop mainloop = GObject.MainLoop() om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb) print('Getting objects...') objects = om.GetManagedObjects() chrcs = [] # List devices found for path, interfaces in objects.items(): device = interfaces.get("org.bluez.Device1") if (device is None): continue if (device["Address"] == ROOMBA): print("Found ROOMBA!") print(device["Address"]) device = bluezutils.find_device(ROOMBA, "hci1") if (device is None): print("Cannot 'find_device'") else: device.Connect() print("Connected") # List characteristics found for path, interfaces in objects.items(): if GATT_CHRC_IFACE not in interfaces.keys(): continue chrcs.append(path) # List sevices found for path, interfaces in objects.items(): if GATT_SERVICE_IFACE not in interfaces.keys(): continue chrc_paths = [d for d in chrcs if d.startswith(path + "/")] if process_roomba_service(path, chrc_paths): break if not roomba_service: print('No ROOMBA found.') sys.exit(1) start_client() mainloop.run() if __name__ == '__main__': main()