audiogen_podcast.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import datetime
  2. import logging
  3. import os
  4. import random
  5. import sys
  6. from subprocess import CalledProcessError, check_output
  7. import requests
  8. from lxml import html
  9. from pytimeparse.timeparse import timeparse
  10. from larigira.fsutils import download_http
  11. def delta_humanreadable(tdelta):
  12. if tdelta is None:
  13. return ""
  14. days = tdelta.days
  15. hours = (tdelta - datetime.timedelta(days=days)).seconds // 3600
  16. if days:
  17. return "{}d{}h".format(days, hours)
  18. return "{}h".format(hours)
  19. def get_duration(url):
  20. try:
  21. lineout = check_output(
  22. [
  23. "ffprobe",
  24. "-v",
  25. "error",
  26. "-show_entries",
  27. "format=duration",
  28. "-i",
  29. url,
  30. ]
  31. ).split(b"\n")
  32. except CalledProcessError as exc:
  33. raise ValueError("error probing `%s`" % url) from exc
  34. duration = next(l for l in lineout if l.startswith(b"duration="))
  35. value = duration.split(b"=")[1]
  36. return int(float(value))
  37. class Audio(object):
  38. def __init__(self, url, duration=None, date=None):
  39. self.url = url
  40. self._duration = duration
  41. self.date = date
  42. self.end_date = datetime.datetime(
  43. 9999, 12, 31, tzinfo=datetime.timezone.utc
  44. )
  45. def __str__(self):
  46. return self.url
  47. def __repr__(self):
  48. return "<Audio {} ({} {})>".format(
  49. self.url, self._duration, delta_humanreadable(self.age)
  50. )
  51. @property
  52. def duration(self):
  53. """lazy-calculation"""
  54. if self._duration is None:
  55. try:
  56. self._duration = get_duration(self.url.encode("utf-8"))
  57. except:
  58. logging.exception(
  59. "Errore nel calcolo della lunghezza di %s; imposto a 0"
  60. )
  61. self._duration = 0
  62. return self._duration
  63. @property
  64. def urls(self):
  65. return [self.url]
  66. @property
  67. def age(self):
  68. if self.date is None:
  69. return None
  70. now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
  71. return now - self.date
  72. @property
  73. def valid(self):
  74. return self.end_date >= datetime.datetime.utcnow().replace(
  75. tzinfo=datetime.timezone.utc
  76. )
  77. def get_tree(feed_url):
  78. if feed_url.startswith("http:") or feed_url.startswith("https:"):
  79. tree = html.fromstring(requests.get(feed_url).content)
  80. else:
  81. if not os.path.exists(feed_url):
  82. raise ValueError("file not found: {}".format(feed_url))
  83. tree = html.parse(open(feed_url))
  84. return tree
  85. def get_item_date(el):
  86. el_date = el.find("pubdate")
  87. if el_date is None:
  88. return None
  89. for time_format in ("%Y-%m-%dT%H:%M:%S%z", "%a, %d %b %Y %H:%M:%S %z"):
  90. try:
  91. return datetime.datetime.strptime(el_date.text, time_format)
  92. except:
  93. continue
  94. def get_audio_from_item(item):
  95. encl = item.find("enclosure")
  96. url = encl.get("url")
  97. audio_args = {}
  98. if item.find("duration") is not None:
  99. duration_parts = item.findtext("duration").split(":")
  100. total_seconds = 0
  101. for i, num in enumerate(reversed(duration_parts)):
  102. total_seconds += int(num) * (60 ** i)
  103. if total_seconds:
  104. audio_args["duration"] = total_seconds
  105. return Audio(url, **audio_args)
  106. def get_urls(tree):
  107. items = tree.xpath("//item")
  108. for it in items:
  109. # title = it.find("title").text
  110. audio = get_audio_from_item(it)
  111. if audio.date is None:
  112. audio.date = get_item_date(it)
  113. yield audio
  114. def parse_duration(arg):
  115. if arg.isdecimal():
  116. secs = int(arg)
  117. else:
  118. secs = timeparse(arg)
  119. if secs is None:
  120. raise ValueError("%r is not a valid duration" % arg)
  121. return secs
  122. def generate(spec):
  123. if "url" not in spec:
  124. raise ValueError("Malformed audiospec: missing 'url'")
  125. audios = list(get_urls(get_tree(spec["url"])))
  126. if spec.get("min_len", False):
  127. audios = [
  128. a for a in audios if a.duration >= parse_duration(spec["min_len"])
  129. ]
  130. if spec.get("max_len", False):
  131. audios = [
  132. a for a in audios if a.duration <= parse_duration(spec["max_len"])
  133. ]
  134. # sort
  135. sort_by = spec.get("sort_by", "none")
  136. if sort_by == "random":
  137. random.shuffle(audios)
  138. elif sort_by == "date":
  139. audios.sort(key=lambda x: x.age)
  140. elif sort_by == "duration":
  141. audios.sort(key=lambda x: x.duration)
  142. if spec.get("reverse", False):
  143. audios.reverse()
  144. # slice
  145. audios = audios[int(spec.get("start", 0)) :]
  146. audios = audios[: int(spec.get("howmany", 1))]
  147. # copy local
  148. local_audios = [
  149. download_http(a.url, copy=spec.get("copy", True), prefix="podcast")
  150. for a in audios
  151. ]
  152. return local_audios
  153. # TODO: testing
  154. # TODO: lxml should maybe be optional?
  155. # TODO: ui
  156. if __name__ == "__main__":
  157. # less than proper testing
  158. logging.basicConfig(level=logging.DEBUG)
  159. for u in get_urls(get_tree(sys.argv[1])):
  160. print(" -", repr(u))