from __future__ import print_function import logging import signal import mpd from pkg_resources import iter_entry_points import gevent from gevent.queue import Queue from .audiogen import audiogenerate from .entrypoints_utils import get_avail_entrypoints from .event import Monitor from .eventutils import ParentedLet, Timer from .unused import UnusedCleaner def get_mpd_client(conf): client = mpd.MPDClient(use_unicode=True) client.connect(conf["MPD_HOST"], conf["MPD_PORT"]) return client class MPDWatcher(ParentedLet): """ MPDWatcher notifies parent about any mpd event """ def __init__(self, queue, conf, client=None): ParentedLet.__init__(self, queue) self.log = logging.getLogger(self.__class__.__name__) self.conf = conf self.client = client # assume it is already connected, or None def refresh_client(self): self.client = get_mpd_client(self.conf) def do_business(self): first_after_connection = True while True: try: if self.client is None: self.refresh_client() if first_after_connection: yield ("mpc", "connect") status = self.client.idle()[0] except ( mpd.ConnectionError, ConnectionRefusedError, FileNotFoundError, ) as exc: self.log.warning( "Connection to MPD failed (%s: %s)", exc.__class__.__name__, exc, ) self.client = None first_after_connection = True gevent.sleep(5) continue else: first_after_connection = False yield ("mpc", status) class Player: """ The player contains different mpd-related methods check_playlist determines whether the playlist is long enough and run audiogenerator accordingly enqueue receive audios that have been generated by Monitor and (if filters allow it) enqueue it to MPD playlist """ def __init__(self, conf): self.conf = conf self.log = logging.getLogger(self.__class__.__name__) self.min_playlist_length = 10 self.tmpcleaner = UnusedCleaner(conf) self._continous_audiospec = self.conf["CONTINOUS_AUDIOSPEC"] self.events_enabled = True def _get_mpd(self): mpd_client = mpd.MPDClient(use_unicode=True) try: mpd_client.connect(self.conf["MPD_HOST"], self.conf["MPD_PORT"]) except ( mpd.ConnectionError, ConnectionRefusedError, FileNotFoundError, ) as exc: self.log.warning( "Connection to MPD failed (%s: %s)", exc.__class__.__name__, exc, ) raise gevent.GreenletExit() return mpd_client @property def continous_audiospec(self): return self._continous_audiospec @continous_audiospec.setter def continous_audiospec(self, spec): self._continous_audiospec = ( self.conf["CONTINOUS_AUDIOSPEC"] if spec is None else spec ) def clear_everything_but_current_song(): mpdc = self._get_mpd() current = mpdc.currentsong() pos = int(current.get("pos", 0)) for song in mpdc.playlistid(): if int(song["pos"]) != pos: mpdc.deleteid(song["id"]) gevent.Greenlet.spawn(clear_everything_but_current_song) def check_playlist(self): mpd_client = self._get_mpd() songs = mpd_client.playlist() current = mpd_client.currentsong() pos = int(current.get("pos", 0)) + 1 if (len(songs) - pos) >= self.min_playlist_length: return self.log.info("need to add new songs") picker = gevent.Greenlet(audiogenerate, self.continous_audiospec) def add(greenlet): uris = greenlet.value for uri in uris: assert type(uri) is str, type(uri) mpd_client.add(uri.strip()) self.tmpcleaner.watch(uri.strip()) picker.link_value(add) picker.start() def enqueue_filter(self, songs): eventfilters = self.conf["EVENT_FILTERS"] if not eventfilters: return True, "" availfilters = get_avail_entrypoints("larigira.eventfilter") if len([ef for ef in eventfilters if ef in availfilters]) == 0: return True, "" mpdc = self._get_mpd() status = mpdc.status() ctx = {"playlist": mpdc.playlist(), "status": status, "durations": []} for entrypoint in iter_entry_points("larigira.eventfilter"): if entrypoint.name in eventfilters: ef = entrypoint.load() try: ret = ef(songs=songs, context=ctx, conf=self.conf) except ImportError as exc: self.log.warn( "Filter %s skipped: %s" % (entrypoint.name, exc) ) continue if ret is None: # bad behavior! continue if type(ret) is bool: reason = "" else: ret, reason = ret reason = "Filtered by %s (%s)" % (entrypoint.name, reason) if ret is False: return ret, reason else: if reason: self.log.debug( "filter %s says ok: %s", entrypoint.name, reason ) return True, "Passed through %s" % ",".join(availfilters) def enqueue(self, songs): assert type(songs) is dict assert "uris" in songs spec = [aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"]] nicks = ",".join( (aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"]) ) if not self.events_enabled: self.log.debug("Ignoring <%s> (events disabled)", nicks) return filterok, reason = self.enqueue_filter(songs) if not filterok: self.log.debug("Ignoring <%s>, filtered: %s", nicks, reason) # delete those files for uri in reversed(songs["uris"]): self.tmpcleaner.watch(uri.strip()) return mpd_client = self._get_mpd() for uri in reversed(songs["uris"]): assert type(uri) is str self.log.info( "Adding %s to playlist (from <%s>:%s=%s)", uri, songs["timespec"].get("nick", ""), songs["aids"], spec, ) insert_pos = ( 0 if len(mpd_client.playlistid()) == 0 else int(mpd_client.currentsong().get("pos", 0)) + 1 ) try: mpd_client.addid(uri, insert_pos) except mpd.CommandError: self.log.exception("Cannot insert song %s", uri) self.tmpcleaner.watch(uri.strip()) def play(self): """make sure that MPD is playing""" mpd_client = self._get_mpd() mpd_client.play() class Controller(gevent.Greenlet): def __init__(self, conf): gevent.Greenlet.__init__(self) self.log = logging.getLogger(self.__class__.__name__) self.conf = conf self.q = Queue() self.player = Player(self.conf) if "DB_URI" in self.conf: self.monitor = Monitor(self.q, self.conf) self.monitor.parent_greenlet = self else: self.monitor = None def _run(self): if self.monitor is not None: self.monitor.start() mw = MPDWatcher(self.q, self.conf, client=None) mw.parent_greenlet = self mw.start() t = Timer(int(self.conf["CHECK_SECS"]) * 1000, self.q) t.parent_greenlet = self t.start() # at the very start, run a check! gevent.Greenlet.spawn(self.player.check_playlist) while True: value = self.q.get() # emitter = value['emitter'] kind = value["kind"] args = value["args"] if kind == "timer" or ( kind == "mpc" and args[0] in ("player", "playlist", "connect") ): gevent.Greenlet.spawn(self.player.check_playlist) if self.conf["MPD_ENFORCE_ALWAYS_PLAYING"]: gevent.Greenlet.spawn(self.player.play) try: self.player.tmpcleaner.check_playlist() except: pass elif kind == "mpc": pass elif kind == "uris_enqueue": # TODO: uris_enqueue messages should be delivered directly to Player.enqueue # probably we need a MPDEnqueuer that receives every uri we want to add try: self.player.enqueue(args[0]) except AssertionError: raise except Exception: self.log.exception( "Error while adding to queue; " "bad audiogen output?" ) elif ( kind == "signal" and args[0] == signal.SIGALRM ) or kind == "refresh": # it's a tick! self.log.debug("Reload") self.monitor.q.put(dict(kind="forcetick")) gevent.Greenlet.spawn(self.player.check_playlist) else: self.log.warning("Unknown message: %s", str(value))