123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- #!/usr/bin/env python3
- # SPDX-License-Identifier: LGPL-2.1-or-later
- import dbus
- try:
- from gi.repository import GObject
- except ImportError:
- import gobject as GObject
- import sys
- from dbus.mainloop.glib import DBusGMainLoop
- import bluezutils
- bus = None
- mainloop = None
- BLUEZ_SERVICE_NAME = 'org.bluez'
- DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
- DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
- GATT_SERVICE_IFACE = 'org.bluez.GattService1'
- GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
- ROOMBA_SVC_UUID = '35f28386-3070-4f3b-ba38-27507e991760'
- ROOMBA_CHR_CTL_UUID = '35f28386-3070-4f3b-ba38-27507e991762'
- ROOMBA_CHR_SENSORS_UUID = '35f28386-3070-4f3b-ba38-27507e991764'
- ROOMBA="F4:A4:72:BA:27:C1"
- # The objects that we interact with.
- roomba_service = None
- roomba_ctrl_chrc = None
- roomba_sensors_chrc = None
- def generic_error_cb(error):
- print('D-Bus call failed: ' + str(error))
- mainloop.quit()
- def sensor_contact_val_to_str(val):
- if val == 0 or val == 1:
- return 'not supported'
- if val == 2:
- return 'no contact detected'
- if val == 3:
- return 'contact detected'
- return 'invalid value'
- def mode_val_cb(value):
- print('Mode value: ' + hex(value[0]))
- def sensors_val_cb(value):
- roomba_sensors = value[0] | (value[1] << 8) | (value[2] << 16) | (value[3] << 24)
- print('Sensors Value : ' + str(hex(roomba_sensors)))
- def roomba_sensors_start_notify_cb():
- print('Roomba sensors: notification enabled')
- def roomba_sensors_changed_cb(iface, changed_props, invalidated_props):
- if iface != GATT_CHRC_IFACE:
- return
- if not len(changed_props):
- return
- value = changed_props.get('Value', None)
- if not value:
- return
- print('New Sensor values')
- roomba_sensors = value[0] | (value[1] << 8) | (value[2] << 16) | (value[3] << 24)
- print('\tValue : ' + str(hex(roomba_sensors)))
- def start_client():
- roomba_ctrl_chrc[0].ReadValue({}, reply_handler=mode_val_cb,
- error_handler=generic_error_cb,
- dbus_interface=GATT_CHRC_IFACE)
- roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb,
- error_handler=generic_error_cb,
- dbus_interface=GATT_CHRC_IFACE)
- def process_chrc(chrc_path):
- chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
- chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
- dbus_interface=DBUS_PROP_IFACE)
- uuid = chrc_props['UUID']
- if uuid == ROOMBA_CHR_CTL_UUID:
- global roomba_ctrl_chrc
- roomba_ctrl_chrc = (chrc, chrc_props)
- elif uuid == ROOMBA_CHR_SENSORS_UUID:
- global roomba_sensors_chrc
- roomba_sensors_chrc = (chrc, chrc_props)
- else:
- print('Unrecognized characteristic: ' + uuid)
- return True
- def process_roomba_service(service_path, chrc_paths):
- service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
- service_props = service.GetAll(GATT_SERVICE_IFACE,
- dbus_interface=DBUS_PROP_IFACE)
- uuid = service_props['UUID']
- if uuid != ROOMBA_SVC_UUID:
- return False
- print('roomba GATT service found: ' + service_path)
- global roomba_service
- roomba_service = (service, service_props, service_path)
-
- # Process the characteristics.
- for chrc_path in chrc_paths:
- process_chrc(chrc_path)
- return True
- def interfaces_removed_cb(object_path, interfaces):
- if not roomba_service:
- return
- if object_path == roomba_service[2]:
- print('Service was removed')
- mainloop.quit()
- def main():
- # Set up the main loop.
- DBusGMainLoop(set_as_default=True)
- global bus
- bus = dbus.SystemBus()
- global mainloop
- mainloop = GObject.MainLoop()
- om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
- om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
- print('Getting objects...')
- objects = om.GetManagedObjects()
- chrcs = []
- # List devices found
- for path, interfaces in objects.items():
- device = interfaces.get("org.bluez.Device1")
- if (device is None):
- continue
- if (device["Address"] == ROOMBA):
- print("Found ROOMBA!")
- print(device["Address"])
- device = bluezutils.find_device(ROOMBA, "hci1")
- if (device is None):
- print("Cannot 'find_device'")
- else:
- device.Connect()
- print("Connected")
-
- # List characteristics found
- for path, interfaces in objects.items():
- if GATT_CHRC_IFACE not in interfaces.keys():
- continue
- chrcs.append(path)
- # List sevices found
- for path, interfaces in objects.items():
- if GATT_SERVICE_IFACE not in interfaces.keys():
- continue
- chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
- if process_roomba_service(path, chrc_paths):
- break
- if not roomba_service:
- print('No ROOMBA found.')
- sys.exit(1)
- start_client()
- mainloop.run()
- if __name__ == '__main__':
- main()
|