mpc.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. from __future__ import print_function
  2. import logging
  3. import signal
  4. from pkg_resources import iter_entry_points
  5. import gevent
  6. from gevent.queue import Queue
  7. import mpd
  8. from .event import Monitor
  9. from .eventutils import ParentedLet, Timer
  10. from .audiogen import audiogenerate
  11. from .unused import UnusedCleaner
  12. from .entrypoints_utils import get_avail_entrypoints
  13. def get_mpd_client(conf):
  14. client = mpd.MPDClient(use_unicode=True)
  15. client.connect(conf['MPD_HOST'], conf['MPD_PORT'])
  16. return client
  17. class MPDWatcher(ParentedLet):
  18. '''
  19. MPDWatcher notifies parent about any mpd event
  20. '''
  21. def __init__(self, queue, conf, client=None):
  22. ParentedLet.__init__(self, queue)
  23. self.log = logging.getLogger(self.__class__.__name__)
  24. self.conf = conf
  25. self.client = client # assume it is already connected, or None
  26. def refresh_client(self):
  27. self.client = get_mpd_client(self.conf)
  28. def do_business(self):
  29. first_after_connection = True
  30. while True:
  31. try:
  32. if self.client is None:
  33. self.refresh_client()
  34. if first_after_connection:
  35. yield('mpc', 'connect')
  36. status = self.client.idle()[0]
  37. except (mpd.ConnectionError, ConnectionRefusedError,
  38. FileNotFoundError) as exc:
  39. self.log.warning('Connection to MPD failed (%s: %s)',
  40. exc.__class__.__name__, exc)
  41. self.client = None
  42. first_after_connection = True
  43. gevent.sleep(5)
  44. continue
  45. else:
  46. first_after_connection = False
  47. yield ('mpc', status)
  48. class Player:
  49. '''
  50. The player contains different mpd-related methods
  51. check_playlist determines whether the playlist is long enough and run audiogenerator accordingly
  52. enqueue receive audios that have been generated by Monitor and (if filters allow it) enqueue it to MPD playlist
  53. '''
  54. def __init__(self, conf):
  55. self.conf = conf
  56. self.log = logging.getLogger(self.__class__.__name__)
  57. self.min_playlist_length = 10
  58. self.tmpcleaner = UnusedCleaner(conf)
  59. self._continous_audiospec = self.conf['CONTINOUS_AUDIOSPEC']
  60. self.events_enabled = True
  61. def _get_mpd(self):
  62. mpd_client = mpd.MPDClient(use_unicode=True)
  63. try:
  64. mpd_client.connect(self.conf['MPD_HOST'], self.conf['MPD_PORT'])
  65. except (mpd.ConnectionError, ConnectionRefusedError,
  66. FileNotFoundError) as exc:
  67. self.log.warning('Connection to MPD failed (%s: %s)',
  68. exc.__class__.__name__, exc)
  69. raise gevent.GreenletExit()
  70. return mpd_client
  71. @property
  72. def continous_audiospec(self):
  73. return self._continous_audiospec
  74. @continous_audiospec.setter
  75. def continous_audiospec(self, spec):
  76. self._continous_audiospec = self.conf['CONTINOUS_AUDIOSPEC'] \
  77. if spec is None else spec
  78. def clear_everything_but_current_song():
  79. mpdc = self._get_mpd()
  80. current = mpdc.currentsong()
  81. pos = int(current.get('pos', 0))
  82. for song in mpdc.playlistid():
  83. if int(song['pos']) != pos:
  84. mpdc.deleteid(song['id'])
  85. gevent.Greenlet.spawn(clear_everything_but_current_song)
  86. def check_playlist(self):
  87. mpd_client = self._get_mpd()
  88. songs = mpd_client.playlist()
  89. current = mpd_client.currentsong()
  90. pos = int(current.get('pos', 0)) + 1
  91. if (len(songs) - pos) >= self.min_playlist_length:
  92. return
  93. self.log.info('need to add new songs')
  94. picker = gevent.Greenlet(audiogenerate,
  95. self.continous_audiospec)
  96. def add(greenlet):
  97. uris = greenlet.value
  98. for uri in uris:
  99. assert type(uri) is str, type(uri)
  100. self.tmpcleaner.watch(uri.strip())
  101. mpd_client.add(uri.strip())
  102. picker.link_value(add)
  103. picker.start()
  104. def enqueue_filter(self, songs):
  105. eventfilters = self.conf['EVENT_FILTERS']
  106. if not eventfilters:
  107. return True, ''
  108. availfilters = get_avail_entrypoints('larigira.eventfilter')
  109. if len([ef for ef in eventfilters if ef in availfilters]) == 0:
  110. return True, ''
  111. mpdc = self._get_mpd()
  112. status = mpdc.status()
  113. ctx = {
  114. 'playlist': mpdc.playlist(),
  115. 'status': status,
  116. 'durations': []
  117. }
  118. for entrypoint in iter_entry_points('larigira.eventfilter'):
  119. if entrypoint.name in eventfilters:
  120. ef = entrypoint.load()
  121. try:
  122. ret = ef(songs=songs, context=ctx, conf=self.conf)
  123. except ImportError as exc:
  124. self.log.warn("Filter %s skipped: %s" % (entrypoint.name,
  125. exc))
  126. continue
  127. if ret is None: # bad behavior!
  128. continue
  129. if type(ret) is bool:
  130. reason = ''
  131. else:
  132. ret, reason = ret
  133. reason = 'Filtered by %s (%s)' % (entrypoint.name, reason)
  134. if ret is False:
  135. return ret, reason
  136. return True, 'Passed through %s' % ','.join(availfilters)
  137. def enqueue(self, songs):
  138. assert type(songs) is dict
  139. assert 'uris' in songs
  140. spec = [aspec.get('nick', aspec.eid) for aspec in songs['audiospecs']]
  141. nicks = ','.join((aspec.get('nick', aspec.eid)
  142. for aspec in songs['audiospecs']))
  143. if not self.events_enabled:
  144. self.log.debug('Ignoring <%s> (events disabled)', nicks
  145. )
  146. return
  147. filterok, reason = self.enqueue_filter(songs)
  148. if not filterok:
  149. self.log.debug('Ignoring <%s>, filtered: %s', nicks, reason)
  150. # delete those files
  151. for uri in reversed(songs['uris']):
  152. self.tmpcleaner.watch(uri.strip())
  153. return
  154. mpd_client = self._get_mpd()
  155. for uri in reversed(songs['uris']):
  156. assert type(uri) is str
  157. self.log.info('Adding %s to playlist (from <%s>:%s=%s)',
  158. uri,
  159. songs['timespec'].get('nick', ''),
  160. songs['aids'], spec)
  161. insert_pos = 0 if len(mpd_client.playlistid()) == 0 else \
  162. int(mpd_client.currentsong().get('pos', 0)) + 1
  163. try:
  164. mpd_client.addid(uri, insert_pos)
  165. except mpd.CommandError:
  166. self.log.exception("Cannot insert song %s", uri)
  167. self.tmpcleaner.watch(uri.strip())
  168. class Controller(gevent.Greenlet):
  169. def __init__(self, conf):
  170. gevent.Greenlet.__init__(self)
  171. self.log = logging.getLogger(self.__class__.__name__)
  172. self.conf = conf
  173. self.q = Queue()
  174. self.player = Player(self.conf)
  175. if 'DB_URI' in self.conf:
  176. self.monitor = Monitor(self.q, self.conf)
  177. self.monitor.parent_greenlet = self
  178. else:
  179. self.monitor = None
  180. def _run(self):
  181. if self.monitor is not None:
  182. self.monitor.start()
  183. mw = MPDWatcher(self.q, self.conf, client=None)
  184. mw.parent_greenlet = self
  185. mw.start()
  186. t = Timer(int(self.conf['CHECK_SECS']) * 1000, self.q)
  187. t.parent_greenlet = self
  188. t.start()
  189. # at the very start, run a check!
  190. gevent.Greenlet.spawn(self.player.check_playlist)
  191. while True:
  192. value = self.q.get()
  193. self.log.debug('<- %s', str(value))
  194. # emitter = value['emitter']
  195. kind = value['kind']
  196. args = value['args']
  197. if kind == 'timer' or (kind == 'mpc' and
  198. args[0] in ('player', 'playlist',
  199. 'connect')):
  200. gevent.Greenlet.spawn(self.player.check_playlist)
  201. try:
  202. self.player.tmpcleaner.check_playlist()
  203. except:
  204. pass
  205. elif kind == 'mpc':
  206. pass
  207. elif kind == 'uris_enqueue':
  208. # TODO: uris_enqueue messages should be delivered directly to Player.enqueue
  209. # probably we need a MPDEnqueuer that receives every uri we want to add
  210. try:
  211. self.player.enqueue(args[0])
  212. except AssertionError:
  213. raise
  214. except Exception:
  215. self.log.exception("Error while adding to queue; "
  216. "bad audiogen output?")
  217. elif (kind == 'signal' and args[0] == signal.SIGALRM) or \
  218. kind == 'refresh':
  219. # it's a tick!
  220. self.log.debug("Reload")
  221. self.monitor.q.put(dict(kind='forcetick'))
  222. gevent.Greenlet.spawn(self.player.check_playlist)
  223. else:
  224. self.log.warning("Unknown message: %s", str(value))