barker.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #!/usr/bin/python3
  2. '''
  3. This script finds the barker code position in a WAV file
  4. '''
  5. from itertools import count
  6. import argparse
  7. from logging import getLogger, basicConfig
  8. import numpy as np
  9. import scipy.io.wavfile
  10. BARKERS = {
  11. 2: [1, -1],
  12. 3: [1, 1, -1],
  13. 4: [1, 1, -1, 1],
  14. 5: [1, 1, 1, -1, 1],
  15. 7: [1, 1, 1, -1, -1, 1, -1],
  16. 11: [1, 1, 1, -1, -1, -1, 1, -1, -1, 1, -1],
  17. 13: [1, 1, 1, 1, 1, -1, -1, 1, 1, -1, 1, -1, 1],
  18. }
  19. def get_parser():
  20. p = argparse.ArgumentParser()
  21. p.add_argument("fname")
  22. p.add_argument(
  23. "--barker-sequence-length", type=int, default=5, choices=BARKERS.keys()
  24. )
  25. p.add_argument(
  26. "--barker-freq-1", default=1000, type=int, help='Known as "+1" in barker code'
  27. )
  28. p.add_argument(
  29. "--barker-freq-2", default=500, type=int, help='Known as "-1" in barker code'
  30. )
  31. p.add_argument(
  32. "--log-level",
  33. default="WARNING",
  34. choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
  35. )
  36. return p
  37. def all_contigous_series(seq) -> list:
  38. """
  39. Put every element of seq into a list of all elements to which it is contigous.
  40. >>> all_contigous_series([1,2,10,11,12])
  41. [[1, 2], [10, 11, 12]]
  42. """
  43. out = [[]]
  44. for elem in seq:
  45. if not out[-1] or elem == out[-1][-1] + 1:
  46. out[-1].append(elem)
  47. else:
  48. assert len(out[-1]) > 0
  49. out.append([elem])
  50. if not out[-1]:
  51. out.pop()
  52. return out
  53. def largest_contigous_series(seq) -> list:
  54. """
  55. Like all_contigous_series, but pick the biggest.
  56. >>> largest_contigous_series([1,2,10,11,12])
  57. [10, 11, 12]
  58. """
  59. return max(all_contigous_series(seq), key=len)
  60. class SequenceOutOfBoundsException(IndexError):
  61. pass
  62. class Barker:
  63. def __init__(
  64. self,
  65. barker_sequence,
  66. barker_freqs,
  67. framerate: int,
  68. wave,
  69. threshold=0,
  70. symbol_ms=100,
  71. chunks_per_symbol=10,
  72. ):
  73. self.barker = barker_sequence
  74. self.freqs = barker_freqs
  75. self.threshold = threshold
  76. self.framerate = framerate
  77. self.wave = wave
  78. self.log = getLogger(self.__class__.__name__)
  79. self.symbol_ms = symbol_ms
  80. self.chunks_per_symbol = chunks_per_symbol
  81. # how many millisecond is a chunk long
  82. self.chunk_ms = int(self.symbol_ms / self.chunks_per_symbol)
  83. def decode_symbol(self, data) -> int:
  84. """
  85. given some data, it will try to understand if that's a 1 or a -1
  86. """
  87. r = np.fft.fft(data, self.framerate)
  88. def to_int(val):
  89. return int(np.absolute(val))
  90. values = [to_int(r[freq]) for freq in self.freqs]
  91. if max(values) < self.threshold:
  92. return 0
  93. is_symbol_1 = values[0] > values[1]
  94. return 1 if is_symbol_1 else -1
  95. def analyze_sequence(self, start_chunk: int) -> float:
  96. frames_per_chunk = int(self.framerate * self.chunk_ms // 1000)
  97. frames_per_symbol = frames_per_chunk * self.chunks_per_symbol
  98. total_frames = frames_per_symbol * len(self.barker)
  99. symbol_start = start_chunk * frames_per_chunk
  100. sequence_end = symbol_start + total_frames
  101. if sequence_end > len(self.wave):
  102. raise SequenceOutOfBoundsException("Invalid start_chunk")
  103. fitness = 0
  104. # print()
  105. for symbol in self.barker:
  106. symbol_end = symbol_start + frames_per_symbol
  107. # print(symbol_start, symbol_end)
  108. to_analyze = self.wave[symbol_start:symbol_end]
  109. symbol_is = self.decode_symbol(to_analyze)
  110. # print('is', symbol_is, 'should', symbol)
  111. fitness += symbol_is * symbol
  112. symbol_start += frames_per_symbol
  113. return fitness
  114. def analyze(self):
  115. total = len(self.wave)
  116. self.log.debug("tot frames = %s", total)
  117. self.log.debug("framerate = %s", self.framerate)
  118. self.log.debug("duration = %.2fs", (total / self.framerate))
  119. bestvalue = None
  120. bestshifts = []
  121. for shift in count():
  122. try:
  123. val = self.analyze_sequence(shift)
  124. except SequenceOutOfBoundsException:
  125. break
  126. if (bestvalue is None) or (val > bestvalue):
  127. bestvalue = val
  128. bestshifts = [shift]
  129. elif val == bestvalue:
  130. bestshifts.append(shift)
  131. if bestvalue < int(len(self.barker) * 2 / 3):
  132. raise ValueError("Could not find the barker code")
  133. bestshifts = largest_contigous_series(bestshifts)
  134. self.log.debug(
  135. "largest contigous series of besthifts picked among %d", len(bestshifts)
  136. )
  137. bestshift = bestshifts[len(bestshifts) // 2]
  138. shift_ms = self.chunk_ms * bestshift
  139. shift_s = shift_ms / 1000.0
  140. self.log.info(
  141. "bestshift at chunk %d (%.2fs): %d", bestshift, shift_s, bestvalue
  142. )
  143. return (bestshift, shift_ms, bestvalue)
  144. def main():
  145. args = get_parser().parse_args()
  146. basicConfig(level=args.log_level)
  147. framerate, data = scipy.io.wavfile.read(args.fname)
  148. if len(data.shape) > 1:
  149. raise ValueError("audio not mono maybe?")
  150. data = data[:, 0]
  151. barker_sequence = BARKERS[args.barker_sequence_length]
  152. barker = Barker(
  153. barker_sequence, [args.barker_freq_1, args.barker_freq_2], framerate, data
  154. )
  155. chunk, shift, fitness = barker.analyze() # in milliseconds
  156. print(shift)
  157. if __name__ == "__main__":
  158. main()