#!/usr/bin/python3 ''' This script finds the barker code position in a WAV file ''' from itertools import count import argparse from logging import getLogger, basicConfig from math import log10 import matplotlib.pyplot as plt import numpy as np import scipy.io.wavfile BARKERS = { 2: [1, -1], 3: [1, 1, -1], 4: [1, 1, -1, 1], 5: [1, 1, 1, -1, 1], 7: [1, 1, 1, -1, -1, 1, -1], 11: [1, 1, 1, -1, -1, -1, 1, -1, -1, 1, -1], 13: [1, 1, 1, 1, 1, -1, -1, 1, 1, -1, 1, -1, 1], } def get_parser(): p = argparse.ArgumentParser() p.add_argument("fname") p.add_argument( "--barker-sequence-length", type=int, default=5, choices=BARKERS.keys() ) p.add_argument( "--barker-freq-1", default=880, type=int, help='Known as "+1" in barker code' ) p.add_argument( "--barker-freq-2", default=440, type=int, help='Known as "-1" in barker code' ) p.add_argument( "--log-level", default="WARNING", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], ) return p def to_int(val): return int(np.absolute(val)) def all_contigous_series(seq) -> list: """ Put every element of seq into a list of all elements to which it is contigous. >>> all_contigous_series([1,2,10,11,12]) [[1, 2], [10, 11, 12]] """ out = [[]] for elem in seq: if not out[-1] or elem == out[-1][-1] + 1: out[-1].append(elem) else: assert len(out[-1]) > 0 out.append([elem]) if not out[-1]: out.pop() return out def largest_contigous_series(seq) -> list: """ Like all_contigous_series, but pick the biggest. >>> largest_contigous_series([1,2,10,11,12]) [10, 11, 12] """ return max(all_contigous_series(seq), key=len) class SequenceOutOfBoundsException(IndexError): pass class Barker: def __init__( self, barker_sequence, barker_freqs, framerate: int, wave, threshold=0, symbol_ms=100, chunks_per_symbol=10, ): self.barker = barker_sequence self.freqs = barker_freqs self.threshold = threshold self.framerate = framerate self.wave = wave self.log = getLogger(self.__class__.__name__) self.fitness_min = len(self.barker) * 2 / 3 self.symbol_ms = symbol_ms self.chunks_per_symbol = chunks_per_symbol # how many millisecond is a chunk long self.chunk_ms = int(self.symbol_ms / self.chunks_per_symbol) def decode_symbol(self, data) -> int: """ given some data, it will try to understand if that's a 1 or a -1 """ r = np.fft.fft(data, self.framerate) values = [to_int(r[freq]) for freq in self.freqs] if max(values) < self.threshold: return 0 is_symbol_1 = values[0] > values[1] return 1 if is_symbol_1 else -1 def analyze_sequence(self, start_chunk: int) -> float: frames_per_chunk = int(self.framerate * self.chunk_ms // 1000) frames_per_symbol = frames_per_chunk * self.chunks_per_symbol total_frames = frames_per_symbol * len(self.barker) symbol_start = start_chunk * frames_per_chunk sequence_end = symbol_start + total_frames if sequence_end > len(self.wave): raise SequenceOutOfBoundsException("Invalid start_chunk") fitness = 0 # print() for symbol in self.barker: symbol_end = symbol_start + frames_per_symbol # print(symbol_start, symbol_end) to_analyze = self.wave[symbol_start:symbol_end] symbol_is = self.decode_symbol(to_analyze) # print('is', symbol_is, 'should', symbol) fitness += symbol_is * symbol symbol_start += frames_per_symbol return fitness def analyze(self): total = len(self.wave) self.log.debug("tot frames = %s", total) self.log.debug("framerate = %s", self.framerate) self.log.debug("duration = %.2fs", (total / self.framerate)) bestvalue = None bestshifts = [] for shift in count(): try: val = self.analyze_sequence(shift) except SequenceOutOfBoundsException: break if (bestvalue is None) or (val > bestvalue): bestvalue = val bestshifts = [shift] elif val == bestvalue: bestshifts.append(shift) if bestvalue < self.fitness_min: raise ValueError("Barker code not found") bestshifts = largest_contigous_series(bestshifts) self.log.debug( "largest contigous series of besthifts picked among %d", len(bestshifts) ) bestshift = bestshifts[len(bestshifts) // 2] shift_ms = self.chunk_ms * bestshift shift_s = shift_ms / 1000.0 self.log.info( "bestshift at chunk %d (%.2fs): %d", bestshift, shift_s, bestvalue ) return (bestshift, shift_ms, bestvalue) def get_frames(self, start_ms: int, duration_ms: int) -> list: start_frame = (start_ms * self.framerate) // 1000 n_frames = (duration_ms * self.framerate) // 1000 data = self.wave[start_frame:start_frame + n_frames] return data def cerca_tono(self, data: list, freq: int) -> float: '''ritorna una metrica di qualità''' r = np.fft.fft(data, self.framerate) # for i, val in enumerate(r): # v = to_int(val) # if v != 0: # print(i, v) margin = 20 good = 0.0 bad = 0.0 for f, val in enumerate(r): if f < 50: continue v = to_int(val) if f > (freq - margin) and f < (freq + margin): good += v bad += v if f == 5000: break # plt.plot(range(len(r)), r) # plt.grid() # plt.show() return good / bad def main(): args = get_parser().parse_args() basicConfig(level=args.log_level) framerate, data = scipy.io.wavfile.read(args.fname) if len(data.shape) > 1: raise ValueError("audio not mono maybe?") data = data[:, 0] barker_sequence = BARKERS[args.barker_sequence_length] barker = Barker( barker_sequence, [args.barker_freq_1, args.barker_freq_2], framerate, data ) chunk, barker_start, fitness = barker.analyze() # in milliseconds print("shift = %dms" % barker_start) barker_end = barker_start + len(barker_sequence) * barker.symbol_ms toneduration = 500 tonestart = barker_end + toneduration freq_qual = [] for tonefreq in [261, 1244, 2793]: campiona_da = tonestart + toneduration / 5. campiona_durata = toneduration * 3./5 campiona_a = campiona_da + campiona_durata data = barker.get_frames(int(campiona_da), int(campiona_durata)) qualita = barker.cerca_tono(data, tonefreq) print(tonefreq, campiona_da, campiona_a, qualita, log10(qualita)) freq_qual.append(qualita) tonestart += toneduration tot_qual = sum(freq_qual) norm_qual = tot_qual / 1.614218155663699 # ad-hoc! print('=>', norm_qual) if __name__ == "__main__": main()