barker.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. #!/usr/bin/python3
  2. '''
  3. This script finds the barker code position in a WAV file
  4. '''
  5. from itertools import count
  6. import argparse
  7. from logging import getLogger, basicConfig
  8. from math import log10
  9. import matplotlib.pyplot as plt
  10. import numpy as np
  11. import scipy.io.wavfile
  12. BARKERS = {
  13. 2: [1, -1],
  14. 3: [1, 1, -1],
  15. 4: [1, 1, -1, 1],
  16. 5: [1, 1, 1, -1, 1],
  17. 7: [1, 1, 1, -1, -1, 1, -1],
  18. 11: [1, 1, 1, -1, -1, -1, 1, -1, -1, 1, -1],
  19. 13: [1, 1, 1, 1, 1, -1, -1, 1, 1, -1, 1, -1, 1],
  20. }
  21. def get_parser():
  22. p = argparse.ArgumentParser()
  23. p.add_argument("fname")
  24. p.add_argument(
  25. "--barker-sequence-length", type=int, default=5, choices=BARKERS.keys()
  26. )
  27. p.add_argument(
  28. "--barker-freq-1", default=880, type=int, help='Known as "+1" in barker code'
  29. )
  30. p.add_argument(
  31. "--barker-freq-2", default=440, type=int, help='Known as "-1" in barker code'
  32. )
  33. p.add_argument(
  34. "--log-level",
  35. default="WARNING",
  36. choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
  37. )
  38. return p
  39. def to_int(val):
  40. return int(np.absolute(val))
  41. def all_contigous_series(seq) -> list:
  42. """
  43. Put every element of seq into a list of all elements to which it is contigous.
  44. >>> all_contigous_series([1,2,10,11,12])
  45. [[1, 2], [10, 11, 12]]
  46. """
  47. out = [[]]
  48. for elem in seq:
  49. if not out[-1] or elem == out[-1][-1] + 1:
  50. out[-1].append(elem)
  51. else:
  52. assert len(out[-1]) > 0
  53. out.append([elem])
  54. if not out[-1]:
  55. out.pop()
  56. return out
  57. def largest_contigous_series(seq) -> list:
  58. """
  59. Like all_contigous_series, but pick the biggest.
  60. >>> largest_contigous_series([1,2,10,11,12])
  61. [10, 11, 12]
  62. """
  63. return max(all_contigous_series(seq), key=len)
  64. class SequenceOutOfBoundsException(IndexError):
  65. pass
  66. class Barker:
  67. def __init__(
  68. self,
  69. barker_sequence,
  70. barker_freqs,
  71. framerate: int,
  72. wave,
  73. threshold=0,
  74. symbol_ms=100,
  75. chunks_per_symbol=10,
  76. ):
  77. self.barker = barker_sequence
  78. self.freqs = barker_freqs
  79. self.threshold = threshold
  80. self.framerate = framerate
  81. self.wave = wave
  82. self.log = getLogger(self.__class__.__name__)
  83. self.fitness_min = len(self.barker) * 2 / 3
  84. self.symbol_ms = symbol_ms
  85. self.chunks_per_symbol = chunks_per_symbol
  86. # how many millisecond is a chunk long
  87. self.chunk_ms = int(self.symbol_ms / self.chunks_per_symbol)
  88. def decode_symbol(self, data) -> int:
  89. """
  90. given some data, it will try to understand if that's a 1 or a -1
  91. """
  92. r = np.fft.fft(data, self.framerate)
  93. values = [to_int(r[freq]) for freq in self.freqs]
  94. if max(values) < self.threshold:
  95. return 0
  96. is_symbol_1 = values[0] > values[1]
  97. return 1 if is_symbol_1 else -1
  98. def analyze_sequence(self, start_chunk: int) -> float:
  99. frames_per_chunk = int(self.framerate * self.chunk_ms // 1000)
  100. frames_per_symbol = frames_per_chunk * self.chunks_per_symbol
  101. total_frames = frames_per_symbol * len(self.barker)
  102. symbol_start = start_chunk * frames_per_chunk
  103. sequence_end = symbol_start + total_frames
  104. if sequence_end > len(self.wave):
  105. raise SequenceOutOfBoundsException("Invalid start_chunk")
  106. fitness = 0
  107. # print()
  108. for symbol in self.barker:
  109. symbol_end = symbol_start + frames_per_symbol
  110. # print(symbol_start, symbol_end)
  111. to_analyze = self.wave[symbol_start:symbol_end]
  112. symbol_is = self.decode_symbol(to_analyze)
  113. # print('is', symbol_is, 'should', symbol)
  114. fitness += symbol_is * symbol
  115. symbol_start += frames_per_symbol
  116. return fitness
  117. def analyze(self):
  118. total = len(self.wave)
  119. self.log.debug("tot frames = %s", total)
  120. self.log.debug("framerate = %s", self.framerate)
  121. self.log.debug("duration = %.2fs", (total / self.framerate))
  122. bestvalue = None
  123. bestshifts = []
  124. for shift in count():
  125. try:
  126. val = self.analyze_sequence(shift)
  127. except SequenceOutOfBoundsException:
  128. break
  129. if (bestvalue is None) or (val > bestvalue):
  130. bestvalue = val
  131. bestshifts = [shift]
  132. elif val == bestvalue:
  133. bestshifts.append(shift)
  134. if bestvalue < self.fitness_min:
  135. raise ValueError("Barker code not found")
  136. bestshifts = largest_contigous_series(bestshifts)
  137. self.log.debug(
  138. "largest contigous series of besthifts picked among %d", len(bestshifts)
  139. )
  140. bestshift = bestshifts[len(bestshifts) // 2]
  141. shift_ms = self.chunk_ms * bestshift
  142. shift_s = shift_ms / 1000.0
  143. self.log.info(
  144. "bestshift at chunk %d (%.2fs): %d", bestshift, shift_s, bestvalue
  145. )
  146. return (bestshift, shift_ms, bestvalue)
  147. def get_frames(self, start_ms: int, duration_ms: int) -> list:
  148. start_frame = (start_ms * self.framerate) // 1000
  149. n_frames = (duration_ms * self.framerate) // 1000
  150. data = self.wave[start_frame:start_frame + n_frames]
  151. return data
  152. def cerca_tono(self, data: list, freq: int) -> float:
  153. '''ritorna una metrica di qualità'''
  154. r = np.fft.fft(data, self.framerate)
  155. # for i, val in enumerate(r):
  156. # v = to_int(val)
  157. # if v != 0:
  158. # print(i, v)
  159. margin = 20
  160. good = 0.0
  161. bad = 0.0
  162. for f, val in enumerate(r):
  163. if f < 50:
  164. continue
  165. v = to_int(val)
  166. if f > (freq - margin) and f < (freq + margin):
  167. good += v
  168. bad += v
  169. if f == 5000:
  170. break
  171. # plt.plot(range(len(r)), r)
  172. # plt.grid()
  173. # plt.show()
  174. return good / bad
  175. def main():
  176. args = get_parser().parse_args()
  177. basicConfig(level=args.log_level)
  178. framerate, data = scipy.io.wavfile.read(args.fname)
  179. if len(data.shape) > 1:
  180. raise ValueError("audio not mono maybe?")
  181. data = data[:, 0]
  182. barker_sequence = BARKERS[args.barker_sequence_length]
  183. barker = Barker(
  184. barker_sequence, [args.barker_freq_1, args.barker_freq_2], framerate, data
  185. )
  186. chunk, barker_start, fitness = barker.analyze() # in milliseconds
  187. print("shift = %dms" % barker_start)
  188. barker_end = barker_start + len(barker_sequence) * barker.symbol_ms
  189. toneduration = 500
  190. tonestart = barker_end + toneduration
  191. freq_qual = []
  192. for tonefreq in [261, 1244, 2793]:
  193. campiona_da = tonestart + toneduration / 5.
  194. campiona_durata = toneduration * 3./5
  195. campiona_a = campiona_da + campiona_durata
  196. data = barker.get_frames(int(campiona_da), int(campiona_durata))
  197. qualita = barker.cerca_tono(data, tonefreq)
  198. print(tonefreq, campiona_da, campiona_a, qualita, log10(qualita))
  199. freq_qual.append(qualita)
  200. tonestart += toneduration
  201. tot_qual = sum(freq_qual)
  202. norm_qual = tot_qual / 1.614218155663699 # ad-hoc!
  203. print('=>', norm_qual)
  204. if __name__ == "__main__":
  205. main()