roomba.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. #!/usr/bin/env python3
  2. # SPDX-License-Identifier: LGPL-2.1-or-later
  3. import dbus
  4. try:
  5. from gi.repository import GObject
  6. except ImportError:
  7. import gobject as GObject
  8. import sys
  9. import time
  10. from dbus.mainloop.glib import DBusGMainLoop
  11. import bluezutils
  12. import struct
  13. import keyboard
  14. bus = None
  15. mainloop = None
  16. BLUEZ_SERVICE_NAME = 'org.bluez'
  17. DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
  18. DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
  19. GATT_SERVICE_IFACE = 'org.bluez.GattService1'
  20. GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
  21. ROOMBA_SVC_UUID = '35f28386-3070-4f3b-ba38-27507e991760'
  22. ROOMBA_CHR_CTL_UUID = '35f28386-3070-4f3b-ba38-27507e991762'
  23. ROOMBA_CHR_SENSORS_READ_UUID = '35f28386-3070-4f3b-ba38-27507e991764'
  24. ROOMBA_CHR_SENSORS_NOTIFY_UUID = '35f28386-3070-4f3b-ba38-27507e991766'
  25. ROOMBA="F4:A4:72:BA:27:C1"
  26. # The objects that we interact with.
  27. roomba_service = None
  28. roomba_ctrl_chrc = None
  29. roomba_sensors_chrc = None
  30. roomba_notify_chrc = None
  31. bt_write_in_progress = False
  32. bt_write_buffer = None
  33. bt_write_idx = 0
  34. bt_write_len = 0
  35. cur_speed = struct.unpack('h', struct.pack('h', 0))[0]
  36. angle = struct.unpack('h', struct.pack('h', 0))[0]
  37. def bt_write(L):
  38. global bt_write_buffer
  39. global bt_write_in_progress
  40. global bt_write_idx
  41. global bt_write_len
  42. while (bt_write_in_progress):
  43. time.sleep(0.1)
  44. bt_write_in_progress = True
  45. bt_write_buffer = L
  46. bt_write_len = len(L)
  47. bt_write_idx = 0
  48. roomba_ctrl_chrc[0].WriteValue(bytes(bt_write_buffer[bt_write_idx]), {}, reply_handler=ctrl_write_cb,
  49. error_handler=generic_error_cb,
  50. dbus_interface=GATT_CHRC_IFACE)
  51. bt_write_idx = 1
  52. def ctrl_write_cb():
  53. global bt_write_buffer
  54. global bt_write_in_progress
  55. global bt_write_idx
  56. global bt_write_len
  57. if bt_write_idx < bt_write_len:
  58. print("Writing next char %d / %d " % (bt_write_idx, bt_write_len - 1))
  59. roomba_ctrl_chrc[0].WriteValue(bytes(bt_write_buffer[bt_write_idx]), {}, reply_handler=ctrl_write_cb,
  60. error_handler=generic_error_cb,
  61. dbus_interface=GATT_CHRC_IFACE)
  62. bt_write_idx += 1
  63. else:
  64. bt_write_in_progress = False
  65. print("written.")
  66. return
  67. def roomba_drive(s, a):
  68. print("driving %d %d" % (s, a))
  69. si = struct.pack('!h', int(s))
  70. if (a == 0):
  71. ai = struct.pack('!H', 0x8000)
  72. else:
  73. ai = struct.pack('!h', int(a))
  74. print("integers: %02x %02x %02x %02x" % (si[0], si[1], ai[0], ai[1]))
  75. bt_write([[0x44], [si[0]], [si[1]], [ai[0]], [ai[1]], [0xFF]])
  76. def roomba_mode(m):
  77. bt_write([[0x4d], [m], [0xFF]])
  78. def key_parse(key):
  79. print("keypress")
  80. global cur_speed
  81. global angle
  82. print("spd: %d ang: %d" % (cur_speed, angle))
  83. if keyboard.is_pressed('w'):
  84. cur_speed += 50
  85. if keyboard.is_pressed('s'):
  86. cur_speed -= 50
  87. if keyboard.is_pressed('a'):
  88. angle = 200
  89. if keyboard.is_pressed('d'):
  90. angle = (-200)
  91. if keyboard.is_pressed('0'):
  92. roomba_mode(0)
  93. return
  94. if keyboard.is_pressed('1'):
  95. roomba_mode(1)
  96. return
  97. if keyboard.is_pressed('2'):
  98. roomba_mode(2)
  99. return
  100. if keyboard.is_pressed('3'):
  101. roomba_mode(3)
  102. return
  103. if keyboard.is_pressed('4'):
  104. roomba_mode(4)
  105. return
  106. if keyboard.is_pressed('5'):
  107. roomba_mode(5)
  108. return
  109. if keyboard.is_pressed(' '):
  110. cur_speed = 0
  111. angle = 0
  112. cur_speed = int(cur_speed)
  113. angle = int(angle)
  114. roomba_drive(cur_speed, angle)
  115. def generic_error_cb(error):
  116. print('D-Bus call failed: ' + str(error))
  117. mainloop.quit()
  118. def sensor_contact_val_to_str(val):
  119. if val == 0 or val == 1:
  120. return 'not supported'
  121. if val == 2:
  122. return 'no contact detected'
  123. if val == 3:
  124. return 'contact detected'
  125. return 'invalid value'
  126. def mode_val_cb(value):
  127. print('Mode value: ' + hex(value[0]))
  128. def sensors_val_cb(value):
  129. print(bytes(value))
  130. x = struct.unpack("I11B", bytes(value))
  131. for y in x:
  132. print('Sensors Values : ' + str(y))
  133. def roomba_sensors_start_notify_cb():
  134. print('Roomba sensors: notification enabled')
  135. def roomba_sensors_changed_cb(iface, changed_props, invalidated_props):
  136. if iface != GATT_CHRC_IFACE:
  137. return
  138. if not len(changed_props):
  139. return
  140. value = changed_props.get('Value', None)
  141. if not value:
  142. return
  143. print("sensor values: changed.\n")
  144. roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb,
  145. error_handler=generic_error_cb,
  146. dbus_interface=GATT_CHRC_IFACE)
  147. def sensors_start_notify_cb():
  148. print("Sensors notifications: enabled.")
  149. def start_client():
  150. roomba_ctrl_chrc[0].ReadValue({}, reply_handler=mode_val_cb,
  151. error_handler=generic_error_cb,
  152. dbus_interface=GATT_CHRC_IFACE)
  153. roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb,
  154. error_handler=generic_error_cb,
  155. dbus_interface=GATT_CHRC_IFACE)
  156. roomba_sensors_prop_iface = dbus.Interface(roomba_notify_chrc[0], DBUS_PROP_IFACE)
  157. roomba_sensors_prop_iface.connect_to_signal("PropertiesChanged", roomba_sensors_changed_cb)
  158. roomba_notify_chrc[0].StartNotify(reply_handler=sensors_start_notify_cb,
  159. error_handler=generic_error_cb,
  160. dbus_interface=GATT_CHRC_IFACE)
  161. roomba_mode(0x02)
  162. def process_chrc(chrc_path):
  163. chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
  164. chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
  165. dbus_interface=DBUS_PROP_IFACE)
  166. uuid = chrc_props['UUID']
  167. if uuid == ROOMBA_CHR_CTL_UUID:
  168. global roomba_ctrl_chrc
  169. roomba_ctrl_chrc = (chrc, chrc_props)
  170. elif uuid == ROOMBA_CHR_SENSORS_READ_UUID:
  171. global roomba_sensors_chrc
  172. roomba_sensors_chrc = (chrc, chrc_props)
  173. elif uuid == ROOMBA_CHR_SENSORS_NOTIFY_UUID:
  174. global roomba_notify_chrc
  175. roomba_notify_chrc = (chrc, chrc_props)
  176. else:
  177. print('Unrecognized characteristic: ' + uuid)
  178. return True
  179. def process_roomba_service(service_path, chrc_paths):
  180. service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
  181. service_props = service.GetAll(GATT_SERVICE_IFACE,
  182. dbus_interface=DBUS_PROP_IFACE)
  183. uuid = service_props['UUID']
  184. if uuid != ROOMBA_SVC_UUID:
  185. return False
  186. print('roomba GATT service found: ' + service_path)
  187. global roomba_service
  188. roomba_service = (service, service_props, service_path)
  189. # Process the characteristics.
  190. for chrc_path in chrc_paths:
  191. process_chrc(chrc_path)
  192. return True
  193. def interfaces_removed_cb(object_path, interfaces):
  194. if not roomba_service:
  195. return
  196. if object_path == roomba_service[2]:
  197. print('Service was removed')
  198. mainloop.quit()
  199. def main():
  200. # Set up the main loop.
  201. DBusGMainLoop(set_as_default=True)
  202. global bus
  203. bus = dbus.SystemBus()
  204. global mainloop
  205. mainloop = GObject.MainLoop()
  206. om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
  207. om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
  208. print('Getting objects...')
  209. objects = om.GetManagedObjects()
  210. chrcs = []
  211. # List devices found
  212. for path, interfaces in objects.items():
  213. device = interfaces.get("org.bluez.Device1")
  214. if (device is None):
  215. continue
  216. if (device["Address"] == ROOMBA):
  217. print("Found ROOMBA!")
  218. print(device["Address"])
  219. device = bluezutils.find_device(ROOMBA, "hci1")
  220. if (device is None):
  221. print("Cannot 'find_device'")
  222. else:
  223. device.Connect()
  224. print("Connected")
  225. # List characteristics found
  226. for path, interfaces in objects.items():
  227. if GATT_CHRC_IFACE not in interfaces.keys():
  228. continue
  229. chrcs.append(path)
  230. # List sevices found
  231. for path, interfaces in objects.items():
  232. if GATT_SERVICE_IFACE not in interfaces.keys():
  233. continue
  234. chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
  235. if process_roomba_service(path, chrc_paths):
  236. break
  237. if not roomba_service:
  238. print('No ROOMBA found.')
  239. sys.exit(1)
  240. start_client()
  241. keyboard.on_press_key("w", key_parse)
  242. keyboard.on_press_key("a", key_parse)
  243. keyboard.on_press_key("s", key_parse)
  244. keyboard.on_press_key("d", key_parse)
  245. keyboard.on_press_key(" ", key_parse)
  246. keyboard.on_press_key("0", key_parse)
  247. keyboard.on_press_key("1", key_parse)
  248. keyboard.on_press_key("2", key_parse)
  249. keyboard.on_press_key("3", key_parse)
  250. keyboard.on_press_key("4", key_parse)
  251. keyboard.on_press_key("5", key_parse)
  252. mainloop.run()
  253. if __name__ == '__main__':
  254. main()