forge.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import asyncio
  2. from aiofiles.os import os as async_os
  3. import logging
  4. import tempfile
  5. import os
  6. from datetime import datetime, timedelta
  7. from subprocess import Popen
  8. from time import sleep
  9. from typing import Callable, Optional
  10. from techrec.config_manager import get_config
  11. from techrec.http_retriever import download
  12. logger = logging.getLogger("forge")
  13. Validator = Callable[[datetime, datetime, str], bool]
  14. def round_timefile(exact: datetime) -> datetime:
  15. """
  16. This will round the datetime, so to match the file organization structure
  17. """
  18. return datetime(exact.year, exact.month, exact.day, exact.hour)
  19. def get_files_and_intervals(start, end, rounder=round_timefile):
  20. """
  21. both arguments are datetime objects
  22. returns an iterator whose elements are (filename, start_cut, end_cut)
  23. Cuts are expressed in seconds
  24. """
  25. if end <= start:
  26. raise ValueError("end < start!")
  27. while start <= end:
  28. begin = rounder(start)
  29. start_cut = (start - begin).total_seconds()
  30. if end < begin + timedelta(seconds=3599):
  31. end_cut = (begin + timedelta(seconds=3599) - end).total_seconds()
  32. else:
  33. end_cut = 0
  34. yield (begin, start_cut, end_cut)
  35. start = begin + timedelta(hours=1)
  36. class InputBackend:
  37. def __init__(self, basepath):
  38. self.base = basepath
  39. self.log = logging.getLogger(self.__class__.__name__)
  40. async def search_files(self, start, end):
  41. # assumption: a day is not split in multiple folder
  42. start_dir = self.parent_dir(self.time_to_uri(start))
  43. end_dir = self.parent_dir(self.time_to_uri(end))
  44. files = {
  45. fpath
  46. for directory in {start_dir, end_dir}
  47. for fpath in await self.list_dir(directory)
  48. }
  49. files_date = [] # tuple of str, datetime
  50. for fpath in files:
  51. try:
  52. dt = self.uri_to_time(fpath)
  53. except Exception as exc:
  54. self.log.debug("Skipping %s", fpath)
  55. print(exc)
  56. continue
  57. if dt > end:
  58. continue
  59. files_date.append((fpath, dt))
  60. # The first file in the list will now be the last chunk to be added.
  61. files_date.sort(key=lambda fpath_dt: fpath_dt[1], reverse=True)
  62. final_files = []
  63. need_to_exit = False
  64. for fpath, dt in files_date:
  65. if need_to_exit:
  66. break
  67. if dt < start:
  68. need_to_exit = True
  69. final_files.insert(0, fpath)
  70. self.log.info("Relevant files: %s", ", ".join(final_files))
  71. return final_files
  72. async def list_dir(self, path):
  73. raise NotImplementedError()
  74. def parent_dir(self, path):
  75. return os.path.dirname(path)
  76. def time_to_uri(self, time: datetime) -> str:
  77. return os.path.join(
  78. str(self.base),
  79. time.strftime(get_config()["AUDIO_INPUT_FORMAT"])
  80. )
  81. def uri_to_time(self, fpath: str) -> datetime:
  82. return datetime.strptime(
  83. os.path.basename(fpath),
  84. get_config()["AUDIO_INPUT_FORMAT"].split('/')[-1])
  85. async def get_file(uri: str) -> str:
  86. return uri
  87. class DirBackend(InputBackend):
  88. def uri_to_relative(self, fpath: str) -> str:
  89. return os.path.relpath(fpath, str(self.base))
  90. async def list_dir(self, path):
  91. files = [os.path.join(path, f) for f in async_os.listdir(path)]
  92. return files
  93. class HttpBackend(InputBackend):
  94. async def get_file(uri: str) -> str:
  95. self.log.info(f"downloading: {uri}")
  96. local = await download(
  97. uri,
  98. basic_auth=get_config()['AUDIO_INPUT_BASICAUTH'],
  99. )
  100. return local
  101. def get_ffmpeg_cmdline(fpaths: list, backend, start: datetime, end: datetime) -> list:
  102. ffmpeg = get_config()["FFMPEG_PATH"]
  103. cmdline = [ffmpeg, "-i", "concat:%s" % "|".join(fpaths)]
  104. cmdline += get_config()["FFMPEG_OUT_CODEC"]
  105. startskip = (start - backend.uri_to_time(fpaths[0])).total_seconds()
  106. if startskip > 0:
  107. cmdline += ["-ss", "%d" % startskip]
  108. cmdline += ["-t", "%d" % (end - start).total_seconds()]
  109. return cmdline
  110. async def create_mp3(
  111. start: datetime,
  112. end: datetime,
  113. outfile: str,
  114. options={},
  115. validator: Optional[Validator] = None,
  116. **kwargs,
  117. ):
  118. be = DirBackend(get_config()['AUDIO_INPUT'])
  119. fpaths = await be.search_files(start, end)
  120. # metadata date/time formatted according to
  121. # https://wiki.xiph.org/VorbisComment#Date_and_time
  122. metadata = {}
  123. if outfile.endswith(".mp3"):
  124. metadata["TRDC"] = start.replace(microsecond=0).isoformat()
  125. metadata["RECORDINGTIME"] = metadata["TRDC"]
  126. metadata["ENCODINGTIME"] = datetime.now().replace(
  127. microsecond=0).isoformat()
  128. else:
  129. metadata["DATE"] = start.replace(microsecond=0).isoformat()
  130. metadata["ENCODER"] = "https://git.lattuga.net/techbloc/techrec"
  131. if "title" in options:
  132. metadata["TITLE"] = options["title"]
  133. if options.get("license_uri", None) is not None:
  134. metadata["RIGHTS-DATE"] = start.strftime("%Y-%m")
  135. metadata["RIGHTS-URI"] = options["license_uri"]
  136. if "extra_tags" in options:
  137. metadata.update(options["extra_tags"])
  138. metadata_list = []
  139. for tag, value in metadata.items():
  140. if "=" in tag:
  141. logger.error('Received a tag with "=" inside, skipping')
  142. continue
  143. metadata_list.append("-metadata")
  144. metadata_list.append("%s=%s" % (tag, value))
  145. prefix, suffix = os.path.basename(outfile).split(".", 1)
  146. tmp_file = tempfile.NamedTemporaryFile(
  147. suffix=".%s" % suffix,
  148. prefix="forge-%s" % prefix,
  149. delete=False,
  150. # This is needed to avoid errors with the rename across different mounts
  151. dir=os.path.dirname(outfile),
  152. )
  153. cmd = (
  154. get_ffmpeg_cmdline(fpaths, be, start, end)
  155. + metadata_list
  156. + ["-y"]
  157. + get_config()["FFMPEG_OPTIONS"]
  158. + [tmp_file.name]
  159. )
  160. logger.info("Running %s", " ".join(cmd))
  161. p = Popen(cmd)
  162. if get_config()["FORGE_TIMEOUT"] == 0:
  163. p.wait()
  164. else:
  165. start = datetime.now()
  166. while (datetime.now() - start).total_seconds() < get_config()["FORGE_TIMEOUT"]:
  167. p.poll()
  168. if p.returncode is None:
  169. sleep(1)
  170. else:
  171. break
  172. if p.returncode is None:
  173. os.kill(p.pid, 15)
  174. try:
  175. os.remove(tmp_file.name)
  176. except Exception:
  177. pass
  178. raise Exception("timeout") # TODO: make a specific TimeoutError
  179. if p.returncode != 0:
  180. raise OSError("return code was %d" % p.returncode)
  181. if validator is not None and not validator(start, end, tmp_file.name):
  182. os.unlink(tmp_file.name)
  183. return False
  184. os.rename(tmp_file.name, outfile)
  185. return True
  186. def main_cmd(options):
  187. log = logging.getLogger("forge_main")
  188. outfile = os.path.abspath(os.path.join(options.cwd, options.outfile))
  189. log.debug("will forge an mp3 into %s" % (outfile))
  190. asyncio.run(create_mp3(options.starttime, options.endtime, outfile))