Compare commits

...

3 commits

Author SHA1 Message Date
17e3539085
Plug http retriever in current logic 2021-08-26 21:47:08 -03:00
1718c4c331
Add http retriever 2021-08-26 21:46:47 -03:00
a192501570
Please the linter
- Do not use lambda
 - Replace assert with raise
 - Minor fixes
2021-08-25 15:48:12 -03:00
3 changed files with 94 additions and 21 deletions

View file

@ -7,19 +7,25 @@ from time import sleep
from typing import Callable, Optional from typing import Callable, Optional
from .config_manager import get_config from .config_manager import get_config
from .http_retriever import RETRIEVER
logger = logging.getLogger("forge") logger = logging.getLogger("forge")
Validator = Callable[[datetime, datetime, str], bool] Validator = Callable[[datetime, datetime, str], bool]
def get_timefile_exact(time) -> str: async def get_timefile_exact(time) -> str:
""" """
time is of type `datetime`; it is not "rounded" to match the real file; time is of type `datetime`; it is not "rounded" to match the real file;
that work is done in get_timefile(time) that work is done in get_timefile(time)
""" """
return os.path.join( remote_repo = get_config()["AUDIO_INPUT"]
get_config()["AUDIO_INPUT"], time.strftime(get_config()["AUDIO_INPUT_FORMAT"]) remote_path = os.path.join(
remote_repo, time.strftime(get_config()["AUDIO_INPUT_FORMAT"])
) )
if remote.startswith("http"):
local = await RETRIEVER.get(remote_path)
return local
return local_path
def round_timefile(exact: datetime) -> datetime: def round_timefile(exact: datetime) -> datetime:
@ -29,8 +35,9 @@ def round_timefile(exact: datetime) -> datetime:
return datetime(exact.year, exact.month, exact.day, exact.hour) return datetime(exact.year, exact.month, exact.day, exact.hour)
def get_timefile(exact: datetime) -> str: async def get_timefile(exact: datetime) -> str:
return get_timefile_exact(round_timefile(exact)) file = await get_timefile_exact(round_timefile(exact))
return file
def get_files_and_intervals(start, end, rounder=round_timefile): def get_files_and_intervals(start, end, rounder=round_timefile):
@ -70,13 +77,16 @@ def mp3_join(named_intervals):
for (filename, start_cut, end_cut) in named_intervals: for (filename, start_cut, end_cut) in named_intervals:
# this happens only one time, and only at the first iteration # this happens only one time, and only at the first iteration
if start_cut: if start_cut:
assert startskip is None if startskip is not None:
raise Exception("error in first cut iteration")
startskip = start_cut startskip = start_cut
# this happens only one time, and only at the first iteration # this happens only one time, and only at the last iteration
if end_cut: if end_cut:
assert endskip is None if endskip is not None:
raise Exception("error in last iteration")
endskip = end_cut endskip = end_cut
assert "|" not in filename if "|" in filename:
raise Exception(f"'|' in {filename}")
files.append(filename) files.append(filename)
cmdline = [ffmpeg, "-i", "concat:%s" % "|".join(files)] cmdline = [ffmpeg, "-i", "concat:%s" % "|".join(files)]
@ -90,20 +100,23 @@ def mp3_join(named_intervals):
return cmdline return cmdline
def create_mp3( async def create_mp3(
start: datetime, start: datetime,
end: datetime, end: datetime,
outfile: str, outfile: str,
options={}, options={},
validator: Optional[Validator] = None, validator: Optional[Validator] = None,
**kwargs **kwargs,
): ):
if validator is None: if validator is None:
validator = lambda s, e, f: True
intervals = [ def validator(s, e, f):
(get_timefile(begin), start_cut, end_cut) return True
for begin, start_cut, end_cut in get_files_and_intervals(start, end)
] intervals = []
for begin, start_cut, end_cut in get_files_and_intervals(start, end):
file = await get_timefile(begin)
interval.append(file, start_cut, end_cut)
if os.path.exists(outfile): if os.path.exists(outfile):
raise OSError("file '%s' already exists" % outfile) raise OSError("file '%s' already exists" % outfile)
for path, _s, _e in intervals: for path, _s, _e in intervals:
@ -162,7 +175,7 @@ def create_mp3(
os.kill(p.pid, 15) os.kill(p.pid, 15)
try: try:
os.remove(tmp_file.name) os.remove(tmp_file.name)
except: except Exception:
pass pass
raise Exception("timeout") # TODO: make a specific TimeoutError raise Exception("timeout") # TODO: make a specific TimeoutError
if p.returncode != 0: if p.returncode != 0:
@ -174,8 +187,8 @@ def create_mp3(
return True return True
def main_cmd(options): async def main_cmd(options):
log = logging.getLogger("forge_main") log = logging.getLogger("forge_main")
outfile = os.path.abspath(os.path.join(options.cwd, options.outfile)) outfile = os.path.abspath(os.path.join(options.cwd, options.outfile))
log.debug("will forge an mp3 into %s" % (outfile)) log.debug("will forge an mp3 into %s" % (outfile))
create_mp3(options.starttime, options.endtime, outfile) await create_mp3(options.starttime, options.endtime, outfile)

60
techrec/http_retriever.py Normal file
View file

@ -0,0 +1,60 @@
# -*- encoding: utf-8 -*-
import asyncio
import os
import aiohttp # type: ignore
from .config_manager import get_config
CHUNK_SIZE = 2 ** 12
class HTTPRetriever(object):
"""
This class offers the `get` method to retrieve the file from the local staging path
or, if missing, from the given remote.
"""
_instance = None
def __new__(cls):
if self._instance is None:
self._instance = super().__new__(cls)
return self._instance
def __init__(self):
self.repo_path = get_config()["AUDIO_STAGING"]
self.repo = dict(
path=os.path.join(self.repo_path, path) for i in os.listdir(self.repo_path)
)
async def get(remote: str) -> str:
"""
This will look in the local staging path (ideally on a tmpfs or something
similar), and return the file from there if present. Otherwise, it will download
it from the remote.
"""
if remote in self.repo:
return self.repo[remote]
file = await self._download_from_remote(remote)
self.repo[] = file
return file
async def _download(remote: str) -> str:
"""
This will download to AUDIO_STAGING the remote file and return the local path
of the downloaded file
"""
_, filename = os.path.split(remote)
local = os.path.join(get_config()["AUDIO_STAGING"], filename)
async with aiohttp.ClientSession() as session:
async with session.get(remote) as resp:
with open(local) as f:
while True:
chunk = await resp.content.read(CHUNK_SIZE)
if not chunk:
break
f.write(chunk)
return local
RETRIEVER = HTTPRetriever()

View file

@ -256,7 +256,7 @@ def get_validator(expected_duration_s: float, error_threshold_s: float) -> Valid
return validator return validator
def generate_mp3(db_id: int, **kwargs): async def generate_mp3(db_id: int, **kwargs):
"""creates and mark it as ready in the db""" """creates and mark it as ready in the db"""
if get_config()["FORGE_VERIFY"]: if get_config()["FORGE_VERIFY"]:
validator = get_validator( validator = get_validator(
@ -269,7 +269,7 @@ def generate_mp3(db_id: int, **kwargs):
retries = 1 retries = 1
for i in range(retries): for i in range(retries):
result = create_mp3(validator=validator, **kwargs) result = await create_mp3(validator=validator, **kwargs)
logger.debug("Create mp3 for %d -> %s", db_id, result) logger.debug("Create mp3 for %d -> %s", db_id, result)
if result: if result:
break break