client.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. #!/usr/bin/python3
  2. #
  3. #
  4. # This Source Code Form is subject to the terms of the MIT License.
  5. # If a copy of the MIT License was not distributed with this file,
  6. # you can obtain one at https://opensource.org/licenses/MIT
  7. #
  8. import argparse
  9. import os.path
  10. from datetime import datetime
  11. import subprocess
  12. import signal
  13. import time
  14. from prompt_toolkit import prompt
  15. from prompt_toolkit.contrib.completers import WordCompleter
  16. from prompt_toolkit.history import InMemoryHistory
  17. from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
  18. def auto_int(x):
  19. return int(x, 0)
  20. def parse_args():
  21. parser = argparse.ArgumentParser(
  22. description='Firmware Extractor User Interface',
  23. )
  24. parser.add_argument(
  25. 'SerialDeviceFILE',
  26. help='Device File to read from (e.g., /dev/ttyUSB0)'
  27. )
  28. parser.add_argument(
  29. '-i',
  30. '--interactive',
  31. action='store_true',
  32. help='Use interactive mode'
  33. )
  34. parser.add_argument(
  35. '-s',
  36. '--start',
  37. type=auto_int,
  38. default=0x00,
  39. help='Set start address',
  40. )
  41. parser.add_argument(
  42. '-l',
  43. '--length',
  44. type=auto_int,
  45. default=0x10000,
  46. help='Number of bytes to extract',
  47. )
  48. parser.add_argument(
  49. '-e',
  50. '--endianess',
  51. default='little',
  52. choices=['little', 'big'],
  53. help='Set endianess',
  54. )
  55. parser.add_argument(
  56. '-o',
  57. '--outfile',
  58. default='data-{}.bin'.format(datetime.now().strftime('%Y%m%d_%H%M')),
  59. help='Set output file path',
  60. )
  61. return parser.parse_args()
  62. finish = False
  63. def sigalarm_handler(signo, frame):
  64. # I want to print the statistics in the end of the program.
  65. # We can create an infinite loop with this sigalarm_handler,
  66. # as multiline mode of send_cmd() uses SIGALARM as well.
  67. # The global variable finish handles this case. And helps
  68. # us terminate the program when the timeout is fired a second
  69. # time.
  70. global finish
  71. if finish:
  72. print()
  73. print('Programm finished.')
  74. exit()
  75. print('End of data.')
  76. print()
  77. finish = True
  78. UART('/dev/ttyUSB0').send_cmd('P', True)
  79. def decode_ascii(s, outfile):
  80. out = ''
  81. for i in range(0, len(s), 2):
  82. char = int(s[i:i+2], 16)
  83. outfile.write(char.to_bytes(1, 'little'))
  84. if char > 31 and char < 127:
  85. out += chr(char)
  86. else:
  87. out += '.'
  88. return out
  89. def print_error(errcode):
  90. print()
  91. print('StatusCode: {:02X}'.format(errcode))
  92. if errcode == 0x20:
  93. print('Status OK')
  94. elif errcode == 0x40:
  95. print('Wait/Retry requested (bus access was not granted in time)')
  96. elif errcode == 0x06:
  97. print('Wait requested + additional OK (previous command OK, but no bus access)')
  98. elif errcode == 0x80:
  99. print('Fault during command execution (command error (access denied etc.))')
  100. elif errcode == 0xA0:
  101. print('Fault after successful command execution (reading invalid memory address?).')
  102. elif errcode == 0xE0:
  103. print('Failure during communication (check connection, no valid status reply received)')
  104. else:
  105. print('Unknown status code')
  106. def _read_enddata(fd):
  107. buf = ''
  108. nchar = 0
  109. state = 'stat'
  110. print()
  111. # Print remaining data from uart until timeout is reached.
  112. while True:
  113. signal.alarm(1)
  114. char = fd.read(1)
  115. signal.alarm(0)
  116. print(char, end='')
  117. buf += char
  118. if char == '!':
  119. state = 'err'
  120. if state == 'err':
  121. if nchar == 25:
  122. print_error(int(buf[-8:], 16))
  123. nchar += 1
  124. def read_ascii(fd, outfile):
  125. l = 0
  126. c = 0
  127. line = ''
  128. lineraw = ''
  129. outfile = open(outfile, 'ab', 16)
  130. while True:
  131. line += '0x{:08X}: '.format(16*l)
  132. decoded_line = ''
  133. while c < 32:
  134. char = fd.read(1)
  135. if char == ' ':
  136. continue
  137. lineraw += char
  138. if c % 2 == 0 and c != 0:
  139. line += ' '
  140. if c % 16 == 0 and c != 0:
  141. line += ' '
  142. # Reached end of data.
  143. # Flush all buffers and read special stuff at the end.
  144. if char == '\r' or char == '\n':
  145. try:
  146. line += ' |' + decode_ascii(lineraw, outfile) + '|'
  147. except ValueError:
  148. pass
  149. print(line)
  150. outfile.flush()
  151. outfile.close()
  152. _read_enddata(fd)
  153. c += 1
  154. line += char
  155. line += ' |' + decode_ascii(lineraw, outfile) + '|'
  156. print(line)
  157. line = ''
  158. lineraw = ''
  159. l += 1
  160. c = 0
  161. class UART:
  162. def __init__(self, devnode):
  163. self.devnode = devnode
  164. subprocess.call( [
  165. 'stty',
  166. '--file={}'.format(self.devnode),
  167. '115200',
  168. 'raw',
  169. '-echok',
  170. '-echo',
  171. ])
  172. def send_cmd(self, code, multiline=False):
  173. readfd = open(self.devnode, 'r')
  174. writefd = open(self.devnode, 'w')
  175. time.sleep(0.1)
  176. writefd.write(code + '\n')
  177. while True:
  178. try:
  179. if multiline:
  180. signal.alarm(1)
  181. char = readfd.read(1)
  182. signal.alarm(0)
  183. if not multiline:
  184. if char == '\r' or char == '\n':
  185. print()
  186. break
  187. print(char, end='')
  188. except UnicodeDecodeError:
  189. print('Received garbage from target.')
  190. print('Is it already running?')
  191. print('Shutting down...')
  192. exit(1)
  193. writefd.close()
  194. readfd.close()
  195. def open(self, mode):
  196. return open(self.devnode, mode)
  197. class REPL:
  198. def __init__(self, start=0x00, length=0x10000, mode='bin',
  199. byteorder='little', devnode='/dev/ttyUSB0'):
  200. self.history = InMemoryHistory()
  201. self.promt = '> '
  202. self.config = {
  203. 'start': start,
  204. 'length': length,
  205. 'byteorder': byteorder,
  206. 'outfile': 'data-{}.bin'.format(datetime.now().strftime('%Y%m%d_%H%M')),
  207. }
  208. self.uart = UART(devnode)
  209. def handle_cmd(self, cmd, *args):
  210. if cmd == 'set':
  211. self.set_config(args[0], args[1])
  212. elif cmd == 'run':
  213. self.apply_config()
  214. self.uart.send_cmd('S')
  215. fd = self.uart.open('r')
  216. print()
  217. read_ascii(fd, self.config['outfile'])
  218. fd.close()
  219. elif cmd == 'cmd':
  220. self.uart.send_cmd(args[0], True)
  221. elif cmd == 'help':
  222. self.show_help()
  223. elif cmd == 'exit':
  224. print('Exit command received. Leaving...')
  225. exit(0)
  226. else:
  227. print('Error: Unknown command.')
  228. print("Try 'help'")
  229. def show_config(self):
  230. print('# Current Configuration')
  231. lines = []
  232. if self.config:
  233. tpl = ' {:<12}: '
  234. for key, val in self.config.items():
  235. if isinstance(val, int):
  236. fstr = tpl + '0x{:X}'
  237. else:
  238. fstr = tpl + '{}'
  239. lines.append(fstr.format(key, val))
  240. print('\n'.join(sorted(lines)))
  241. def show_help(self):
  242. print('# Supported commands')
  243. print(' set KEY VAL : Set configuration value KEY to VAL')
  244. print(' cmd CODE : Send command code to UART')
  245. print(' run : Start reading out code')
  246. print(' help : Show this help page')
  247. print(' exit : Terminate programm')
  248. def set_config(self, key, val):
  249. if key == 'byteorder':
  250. if val not in ('little', 'big'):
  251. print('Error: Wrong byteorder. Choose "little" or "big".')
  252. return
  253. elif key == 'start':
  254. val = int(val, 0)
  255. if val > 0x10000:
  256. print('Error: Start address is too large.')
  257. return
  258. elif key == 'length':
  259. val = int(val, 0)
  260. elif key == 'outfile':
  261. self.config[key] = str(val)
  262. else:
  263. print('Error: Config key does not exist.')
  264. return
  265. self.config[key] = val
  266. self.show_config()
  267. def apply_config(self):
  268. self.uart.send_cmd('A{:X}'.format(self.config['start']))
  269. self.uart.send_cmd('L{:X}'.format(self.config['length']))
  270. # Hardcode HEX mode
  271. self.uart.send_cmd('H')
  272. cmd = 'e' if self.config['byteorder'] == 'little' else 'E'
  273. self.uart.send_cmd(cmd)
  274. def run_loop(self):
  275. self.show_help()
  276. print()
  277. self.show_config()
  278. try:
  279. while True:
  280. cmd = prompt(self.promt,
  281. history=self.history,
  282. auto_suggest=AutoSuggestFromHistory(),
  283. )
  284. cmd = cmd.strip().split()
  285. cmd = cmd if cmd else ''
  286. if cmd != '':
  287. self.handle_cmd(*cmd)
  288. except KeyboardInterrupt:
  289. print('KeyboardInterrupt received. Shutting down...')
  290. exit(1)
  291. except EOFError:
  292. print('Reached end of line. Leaving...')
  293. exit(0)
  294. def main():
  295. args = parse_args()
  296. signal.signal(signal.SIGALRM, sigalarm_handler)
  297. if not os.path.exists(args.SerialDeviceFILE):
  298. print('Error: No such file.')
  299. exit(1)
  300. if args.interactive:
  301. REPL(devnode=args.SerialDeviceFILE).run_loop()
  302. exit(0)
  303. # If the script is not in interactive mode, issue this stuff
  304. # manually.
  305. uart = UART(args.SerialDeviceFILE)
  306. uart.send_cmd('A{:X}'.format(args.start))
  307. uart.send_cmd('L{:X}'.format(args.length))
  308. uart.send_cmd('H') # Hardcode HEX mode
  309. if args.endianess == 'big':
  310. uart.send_cmd('E')
  311. else:
  312. uart.send_cmd('e')
  313. uart.send_cmd('S')
  314. fd = uart.open('r')
  315. print()
  316. try:
  317. read_ascii(fd, args.outfile)
  318. except KeyboardInterrupt:
  319. print('Leaving...')
  320. finally:
  321. fd.close()
  322. if __name__ == '__main__':
  323. main()