larigira/larigira/mpc.py
boyska d1a1202552
Some checks failed
Build documentation / build (push) Failing after 2s
Install and run tests / test (3.10) (push) Failing after 2s
Install and run tests / test (3.11) (push) Failing after 2s
Install and run tests / test (3.12) (push) Failing after 2s
Install and run tests / test (3.13) (push) Failing after 3s
Install and run tests / test (3.8) (push) Failing after 2s
Install and run tests / test (3.9) (push) Failing after 3s
Build documentation / deploy (push) Has been skipped
remove obsolete __future__
2025-03-01 01:25:26 +01:00

282 lines
9.6 KiB
Python

import logging
import signal
import mpd
from pkg_resources import iter_entry_points
import gevent
from gevent.queue import Queue
from .audiogen import audiogenerate
from .entrypoints_utils import get_avail_entrypoints
from .event import Monitor
from .eventutils import ParentedLet, Timer
from .unused import UnusedCleaner
def get_mpd_client(conf):
client = mpd.MPDClient(use_unicode=True)
client.connect(conf["MPD_HOST"], conf["MPD_PORT"])
return client
class MPDWatcher(ParentedLet):
"""
MPDWatcher notifies parent about any mpd event
"""
def __init__(self, queue, conf, client=None):
ParentedLet.__init__(self, queue)
self.log = logging.getLogger(self.__class__.__name__)
self.conf = conf
self.client = client # assume it is already connected, or None
def refresh_client(self):
self.client = get_mpd_client(self.conf)
def do_business(self):
first_after_connection = True
while True:
try:
if self.client is None:
self.refresh_client()
if first_after_connection:
yield ("mpc", "connect")
status = self.client.idle()[0]
except (
mpd.ConnectionError,
ConnectionRefusedError,
FileNotFoundError,
) as exc:
self.log.warning(
"Connection to MPD failed (%s: %s)",
exc.__class__.__name__,
exc,
)
self.client = None
first_after_connection = True
gevent.sleep(5)
continue
else:
first_after_connection = False
yield ("mpc", status)
class Player:
"""
The player contains different mpd-related methods
check_playlist determines whether the playlist is long enough and run audiogenerator accordingly
enqueue receive audios that have been generated by Monitor and (if filters allow it) enqueue it to MPD playlist
"""
def __init__(self, conf):
self.conf = conf
self.log = logging.getLogger(self.__class__.__name__)
self.min_playlist_length = 10
self.tmpcleaner = UnusedCleaner(conf)
self._continous_audiospec = self.conf["CONTINOUS_AUDIOSPEC"]
self.events_enabled = True
def _get_mpd(self):
mpd_client = mpd.MPDClient(use_unicode=True)
try:
mpd_client.connect(self.conf["MPD_HOST"], self.conf["MPD_PORT"])
except (
mpd.ConnectionError,
ConnectionRefusedError,
FileNotFoundError,
) as exc:
self.log.warning(
"Connection to MPD failed (%s: %s)",
exc.__class__.__name__,
exc,
)
raise gevent.GreenletExit()
return mpd_client
@property
def continous_audiospec(self):
return self._continous_audiospec
@continous_audiospec.setter
def continous_audiospec(self, spec):
self._continous_audiospec = (
self.conf["CONTINOUS_AUDIOSPEC"] if spec is None else spec
)
def clear_everything_but_current_song():
mpdc = self._get_mpd()
current = mpdc.currentsong()
pos = int(current.get("pos", 0))
for song in mpdc.playlistid():
if int(song["pos"]) != pos:
mpdc.deleteid(song["id"])
gevent.Greenlet.spawn(clear_everything_but_current_song)
def check_playlist(self):
mpd_client = self._get_mpd()
songs = mpd_client.playlist()
current = mpd_client.currentsong()
pos = int(current.get("pos", 0)) + 1
if (len(songs) - pos) >= self.min_playlist_length:
return
self.log.info("need to add new songs")
picker = gevent.Greenlet(audiogenerate, self.continous_audiospec)
def add(greenlet):
uris = greenlet.value
for uri in uris:
assert type(uri) is str, type(uri)
mpd_client.add(uri.strip())
self.tmpcleaner.watch(uri.strip())
picker.link_value(add)
picker.start()
def enqueue_filter(self, songs):
eventfilters = self.conf["EVENT_FILTERS"]
if not eventfilters:
return True, ""
availfilters = get_avail_entrypoints("larigira.eventfilter")
if len([ef for ef in eventfilters if ef in availfilters]) == 0:
return True, ""
mpdc = self._get_mpd()
status = mpdc.status()
ctx = {"playlist": mpdc.playlist(), "status": status, "durations": []}
for entrypoint in iter_entry_points("larigira.eventfilter"):
if entrypoint.name in eventfilters:
ef = entrypoint.load()
try:
ret = ef(songs=songs, context=ctx, conf=self.conf)
except ImportError as exc:
self.log.warn(
"Filter %s skipped: %s" % (entrypoint.name, exc)
)
continue
if ret is None: # bad behavior!
continue
if type(ret) is bool:
reason = ""
else:
ret, reason = ret
reason = "Filtered by %s (%s)" % (entrypoint.name, reason)
if ret is False:
return ret, reason
else:
if reason:
self.log.debug(
"filter %s says ok: %s", entrypoint.name, reason
)
return True, "Passed through %s" % ",".join(availfilters)
def enqueue(self, songs):
assert type(songs) is dict
assert "uris" in songs
spec = [aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"]]
nicks = ",".join(
(aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"])
)
if not self.events_enabled:
self.log.debug("Ignoring <%s> (events disabled)", nicks)
return
filterok, reason = self.enqueue_filter(songs)
if not filterok:
self.log.debug("Ignoring <%s>, filtered: %s", nicks, reason)
# delete those files
for uri in reversed(songs["uris"]):
self.tmpcleaner.watch(uri.strip())
return
mpd_client = self._get_mpd()
for uri in reversed(songs["uris"]):
assert type(uri) is str
self.log.info(
"Adding %s to playlist (from <%s>:%s=%s)",
uri,
songs["timespec"].get("nick", ""),
songs["aids"],
spec,
)
insert_pos = (
0
if len(mpd_client.playlistid()) == 0
else int(mpd_client.currentsong().get("pos", 0)) + 1
)
try:
mpd_client.addid(uri, insert_pos)
except mpd.CommandError:
self.log.exception("Cannot insert song %s", uri)
self.tmpcleaner.watch(uri.strip())
def play(self):
"""make sure that MPD is playing"""
mpd_client = self._get_mpd()
mpd_client.play()
class Controller(gevent.Greenlet):
def __init__(self, conf):
gevent.Greenlet.__init__(self)
self.log = logging.getLogger(self.__class__.__name__)
self.conf = conf
self.q = Queue()
self.player = Player(self.conf)
if "DB_URI" in self.conf:
self.monitor = Monitor(self.q, self.conf)
self.monitor.parent_greenlet = self
else:
self.monitor = None
def _run(self):
if self.monitor is not None:
self.monitor.start()
mw = MPDWatcher(self.q, self.conf, client=None)
mw.parent_greenlet = self
mw.start()
t = Timer(int(self.conf["CHECK_SECS"]) * 1000, self.q)
t.parent_greenlet = self
t.start()
# at the very start, run a check!
gevent.Greenlet.spawn(self.player.check_playlist)
while True:
value = self.q.get()
# emitter = value['emitter']
kind = value["kind"]
args = value["args"]
if kind == "timer" or (
kind == "mpc" and args[0] in ("player", "playlist", "connect")
):
gevent.Greenlet.spawn(self.player.check_playlist)
if self.conf["MPD_ENFORCE_ALWAYS_PLAYING"]:
gevent.Greenlet.spawn(self.player.play)
try:
self.player.tmpcleaner.check_playlist()
except:
pass
elif kind == "mpc":
pass
elif kind == "uris_enqueue":
# TODO: uris_enqueue messages should be delivered directly to Player.enqueue
# probably we need a MPDEnqueuer that receives every uri we want to add
try:
self.player.enqueue(args[0])
except AssertionError:
raise
except Exception:
self.log.exception(
"Error while adding to queue; " "bad audiogen output?"
)
elif (
kind == "signal" and args[0] == signal.SIGALRM
) or kind == "refresh":
# it's a tick!
self.log.debug("Reload")
self.monitor.q.put(dict(kind="forcetick"))
gevent.Greenlet.spawn(self.player.check_playlist)
else:
self.log.warning("Unknown message: %s", str(value))