audiogen_podcast.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import datetime
  2. import logging
  3. import os
  4. import random
  5. import sys
  6. from subprocess import CalledProcessError, check_output
  7. import requests
  8. from lxml import html
  9. from pytimeparse.timeparse import timeparse
  10. from larigira.fsutils import download_http
  11. def delta_humanreadable(tdelta):
  12. if tdelta is None:
  13. return ""
  14. days = tdelta.days
  15. hours = (tdelta - datetime.timedelta(days=days)).seconds // 3600
  16. if days:
  17. return "{}d{}h".format(days, hours)
  18. return "{}h".format(hours)
  19. def get_duration(url):
  20. try:
  21. lineout = check_output(
  22. [
  23. "ffprobe",
  24. "-v",
  25. "error",
  26. "-show_entries",
  27. "format=duration",
  28. "-i",
  29. url,
  30. ]
  31. ).split(b"\n")
  32. except CalledProcessError as exc:
  33. raise ValueError("error probing `%s`" % url) from exc
  34. duration = next(l for l in lineout if l.startswith(b"duration="))
  35. value = duration.split(b"=")[1]
  36. return int(float(value))
  37. class Audio(object):
  38. def __init__(self, url, duration=None, date=None):
  39. self.url = url
  40. self._duration = duration
  41. self.date = date
  42. self.end_date = datetime.datetime(
  43. 9999, 12, 31, tzinfo=datetime.timezone.utc
  44. )
  45. def __str__(self):
  46. return self.url
  47. def __repr__(self):
  48. return "<Audio {} ({} {})>".format(
  49. self.url, self._duration, delta_humanreadable(self.age)
  50. )
  51. @property
  52. def duration(self):
  53. """lazy-calculation"""
  54. if self._duration is None:
  55. try:
  56. self._duration = get_duration(self.url.encode("utf-8"))
  57. except:
  58. logging.exception(
  59. "Error while computing duration of %s; set it to 0",
  60. self.url,
  61. )
  62. self._duration = 0
  63. return self._duration
  64. @property
  65. def urls(self):
  66. return [self.url]
  67. @property
  68. def age(self):
  69. if self.date is None:
  70. return None
  71. now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
  72. return now - self.date
  73. @property
  74. def valid(self):
  75. return self.end_date >= datetime.datetime.utcnow().replace(
  76. tzinfo=datetime.timezone.utc
  77. )
  78. def get_tree(feed_url):
  79. if feed_url.startswith("http:") or feed_url.startswith("https:"):
  80. tree = html.fromstring(requests.get(feed_url).content)
  81. else:
  82. if not os.path.exists(feed_url):
  83. raise ValueError("file not found: {}".format(feed_url))
  84. tree = html.parse(open(feed_url))
  85. return tree
  86. def get_item_date(el):
  87. el_date = el.find("pubdate")
  88. if el_date is None:
  89. return None
  90. for time_format in ("%Y-%m-%dT%H:%M:%S%z", "%a, %d %b %Y %H:%M:%S %z"):
  91. try:
  92. return datetime.datetime.strptime(el_date.text, time_format)
  93. except:
  94. continue
  95. def get_audio_from_item(item):
  96. encl = item.find("enclosure")
  97. url = encl.get("url")
  98. audio_args = {}
  99. if item.find("duration") is not None:
  100. duration_parts = item.findtext("duration").split(":")
  101. total_seconds = 0
  102. for i, num in enumerate(reversed(duration_parts)):
  103. total_seconds += int(num) * (60 ** i)
  104. if total_seconds:
  105. audio_args["duration"] = total_seconds
  106. return Audio(url, **audio_args)
  107. def get_urls(tree):
  108. items = tree.xpath("//item")
  109. for it in items:
  110. # title = it.find("title").text
  111. audio = get_audio_from_item(it)
  112. if audio.date is None:
  113. audio.date = get_item_date(it)
  114. yield audio
  115. def parse_duration(arg):
  116. if arg.isdecimal():
  117. secs = int(arg)
  118. else:
  119. secs = timeparse(arg)
  120. if secs is None:
  121. raise ValueError("%r is not a valid duration" % arg)
  122. return secs
  123. def generate(spec):
  124. if "url" not in spec:
  125. raise ValueError("Malformed audiospec: missing 'url'")
  126. audios = list(get_urls(get_tree(spec["url"])))
  127. if spec.get("min_len", False):
  128. audios = [
  129. a for a in audios if a.duration >= parse_duration(spec["min_len"])
  130. ]
  131. if spec.get("max_len", False):
  132. audios = [
  133. a for a in audios if a.duration <= parse_duration(spec["max_len"])
  134. ]
  135. # sort
  136. sort_by = spec.get("sort_by", "none")
  137. if sort_by == "random":
  138. random.shuffle(audios)
  139. elif sort_by == "date":
  140. audios.sort(key=lambda x: x.age)
  141. elif sort_by == "duration":
  142. audios.sort(key=lambda x: x.duration)
  143. if spec.get("reverse", False):
  144. audios.reverse()
  145. # slice
  146. audios = audios[int(spec.get("start", 0)) :]
  147. audios = audios[: int(spec.get("howmany", 1))]
  148. # copy local
  149. local_audios = [
  150. download_http(a.url, copy=spec.get("copy", True), prefix="podcast")
  151. for a in audios
  152. ]
  153. return local_audios
  154. # TODO: testing
  155. # TODO: lxml should maybe be optional?
  156. # TODO: ui
  157. if __name__ == "__main__":
  158. # less than proper testing
  159. logging.basicConfig(level=logging.DEBUG)
  160. for u in get_urls(get_tree(sys.argv[1])):
  161. print(" -", repr(u))