mpc.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. from __future__ import print_function
  2. import logging
  3. import signal
  4. from pkg_resources import iter_entry_points
  5. import gevent
  6. from gevent.queue import Queue
  7. import mpd
  8. from .event import Monitor
  9. from .eventutils import ParentedLet, Timer
  10. from .audiogen import audiogenerate
  11. from .unused import UnusedCleaner
  12. from .entrypoints_utils import get_avail_entrypoints
  13. def get_mpd_client(conf):
  14. client = mpd.MPDClient(use_unicode=True)
  15. client.connect(conf["MPD_HOST"], conf["MPD_PORT"])
  16. return client
  17. class MPDWatcher(ParentedLet):
  18. """
  19. MPDWatcher notifies parent about any mpd event
  20. """
  21. def __init__(self, queue, conf, client=None):
  22. ParentedLet.__init__(self, queue)
  23. self.log = logging.getLogger(self.__class__.__name__)
  24. self.conf = conf
  25. self.client = client # assume it is already connected, or None
  26. def refresh_client(self):
  27. self.client = get_mpd_client(self.conf)
  28. def do_business(self):
  29. first_after_connection = True
  30. while True:
  31. try:
  32. if self.client is None:
  33. self.refresh_client()
  34. if first_after_connection:
  35. yield ("mpc", "connect")
  36. status = self.client.idle()[0]
  37. except (
  38. mpd.ConnectionError,
  39. ConnectionRefusedError,
  40. FileNotFoundError,
  41. ) as exc:
  42. self.log.warning(
  43. "Connection to MPD failed (%s: %s)", exc.__class__.__name__, exc
  44. )
  45. self.client = None
  46. first_after_connection = True
  47. gevent.sleep(5)
  48. continue
  49. else:
  50. first_after_connection = False
  51. yield ("mpc", status)
  52. class Player:
  53. """
  54. The player contains different mpd-related methods
  55. check_playlist determines whether the playlist is long enough and run audiogenerator accordingly
  56. enqueue receive audios that have been generated by Monitor and (if filters allow it) enqueue it to MPD playlist
  57. """
  58. def __init__(self, conf):
  59. self.conf = conf
  60. self.log = logging.getLogger(self.__class__.__name__)
  61. self.min_playlist_length = 10
  62. self.tmpcleaner = UnusedCleaner(conf)
  63. self._continous_audiospec = self.conf["CONTINOUS_AUDIOSPEC"]
  64. self.events_enabled = True
  65. def _get_mpd(self):
  66. mpd_client = mpd.MPDClient(use_unicode=True)
  67. try:
  68. mpd_client.connect(self.conf["MPD_HOST"], self.conf["MPD_PORT"])
  69. except (mpd.ConnectionError, ConnectionRefusedError, FileNotFoundError) as exc:
  70. self.log.warning(
  71. "Connection to MPD failed (%s: %s)", exc.__class__.__name__, exc
  72. )
  73. raise gevent.GreenletExit()
  74. return mpd_client
  75. @property
  76. def continous_audiospec(self):
  77. return self._continous_audiospec
  78. @continous_audiospec.setter
  79. def continous_audiospec(self, spec):
  80. self._continous_audiospec = (
  81. self.conf["CONTINOUS_AUDIOSPEC"] if spec is None else spec
  82. )
  83. def clear_everything_but_current_song():
  84. mpdc = self._get_mpd()
  85. current = mpdc.currentsong()
  86. pos = int(current.get("pos", 0))
  87. for song in mpdc.playlistid():
  88. if int(song["pos"]) != pos:
  89. mpdc.deleteid(song["id"])
  90. gevent.Greenlet.spawn(clear_everything_but_current_song)
  91. def check_playlist(self):
  92. mpd_client = self._get_mpd()
  93. songs = mpd_client.playlist()
  94. current = mpd_client.currentsong()
  95. pos = int(current.get("pos", 0)) + 1
  96. if (len(songs) - pos) >= self.min_playlist_length:
  97. return
  98. self.log.info("need to add new songs")
  99. picker = gevent.Greenlet(audiogenerate, self.continous_audiospec)
  100. def add(greenlet):
  101. uris = greenlet.value
  102. for uri in uris:
  103. assert type(uri) is str, type(uri)
  104. mpd_client.add(uri.strip())
  105. self.tmpcleaner.watch(uri.strip())
  106. picker.link_value(add)
  107. picker.start()
  108. def enqueue_filter(self, songs):
  109. eventfilters = self.conf["EVENT_FILTERS"]
  110. if not eventfilters:
  111. return True, ""
  112. availfilters = get_avail_entrypoints("larigira.eventfilter")
  113. if len([ef for ef in eventfilters if ef in availfilters]) == 0:
  114. return True, ""
  115. mpdc = self._get_mpd()
  116. status = mpdc.status()
  117. ctx = {"playlist": mpdc.playlist(), "status": status, "durations": []}
  118. for entrypoint in iter_entry_points("larigira.eventfilter"):
  119. if entrypoint.name in eventfilters:
  120. ef = entrypoint.load()
  121. try:
  122. ret = ef(songs=songs, context=ctx, conf=self.conf)
  123. except ImportError as exc:
  124. self.log.warn("Filter %s skipped: %s" % (entrypoint.name, exc))
  125. continue
  126. if ret is None: # bad behavior!
  127. continue
  128. if type(ret) is bool:
  129. reason = ""
  130. else:
  131. ret, reason = ret
  132. reason = "Filtered by %s (%s)" % (entrypoint.name, reason)
  133. if ret is False:
  134. return ret, reason
  135. else:
  136. if reason:
  137. self.log.debug('filter %s says ok: %s', entrypoint.name, reason)
  138. return True, "Passed through %s" % ",".join(availfilters)
  139. def enqueue(self, songs):
  140. assert type(songs) is dict
  141. assert "uris" in songs
  142. spec = [aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"]]
  143. nicks = ",".join(
  144. (aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"])
  145. )
  146. if not self.events_enabled:
  147. self.log.debug("Ignoring <%s> (events disabled)", nicks)
  148. return
  149. filterok, reason = self.enqueue_filter(songs)
  150. if not filterok:
  151. self.log.debug("Ignoring <%s>, filtered: %s", nicks, reason)
  152. # delete those files
  153. for uri in reversed(songs["uris"]):
  154. self.tmpcleaner.watch(uri.strip())
  155. return
  156. mpd_client = self._get_mpd()
  157. for uri in reversed(songs["uris"]):
  158. assert type(uri) is str
  159. self.log.info(
  160. "Adding %s to playlist (from <%s>:%s=%s)",
  161. uri,
  162. songs["timespec"].get("nick", ""),
  163. songs["aids"],
  164. spec,
  165. )
  166. insert_pos = (
  167. 0
  168. if len(mpd_client.playlistid()) == 0
  169. else int(mpd_client.currentsong().get("pos", 0)) + 1
  170. )
  171. try:
  172. mpd_client.addid(uri, insert_pos)
  173. except mpd.CommandError:
  174. self.log.exception("Cannot insert song %s", uri)
  175. self.tmpcleaner.watch(uri.strip())
  176. class Controller(gevent.Greenlet):
  177. def __init__(self, conf):
  178. gevent.Greenlet.__init__(self)
  179. self.log = logging.getLogger(self.__class__.__name__)
  180. self.conf = conf
  181. self.q = Queue()
  182. self.player = Player(self.conf)
  183. if "DB_URI" in self.conf:
  184. self.monitor = Monitor(self.q, self.conf)
  185. self.monitor.parent_greenlet = self
  186. else:
  187. self.monitor = None
  188. def _run(self):
  189. if self.monitor is not None:
  190. self.monitor.start()
  191. mw = MPDWatcher(self.q, self.conf, client=None)
  192. mw.parent_greenlet = self
  193. mw.start()
  194. t = Timer(int(self.conf["CHECK_SECS"]) * 1000, self.q)
  195. t.parent_greenlet = self
  196. t.start()
  197. # at the very start, run a check!
  198. gevent.Greenlet.spawn(self.player.check_playlist)
  199. while True:
  200. value = self.q.get()
  201. # emitter = value['emitter']
  202. kind = value["kind"]
  203. args = value["args"]
  204. if kind == "timer" or (
  205. kind == "mpc" and args[0] in ("player", "playlist", "connect")
  206. ):
  207. gevent.Greenlet.spawn(self.player.check_playlist)
  208. try:
  209. self.player.tmpcleaner.check_playlist()
  210. except:
  211. pass
  212. elif kind == "mpc":
  213. pass
  214. elif kind == "uris_enqueue":
  215. # TODO: uris_enqueue messages should be delivered directly to Player.enqueue
  216. # probably we need a MPDEnqueuer that receives every uri we want to add
  217. try:
  218. self.player.enqueue(args[0])
  219. except AssertionError:
  220. raise
  221. except Exception:
  222. self.log.exception(
  223. "Error while adding to queue; " "bad audiogen output?"
  224. )
  225. elif (kind == "signal" and args[0] == signal.SIGALRM) or kind == "refresh":
  226. # it's a tick!
  227. self.log.debug("Reload")
  228. self.monitor.q.put(dict(kind="forcetick"))
  229. gevent.Greenlet.spawn(self.player.check_playlist)
  230. else:
  231. self.log.warning("Unknown message: %s", str(value))