radio-monitoring/barker/barker.py
2022-05-08 00:11:29 +02:00

248 lines
7.2 KiB
Python
Executable file

#!/usr/bin/python3
'''
This script finds the barker code position in a WAV file
'''
from itertools import count
import argparse
from logging import getLogger, basicConfig
from math import log10
import matplotlib.pyplot as plt
import numpy as np
import scipy.io.wavfile
BARKERS = {
2: [1, -1],
3: [1, 1, -1],
4: [1, 1, -1, 1],
5: [1, 1, 1, -1, 1],
7: [1, 1, 1, -1, -1, 1, -1],
11: [1, 1, 1, -1, -1, -1, 1, -1, -1, 1, -1],
13: [1, 1, 1, 1, 1, -1, -1, 1, 1, -1, 1, -1, 1],
}
def get_parser():
p = argparse.ArgumentParser()
p.add_argument("fname")
p.add_argument(
"--barker-sequence-length", type=int, default=5, choices=BARKERS.keys()
)
p.add_argument(
"--barker-freq-1", default=880, type=int, help='Known as "+1" in barker code'
)
p.add_argument(
"--barker-freq-2", default=440, type=int, help='Known as "-1" in barker code'
)
p.add_argument(
"--log-level",
default="WARNING",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
)
return p
def to_int(val):
return int(np.absolute(val))
def all_contigous_series(seq) -> list:
"""
Put every element of seq into a list of all elements to which it is contigous.
>>> all_contigous_series([1,2,10,11,12])
[[1, 2], [10, 11, 12]]
"""
out = [[]]
for elem in seq:
if not out[-1] or elem == out[-1][-1] + 1:
out[-1].append(elem)
else:
assert len(out[-1]) > 0
out.append([elem])
if not out[-1]:
out.pop()
return out
def largest_contigous_series(seq) -> list:
"""
Like all_contigous_series, but pick the biggest.
>>> largest_contigous_series([1,2,10,11,12])
[10, 11, 12]
"""
return max(all_contigous_series(seq), key=len)
class SequenceOutOfBoundsException(IndexError):
pass
class Barker:
def __init__(
self,
barker_sequence,
barker_freqs,
framerate: int,
wave,
threshold=0,
symbol_ms=100,
chunks_per_symbol=10,
):
self.barker = barker_sequence
self.freqs = barker_freqs
self.threshold = threshold
self.framerate = framerate
self.wave = wave
self.log = getLogger(self.__class__.__name__)
self.fitness_min = len(self.barker) * 2 / 3
self.symbol_ms = symbol_ms
self.chunks_per_symbol = chunks_per_symbol
# how many millisecond is a chunk long
self.chunk_ms = int(self.symbol_ms / self.chunks_per_symbol)
def decode_symbol(self, data) -> int:
"""
given some data, it will try to understand if that's a 1 or a -1
"""
r = np.fft.fft(data, self.framerate)
values = [to_int(r[freq]) for freq in self.freqs]
if max(values) < self.threshold:
return 0
is_symbol_1 = values[0] > values[1]
return 1 if is_symbol_1 else -1
def analyze_sequence(self, start_chunk: int) -> float:
frames_per_chunk = int(self.framerate * self.chunk_ms // 1000)
frames_per_symbol = frames_per_chunk * self.chunks_per_symbol
total_frames = frames_per_symbol * len(self.barker)
symbol_start = start_chunk * frames_per_chunk
sequence_end = symbol_start + total_frames
if sequence_end > len(self.wave):
raise SequenceOutOfBoundsException("Invalid start_chunk")
fitness = 0
# print()
for symbol in self.barker:
symbol_end = symbol_start + frames_per_symbol
# print(symbol_start, symbol_end)
to_analyze = self.wave[symbol_start:symbol_end]
symbol_is = self.decode_symbol(to_analyze)
# print('is', symbol_is, 'should', symbol)
fitness += symbol_is * symbol
symbol_start += frames_per_symbol
return fitness
def analyze(self):
total = len(self.wave)
self.log.debug("tot frames = %s", total)
self.log.debug("framerate = %s", self.framerate)
self.log.debug("duration = %.2fs", (total / self.framerate))
bestvalue = None
bestshifts = []
for shift in count():
try:
val = self.analyze_sequence(shift)
except SequenceOutOfBoundsException:
break
if (bestvalue is None) or (val > bestvalue):
bestvalue = val
bestshifts = [shift]
elif val == bestvalue:
bestshifts.append(shift)
if bestvalue < self.fitness_min:
raise ValueError("Barker code not found")
bestshifts = largest_contigous_series(bestshifts)
self.log.debug(
"largest contigous series of besthifts picked among %d", len(bestshifts)
)
bestshift = bestshifts[len(bestshifts) // 2]
shift_ms = self.chunk_ms * bestshift
shift_s = shift_ms / 1000.0
self.log.info(
"bestshift at chunk %d (%.2fs): %d", bestshift, shift_s, bestvalue
)
return (bestshift, shift_ms, bestvalue)
def get_frames(self, start_ms: int, duration_ms: int) -> list:
start_frame = (start_ms * self.framerate) // 1000
n_frames = (duration_ms * self.framerate) // 1000
data = self.wave[start_frame:start_frame + n_frames]
return data
def cerca_tono(self, data: list, freq: int) -> float:
'''ritorna una metrica di qualità'''
r = np.fft.fft(data, self.framerate)
# for i, val in enumerate(r):
# v = to_int(val)
# if v != 0:
# print(i, v)
margin = 20
good = 0.0
bad = 0.0
for f, val in enumerate(r):
if f < 50:
continue
v = to_int(val)
if f > (freq - margin) and f < (freq + margin):
good += v
bad += v
if f == 5000:
break
# plt.plot(range(len(r)), r)
# plt.grid()
# plt.show()
return good / bad
def main():
args = get_parser().parse_args()
basicConfig(level=args.log_level)
framerate, data = scipy.io.wavfile.read(args.fname)
if len(data.shape) > 1:
raise ValueError("audio not mono maybe?")
data = data[:, 0]
barker_sequence = BARKERS[args.barker_sequence_length]
barker = Barker(
barker_sequence, [args.barker_freq_1, args.barker_freq_2], framerate, data
)
chunk, barker_start, fitness = barker.analyze() # in milliseconds
print("shift = %dms" % barker_start)
barker_end = barker_start + len(barker_sequence) * barker.symbol_ms
toneduration = 500
tonestart = barker_end + toneduration
freq_qual = []
for tonefreq in [261, 1244, 2793]:
campiona_da = tonestart + toneduration / 5.
campiona_durata = toneduration * 3./5
campiona_a = campiona_da + campiona_durata
data = barker.get_frames(int(campiona_da), int(campiona_durata))
qualita = barker.cerca_tono(data, tonefreq)
print(tonefreq, campiona_da, campiona_a, qualita, log10(qualita))
freq_qual.append(qualita)
tonestart += toneduration
tot_qual = sum(freq_qual)
norm_qual = tot_qual / 1.614218155663699 # ad-hoc!
print('=>', norm_qual)
if __name__ == "__main__":
main()