|
- from __future__ import print_function
- import logging
- import signal
- import mpd
- from pkg_resources import iter_entry_points
- import gevent
- from gevent.queue import Queue
- from .audiogen import audiogenerate
- from .entrypoints_utils import get_avail_entrypoints
- from .event import Monitor
- from .eventutils import ParentedLet, Timer
- from .unused import UnusedCleaner
- def get_mpd_client(conf):
- client = mpd.MPDClient(use_unicode=True)
- client.connect(conf["MPD_HOST"], conf["MPD_PORT"])
- return client
- class MPDWatcher(ParentedLet):
- """
- MPDWatcher notifies parent about any mpd event
- """
- def __init__(self, queue, conf, client=None):
- ParentedLet.__init__(self, queue)
- self.log = logging.getLogger(self.__class__.__name__)
- self.conf = conf
- self.client = client # assume it is already connected, or None
- def refresh_client(self):
- self.client = get_mpd_client(self.conf)
- def do_business(self):
- first_after_connection = True
- while True:
- try:
- if self.client is None:
- self.refresh_client()
- if first_after_connection:
- yield ("mpc", "connect")
- status = self.client.idle()[0]
- except (
- mpd.ConnectionError,
- ConnectionRefusedError,
- FileNotFoundError,
- ) as exc:
- self.log.warning(
- "Connection to MPD failed (%s: %s)",
- exc.__class__.__name__,
- exc,
- )
- self.client = None
- first_after_connection = True
- gevent.sleep(5)
- continue
- else:
- first_after_connection = False
- yield ("mpc", status)
- class Player:
- """
- The player contains different mpd-related methods
- check_playlist determines whether the playlist is long enough and run audiogenerator accordingly
- enqueue receive audios that have been generated by Monitor and (if filters allow it) enqueue it to MPD playlist
- """
- def __init__(self, conf):
- self.conf = conf
- self.log = logging.getLogger(self.__class__.__name__)
- self.min_playlist_length = 10
- self.tmpcleaner = UnusedCleaner(conf)
- self._continous_audiospec = self.conf["CONTINOUS_AUDIOSPEC"]
- self.events_enabled = True
- def _get_mpd(self):
- mpd_client = mpd.MPDClient(use_unicode=True)
- try:
- mpd_client.connect(self.conf["MPD_HOST"], self.conf["MPD_PORT"])
- except (
- mpd.ConnectionError,
- ConnectionRefusedError,
- FileNotFoundError,
- ) as exc:
- self.log.warning(
- "Connection to MPD failed (%s: %s)",
- exc.__class__.__name__,
- exc,
- )
- raise gevent.GreenletExit()
- return mpd_client
- @property
- def continous_audiospec(self):
- return self._continous_audiospec
- @continous_audiospec.setter
- def continous_audiospec(self, spec):
- self._continous_audiospec = (
- self.conf["CONTINOUS_AUDIOSPEC"] if spec is None else spec
- )
- def clear_everything_but_current_song():
- mpdc = self._get_mpd()
- current = mpdc.currentsong()
- pos = int(current.get("pos", 0))
- for song in mpdc.playlistid():
- if int(song["pos"]) != pos:
- mpdc.deleteid(song["id"])
- gevent.Greenlet.spawn(clear_everything_but_current_song)
- def check_playlist(self):
- mpd_client = self._get_mpd()
- songs = mpd_client.playlist()
- current = mpd_client.currentsong()
- pos = int(current.get("pos", 0)) + 1
- if (len(songs) - pos) >= self.min_playlist_length:
- return
- self.log.info("need to add new songs")
- picker = gevent.Greenlet(audiogenerate, self.continous_audiospec)
- def add(greenlet):
- uris = greenlet.value
- for uri in uris:
- assert type(uri) is str, type(uri)
- mpd_client.add(uri.strip())
- self.tmpcleaner.watch(uri.strip())
- picker.link_value(add)
- picker.start()
- def enqueue_filter(self, songs):
- eventfilters = self.conf["EVENT_FILTERS"]
- if not eventfilters:
- return True, ""
- availfilters = get_avail_entrypoints("larigira.eventfilter")
- if len([ef for ef in eventfilters if ef in availfilters]) == 0:
- return True, ""
- mpdc = self._get_mpd()
- status = mpdc.status()
- ctx = {"playlist": mpdc.playlist(), "status": status, "durations": []}
- for entrypoint in iter_entry_points("larigira.eventfilter"):
- if entrypoint.name in eventfilters:
- ef = entrypoint.load()
- try:
- ret = ef(songs=songs, context=ctx, conf=self.conf)
- except ImportError as exc:
- self.log.warn(
- "Filter %s skipped: %s" % (entrypoint.name, exc)
- )
- continue
- if ret is None: # bad behavior!
- continue
- if type(ret) is bool:
- reason = ""
- else:
- ret, reason = ret
- reason = "Filtered by %s (%s)" % (entrypoint.name, reason)
- if ret is False:
- return ret, reason
- else:
- if reason:
- self.log.debug(
- "filter %s says ok: %s", entrypoint.name, reason
- )
- return True, "Passed through %s" % ",".join(availfilters)
- def enqueue(self, songs):
- assert type(songs) is dict
- assert "uris" in songs
- spec = [aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"]]
- nicks = ",".join(
- (aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"])
- )
- if not self.events_enabled:
- self.log.debug("Ignoring <%s> (events disabled)", nicks)
- return
- filterok, reason = self.enqueue_filter(songs)
- if not filterok:
- self.log.debug("Ignoring <%s>, filtered: %s", nicks, reason)
- # delete those files
- for uri in reversed(songs["uris"]):
- self.tmpcleaner.watch(uri.strip())
- return
- mpd_client = self._get_mpd()
- for uri in reversed(songs["uris"]):
- assert type(uri) is str
- self.log.info(
- "Adding %s to playlist (from <%s>:%s=%s)",
- uri,
- songs["timespec"].get("nick", ""),
- songs["aids"],
- spec,
- )
- insert_pos = (
- 0
- if len(mpd_client.playlistid()) == 0
- else int(mpd_client.currentsong().get("pos", 0)) + 1
- )
- try:
- mpd_client.addid(uri, insert_pos)
- except mpd.CommandError:
- self.log.exception("Cannot insert song %s", uri)
- self.tmpcleaner.watch(uri.strip())
- def play(self):
- """make sure that MPD is playing"""
- mpd_client = self._get_mpd()
- mpd_client.play()
- class Controller(gevent.Greenlet):
- def __init__(self, conf):
- gevent.Greenlet.__init__(self)
- self.log = logging.getLogger(self.__class__.__name__)
- self.conf = conf
- self.q = Queue()
- self.player = Player(self.conf)
- if "DB_URI" in self.conf:
- self.monitor = Monitor(self.q, self.conf)
- self.monitor.parent_greenlet = self
- else:
- self.monitor = None
- def _run(self):
- if self.monitor is not None:
- self.monitor.start()
- mw = MPDWatcher(self.q, self.conf, client=None)
- mw.parent_greenlet = self
- mw.start()
- t = Timer(int(self.conf["CHECK_SECS"]) * 1000, self.q)
- t.parent_greenlet = self
- t.start()
- # at the very start, run a check!
- gevent.Greenlet.spawn(self.player.check_playlist)
- while True:
- value = self.q.get()
- # emitter = value['emitter']
- kind = value["kind"]
- args = value["args"]
- if kind == "timer" or (
- kind == "mpc" and args[0] in ("player", "playlist", "connect")
- ):
- gevent.Greenlet.spawn(self.player.check_playlist)
- if self.conf["MPD_ENFORCE_ALWAYS_PLAYING"]:
- gevent.Greenlet.spawn(self.player.play)
- try:
- self.player.tmpcleaner.check_playlist()
- except:
- pass
- elif kind == "mpc":
- pass
- elif kind == "uris_enqueue":
- # TODO: uris_enqueue messages should be delivered directly to Player.enqueue
- # probably we need a MPDEnqueuer that receives every uri we want to add
- try:
- self.player.enqueue(args[0])
- except AssertionError:
- raise
- except Exception:
- self.log.exception(
- "Error while adding to queue; " "bad audiogen output?"
- )
- elif (
- kind == "signal" and args[0] == signal.SIGALRM
- ) or kind == "refresh":
- # it's a tick!
- self.log.debug("Reload")
- self.monitor.q.put(dict(kind="forcetick"))
- gevent.Greenlet.spawn(self.player.check_playlist)
- else:
- self.log.warning("Unknown message: %s", str(value))
|