Do not singletonize the retriever
This commit is contained in:
parent
d967440a6d
commit
acc966b488
2 changed files with 27 additions and 50 deletions
|
@ -8,7 +8,7 @@ from time import sleep
|
||||||
from typing import Callable, Optional
|
from typing import Callable, Optional
|
||||||
|
|
||||||
from .config_manager import get_config
|
from .config_manager import get_config
|
||||||
from .http_retriever import RETRIEVER
|
from .http_retriever import download
|
||||||
|
|
||||||
logger = logging.getLogger("forge")
|
logger = logging.getLogger("forge")
|
||||||
Validator = Callable[[datetime, datetime, str], bool]
|
Validator = Callable[[datetime, datetime, str], bool]
|
||||||
|
@ -23,8 +23,10 @@ async def get_timefile_exact(time) -> str:
|
||||||
remote_path = os.path.join(
|
remote_path = os.path.join(
|
||||||
remote_repo, time.strftime(get_config()["AUDIO_INPUT_FORMAT"])
|
remote_repo, time.strftime(get_config()["AUDIO_INPUT_FORMAT"])
|
||||||
)
|
)
|
||||||
if remote.startswith("http"):
|
if remote_path.startswith("http://") or remote_path.startswith("https://"):
|
||||||
local = await RETRIEVER.get(remote_path)
|
logger.info(f"downloading {remote_path}")
|
||||||
|
print(f"DOWNLOADING -> {remote_path}")
|
||||||
|
local = await download(remote_path)
|
||||||
return local
|
return local
|
||||||
return local_path
|
return local_path
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
# -*- encoding: utf-8 -*-
|
# -*- encoding: utf-8 -*-
|
||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
|
from typing import Optional
|
||||||
|
from tempfile import mkdtemp
|
||||||
|
|
||||||
import aiohttp # type: ignore
|
import aiohttp # type: ignore
|
||||||
|
|
||||||
|
@ -9,52 +11,25 @@ from .config_manager import get_config
|
||||||
CHUNK_SIZE = 2 ** 12
|
CHUNK_SIZE = 2 ** 12
|
||||||
|
|
||||||
|
|
||||||
class HTTPRetriever(object):
|
async def download(remote: str, staging: Optional[str] = None) -> str:
|
||||||
"""
|
"""
|
||||||
This class offers the `get` method to retrieve the file from the local staging path
|
This will download to AUDIO_STAGING the remote file and return the local
|
||||||
or, if missing, from the given remote.
|
path of the downloaded file
|
||||||
"""
|
"""
|
||||||
_instance = None
|
_, filename = os.path.split(remote)
|
||||||
|
if staging:
|
||||||
def __new__(cls):
|
base = staging
|
||||||
if self._instance is None:
|
else:
|
||||||
self._instance = super().__new__(cls)
|
# if no staging is specified, and you want to clean the storage
|
||||||
return self._instance
|
# used by techrec: rm -rf /tmp/techrec*
|
||||||
|
base = mkdtemp(prefix="techrec-", dir="/tmp")
|
||||||
def __init__(self):
|
local = os.path.join(base, filename)
|
||||||
self.repo_path = get_config()["AUDIO_STAGING"]
|
async with aiohttp.ClientSession() as session:
|
||||||
self.repo = dict(
|
async with session.get(remote) as resp:
|
||||||
path=os.path.join(self.repo_path, path) for i in os.listdir(self.repo_path)
|
with open(local, "wb") as f:
|
||||||
)
|
while True:
|
||||||
|
chunk = await resp.content.read(CHUNK_SIZE)
|
||||||
async def get(remote: str) -> str:
|
if not chunk:
|
||||||
"""
|
break
|
||||||
This will look in the local staging path (ideally on a tmpfs or something
|
f.write(chunk)
|
||||||
similar), and return the file from there if present. Otherwise, it will download
|
return local
|
||||||
it from the remote.
|
|
||||||
"""
|
|
||||||
if remote in self.repo:
|
|
||||||
return self.repo[remote]
|
|
||||||
file = await self._download_from_remote(remote)
|
|
||||||
self.repo[] = file
|
|
||||||
return file
|
|
||||||
|
|
||||||
async def _download(remote: str) -> str:
|
|
||||||
"""
|
|
||||||
This will download to AUDIO_STAGING the remote file and return the local path
|
|
||||||
of the downloaded file
|
|
||||||
"""
|
|
||||||
_, filename = os.path.split(remote)
|
|
||||||
local = os.path.join(get_config()["AUDIO_STAGING"], filename)
|
|
||||||
async with aiohttp.ClientSession() as session:
|
|
||||||
async with session.get(remote) as resp:
|
|
||||||
with open(local) as f:
|
|
||||||
while True:
|
|
||||||
chunk = await resp.content.read(CHUNK_SIZE)
|
|
||||||
if not chunk:
|
|
||||||
break
|
|
||||||
f.write(chunk)
|
|
||||||
return local
|
|
||||||
|
|
||||||
|
|
||||||
RETRIEVER = HTTPRetriever()
|
|
||||||
|
|
Loading…
Reference in a new issue