mpc.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. from __future__ import print_function
  2. import logging
  3. import signal
  4. import mpd
  5. from pkg_resources import iter_entry_points
  6. import gevent
  7. from gevent.queue import Queue
  8. from .audiogen import audiogenerate
  9. from .entrypoints_utils import get_avail_entrypoints
  10. from .event import Monitor
  11. from .eventutils import ParentedLet, Timer
  12. from .unused import UnusedCleaner
  13. def get_mpd_client(conf):
  14. client = mpd.MPDClient(use_unicode=True)
  15. client.connect(conf["MPD_HOST"], conf["MPD_PORT"])
  16. return client
  17. class MPDWatcher(ParentedLet):
  18. """
  19. MPDWatcher notifies parent about any mpd event
  20. """
  21. def __init__(self, queue, conf, client=None):
  22. ParentedLet.__init__(self, queue)
  23. self.log = logging.getLogger(self.__class__.__name__)
  24. self.conf = conf
  25. self.client = client # assume it is already connected, or None
  26. def refresh_client(self):
  27. self.client = get_mpd_client(self.conf)
  28. def do_business(self):
  29. first_after_connection = True
  30. while True:
  31. try:
  32. if self.client is None:
  33. self.refresh_client()
  34. if first_after_connection:
  35. yield ("mpc", "connect")
  36. status = self.client.idle()[0]
  37. except (
  38. mpd.ConnectionError,
  39. ConnectionRefusedError,
  40. FileNotFoundError,
  41. ) as exc:
  42. self.log.warning(
  43. "Connection to MPD failed (%s: %s)",
  44. exc.__class__.__name__,
  45. exc,
  46. )
  47. self.client = None
  48. first_after_connection = True
  49. gevent.sleep(5)
  50. continue
  51. else:
  52. first_after_connection = False
  53. yield ("mpc", status)
  54. class Player:
  55. """
  56. The player contains different mpd-related methods
  57. check_playlist determines whether the playlist is long enough and run audiogenerator accordingly
  58. enqueue receive audios that have been generated by Monitor and (if filters allow it) enqueue it to MPD playlist
  59. """
  60. def __init__(self, conf):
  61. self.conf = conf
  62. self.log = logging.getLogger(self.__class__.__name__)
  63. self.min_playlist_length = 10
  64. self.tmpcleaner = UnusedCleaner(conf)
  65. self._continous_audiospec = self.conf["CONTINOUS_AUDIOSPEC"]
  66. self.events_enabled = True
  67. def _get_mpd(self):
  68. mpd_client = mpd.MPDClient(use_unicode=True)
  69. try:
  70. mpd_client.connect(self.conf["MPD_HOST"], self.conf["MPD_PORT"])
  71. except (
  72. mpd.ConnectionError,
  73. ConnectionRefusedError,
  74. FileNotFoundError,
  75. ) as exc:
  76. self.log.warning(
  77. "Connection to MPD failed (%s: %s)",
  78. exc.__class__.__name__,
  79. exc,
  80. )
  81. raise gevent.GreenletExit()
  82. return mpd_client
  83. @property
  84. def continous_audiospec(self):
  85. return self._continous_audiospec
  86. @continous_audiospec.setter
  87. def continous_audiospec(self, spec):
  88. self._continous_audiospec = (
  89. self.conf["CONTINOUS_AUDIOSPEC"] if spec is None else spec
  90. )
  91. def clear_everything_but_current_song():
  92. mpdc = self._get_mpd()
  93. current = mpdc.currentsong()
  94. pos = int(current.get("pos", 0))
  95. for song in mpdc.playlistid():
  96. if int(song["pos"]) != pos:
  97. mpdc.deleteid(song["id"])
  98. gevent.Greenlet.spawn(clear_everything_but_current_song)
  99. def check_playlist(self):
  100. mpd_client = self._get_mpd()
  101. songs = mpd_client.playlist()
  102. current = mpd_client.currentsong()
  103. pos = int(current.get("pos", 0)) + 1
  104. if (len(songs) - pos) >= self.min_playlist_length:
  105. return
  106. self.log.info("need to add new songs")
  107. picker = gevent.Greenlet(audiogenerate, self.continous_audiospec)
  108. def add(greenlet):
  109. uris = greenlet.value
  110. for uri in uris:
  111. assert type(uri) is str, type(uri)
  112. mpd_client.add(uri.strip())
  113. self.tmpcleaner.watch(uri.strip())
  114. picker.link_value(add)
  115. picker.start()
  116. def enqueue_filter(self, songs):
  117. eventfilters = self.conf["EVENT_FILTERS"]
  118. if not eventfilters:
  119. return True, ""
  120. availfilters = get_avail_entrypoints("larigira.eventfilter")
  121. if len([ef for ef in eventfilters if ef in availfilters]) == 0:
  122. return True, ""
  123. mpdc = self._get_mpd()
  124. status = mpdc.status()
  125. ctx = {"playlist": mpdc.playlist(), "status": status, "durations": []}
  126. for entrypoint in iter_entry_points("larigira.eventfilter"):
  127. if entrypoint.name in eventfilters:
  128. ef = entrypoint.load()
  129. try:
  130. ret = ef(songs=songs, context=ctx, conf=self.conf)
  131. except ImportError as exc:
  132. self.log.warn(
  133. "Filter %s skipped: %s" % (entrypoint.name, exc)
  134. )
  135. continue
  136. if ret is None: # bad behavior!
  137. continue
  138. if type(ret) is bool:
  139. reason = ""
  140. else:
  141. ret, reason = ret
  142. reason = "Filtered by %s (%s)" % (entrypoint.name, reason)
  143. if ret is False:
  144. return ret, reason
  145. else:
  146. if reason:
  147. self.log.debug(
  148. "filter %s says ok: %s", entrypoint.name, reason
  149. )
  150. return True, "Passed through %s" % ",".join(availfilters)
  151. def enqueue(self, songs):
  152. assert type(songs) is dict
  153. assert "uris" in songs
  154. spec = [aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"]]
  155. nicks = ",".join(
  156. (aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"])
  157. )
  158. if not self.events_enabled:
  159. self.log.debug("Ignoring <%s> (events disabled)", nicks)
  160. return
  161. filterok, reason = self.enqueue_filter(songs)
  162. if not filterok:
  163. self.log.debug("Ignoring <%s>, filtered: %s", nicks, reason)
  164. # delete those files
  165. for uri in reversed(songs["uris"]):
  166. self.tmpcleaner.watch(uri.strip())
  167. return
  168. mpd_client = self._get_mpd()
  169. for uri in reversed(songs["uris"]):
  170. assert type(uri) is str
  171. self.log.info(
  172. "Adding %s to playlist (from <%s>:%s=%s)",
  173. uri,
  174. songs["timespec"].get("nick", ""),
  175. songs["aids"],
  176. spec,
  177. )
  178. insert_pos = (
  179. 0
  180. if len(mpd_client.playlistid()) == 0
  181. else int(mpd_client.currentsong().get("pos", 0)) + 1
  182. )
  183. try:
  184. mpd_client.addid(uri, insert_pos)
  185. except mpd.CommandError:
  186. self.log.exception("Cannot insert song %s", uri)
  187. self.tmpcleaner.watch(uri.strip())
  188. def play(self):
  189. """make sure that MPD is playing"""
  190. mpd_client = self._get_mpd()
  191. mpd_client.play()
  192. class Controller(gevent.Greenlet):
  193. def __init__(self, conf):
  194. gevent.Greenlet.__init__(self)
  195. self.log = logging.getLogger(self.__class__.__name__)
  196. self.conf = conf
  197. self.q = Queue()
  198. self.player = Player(self.conf)
  199. if "DB_URI" in self.conf:
  200. self.monitor = Monitor(self.q, self.conf)
  201. self.monitor.parent_greenlet = self
  202. else:
  203. self.monitor = None
  204. def _run(self):
  205. if self.monitor is not None:
  206. self.monitor.start()
  207. mw = MPDWatcher(self.q, self.conf, client=None)
  208. mw.parent_greenlet = self
  209. mw.start()
  210. t = Timer(int(self.conf["CHECK_SECS"]) * 1000, self.q)
  211. t.parent_greenlet = self
  212. t.start()
  213. # at the very start, run a check!
  214. gevent.Greenlet.spawn(self.player.check_playlist)
  215. while True:
  216. value = self.q.get()
  217. # emitter = value['emitter']
  218. kind = value["kind"]
  219. args = value["args"]
  220. if kind == "timer" or (
  221. kind == "mpc" and args[0] in ("player", "playlist", "connect")
  222. ):
  223. gevent.Greenlet.spawn(self.player.check_playlist)
  224. if self.conf["MPD_ENFORCE_ALWAYS_PLAYING"]:
  225. gevent.Greenlet.spawn(self.player.play)
  226. try:
  227. self.player.tmpcleaner.check_playlist()
  228. except:
  229. pass
  230. elif kind == "mpc":
  231. pass
  232. elif kind == "uris_enqueue":
  233. # TODO: uris_enqueue messages should be delivered directly to Player.enqueue
  234. # probably we need a MPDEnqueuer that receives every uri we want to add
  235. try:
  236. self.player.enqueue(args[0])
  237. except AssertionError:
  238. raise
  239. except Exception:
  240. self.log.exception(
  241. "Error while adding to queue; " "bad audiogen output?"
  242. )
  243. elif (
  244. kind == "signal" and args[0] == signal.SIGALRM
  245. ) or kind == "refresh":
  246. # it's a tick!
  247. self.log.debug("Reload")
  248. self.monitor.q.put(dict(kind="forcetick"))
  249. gevent.Greenlet.spawn(self.player.check_playlist)
  250. else:
  251. self.log.warning("Unknown message: %s", str(value))