248 lines
7.2 KiB
Python
Executable file
248 lines
7.2 KiB
Python
Executable file
#!/usr/bin/python3
|
|
'''
|
|
This script finds the barker code position in a WAV file
|
|
'''
|
|
|
|
from itertools import count
|
|
import argparse
|
|
from logging import getLogger, basicConfig
|
|
from math import log10
|
|
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
import scipy.io.wavfile
|
|
|
|
BARKERS = {
|
|
2: [1, -1],
|
|
3: [1, 1, -1],
|
|
4: [1, 1, -1, 1],
|
|
5: [1, 1, 1, -1, 1],
|
|
7: [1, 1, 1, -1, -1, 1, -1],
|
|
11: [1, 1, 1, -1, -1, -1, 1, -1, -1, 1, -1],
|
|
13: [1, 1, 1, 1, 1, -1, -1, 1, 1, -1, 1, -1, 1],
|
|
}
|
|
|
|
|
|
def get_parser():
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("fname")
|
|
p.add_argument(
|
|
"--barker-sequence-length", type=int, default=5, choices=BARKERS.keys()
|
|
)
|
|
p.add_argument(
|
|
"--barker-freq-1", default=880, type=int, help='Known as "+1" in barker code'
|
|
)
|
|
p.add_argument(
|
|
"--barker-freq-2", default=440, type=int, help='Known as "-1" in barker code'
|
|
)
|
|
p.add_argument(
|
|
"--log-level",
|
|
default="WARNING",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
|
)
|
|
return p
|
|
|
|
def to_int(val):
|
|
return int(np.absolute(val))
|
|
|
|
def all_contigous_series(seq) -> list:
|
|
"""
|
|
Put every element of seq into a list of all elements to which it is contigous.
|
|
|
|
>>> all_contigous_series([1,2,10,11,12])
|
|
[[1, 2], [10, 11, 12]]
|
|
"""
|
|
|
|
out = [[]]
|
|
for elem in seq:
|
|
if not out[-1] or elem == out[-1][-1] + 1:
|
|
out[-1].append(elem)
|
|
else:
|
|
assert len(out[-1]) > 0
|
|
out.append([elem])
|
|
if not out[-1]:
|
|
out.pop()
|
|
return out
|
|
|
|
|
|
def largest_contigous_series(seq) -> list:
|
|
"""
|
|
Like all_contigous_series, but pick the biggest.
|
|
|
|
>>> largest_contigous_series([1,2,10,11,12])
|
|
[10, 11, 12]
|
|
"""
|
|
|
|
return max(all_contigous_series(seq), key=len)
|
|
|
|
|
|
class SequenceOutOfBoundsException(IndexError):
|
|
pass
|
|
|
|
|
|
class Barker:
|
|
def __init__(
|
|
self,
|
|
barker_sequence,
|
|
barker_freqs,
|
|
framerate: int,
|
|
wave,
|
|
threshold=0,
|
|
symbol_ms=100,
|
|
chunks_per_symbol=10,
|
|
):
|
|
self.barker = barker_sequence
|
|
self.freqs = barker_freqs
|
|
self.threshold = threshold
|
|
self.framerate = framerate
|
|
self.wave = wave
|
|
self.log = getLogger(self.__class__.__name__)
|
|
self.fitness_min = len(self.barker) * 2 / 3
|
|
|
|
self.symbol_ms = symbol_ms
|
|
self.chunks_per_symbol = chunks_per_symbol
|
|
# how many millisecond is a chunk long
|
|
self.chunk_ms = int(self.symbol_ms / self.chunks_per_symbol)
|
|
|
|
def decode_symbol(self, data) -> int:
|
|
"""
|
|
given some data, it will try to understand if that's a 1 or a -1
|
|
"""
|
|
r = np.fft.fft(data, self.framerate)
|
|
|
|
|
|
values = [to_int(r[freq]) for freq in self.freqs]
|
|
if max(values) < self.threshold:
|
|
return 0
|
|
is_symbol_1 = values[0] > values[1]
|
|
return 1 if is_symbol_1 else -1
|
|
|
|
def analyze_sequence(self, start_chunk: int) -> float:
|
|
frames_per_chunk = int(self.framerate * self.chunk_ms // 1000)
|
|
frames_per_symbol = frames_per_chunk * self.chunks_per_symbol
|
|
total_frames = frames_per_symbol * len(self.barker)
|
|
symbol_start = start_chunk * frames_per_chunk
|
|
sequence_end = symbol_start + total_frames
|
|
if sequence_end > len(self.wave):
|
|
raise SequenceOutOfBoundsException("Invalid start_chunk")
|
|
fitness = 0
|
|
|
|
# print()
|
|
for symbol in self.barker:
|
|
symbol_end = symbol_start + frames_per_symbol
|
|
# print(symbol_start, symbol_end)
|
|
to_analyze = self.wave[symbol_start:symbol_end]
|
|
symbol_is = self.decode_symbol(to_analyze)
|
|
|
|
# print('is', symbol_is, 'should', symbol)
|
|
fitness += symbol_is * symbol
|
|
|
|
symbol_start += frames_per_symbol
|
|
return fitness
|
|
|
|
def analyze(self):
|
|
total = len(self.wave)
|
|
self.log.debug("tot frames = %s", total)
|
|
self.log.debug("framerate = %s", self.framerate)
|
|
self.log.debug("duration = %.2fs", (total / self.framerate))
|
|
|
|
bestvalue = None
|
|
bestshifts = []
|
|
for shift in count():
|
|
try:
|
|
val = self.analyze_sequence(shift)
|
|
except SequenceOutOfBoundsException:
|
|
break
|
|
if (bestvalue is None) or (val > bestvalue):
|
|
bestvalue = val
|
|
bestshifts = [shift]
|
|
elif val == bestvalue:
|
|
bestshifts.append(shift)
|
|
|
|
if bestvalue < self.fitness_min:
|
|
raise ValueError("Barker code not found")
|
|
|
|
bestshifts = largest_contigous_series(bestshifts)
|
|
self.log.debug(
|
|
"largest contigous series of besthifts picked among %d", len(bestshifts)
|
|
)
|
|
bestshift = bestshifts[len(bestshifts) // 2]
|
|
shift_ms = self.chunk_ms * bestshift
|
|
shift_s = shift_ms / 1000.0
|
|
self.log.info(
|
|
"bestshift at chunk %d (%.2fs): %d", bestshift, shift_s, bestvalue
|
|
)
|
|
return (bestshift, shift_ms, bestvalue)
|
|
|
|
def get_frames(self, start_ms: int, duration_ms: int) -> list:
|
|
start_frame = (start_ms * self.framerate) // 1000
|
|
n_frames = (duration_ms * self.framerate) // 1000
|
|
data = self.wave[start_frame:start_frame + n_frames]
|
|
return data
|
|
|
|
def cerca_tono(self, data: list, freq: int) -> float:
|
|
'''ritorna una metrica di qualità'''
|
|
r = np.fft.fft(data, self.framerate)
|
|
|
|
# for i, val in enumerate(r):
|
|
# v = to_int(val)
|
|
# if v != 0:
|
|
# print(i, v)
|
|
margin = 20
|
|
good = 0.0
|
|
bad = 0.0
|
|
|
|
for f, val in enumerate(r):
|
|
if f < 50:
|
|
continue
|
|
v = to_int(val)
|
|
if f > (freq - margin) and f < (freq + margin):
|
|
good += v
|
|
bad += v
|
|
if f == 5000:
|
|
break
|
|
# plt.plot(range(len(r)), r)
|
|
# plt.grid()
|
|
# plt.show()
|
|
|
|
return good / bad
|
|
|
|
|
|
def main():
|
|
args = get_parser().parse_args()
|
|
basicConfig(level=args.log_level)
|
|
framerate, data = scipy.io.wavfile.read(args.fname)
|
|
if len(data.shape) > 1:
|
|
raise ValueError("audio not mono maybe?")
|
|
data = data[:, 0]
|
|
|
|
barker_sequence = BARKERS[args.barker_sequence_length]
|
|
barker = Barker(
|
|
barker_sequence, [args.barker_freq_1, args.barker_freq_2], framerate, data
|
|
)
|
|
chunk, barker_start, fitness = barker.analyze() # in milliseconds
|
|
print("shift = %dms" % barker_start)
|
|
barker_end = barker_start + len(barker_sequence) * barker.symbol_ms
|
|
toneduration = 500
|
|
tonestart = barker_end + toneduration
|
|
|
|
freq_qual = []
|
|
for tonefreq in [261, 1244, 2793]:
|
|
campiona_da = tonestart + toneduration / 5.
|
|
campiona_durata = toneduration * 3./5
|
|
campiona_a = campiona_da + campiona_durata
|
|
data = barker.get_frames(int(campiona_da), int(campiona_durata))
|
|
qualita = barker.cerca_tono(data, tonefreq)
|
|
print(tonefreq, campiona_da, campiona_a, qualita, log10(qualita))
|
|
freq_qual.append(qualita)
|
|
|
|
tonestart += toneduration
|
|
|
|
tot_qual = sum(freq_qual)
|
|
norm_qual = tot_qual / 1.614218155663699 # ad-hoc!
|
|
print('=>', norm_qual)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|