123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- #!/usr/bin/env python3
- # SPDX-License-Identifier: LGPL-2.1-or-later
- import dbus
- try:
- from gi.repository import GObject
- except ImportError:
- import gobject as GObject
- import sys
- import time
- from dbus.mainloop.glib import DBusGMainLoop
- import bluezutils
- import struct
- import keyboard
- bus = None
- mainloop = None
- BLUEZ_SERVICE_NAME = 'org.bluez'
- DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
- DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
- GATT_SERVICE_IFACE = 'org.bluez.GattService1'
- GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
- ROOMBA_SVC_UUID = '35f28386-3070-4f3b-ba38-27507e991760'
- ROOMBA_CHR_CTL_UUID = '35f28386-3070-4f3b-ba38-27507e991762'
- ROOMBA_CHR_SENSORS_READ_UUID = '35f28386-3070-4f3b-ba38-27507e991764'
- ROOMBA_CHR_SENSORS_NOTIFY_UUID = '35f28386-3070-4f3b-ba38-27507e991766'
- ROOMBA="F4:A4:72:BA:27:C1"
- # The objects that we interact with.
- roomba_service = None
- roomba_ctrl_chrc = None
- roomba_sensors_chrc = None
- roomba_notify_chrc = None
- bt_write_in_progress = False
- bt_write_buffer = None
- bt_write_idx = 0
- bt_write_len = 0
- cur_speed = struct.unpack('h', struct.pack('h', 0))[0]
- angle = struct.unpack('h', struct.pack('h', 0))[0]
- def bt_write(L):
- global bt_write_buffer
- global bt_write_in_progress
- global bt_write_idx
- global bt_write_len
- while (bt_write_in_progress):
- time.sleep(0.1)
- bt_write_in_progress = True
- bt_write_buffer = L
- bt_write_len = len(L)
- bt_write_idx = 0
- roomba_ctrl_chrc[0].WriteValue(bytes(bt_write_buffer[bt_write_idx]), {}, reply_handler=ctrl_write_cb,
- error_handler=generic_error_cb,
- dbus_interface=GATT_CHRC_IFACE)
- bt_write_idx = 1
- def ctrl_write_cb():
- global bt_write_buffer
- global bt_write_in_progress
- global bt_write_idx
- global bt_write_len
- if bt_write_idx < bt_write_len:
- print("Writing next char %d / %d " % (bt_write_idx, bt_write_len - 1))
- roomba_ctrl_chrc[0].WriteValue(bytes(bt_write_buffer[bt_write_idx]), {}, reply_handler=ctrl_write_cb,
- error_handler=generic_error_cb,
- dbus_interface=GATT_CHRC_IFACE)
- bt_write_idx += 1
- else:
- bt_write_in_progress = False
- print("written.")
- return
- def roomba_drive(s, a):
- print("driving %d %d" % (s, a))
- si = struct.pack('!h', int(s))
- if (a == 0):
- ai = struct.pack('!H', 0x8000)
- else:
- ai = struct.pack('!h', int(a))
- print("integers: %02x %02x %02x %02x" % (si[0], si[1], ai[0], ai[1]))
- bt_write([[0x44], [si[0]], [si[1]], [ai[0]], [ai[1]], [0xFF]])
- def roomba_mode(m):
- bt_write([[0x4d], [m], [0xFF]])
- def key_parse(key):
- print("keypress")
- global cur_speed
- global angle
- print("spd: %d ang: %d" % (cur_speed, angle))
- if keyboard.is_pressed('w'):
- cur_speed += 50
- if keyboard.is_pressed('s'):
- cur_speed -= 50
- if keyboard.is_pressed('a'):
- angle = 200
- if keyboard.is_pressed('d'):
- angle = (-200)
- if keyboard.is_pressed('0'):
- roomba_mode(0)
- return
- if keyboard.is_pressed('1'):
- roomba_mode(1)
- return
- if keyboard.is_pressed('2'):
- roomba_mode(2)
- return
- if keyboard.is_pressed('3'):
- roomba_mode(3)
- return
- if keyboard.is_pressed('4'):
- roomba_mode(4)
- return
- if keyboard.is_pressed('5'):
- roomba_mode(5)
- return
- if keyboard.is_pressed(' '):
- cur_speed = 0
- angle = 0
- cur_speed = int(cur_speed)
- angle = int(angle)
- roomba_drive(cur_speed, angle)
- def generic_error_cb(error):
- print('D-Bus call failed: ' + str(error))
- mainloop.quit()
- def sensor_contact_val_to_str(val):
- if val == 0 or val == 1:
- return 'not supported'
- if val == 2:
- return 'no contact detected'
- if val == 3:
- return 'contact detected'
- return 'invalid value'
- def mode_val_cb(value):
- print('Mode value: ' + hex(value[0]))
- def sensors_val_cb(value):
- print(bytes(value))
- x = struct.unpack("I11B", bytes(value))
- for y in x:
- print('Sensors Values : ' + str(y))
- def roomba_sensors_start_notify_cb():
- print('Roomba sensors: notification enabled')
- def roomba_sensors_changed_cb(iface, changed_props, invalidated_props):
- if iface != GATT_CHRC_IFACE:
- return
- if not len(changed_props):
- return
- value = changed_props.get('Value', None)
- if not value:
- return
- print("sensor values: changed.\n")
- roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb,
- error_handler=generic_error_cb,
- dbus_interface=GATT_CHRC_IFACE)
- def sensors_start_notify_cb():
- print("Sensors notifications: enabled.")
- def start_client():
- roomba_ctrl_chrc[0].ReadValue({}, reply_handler=mode_val_cb,
- error_handler=generic_error_cb,
- dbus_interface=GATT_CHRC_IFACE)
- roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb,
- error_handler=generic_error_cb,
- dbus_interface=GATT_CHRC_IFACE)
- roomba_sensors_prop_iface = dbus.Interface(roomba_notify_chrc[0], DBUS_PROP_IFACE)
- roomba_sensors_prop_iface.connect_to_signal("PropertiesChanged", roomba_sensors_changed_cb)
- roomba_notify_chrc[0].StartNotify(reply_handler=sensors_start_notify_cb,
- error_handler=generic_error_cb,
- dbus_interface=GATT_CHRC_IFACE)
- roomba_mode(0x02)
- def process_chrc(chrc_path):
- chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
- chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
- dbus_interface=DBUS_PROP_IFACE)
- uuid = chrc_props['UUID']
- if uuid == ROOMBA_CHR_CTL_UUID:
- global roomba_ctrl_chrc
- roomba_ctrl_chrc = (chrc, chrc_props)
- elif uuid == ROOMBA_CHR_SENSORS_READ_UUID:
- global roomba_sensors_chrc
- roomba_sensors_chrc = (chrc, chrc_props)
- elif uuid == ROOMBA_CHR_SENSORS_NOTIFY_UUID:
- global roomba_notify_chrc
- roomba_notify_chrc = (chrc, chrc_props)
- else:
- print('Unrecognized characteristic: ' + uuid)
- return True
- def process_roomba_service(service_path, chrc_paths):
- service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
- service_props = service.GetAll(GATT_SERVICE_IFACE,
- dbus_interface=DBUS_PROP_IFACE)
- uuid = service_props['UUID']
- if uuid != ROOMBA_SVC_UUID:
- return False
- print('roomba GATT service found: ' + service_path)
- global roomba_service
- roomba_service = (service, service_props, service_path)
-
- # Process the characteristics.
- for chrc_path in chrc_paths:
- process_chrc(chrc_path)
- return True
- def interfaces_removed_cb(object_path, interfaces):
- if not roomba_service:
- return
- if object_path == roomba_service[2]:
- print('Service was removed')
- mainloop.quit()
- def main():
- # Set up the main loop.
- DBusGMainLoop(set_as_default=True)
- global bus
- bus = dbus.SystemBus()
- global mainloop
- mainloop = GObject.MainLoop()
- om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
- om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
- print('Getting objects...')
- objects = om.GetManagedObjects()
- chrcs = []
- # List devices found
- for path, interfaces in objects.items():
- device = interfaces.get("org.bluez.Device1")
- if (device is None):
- continue
- if (device["Address"] == ROOMBA):
- print("Found ROOMBA!")
- print(device["Address"])
- device = bluezutils.find_device(ROOMBA, "hci1")
- if (device is None):
- print("Cannot 'find_device'")
- else:
- device.Connect()
- print("Connected")
-
- # List characteristics found
- for path, interfaces in objects.items():
- if GATT_CHRC_IFACE not in interfaces.keys():
- continue
- chrcs.append(path)
- # List sevices found
- for path, interfaces in objects.items():
- if GATT_SERVICE_IFACE not in interfaces.keys():
- continue
- chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
- if process_roomba_service(path, chrc_paths):
- break
- if not roomba_service:
- print('No ROOMBA found.')
- sys.exit(1)
- start_client()
- keyboard.on_press_key("w", key_parse)
- keyboard.on_press_key("a", key_parse)
- keyboard.on_press_key("s", key_parse)
- keyboard.on_press_key("d", key_parse)
- keyboard.on_press_key(" ", key_parse)
- keyboard.on_press_key("0", key_parse)
- keyboard.on_press_key("1", key_parse)
- keyboard.on_press_key("2", key_parse)
- keyboard.on_press_key("3", key_parse)
- keyboard.on_press_key("4", key_parse)
- keyboard.on_press_key("5", key_parse)
- mainloop.run()
- if __name__ == '__main__':
- main()
|