123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- #!/usr/bin/python3
- '''
- This script finds the barker code position in a WAV file
- '''
- from itertools import count
- import argparse
- from logging import getLogger, basicConfig
- from math import log10
- import matplotlib.pyplot as plt
- import numpy as np
- import scipy.io.wavfile
- BARKERS = {
- 2: [1, -1],
- 3: [1, 1, -1],
- 4: [1, 1, -1, 1],
- 5: [1, 1, 1, -1, 1],
- 7: [1, 1, 1, -1, -1, 1, -1],
- 11: [1, 1, 1, -1, -1, -1, 1, -1, -1, 1, -1],
- 13: [1, 1, 1, 1, 1, -1, -1, 1, 1, -1, 1, -1, 1],
- }
- def get_parser():
- p = argparse.ArgumentParser()
- p.add_argument("fname")
- p.add_argument(
- "--barker-sequence-length", type=int, default=5, choices=BARKERS.keys()
- )
- p.add_argument(
- "--barker-freq-1", default=880, type=int, help='Known as "+1" in barker code'
- )
- p.add_argument(
- "--barker-freq-2", default=440, type=int, help='Known as "-1" in barker code'
- )
- p.add_argument(
- "--log-level",
- default="WARNING",
- choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
- )
- return p
- def to_int(val):
- return int(np.absolute(val))
- def all_contigous_series(seq) -> list:
- """
- Put every element of seq into a list of all elements to which it is contigous.
- >>> all_contigous_series([1,2,10,11,12])
- [[1, 2], [10, 11, 12]]
- """
- out = [[]]
- for elem in seq:
- if not out[-1] or elem == out[-1][-1] + 1:
- out[-1].append(elem)
- else:
- assert len(out[-1]) > 0
- out.append([elem])
- if not out[-1]:
- out.pop()
- return out
- def largest_contigous_series(seq) -> list:
- """
- Like all_contigous_series, but pick the biggest.
- >>> largest_contigous_series([1,2,10,11,12])
- [10, 11, 12]
- """
- return max(all_contigous_series(seq), key=len)
- class SequenceOutOfBoundsException(IndexError):
- pass
- class Barker:
- def __init__(
- self,
- barker_sequence,
- barker_freqs,
- framerate: int,
- wave,
- threshold=0,
- symbol_ms=100,
- chunks_per_symbol=10,
- ):
- self.barker = barker_sequence
- self.freqs = barker_freqs
- self.threshold = threshold
- self.framerate = framerate
- self.wave = wave
- self.log = getLogger(self.__class__.__name__)
- self.fitness_min = len(self.barker) * 2 / 3
- self.symbol_ms = symbol_ms
- self.chunks_per_symbol = chunks_per_symbol
- # how many millisecond is a chunk long
- self.chunk_ms = int(self.symbol_ms / self.chunks_per_symbol)
- def decode_symbol(self, data) -> int:
- """
- given some data, it will try to understand if that's a 1 or a -1
- """
- r = np.fft.fft(data, self.framerate)
- values = [to_int(r[freq]) for freq in self.freqs]
- if max(values) < self.threshold:
- return 0
- is_symbol_1 = values[0] > values[1]
- return 1 if is_symbol_1 else -1
- def analyze_sequence(self, start_chunk: int) -> float:
- frames_per_chunk = int(self.framerate * self.chunk_ms // 1000)
- frames_per_symbol = frames_per_chunk * self.chunks_per_symbol
- total_frames = frames_per_symbol * len(self.barker)
- symbol_start = start_chunk * frames_per_chunk
- sequence_end = symbol_start + total_frames
- if sequence_end > len(self.wave):
- raise SequenceOutOfBoundsException("Invalid start_chunk")
- fitness = 0
- # print()
- for symbol in self.barker:
- symbol_end = symbol_start + frames_per_symbol
- # print(symbol_start, symbol_end)
- to_analyze = self.wave[symbol_start:symbol_end]
- symbol_is = self.decode_symbol(to_analyze)
- # print('is', symbol_is, 'should', symbol)
- fitness += symbol_is * symbol
- symbol_start += frames_per_symbol
- return fitness
- def analyze(self):
- total = len(self.wave)
- self.log.debug("tot frames = %s", total)
- self.log.debug("framerate = %s", self.framerate)
- self.log.debug("duration = %.2fs", (total / self.framerate))
- bestvalue = None
- bestshifts = []
- for shift in count():
- try:
- val = self.analyze_sequence(shift)
- except SequenceOutOfBoundsException:
- break
- if (bestvalue is None) or (val > bestvalue):
- bestvalue = val
- bestshifts = [shift]
- elif val == bestvalue:
- bestshifts.append(shift)
- if bestvalue < self.fitness_min:
- raise ValueError("Barker code not found")
- bestshifts = largest_contigous_series(bestshifts)
- self.log.debug(
- "largest contigous series of besthifts picked among %d", len(bestshifts)
- )
- bestshift = bestshifts[len(bestshifts) // 2]
- shift_ms = self.chunk_ms * bestshift
- shift_s = shift_ms / 1000.0
- self.log.info(
- "bestshift at chunk %d (%.2fs): %d", bestshift, shift_s, bestvalue
- )
- return (bestshift, shift_ms, bestvalue)
- def get_frames(self, start_ms: int, duration_ms: int) -> list:
- start_frame = (start_ms * self.framerate) // 1000
- n_frames = (duration_ms * self.framerate) // 1000
- data = self.wave[start_frame:start_frame + n_frames]
- return data
- def cerca_tono(self, data: list, freq: int) -> float:
- '''ritorna una metrica di qualità'''
- r = np.fft.fft(data, self.framerate)
- # for i, val in enumerate(r):
- # v = to_int(val)
- # if v != 0:
- # print(i, v)
- margin = 20
- good = 0.0
- bad = 0.0
- for f, val in enumerate(r):
- if f < 50:
- continue
- v = to_int(val)
- if f > (freq - margin) and f < (freq + margin):
- good += v
- bad += v
- if f == 5000:
- break
- # plt.plot(range(len(r)), r)
- # plt.grid()
- # plt.show()
- return good / bad
- def main():
- args = get_parser().parse_args()
- basicConfig(level=args.log_level)
- framerate, data = scipy.io.wavfile.read(args.fname)
- if len(data.shape) > 1:
- raise ValueError("audio not mono maybe?")
- data = data[:, 0]
- barker_sequence = BARKERS[args.barker_sequence_length]
- barker = Barker(
- barker_sequence, [args.barker_freq_1, args.barker_freq_2], framerate, data
- )
- chunk, barker_start, fitness = barker.analyze() # in milliseconds
- print("shift = %dms" % barker_start)
- barker_end = barker_start + len(barker_sequence) * barker.symbol_ms
- toneduration = 500
- tonestart = barker_end + toneduration
- freq_qual = []
- for tonefreq in [261, 1244, 2793]:
- campiona_da = tonestart + toneduration / 5.
- campiona_durata = toneduration * 3./5
- campiona_a = campiona_da + campiona_durata
- data = barker.get_frames(int(campiona_da), int(campiona_durata))
- qualita = barker.cerca_tono(data, tonefreq)
- print(tonefreq, campiona_da, campiona_a, qualita, log10(qualita))
- freq_qual.append(qualita)
- tonestart += toneduration
- tot_qual = sum(freq_qual)
- norm_qual = tot_qual / 1.614218155663699 # ad-hoc!
- print('=>', norm_qual)
- if __name__ == "__main__":
- main()
|