roomba.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. #!/usr/bin/env python3
  2. # SPDX-License-Identifier: LGPL-2.1-or-later
  3. import dbus
  4. try:
  5. from gi.repository import GObject
  6. except ImportError:
  7. import gobject as GObject
  8. import sys
  9. import time
  10. from dbus.mainloop.glib import DBusGMainLoop
  11. import bluezutils
  12. import struct
  13. import keyboard
  14. # Settings
  15. ROOMBA="CE:E0:94:15:6A:3D"
  16. HCI="hci0"
  17. bus = None
  18. mainloop = None
  19. BLUEZ_SERVICE_NAME = 'org.bluez'
  20. DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
  21. DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
  22. GATT_SERVICE_IFACE = 'org.bluez.GattService1'
  23. GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
  24. ROOMBA_SVC_UUID = '35f28386-3070-4f3b-ba38-27507e991760'
  25. ROOMBA_CHR_CTL_UUID = '35f28386-3070-4f3b-ba38-27507e991762'
  26. ROOMBA_CHR_SENSORS_READ_UUID = '35f28386-3070-4f3b-ba38-27507e991764'
  27. ROOMBA_CHR_SENSORS_NOTIFY_UUID = '35f28386-3070-4f3b-ba38-27507e991766'
  28. # The objects that we interact with.
  29. roomba_service = None
  30. roomba_ctrl_chrc = None
  31. roomba_sensors_chrc = None
  32. roomba_notify_chrc = None
  33. bt_write_in_progress = False
  34. bt_write_buffer = None
  35. bt_write_idx = 0
  36. bt_write_len = 0
  37. cur_speed = struct.unpack('h', struct.pack('h', 0))[0]
  38. angle = struct.unpack('h', struct.pack('h', 0))[0]
  39. roomba_main_brush_on = False # 'j'
  40. roomba_vacuum_on = False # 'k'
  41. roomba_side_brush_on = False # 'l'
  42. def bt_write(L):
  43. global bt_write_buffer
  44. global bt_write_in_progress
  45. global bt_write_idx
  46. global bt_write_len
  47. while (bt_write_in_progress):
  48. time.sleep(0.1)
  49. bt_write_in_progress = True
  50. bt_write_buffer = L
  51. bt_write_len = len(L)
  52. bt_write_idx = 0
  53. roomba_ctrl_chrc[0].WriteValue(bytes(bt_write_buffer[bt_write_idx]), {}, reply_handler=ctrl_write_cb,
  54. error_handler=generic_error_cb,
  55. dbus_interface=GATT_CHRC_IFACE)
  56. bt_write_idx = 1
  57. def ctrl_write_cb():
  58. global bt_write_buffer
  59. global bt_write_in_progress
  60. global bt_write_idx
  61. global bt_write_len
  62. if bt_write_idx < bt_write_len:
  63. # print("Writing next char %d / %d " % (bt_write_idx, bt_write_len - 1))
  64. roomba_ctrl_chrc[0].WriteValue(bytes(bt_write_buffer[bt_write_idx]), {}, reply_handler=ctrl_write_cb,
  65. error_handler=generic_error_cb,
  66. dbus_interface=GATT_CHRC_IFACE)
  67. bt_write_idx += 1
  68. else:
  69. bt_write_in_progress = False
  70. return
  71. def roomba_drive(s, a):
  72. print("driving %d %d" % (s, a))
  73. si = struct.pack('!h', int(s))
  74. if (a == 0):
  75. ai = struct.pack('!H', 0x8000)
  76. else:
  77. ai = struct.pack('!h', int(a))
  78. bt_write([[0x44], [si[0]], [si[1]], [ai[0]], [ai[1]]])
  79. while (bt_write_in_progress):
  80. time.sleep(0.100000)
  81. roomba_cmd_clean()
  82. def roomba_cmd_endsteer(arg):
  83. global angle
  84. global cur_speed
  85. if (angle):
  86. angle = 0
  87. roomba_drive(cur_speed, angle)
  88. def roomba_cmd_clean():
  89. global roomba_main_brush_on
  90. global roomba_vacuum_on
  91. global roomba_side_brush_on
  92. b0 = 0
  93. b1 = 0
  94. b2 = 0
  95. if roomba_main_brush_on:
  96. b0 = 1
  97. if roomba_vacuum_on:
  98. b1 = 1
  99. if roomba_side_brush_on:
  100. b2 = 1
  101. bt_write([[0x43], [b0], [b1], [b2]])
  102. def toggle_brush():
  103. global roomba_main_brush_on
  104. roomba_main_brush_on = not roomba_main_brush_on
  105. roomba_cmd_clean()
  106. def toggle_vacuum():
  107. global roomba_vacuum_on
  108. roomba_vacuum_on = not roomba_vacuum_on
  109. roomba_cmd_clean()
  110. def toggle_side():
  111. global roomba_side_brush_on
  112. roomba_side_brush_on = not roomba_side_brush_on
  113. roomba_cmd_clean()
  114. def roomba_mode(m):
  115. bt_write([[0x4d], [m]])
  116. def key_parse(key):
  117. global cur_speed
  118. global angle
  119. old_speed = cur_speed
  120. old_angle = angle
  121. if keyboard.is_pressed('0'):
  122. roomba_mode(0)
  123. return
  124. if keyboard.is_pressed('1'):
  125. roomba_mode(1)
  126. return
  127. if keyboard.is_pressed('2'):
  128. roomba_mode(2)
  129. return
  130. if keyboard.is_pressed('3'):
  131. roomba_mode(3)
  132. return
  133. if keyboard.is_pressed('4'):
  134. roomba_mode(4)
  135. return
  136. if keyboard.is_pressed('5'):
  137. roomba_mode(5)
  138. return
  139. if keyboard.is_pressed('6'):
  140. roomba_mode(66)
  141. return
  142. if keyboard.is_pressed('j'):
  143. toggle_brush()
  144. return
  145. if keyboard.is_pressed('k'):
  146. toggle_vacuum()
  147. return
  148. if keyboard.is_pressed('l'):
  149. toggle_side()
  150. return
  151. if keyboard.is_pressed(' '):
  152. cur_speed = 0
  153. angle = 0
  154. if keyboard.is_pressed('a'):
  155. angle = 200
  156. elif keyboard.is_pressed('d'):
  157. angle = (-200)
  158. else:
  159. angle = 0
  160. if keyboard.is_pressed('w'):
  161. cur_speed += 50
  162. if keyboard.is_pressed('s'):
  163. cur_speed -= 50
  164. cur_speed = int(cur_speed)
  165. angle = int(angle)
  166. if old_speed == cur_speed and old_angle == angle:
  167. return
  168. if cur_speed == 0 and angle != 0:
  169. turn_speed = 150
  170. if angle > 0:
  171. angle = 1
  172. else:
  173. angle = -1
  174. roomba_drive(turn_speed, angle)
  175. else:
  176. roomba_drive(cur_speed, angle)
  177. def generic_error_cb(error):
  178. print('D-Bus call failed: ' + str(error))
  179. mainloop.quit()
  180. def sensor_contact_val_to_str(val):
  181. if val == 0 or val == 1:
  182. return 'not supported'
  183. if val == 2:
  184. return 'no contact detected'
  185. if val == 3:
  186. return 'contact detected'
  187. return 'invalid value'
  188. def mode_val_cb(value):
  189. print('Mode value: ' + hex(value[0]))
  190. def sensors_val_cb(value):
  191. print(bytes(value))
  192. x = struct.unpack("I11B", bytes(value))
  193. for y in x:
  194. print('Sensors Values : ' + str(y))
  195. def roomba_sensors_start_notify_cb():
  196. print('Roomba sensors: notification enabled')
  197. def roomba_sensors_changed_cb(iface, changed_props, invalidated_props):
  198. if iface != GATT_CHRC_IFACE:
  199. return
  200. if not len(changed_props):
  201. return
  202. value = changed_props.get('Value', None)
  203. if not value:
  204. return
  205. roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb,
  206. error_handler=generic_error_cb,
  207. dbus_interface=GATT_CHRC_IFACE)
  208. def sensors_start_notify_cb():
  209. print("Sensors notifications: enabled.")
  210. def start_client():
  211. roomba_ctrl_chrc[0].ReadValue({}, reply_handler=mode_val_cb,
  212. error_handler=generic_error_cb,
  213. dbus_interface=GATT_CHRC_IFACE)
  214. roomba_sensors_chrc[0].ReadValue({}, reply_handler=sensors_val_cb,
  215. error_handler=generic_error_cb,
  216. dbus_interface=GATT_CHRC_IFACE)
  217. roomba_sensors_prop_iface = dbus.Interface(roomba_notify_chrc[0], DBUS_PROP_IFACE)
  218. roomba_sensors_prop_iface.connect_to_signal("PropertiesChanged", roomba_sensors_changed_cb)
  219. roomba_notify_chrc[0].StartNotify(reply_handler=sensors_start_notify_cb,
  220. error_handler=generic_error_cb,
  221. dbus_interface=GATT_CHRC_IFACE)
  222. roomba_mode(0x02)
  223. def process_chrc(chrc_path):
  224. chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
  225. chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
  226. dbus_interface=DBUS_PROP_IFACE)
  227. uuid = chrc_props['UUID']
  228. if uuid == ROOMBA_CHR_CTL_UUID:
  229. global roomba_ctrl_chrc
  230. roomba_ctrl_chrc = (chrc, chrc_props)
  231. elif uuid == ROOMBA_CHR_SENSORS_READ_UUID:
  232. global roomba_sensors_chrc
  233. roomba_sensors_chrc = (chrc, chrc_props)
  234. elif uuid == ROOMBA_CHR_SENSORS_NOTIFY_UUID:
  235. global roomba_notify_chrc
  236. roomba_notify_chrc = (chrc, chrc_props)
  237. else:
  238. print('Unrecognized characteristic: ' + uuid)
  239. return True
  240. def process_roomba_service(service_path, chrc_paths):
  241. service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
  242. service_props = service.GetAll(GATT_SERVICE_IFACE,
  243. dbus_interface=DBUS_PROP_IFACE)
  244. uuid = service_props['UUID']
  245. if uuid != ROOMBA_SVC_UUID:
  246. return False
  247. print('roomba GATT service found: ' + service_path)
  248. global roomba_service
  249. roomba_service = (service, service_props, service_path)
  250. # Process the characteristics.
  251. for chrc_path in chrc_paths:
  252. process_chrc(chrc_path)
  253. return True
  254. def interfaces_removed_cb(object_path, interfaces):
  255. if not roomba_service:
  256. return
  257. if object_path == roomba_service[2]:
  258. print('Service was removed')
  259. mainloop.quit()
  260. def main():
  261. # Set up the main loop.
  262. DBusGMainLoop(set_as_default=True)
  263. global bus
  264. bus = dbus.SystemBus()
  265. global mainloop
  266. mainloop = GObject.MainLoop()
  267. om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
  268. om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
  269. print('Getting objects...')
  270. objects = om.GetManagedObjects()
  271. chrcs = []
  272. # List devices found
  273. for path, interfaces in objects.items():
  274. device = interfaces.get("org.bluez.Device1")
  275. if (device is None):
  276. continue
  277. if (device["Address"] == ROOMBA):
  278. print("Found ROOMBA!")
  279. print(device["Address"])
  280. device = bluezutils.find_device(ROOMBA,HCI)
  281. if (device is None):
  282. print("Cannot 'find_device'")
  283. else:
  284. device.Connect()
  285. print("Connected")
  286. # List characteristics found
  287. for path, interfaces in objects.items():
  288. if GATT_CHRC_IFACE not in interfaces.keys():
  289. continue
  290. chrcs.append(path)
  291. # List sevices found
  292. for path, interfaces in objects.items():
  293. if GATT_SERVICE_IFACE not in interfaces.keys():
  294. continue
  295. chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
  296. if process_roomba_service(path, chrc_paths):
  297. break
  298. if not roomba_service:
  299. print('No ROOMBA found.')
  300. sys.exit(1)
  301. start_client()
  302. keyboard.on_press_key("w", key_parse)
  303. keyboard.on_press_key("a", key_parse)
  304. keyboard.on_press_key("s", key_parse)
  305. keyboard.on_press_key("d", key_parse)
  306. keyboard.on_press_key(" ", key_parse)
  307. keyboard.on_press_key("0", key_parse)
  308. keyboard.on_press_key("1", key_parse)
  309. keyboard.on_press_key("2", key_parse)
  310. keyboard.on_press_key("3", key_parse)
  311. keyboard.on_press_key("4", key_parse)
  312. keyboard.on_press_key("5", key_parse)
  313. keyboard.on_press_key("6", key_parse)
  314. keyboard.on_press_key("j", key_parse)
  315. keyboard.on_press_key("k", key_parse)
  316. keyboard.on_press_key("l", key_parse)
  317. keyboard.on_release_key("a", roomba_cmd_endsteer)
  318. keyboard.on_release_key("d", roomba_cmd_endsteer)
  319. mainloop.run()
  320. if __name__ == '__main__':
  321. main()