Browse Source

barker codes detection

boyska 2 years ago
commit
4435fc32c8
4 changed files with 194 additions and 0 deletions
  1. 2 0
      .gitignore
  2. BIN
      barker/audio/barker13.ogg
  3. BIN
      barker/audio/barker5.ogg
  4. 192 0
      barker/barker.py

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+__pycache__/
+.mypy_cache/

BIN
barker/audio/barker13.ogg


BIN
barker/audio/barker5.ogg


+ 192 - 0
barker/barker.py

@@ -0,0 +1,192 @@
+#!/usr/bin/python3
+'''
+This script finds the barker code position in a WAV file
+'''
+
+from itertools import count
+import argparse
+from logging import getLogger, basicConfig
+
+import numpy as np
+import scipy.io.wavfile
+
+BARKERS = {
+    2: [1, -1],
+    3: [1, 1, -1],
+    4: [1, 1, -1, 1],
+    5: [1, 1, 1, -1, 1],
+    7: [1, 1, 1, -1, -1, 1, -1],
+    11: [1, 1, 1, -1, -1, -1, 1, -1, -1, 1, -1],
+    13: [1, 1, 1, 1, 1, -1, -1, 1, 1, -1, 1, -1, 1],
+}
+
+
+def get_parser():
+    p = argparse.ArgumentParser()
+    p.add_argument("fname")
+    p.add_argument(
+        "--barker-sequence-length", type=int, default=5, choices=BARKERS.keys()
+    )
+    p.add_argument(
+        "--barker-freq-1", default=1000, type=int, help='Known as "+1" in barker code'
+    )
+    p.add_argument(
+        "--barker-freq-2", default=500, type=int, help='Known as "-1" in barker code'
+    )
+    p.add_argument(
+        "--log-level",
+        default="WARNING",
+        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
+    )
+    return p
+
+
+def all_contigous_series(seq) -> list:
+    """
+    Put every element of seq into a list of all elements to which it is contigous.
+
+    >>> all_contigous_series([1,2,10,11,12])
+    [[1, 2], [10, 11, 12]]
+    """
+
+    out = [[]]
+    for elem in seq:
+        if not out[-1] or elem == out[-1][-1] + 1:
+            out[-1].append(elem)
+        else:
+            assert len(out[-1]) > 0
+            out.append([elem])
+    if not out[-1]:
+        out.pop()
+    return out
+
+
+def largest_contigous_series(seq) -> list:
+    """
+    Like all_contigous_series, but pick the biggest.
+
+    >>> largest_contigous_series([1,2,10,11,12])
+    [10, 11, 12]
+    """
+
+    return max(all_contigous_series(seq), key=len)
+
+
+class SequenceOutOfBoundsException(IndexError):
+    pass
+
+
+class Barker:
+    def __init__(
+        self,
+        barker_sequence,
+        barker_freqs,
+        framerate: int,
+        wave,
+        threshold=0,
+        symbol_ms=100,
+        chunks_per_symbol=10,
+    ):
+        self.barker = barker_sequence
+        self.freqs = barker_freqs
+        self.threshold = threshold
+        self.framerate = framerate
+        self.wave = wave
+        self.log = getLogger(self.__class__.__name__)
+
+        self.symbol_ms = symbol_ms
+        self.chunks_per_symbol = chunks_per_symbol
+        # how many millisecond is a chunk long
+        self.chunk_ms = int(self.symbol_ms / self.chunks_per_symbol)
+
+    def decode_symbol(self, data) -> int:
+        """
+        given some data, it will try to understand if that's a 1 or a -1
+        """
+        r = np.fft.fft(data, self.framerate)
+
+        def to_int(val):
+            return int(np.absolute(val))
+
+        values = [to_int(r[freq]) for freq in self.freqs]
+        if max(values) < self.threshold:
+            return 0
+        is_symbol_1 = values[0] > values[1]
+        return 1 if is_symbol_1 else -1
+
+    def analyze_sequence(self, start_chunk: int) -> float:
+        frames_per_chunk = int(self.framerate * self.chunk_ms // 1000)
+        frames_per_symbol = frames_per_chunk * self.chunks_per_symbol
+        total_frames = frames_per_symbol * len(self.barker)
+        symbol_start = start_chunk * frames_per_chunk
+        sequence_end = symbol_start + total_frames
+        if sequence_end > len(self.wave):
+            raise SequenceOutOfBoundsException("Invalid start_chunk")
+        fitness = 0
+
+        # print()
+        for symbol in self.barker:
+            symbol_end = symbol_start + frames_per_symbol
+            # print(symbol_start, symbol_end)
+            to_analyze = self.wave[symbol_start:symbol_end]
+            symbol_is = self.decode_symbol(to_analyze)
+
+            # print('is', symbol_is, 'should', symbol)
+            fitness += symbol_is * symbol
+
+            symbol_start += frames_per_symbol
+        return fitness
+
+    def analyze(self):
+        total = len(self.wave)
+        self.log.debug("tot frames = %s", total)
+        self.log.debug("framerate = %s", self.framerate)
+        self.log.debug("duration = %.2fs", (total / self.framerate))
+
+        bestvalue = None
+        bestshifts = []
+        for shift in count():
+            try:
+                val = self.analyze_sequence(shift)
+            except SequenceOutOfBoundsException:
+                break
+            if (bestvalue is None) or (val > bestvalue):
+                bestvalue = val
+                bestshifts = [shift]
+            elif val == bestvalue:
+                bestshifts.append(shift)
+
+        if bestvalue < int(len(self.barker) * 2 / 3):
+            raise ValueError("Could not find the barker code")
+
+        bestshifts = largest_contigous_series(bestshifts)
+        self.log.debug(
+            "largest contigous series of besthifts picked among %d", len(bestshifts)
+        )
+        bestshift = bestshifts[len(bestshifts) // 2]
+        shift_ms = self.chunk_ms * bestshift
+        shift_s = shift_ms / 1000.0
+        self.log.info(
+            "bestshift at chunk %d (%.2fs): %d", bestshift, shift_s, bestvalue
+        )
+        return (bestshift, shift_ms, bestvalue)
+
+
+def main():
+    args = get_parser().parse_args()
+    basicConfig(level=args.log_level)
+    framerate, data = scipy.io.wavfile.read(args.fname)
+    if len(data.shape) > 1:
+        raise ValueError("audio not mono maybe?")
+        data = data[:, 0]
+
+    barker_sequence = BARKERS[args.barker_sequence_length]
+    barker = Barker(
+        barker_sequence, [args.barker_freq_1, args.barker_freq_2], framerate, data
+    )
+    chunk, shift, fitness = barker.analyze()  # in milliseconds
+    print(shift)
+
+
+if __name__ == "__main__":
+    main()