roomba.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. #!/usr/bin/env python3
  2. # SPDX-License-Identifier: LGPL-2.1-or-later
  3. import dbus
  4. try:
  5. from gi.repository import GObject
  6. except ImportError:
  7. import gobject as GObject
  8. import sys
  9. from dbus.mainloop.glib import DBusGMainLoop
  10. import bluezutils
  11. bus = None
  12. mainloop = None
  13. BLUEZ_SERVICE_NAME = 'org.bluez'
  14. DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
  15. DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
  16. GATT_SERVICE_IFACE = 'org.bluez.GattService1'
  17. GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
  18. ROOMBA_SVC_UUID = '35f28386-3070-4f3b-ba38-27507e991760'
  19. ROOMBA_CHR_CTL_UUID = '35f28386-3070-4f3b-ba38-27507e991762'
  20. ROOMBA_CHR_SENSORS_UUID = '35f28386-3070-4f3b-ba38-27507e991764'
  21. ROOMBA="F4:A4:72:BA:27:C1"
  22. # The objects that we interact with.
  23. roomba_service = None
  24. roomba_ctrl_chrc = None
  25. roomba_sensors_chrc = None
  26. def generic_error_cb(error):
  27. print('D-Bus call failed: ' + str(error))
  28. mainloop.quit()
  29. def sensor_contact_val_to_str(val):
  30. if val == 0 or val == 1:
  31. return 'not supported'
  32. if val == 2:
  33. return 'no contact detected'
  34. if val == 3:
  35. return 'contact detected'
  36. return 'invalid value'
  37. def mode_val_cb(value):
  38. print('Mode value: ' + hex(value[0]))
  39. def sensors_val_cb(value):
  40. roomba_sensors = value[0] | (value[1] << 8) | (value[2] << 16) | (value[3] << 24)
  41. print('Sensors Value : ' + str(hex(roomba_sensors)))
  42. def roomba_sensors_start_notify_cb():
  43. print('Roomba sensors: notification enabled')
  44. def roomba_sensors_changed_cb(iface, changed_props, invalidated_props):
  45. if iface != GATT_CHRC_IFACE:
  46. return
  47. if not len(changed_props):
  48. return
  49. value = changed_props.get('Value', None)
  50. if not value:
  51. return
  52. print('New Sensor values')
  53. roomba_sensors = value[0] | (value[1] << 8) | (value[2] << 16) | (value[3] << 24)
  54. print('\tValue : ' + str(hex(roomba_sensors)))
  55. def start_client():
  56. roomba_ctrl_chrc[0].ReadValue({}, reply_handler=mode_val_cb,
  57. error_handler=generic_error_cb,
  58. dbus_interface=GATT_CHRC_IFACE)
  59. roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb,
  60. error_handler=generic_error_cb,
  61. dbus_interface=GATT_CHRC_IFACE)
  62. def process_chrc(chrc_path):
  63. chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
  64. chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
  65. dbus_interface=DBUS_PROP_IFACE)
  66. uuid = chrc_props['UUID']
  67. if uuid == ROOMBA_CHR_CTL_UUID:
  68. global roomba_ctrl_chrc
  69. roomba_ctrl_chrc = (chrc, chrc_props)
  70. elif uuid == ROOMBA_CHR_SENSORS_UUID:
  71. global roomba_sensors_chrc
  72. roomba_sensors_chrc = (chrc, chrc_props)
  73. else:
  74. print('Unrecognized characteristic: ' + uuid)
  75. return True
  76. def process_roomba_service(service_path, chrc_paths):
  77. service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
  78. service_props = service.GetAll(GATT_SERVICE_IFACE,
  79. dbus_interface=DBUS_PROP_IFACE)
  80. uuid = service_props['UUID']
  81. if uuid != ROOMBA_SVC_UUID:
  82. return False
  83. print('roomba GATT service found: ' + service_path)
  84. global roomba_service
  85. roomba_service = (service, service_props, service_path)
  86. # Process the characteristics.
  87. for chrc_path in chrc_paths:
  88. process_chrc(chrc_path)
  89. return True
  90. def interfaces_removed_cb(object_path, interfaces):
  91. if not roomba_service:
  92. return
  93. if object_path == roomba_service[2]:
  94. print('Service was removed')
  95. mainloop.quit()
  96. def main():
  97. # Set up the main loop.
  98. DBusGMainLoop(set_as_default=True)
  99. global bus
  100. bus = dbus.SystemBus()
  101. global mainloop
  102. mainloop = GObject.MainLoop()
  103. om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
  104. om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
  105. print('Getting objects...')
  106. objects = om.GetManagedObjects()
  107. chrcs = []
  108. # List devices found
  109. for path, interfaces in objects.items():
  110. device = interfaces.get("org.bluez.Device1")
  111. if (device is None):
  112. continue
  113. if (device["Address"] == ROOMBA):
  114. print("Found ROOMBA!")
  115. print(device["Address"])
  116. device = bluezutils.find_device(ROOMBA, "hci1")
  117. if (device is None):
  118. print("Cannot 'find_device'")
  119. else:
  120. device.Connect()
  121. print("Connected")
  122. # List characteristics found
  123. for path, interfaces in objects.items():
  124. if GATT_CHRC_IFACE not in interfaces.keys():
  125. continue
  126. chrcs.append(path)
  127. # List sevices found
  128. for path, interfaces in objects.items():
  129. if GATT_SERVICE_IFACE not in interfaces.keys():
  130. continue
  131. chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
  132. if process_roomba_service(path, chrc_paths):
  133. break
  134. if not roomba_service:
  135. print('No ROOMBA found.')
  136. sys.exit(1)
  137. start_client()
  138. mainloop.run()
  139. if __name__ == '__main__':
  140. main()