Some checks failed
Build documentation / build (push) Failing after 2s
Install and run tests / test (3.10) (push) Failing after 2s
Install and run tests / test (3.11) (push) Failing after 2s
Install and run tests / test (3.12) (push) Failing after 2s
Install and run tests / test (3.13) (push) Failing after 3s
Install and run tests / test (3.8) (push) Failing after 2s
Install and run tests / test (3.9) (push) Failing after 3s
Build documentation / deploy (push) Has been skipped
282 lines
9.6 KiB
Python
282 lines
9.6 KiB
Python
|
|
import logging
|
|
import signal
|
|
|
|
import mpd
|
|
from pkg_resources import iter_entry_points
|
|
|
|
import gevent
|
|
from gevent.queue import Queue
|
|
|
|
from .audiogen import audiogenerate
|
|
from .entrypoints_utils import get_avail_entrypoints
|
|
from .event import Monitor
|
|
from .eventutils import ParentedLet, Timer
|
|
from .unused import UnusedCleaner
|
|
|
|
|
|
def get_mpd_client(conf):
|
|
client = mpd.MPDClient(use_unicode=True)
|
|
client.connect(conf["MPD_HOST"], conf["MPD_PORT"])
|
|
|
|
return client
|
|
|
|
|
|
class MPDWatcher(ParentedLet):
|
|
"""
|
|
MPDWatcher notifies parent about any mpd event
|
|
"""
|
|
|
|
def __init__(self, queue, conf, client=None):
|
|
ParentedLet.__init__(self, queue)
|
|
self.log = logging.getLogger(self.__class__.__name__)
|
|
self.conf = conf
|
|
self.client = client # assume it is already connected, or None
|
|
|
|
def refresh_client(self):
|
|
self.client = get_mpd_client(self.conf)
|
|
|
|
def do_business(self):
|
|
first_after_connection = True
|
|
while True:
|
|
try:
|
|
if self.client is None:
|
|
self.refresh_client()
|
|
if first_after_connection:
|
|
yield ("mpc", "connect")
|
|
|
|
status = self.client.idle()[0]
|
|
except (
|
|
mpd.ConnectionError,
|
|
ConnectionRefusedError,
|
|
FileNotFoundError,
|
|
) as exc:
|
|
self.log.warning(
|
|
"Connection to MPD failed (%s: %s)",
|
|
exc.__class__.__name__,
|
|
exc,
|
|
)
|
|
self.client = None
|
|
first_after_connection = True
|
|
gevent.sleep(5)
|
|
continue
|
|
else:
|
|
first_after_connection = False
|
|
yield ("mpc", status)
|
|
|
|
|
|
class Player:
|
|
"""
|
|
The player contains different mpd-related methods
|
|
|
|
check_playlist determines whether the playlist is long enough and run audiogenerator accordingly
|
|
|
|
enqueue receive audios that have been generated by Monitor and (if filters allow it) enqueue it to MPD playlist
|
|
"""
|
|
|
|
def __init__(self, conf):
|
|
self.conf = conf
|
|
self.log = logging.getLogger(self.__class__.__name__)
|
|
self.min_playlist_length = 10
|
|
self.tmpcleaner = UnusedCleaner(conf)
|
|
self._continous_audiospec = self.conf["CONTINOUS_AUDIOSPEC"]
|
|
self.events_enabled = True
|
|
|
|
def _get_mpd(self):
|
|
mpd_client = mpd.MPDClient(use_unicode=True)
|
|
try:
|
|
mpd_client.connect(self.conf["MPD_HOST"], self.conf["MPD_PORT"])
|
|
except (
|
|
mpd.ConnectionError,
|
|
ConnectionRefusedError,
|
|
FileNotFoundError,
|
|
) as exc:
|
|
self.log.warning(
|
|
"Connection to MPD failed (%s: %s)",
|
|
exc.__class__.__name__,
|
|
exc,
|
|
)
|
|
raise gevent.GreenletExit()
|
|
return mpd_client
|
|
|
|
@property
|
|
def continous_audiospec(self):
|
|
return self._continous_audiospec
|
|
|
|
@continous_audiospec.setter
|
|
def continous_audiospec(self, spec):
|
|
self._continous_audiospec = (
|
|
self.conf["CONTINOUS_AUDIOSPEC"] if spec is None else spec
|
|
)
|
|
|
|
def clear_everything_but_current_song():
|
|
mpdc = self._get_mpd()
|
|
current = mpdc.currentsong()
|
|
pos = int(current.get("pos", 0))
|
|
for song in mpdc.playlistid():
|
|
if int(song["pos"]) != pos:
|
|
mpdc.deleteid(song["id"])
|
|
|
|
gevent.Greenlet.spawn(clear_everything_but_current_song)
|
|
|
|
def check_playlist(self):
|
|
mpd_client = self._get_mpd()
|
|
songs = mpd_client.playlist()
|
|
current = mpd_client.currentsong()
|
|
pos = int(current.get("pos", 0)) + 1
|
|
if (len(songs) - pos) >= self.min_playlist_length:
|
|
return
|
|
self.log.info("need to add new songs")
|
|
picker = gevent.Greenlet(audiogenerate, self.continous_audiospec)
|
|
|
|
def add(greenlet):
|
|
uris = greenlet.value
|
|
for uri in uris:
|
|
assert type(uri) is str, type(uri)
|
|
mpd_client.add(uri.strip())
|
|
self.tmpcleaner.watch(uri.strip())
|
|
|
|
picker.link_value(add)
|
|
picker.start()
|
|
|
|
def enqueue_filter(self, songs):
|
|
eventfilters = self.conf["EVENT_FILTERS"]
|
|
if not eventfilters:
|
|
return True, ""
|
|
availfilters = get_avail_entrypoints("larigira.eventfilter")
|
|
if len([ef for ef in eventfilters if ef in availfilters]) == 0:
|
|
return True, ""
|
|
mpdc = self._get_mpd()
|
|
status = mpdc.status()
|
|
ctx = {"playlist": mpdc.playlist(), "status": status, "durations": []}
|
|
for entrypoint in iter_entry_points("larigira.eventfilter"):
|
|
if entrypoint.name in eventfilters:
|
|
ef = entrypoint.load()
|
|
try:
|
|
ret = ef(songs=songs, context=ctx, conf=self.conf)
|
|
except ImportError as exc:
|
|
self.log.warn(
|
|
"Filter %s skipped: %s" % (entrypoint.name, exc)
|
|
)
|
|
continue
|
|
if ret is None: # bad behavior!
|
|
continue
|
|
if type(ret) is bool:
|
|
reason = ""
|
|
else:
|
|
ret, reason = ret
|
|
reason = "Filtered by %s (%s)" % (entrypoint.name, reason)
|
|
if ret is False:
|
|
return ret, reason
|
|
else:
|
|
if reason:
|
|
self.log.debug(
|
|
"filter %s says ok: %s", entrypoint.name, reason
|
|
)
|
|
return True, "Passed through %s" % ",".join(availfilters)
|
|
|
|
def enqueue(self, songs):
|
|
assert type(songs) is dict
|
|
assert "uris" in songs
|
|
spec = [aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"]]
|
|
nicks = ",".join(
|
|
(aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"])
|
|
)
|
|
if not self.events_enabled:
|
|
self.log.debug("Ignoring <%s> (events disabled)", nicks)
|
|
return
|
|
filterok, reason = self.enqueue_filter(songs)
|
|
if not filterok:
|
|
self.log.debug("Ignoring <%s>, filtered: %s", nicks, reason)
|
|
# delete those files
|
|
for uri in reversed(songs["uris"]):
|
|
self.tmpcleaner.watch(uri.strip())
|
|
return
|
|
mpd_client = self._get_mpd()
|
|
for uri in reversed(songs["uris"]):
|
|
assert type(uri) is str
|
|
self.log.info(
|
|
"Adding %s to playlist (from <%s>:%s=%s)",
|
|
uri,
|
|
songs["timespec"].get("nick", ""),
|
|
songs["aids"],
|
|
spec,
|
|
)
|
|
insert_pos = (
|
|
0
|
|
if len(mpd_client.playlistid()) == 0
|
|
else int(mpd_client.currentsong().get("pos", 0)) + 1
|
|
)
|
|
try:
|
|
mpd_client.addid(uri, insert_pos)
|
|
except mpd.CommandError:
|
|
self.log.exception("Cannot insert song %s", uri)
|
|
self.tmpcleaner.watch(uri.strip())
|
|
|
|
def play(self):
|
|
"""make sure that MPD is playing"""
|
|
mpd_client = self._get_mpd()
|
|
mpd_client.play()
|
|
|
|
|
|
class Controller(gevent.Greenlet):
|
|
def __init__(self, conf):
|
|
gevent.Greenlet.__init__(self)
|
|
self.log = logging.getLogger(self.__class__.__name__)
|
|
self.conf = conf
|
|
self.q = Queue()
|
|
self.player = Player(self.conf)
|
|
if "DB_URI" in self.conf:
|
|
self.monitor = Monitor(self.q, self.conf)
|
|
self.monitor.parent_greenlet = self
|
|
else:
|
|
self.monitor = None
|
|
|
|
def _run(self):
|
|
if self.monitor is not None:
|
|
self.monitor.start()
|
|
mw = MPDWatcher(self.q, self.conf, client=None)
|
|
mw.parent_greenlet = self
|
|
mw.start()
|
|
t = Timer(int(self.conf["CHECK_SECS"]) * 1000, self.q)
|
|
t.parent_greenlet = self
|
|
t.start()
|
|
# at the very start, run a check!
|
|
gevent.Greenlet.spawn(self.player.check_playlist)
|
|
while True:
|
|
value = self.q.get()
|
|
# emitter = value['emitter']
|
|
kind = value["kind"]
|
|
args = value["args"]
|
|
if kind == "timer" or (
|
|
kind == "mpc" and args[0] in ("player", "playlist", "connect")
|
|
):
|
|
gevent.Greenlet.spawn(self.player.check_playlist)
|
|
if self.conf["MPD_ENFORCE_ALWAYS_PLAYING"]:
|
|
gevent.Greenlet.spawn(self.player.play)
|
|
try:
|
|
self.player.tmpcleaner.check_playlist()
|
|
except:
|
|
pass
|
|
elif kind == "mpc":
|
|
pass
|
|
elif kind == "uris_enqueue":
|
|
# TODO: uris_enqueue messages should be delivered directly to Player.enqueue
|
|
# probably we need a MPDEnqueuer that receives every uri we want to add
|
|
try:
|
|
self.player.enqueue(args[0])
|
|
except AssertionError:
|
|
raise
|
|
except Exception:
|
|
self.log.exception(
|
|
"Error while adding to queue; " "bad audiogen output?"
|
|
)
|
|
elif (
|
|
kind == "signal" and args[0] == signal.SIGALRM
|
|
) or kind == "refresh":
|
|
# it's a tick!
|
|
self.log.debug("Reload")
|
|
self.monitor.q.put(dict(kind="forcetick"))
|
|
gevent.Greenlet.spawn(self.player.check_playlist)
|
|
else:
|
|
self.log.warning("Unknown message: %s", str(value))
|