diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..b7de0e0 --- /dev/null +++ b/.flake8 @@ -0,0 +1,2 @@ +[flake8] +max-line-length=79 diff --git a/larigira/audioform_http.py b/larigira/audioform_http.py index 6fa175f..46d1335 100644 --- a/larigira/audioform_http.py +++ b/larigira/audioform_http.py @@ -3,23 +3,24 @@ from wtforms import StringField, validators, SubmitField class AudioForm(Form): - nick = StringField('Audio nick', validators=[validators.required()], - description='A simple name to recognize this audio') - urls = StringField('URLs', - validators=[validators.required()], - description='URL of the file to download') - submit = SubmitField('Submit') + nick = StringField( + "Audio nick", + validators=[validators.required()], + description="A simple name to recognize this audio", + ) + urls = StringField( + "URLs", + validators=[validators.required()], + description="URL of the file to download", + ) + submit = SubmitField("Submit") def populate_from_audiospec(self, audiospec): - if 'nick' in audiospec: - self.nick.data = audiospec['nick'] - if 'urls' in audiospec: - self.urls.data = ';'.join(audiospec['urls']) + if "nick" in audiospec: + self.nick.data = audiospec["nick"] + if "urls" in audiospec: + self.urls.data = ";".join(audiospec["urls"]) def audio_receive(form): - return { - 'kind': 'http', - 'nick': form.nick.data, - 'urls': form.urls.data.split(';'), - } + return {"kind": "http", "nick": form.nick.data, "urls": form.urls.data.split(";")} diff --git a/larigira/audioform_mostrecent.py b/larigira/audioform_mostrecent.py index 5faf781..e3417c1 100644 --- a/larigira/audioform_mostrecent.py +++ b/larigira/audioform_mostrecent.py @@ -6,40 +6,49 @@ from larigira.formutils import AutocompleteStringField class AudioForm(Form): - nick = StringField('Audio nick', validators=[validators.required()], - description='A simple name to recognize this audio') - path = AutocompleteStringField('dl-suggested-dirs', - 'Path', validators=[validators.required()], - description='Directory to pick file from') - maxage = StringField('Max age', - validators=[validators.required()], - description='in seconds, or human-readable ' - '(like 9w3d12h)') - submit = SubmitField('Submit') + nick = StringField( + "Audio nick", + validators=[validators.required()], + description="A simple name to recognize this audio", + ) + path = AutocompleteStringField( + "dl-suggested-dirs", + "Path", + validators=[validators.required()], + description="Directory to pick file from", + ) + maxage = StringField( + "Max age", + validators=[validators.required()], + description="in seconds, or human-readable " "(like 9w3d12h)", + ) + submit = SubmitField("Submit") def validate_maxage(self, field): try: int(field.data) except ValueError: if timeparse(field.data) is None: - raise ValidationError("maxage must either be a number " - "(in seconds) or a human-readable " - "string like '1h2m' or '1d12h'") + raise ValidationError( + "maxage must either be a number " + "(in seconds) or a human-readable " + "string like '1h2m' or '1d12h'" + ) def populate_from_audiospec(self, audiospec): - if 'nick' in audiospec: - self.nick.data = audiospec['nick'] - if 'path' in audiospec: - self.path.data = audiospec['path'] - if 'maxage' in audiospec: - self.maxage.data = audiospec['maxage'] + if "nick" in audiospec: + self.nick.data = audiospec["nick"] + if "path" in audiospec: + self.path.data = audiospec["path"] + if "maxage" in audiospec: + self.maxage.data = audiospec["maxage"] def audio_receive(form): return { - 'kind': 'mostrecent', - 'nick': form.nick.data, - 'path': form.path.data, - 'maxage': form.maxage.data, - 'howmany': 1 + "kind": "mostrecent", + "nick": form.nick.data, + "path": form.path.data, + "maxage": form.maxage.data, + "howmany": 1, } diff --git a/larigira/audioform_randomdir.py b/larigira/audioform_randomdir.py index 40a4c37..233b104 100644 --- a/larigira/audioform_randomdir.py +++ b/larigira/audioform_randomdir.py @@ -5,33 +5,40 @@ from larigira.formutils import AutocompleteStringField class Form(flask_wtf.Form): - nick = StringField('Audio nick', validators=[validators.required()], - description='A simple name to recognize this audio') - path = AutocompleteStringField('dl-suggested-dirs', - 'Path', validators=[validators.required()], - description='Full path to source directory') - howmany = IntegerField('Number', validators=[validators.optional()], - default=1, - description='How many songs to be picked' - 'from this dir; defaults to 1') - submit = SubmitField('Submit') + nick = StringField( + "Audio nick", + validators=[validators.required()], + description="A simple name to recognize this audio", + ) + path = AutocompleteStringField( + "dl-suggested-dirs", + "Path", + validators=[validators.required()], + description="Full path to source directory", + ) + howmany = IntegerField( + "Number", + validators=[validators.optional()], + default=1, + description="How many songs to be picked" "from this dir; defaults to 1", + ) + submit = SubmitField("Submit") def populate_from_audiospec(self, audiospec): - if 'nick' in audiospec: - self.nick.data = audiospec['nick'] - if 'paths' in audiospec: - self.path.data = audiospec['paths'][0] - if 'howmany' in audiospec: - self.howmany.data = audiospec['howmany'] + if "nick" in audiospec: + self.nick.data = audiospec["nick"] + if "paths" in audiospec: + self.path.data = audiospec["paths"][0] + if "howmany" in audiospec: + self.howmany.data = audiospec["howmany"] else: self.howmany.data = 1 def receive(form): return { - 'kind': 'randomdir', - 'nick': form.nick.data, - 'paths': [form.path.data], - 'howmany': form.howmany.data or 1 + "kind": "randomdir", + "nick": form.nick.data, + "paths": [form.path.data], + "howmany": form.howmany.data or 1, } - diff --git a/larigira/audioform_script.py b/larigira/audioform_script.py index 1e9caf5..7ab62ce 100644 --- a/larigira/audioform_script.py +++ b/larigira/audioform_script.py @@ -3,38 +3,44 @@ from wtforms import StringField, validators, SubmitField, ValidationError from larigira.formutils import AutocompleteStringField + class ScriptAudioForm(Form): - nick = StringField('Audio nick', validators=[validators.required()], - description='A simple name to recognize this audio') + nick = StringField( + "Audio nick", + validators=[validators.required()], + description="A simple name to recognize this audio", + ) name = AutocompleteStringField( - 'dl-suggested-scripts', - 'Name', validators=[validators.required()], - description='filename (NOT path) of the script') - args = StringField('Arguments', - description='arguments, separated by ";"') - submit = SubmitField('Submit') + "dl-suggested-scripts", + "Name", + validators=[validators.required()], + description="filename (NOT path) of the script", + ) + args = StringField("Arguments", description='arguments, separated by ";"') + submit = SubmitField("Submit") def populate_from_audiospec(self, audiospec): - if 'nick' in audiospec: - self.nick.data = audiospec['nick'] - if 'name' in audiospec: - self.name.data = audiospec['name'] - if 'args' in audiospec: - if type(audiospec['args']) is str: # legacy compatibility - self.args.data = audiospec['args'].replace(' ', ';') + if "nick" in audiospec: + self.nick.data = audiospec["nick"] + if "name" in audiospec: + self.name.data = audiospec["name"] + if "args" in audiospec: + if type(audiospec["args"]) is str: # legacy compatibility + self.args.data = audiospec["args"].replace(" ", ";") else: - self.args.data = ';'.join(audiospec['args']) + self.args.data = ";".join(audiospec["args"]) def validate_name(self, field): - if '/' in field.data: - raise ValidationError("Name cannot have slashes: " - "it's a name, not a path") + if "/" in field.data: + raise ValidationError( + "Name cannot have slashes: " "it's a name, not a path" + ) def scriptaudio_receive(form): return { - 'kind': 'script', - 'nick': form.nick.data, - 'name': form.name.data, - 'args': form.args.data.split(';') + "kind": "script", + "nick": form.nick.data, + "name": form.name.data, + "args": form.args.data.split(";"), } diff --git a/larigira/audioform_static.py b/larigira/audioform_static.py index e0b98a0..e176958 100644 --- a/larigira/audioform_static.py +++ b/larigira/audioform_static.py @@ -5,23 +5,25 @@ from larigira.formutils import AutocompleteStringField class StaticAudioForm(Form): - nick = StringField('Audio nick', validators=[validators.required()], - description='A simple name to recognize this audio') - path = AutocompleteStringField('dl-suggested-files', - 'Path', validators=[validators.required()], - description='Full path to audio file') - submit = SubmitField('Submit') + nick = StringField( + "Audio nick", + validators=[validators.required()], + description="A simple name to recognize this audio", + ) + path = AutocompleteStringField( + "dl-suggested-files", + "Path", + validators=[validators.required()], + description="Full path to audio file", + ) + submit = SubmitField("Submit") def populate_from_audiospec(self, audiospec): - if 'nick' in audiospec: - self.nick.data = audiospec['nick'] - if 'paths' in audiospec: - self.path.data = audiospec['paths'][0] + if "nick" in audiospec: + self.nick.data = audiospec["nick"] + if "paths" in audiospec: + self.path.data = audiospec["paths"][0] def staticaudio_receive(form): - return { - 'kind': 'static', - 'nick': form.nick.data, - 'paths': [form.path.data] - } + return {"kind": "static", "nick": form.nick.data, "paths": [form.path.data]} diff --git a/larigira/audiogen.py b/larigira/audiogen.py index 80ba8b0..e9c111f 100644 --- a/larigira/audiogen.py +++ b/larigira/audiogen.py @@ -4,25 +4,30 @@ import argparse from .entrypoints_utils import get_one_entrypoint import json from logging import getLogger -log = getLogger('audiogen') + +log = getLogger("audiogen") def get_audiogenerator(kind): - '''Messes with entrypoints to return an audiogenerator function''' - return get_one_entrypoint('larigira.audiogenerators', kind) + """Messes with entrypoints to return an audiogenerator function""" + return get_one_entrypoint("larigira.audiogenerators", kind) def get_parser(): - parser = argparse.ArgumentParser( - description='Generate audio and output paths') - parser.add_argument('audiospec', metavar='AUDIOSPEC', type=str, nargs=1, - help='filename for audiospec, formatted in json') + parser = argparse.ArgumentParser(description="Generate audio and output paths") + parser.add_argument( + "audiospec", + metavar="AUDIOSPEC", + type=str, + nargs=1, + help="filename for audiospec, formatted in json", + ) return parser def read_spec(fname): try: - if fname == '-': + if fname == "-": return json.load(sys.stdin) with open(fname) as buf: return json.load(buf) @@ -32,28 +37,28 @@ def read_spec(fname): def check_spec(spec): - if 'kind' not in spec: + if "kind" not in spec: yield "Missing field 'kind'" def audiogenerate(spec): - gen = get_audiogenerator(spec['kind']) + gen = get_audiogenerator(spec["kind"]) return tuple(gen(spec)) def main(): - '''Main function for the "larigira-audiogen" executable''' + """Main function for the "larigira-audiogen" executable""" args = get_parser().parse_args() spec = read_spec(args.audiospec[0]) errors = tuple(check_spec(spec)) if errors: log.error("Errors in audiospec") for err in errors: - sys.stderr.write('Error: {}\n'.format(err)) + sys.stderr.write("Error: {}\n".format(err)) sys.exit(1) for path in audiogenerate(spec): print(path) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/larigira/audiogen_http.py b/larigira/audiogen_http.py index b3efff6..3d35ccd 100644 --- a/larigira/audiogen_http.py +++ b/larigira/audiogen_http.py @@ -9,40 +9,39 @@ log = logging.getLogger(__name__) def put(url, destdir=None, copy=False): - if url.split(':')[0] not in ('http', 'https'): - log.warning('Not a valid URL: %s', url) + if url.split(":")[0] not in ("http", "https"): + log.warning("Not a valid URL: %s", url) return None - ext = url.split('.')[-1] - if ext.lower() not in ('mp3', 'ogg', 'oga', 'wma', 'm4a'): + ext = url.split(".")[-1] + if ext.lower() not in ("mp3", "ogg", "oga", "wma", "m4a"): log.warning('Invalid format (%s) for "%s"', ext, url) return None if not copy: return url fname = posixpath.basename(urlparse(url).path) # sanitize - fname = "".join(c for c in fname - if c.isalnum() or c in list('._-')).rstrip() - tmp = mkstemp(suffix='.' + ext, prefix='http-%s-' % fname, dir=destdir) + fname = "".join(c for c in fname if c.isalnum() or c in list("._-")).rstrip() + tmp = mkstemp(suffix="." + ext, prefix="http-%s-" % fname, dir=destdir) os.close(tmp[0]) log.info("downloading %s -> %s", url, tmp[1]) fname, headers = urllib.request.urlretrieve(url, tmp[1]) - return 'file://%s' % os.path.realpath(tmp[1]) + return "file://%s" % os.path.realpath(tmp[1]) def generate(spec): - ''' + """ resolves audiospec-static Recognized argument is "paths" (list of static paths) - ''' - if 'urls' not in spec: + """ + if "urls" not in spec: raise ValueError("Malformed audiospec: missing 'paths'") - for url in spec['urls']: + for url in spec["urls"]: ret = put(url, copy=True) if ret is None: continue yield ret -generate.description = 'Fetch audio from an URL' +generate.description = "Fetch audio from an URL" diff --git a/larigira/audiogen_mostrecent.py b/larigira/audiogen_mostrecent.py index 0c598aa..cefb41d 100644 --- a/larigira/audiogen_mostrecent.py +++ b/larigira/audiogen_mostrecent.py @@ -7,6 +7,7 @@ from tempfile import mkstemp from pytimeparse.timeparse import timeparse from larigira.fsutils import scan_dir, shortname + log = logging.getLogger(__name__) @@ -23,36 +24,40 @@ def recent_choose(paths, howmany, minepoch): if os.path.isfile(path): found_files[path] = get_mtime(path) elif os.path.isdir(path): - found_files.update({fname: get_mtime(fname) - for fname in scan_dir(path)}) - found_files = [(fname, mtime) - for (fname, mtime) in found_files.items() - if mtime >= minepoch] + found_files.update({fname: get_mtime(fname) for fname in scan_dir(path)}) + found_files = [ + (fname, mtime) for (fname, mtime) in found_files.items() if mtime >= minepoch + ] - return [fname for fname, mtime in - sorted(found_files, key=lambda x: x[1], reverse=True)[:howmany]] + return [ + fname + for fname, mtime in sorted(found_files, key=lambda x: x[1], reverse=True)[ + :howmany + ] + ] def generate(spec): - ''' + """ resolves audiospec-randomdir Recognized arguments: - path [mandatory] source dir - maxage [default=ignored] max age of audio files to pick - ''' - for attr in ('path', 'maxage'): + """ + for attr in ("path", "maxage"): if attr not in spec: raise ValueError("Malformed audiospec: missing '%s'" % attr) - if spec['maxage'].strip(): + if spec["maxage"].strip(): try: - maxage = int(spec['maxage']) + maxage = int(spec["maxage"]) except ValueError: - maxage = timeparse(spec['maxage']) + maxage = timeparse(spec["maxage"]) if maxage is None: - raise ValueError("Unknown format for maxage: '{}'" - .format(spec['maxage'])) + raise ValueError( + "Unknown format for maxage: '{}'".format(spec["maxage"]) + ) assert type(maxage) is int else: maxage = None @@ -60,15 +65,16 @@ def generate(spec): now = int(time.time()) minepoch = 0 if maxage is None else now - maxage - picked = recent_choose([spec['path']], 1, minepoch) + picked = recent_choose([spec["path"]], 1, minepoch) for path in picked: - tmp = mkstemp(suffix=os.path.splitext(path)[-1], - prefix='randomdir-%s-' % shortname(path)) + tmp = mkstemp( + suffix=os.path.splitext(path)[-1], prefix="randomdir-%s-" % shortname(path) + ) os.close(tmp[0]) shutil.copy(path, tmp[1]) log.info("copying %s -> %s", path, os.path.basename(tmp[1])) - yield 'file://{}'.format(tmp[1]) + yield "file://{}".format(tmp[1]) -generate.description = 'Select most recent file inside a directory' +generate.description = "Select most recent file inside a directory" diff --git a/larigira/audiogen_mpdrandom.py b/larigira/audiogen_mpdrandom.py index 3fea534..14eb9b3 100644 --- a/larigira/audiogen_mpdrandom.py +++ b/larigira/audiogen_mpdrandom.py @@ -1,5 +1,6 @@ import logging -log = logging.getLogger('mpdrandom') + +log = logging.getLogger("mpdrandom") import random from mpd import MPDClient @@ -8,17 +9,17 @@ from .config import get_conf def generate_by_artist(spec): - '''choose HOWMANY random artists, and for each one choose a random song''' - spec.setdefault('howmany', 1) - log.info('generating') + """choose HOWMANY random artists, and for each one choose a random song""" + spec.setdefault("howmany", 1) + log.info("generating") conf = get_conf() c = MPDClient(use_unicode=True) - c.connect(conf['MPD_HOST'], conf['MPD_PORT']) + c.connect(conf["MPD_HOST"], conf["MPD_PORT"]) - artists = c.list('artist') + artists = c.list("artist") log.debug("got %d artists", len(artists)) if not artists: raise ValueError("no artists in your mpd database") - for _ in range(spec['howmany']): + for _ in range(spec["howmany"]): artist = random.choice(artists) - yield random.choice(c.find('artist', artist))['file'] + yield random.choice(c.find("artist", artist))["file"] diff --git a/larigira/audiogen_randomdir.py b/larigira/audiogen_randomdir.py index effcd04..3bb67e1 100644 --- a/larigira/audiogen_randomdir.py +++ b/larigira/audiogen_randomdir.py @@ -6,6 +6,7 @@ from tempfile import mkstemp from pathlib import Path from larigira.fsutils import scan_dir_audio, shortname, is_audio + log = logging.getLogger(__name__) @@ -23,36 +24,37 @@ def candidates(paths): def generate(spec): - ''' + """ resolves audiospec-randomdir Recognized arguments: - paths [mandatory] list of source paths - howmany [default=1] number of audio files to pick - ''' - spec.setdefault('howmany', 1) - for attr in ('paths', ): + """ + spec.setdefault("howmany", 1) + for attr in ("paths",): if attr not in spec: raise ValueError("Malformed audiospec: missing '%s'" % attr) - found_files = candidates([Path(p) for p in spec['paths']]) + found_files = candidates([Path(p) for p in spec["paths"]]) - picked = random.sample(found_files, int(spec['howmany'])) + picked = random.sample(found_files, int(spec["howmany"])) - nick = spec.get('nick', '') + nick = spec.get("nick", "") if not nick: - if hasattr(spec, 'eid'): + if hasattr(spec, "eid"): nick = spec.eid else: - nick = 'NONICK' + nick = "NONICK" for path in picked: - tmp = mkstemp(suffix=os.path.splitext(path)[-1], - prefix='randomdir-%s-%s-' % (shortname(nick), - shortname(path))) + tmp = mkstemp( + suffix=os.path.splitext(path)[-1], + prefix="randomdir-%s-%s-" % (shortname(nick), shortname(path)), + ) os.close(tmp[0]) shutil.copy(path, tmp[1]) log.info("copying %s -> %s", path, os.path.basename(tmp[1])) yield "file://{}".format(tmp[1]) -generate.description = 'Picks random files from a specified directory' +generate.description = "Picks random files from a specified directory" diff --git a/larigira/audiogen_script.py b/larigira/audiogen_script.py index 8e078dd..7e05404 100644 --- a/larigira/audiogen_script.py +++ b/larigira/audiogen_script.py @@ -1,4 +1,4 @@ -''' +""" script audiogenerator: uses an external program to generate audio URIs a script can be any valid executable in @@ -14,64 +14,65 @@ The output MUST be UTF-8-encoded. Empty lines will be skipped. stderr will be logged, so please be careful. any non-zero exit code will result in no files being added.and an exception being logged. -''' +""" import logging import os import subprocess from .config import get_conf + log = logging.getLogger(__name__) def generate(spec): - ''' + """ Recognized arguments (fields in spec): - name [mandatory] script name - args [default=empty] arguments, colon-separated - ''' + """ conf = get_conf() - spec.setdefault('args', '') - if type(spec['args']) is str: - args = spec['args'].split(';') - args = list(spec['args']) - for attr in ('name', ): + spec.setdefault("args", "") + if type(spec["args"]) is str: + args = spec["args"].split(";") + args = list(spec["args"]) + for attr in ("name",): if attr not in spec: raise ValueError("Malformed audiospec: missing '%s'" % attr) - if '/' in spec['name']: - raise ValueError("Script name is a filename, not a path ({} provided)" - .format(spec['name'])) - scriptpath = os.path.join(conf['SCRIPTS_PATH'], spec['name']) + if "/" in spec["name"]: + raise ValueError( + "Script name is a filename, not a path ({} provided)".format(spec["name"]) + ) + scriptpath = os.path.join(conf["SCRIPTS_PATH"], spec["name"]) if not os.path.exists(scriptpath): - raise ValueError("Script %s not found", spec['name']) + raise ValueError("Script %s not found", spec["name"]) if not os.access(scriptpath, os.R_OK | os.X_OK): raise ValueError("Insufficient privileges for script %s" % scriptpath) if os.stat(scriptpath).st_uid != os.getuid(): - raise ValueError("Script %s owned by %d, should be owned by %d" - % (spec['name'], os.stat(scriptpath).st_uid, - os.getuid())) - try: - log.info('Going to run %s', [scriptpath] + args) - env = dict( - HOME=os.environ['HOME'], - PATH=os.environ['PATH'], - MPD_HOST=conf['MPD_HOST'], - MPD_PORT=str(conf['MPD_PORT']) + raise ValueError( + "Script %s owned by %d, should be owned by %d" + % (spec["name"], os.stat(scriptpath).st_uid, os.getuid()) ) - if 'TMPDIR' in os.environ: - env['TMPDIR'] = os.environ['TMPDIR'] - out = subprocess.check_output([scriptpath] + args, - env=env, - cwd='/') + try: + log.info("Going to run %s", [scriptpath] + args) + env = dict( + HOME=os.environ["HOME"], + PATH=os.environ["PATH"], + MPD_HOST=conf["MPD_HOST"], + MPD_PORT=str(conf["MPD_PORT"]), + ) + if "TMPDIR" in os.environ: + env["TMPDIR"] = os.environ["TMPDIR"] + out = subprocess.check_output([scriptpath] + args, env=env, cwd="/") except subprocess.CalledProcessError as exc: - log.error("Error %d when running script %s", - exc.returncode, spec['name']) + log.error("Error %d when running script %s", exc.returncode, spec["name"]) return [] - out = out.decode('utf-8') - out = [p for p in out.split('\n') if p] - logging.debug('Script %s produced %d files', spec['name'], len(out)) + out = out.decode("utf-8") + out = [p for p in out.split("\n") if p] + logging.debug("Script %s produced %d files", spec["name"], len(out)) return out -generate.description = 'Generate audio through an external script. ' \ -'Experts only.' + + +generate.description = "Generate audio through an external script. " "Experts only." diff --git a/larigira/audiogen_static.py b/larigira/audiogen_static.py index 8d801ff..37b084b 100644 --- a/larigira/audiogen_static.py +++ b/larigira/audiogen_static.py @@ -4,28 +4,30 @@ import shutil from tempfile import mkstemp from larigira.fsutils import shortname + log = logging.getLogger(__name__) def generate(spec): - ''' + """ resolves audiospec-static Recognized argument is "paths" (list of static paths) - ''' - if 'paths' not in spec: + """ + if "paths" not in spec: raise ValueError("Malformed audiospec: missing 'paths'") - for path in spec['paths']: + for path in spec["paths"]: if not os.path.exists(path): log.warning("Can't find requested path: %s", path) continue - tmp = mkstemp(suffix=os.path.splitext(path)[-1], - prefix='static-%s-' % shortname(path)) + tmp = mkstemp( + suffix=os.path.splitext(path)[-1], prefix="static-%s-" % shortname(path) + ) os.close(tmp[0]) log.info("copying %s -> %s", path, os.path.basename(tmp[1])) shutil.copy(path, tmp[1]) - yield 'file://{}'.format(tmp[1]) + yield "file://{}".format(tmp[1]) -generate.description = 'Picks always the same specified file' +generate.description = "Picks always the same specified file" diff --git a/larigira/db.py b/larigira/db.py index e60a458..6111856 100644 --- a/larigira/db.py +++ b/larigira/db.py @@ -1,5 +1,6 @@ from tinydb import TinyDB + class EventModel(object): def __init__(self, uri): self.uri = uri @@ -10,8 +11,8 @@ class EventModel(object): if self.db is not None: self.db.close() self.db = TinyDB(self.uri, indent=2) - self._actions = self.db.table('actions') - self._alarms = self.db.table('alarms') + self._actions = self.db.table("actions") + self._alarms = self.db.table("alarms") def get_action_by_id(self, action_id): return self._actions.get(eid=action_id) @@ -20,7 +21,7 @@ class EventModel(object): return self._alarms.get(eid=alarm_id) def get_actions_by_alarm(self, alarm): - for action_id in alarm.get('actions', []): + for action_id in alarm.get("actions", []): action = self.get_action_by_id(action_id) if action is None: continue @@ -39,7 +40,7 @@ class EventModel(object): def add_event(self, alarm, actions): action_ids = [self.add_action(a) for a in actions] - alarm['actions'] = action_ids + alarm["actions"] = action_ids return self._alarms.insert(alarm) def add_action(self, action): diff --git a/larigira/dbadmin/__init__.py b/larigira/dbadmin/__init__.py index 00b950c..1361235 100644 --- a/larigira/dbadmin/__init__.py +++ b/larigira/dbadmin/__init__.py @@ -1,14 +1,23 @@ -''' +""" This module contains a flask blueprint for db administration stuff Templates are self-contained in this directory -''' +""" from __future__ import print_function from datetime import datetime, timedelta, time from collections import defaultdict -from flask import current_app, Blueprint, render_template, jsonify, abort, \ - request, redirect, url_for, flash +from flask import ( + current_app, + Blueprint, + render_template, + jsonify, + abort, + request, + redirect, + url_for, + flash, +) from larigira.entrypoints_utils import get_avail_entrypoints from larigira.audiogen import get_audiogenerator @@ -17,56 +26,63 @@ from larigira.timegen import get_timegenerator, timegenerate from larigira import forms from larigira.config import get_conf from .suggestions import get_suggestions -db = Blueprint('db', __name__, - url_prefix=get_conf()['ROUTE_PREFIX'] + '/db', - template_folder='templates') + +db = Blueprint( + "db", + __name__, + url_prefix=get_conf()["ROUTE_PREFIX"] + "/db", + template_folder="templates", +) def request_wants_json(): - best = request.accept_mimetypes \ - .best_match(['application/json', 'text/html']) - return best == 'application/json' and \ - request.accept_mimetypes[best] > \ - request.accept_mimetypes['text/html'] + best = request.accept_mimetypes.best_match(["application/json", "text/html"]) + return ( + best == "application/json" + and request.accept_mimetypes[best] > request.accept_mimetypes["text/html"] + ) def get_model(): return current_app.larigira.controller.monitor.model -@db.route('/') +@db.route("/") def home(): - return render_template('dbadmin_base.html') + return render_template("dbadmin_base.html") -@db.route('/list') +@db.route("/list") def events_list(): model = current_app.larigira.controller.monitor.model alarms = tuple(model.get_all_alarms()) - events = [(alarm, model.get_actions_by_alarm(alarm)) - for alarm in alarms] - return render_template('list.html', events=events) + events = [(alarm, model.get_actions_by_alarm(alarm)) for alarm in alarms] + return render_template("list.html", events=events) -@db.route('/calendar') +@db.route("/calendar") def events_calendar(): model = current_app.larigira.controller.monitor.model today = datetime.now().date() maxdays = 30 # {date: {datetime: [(alarm1,actions1), (alarm2,actions2)]}} days = defaultdict(lambda: defaultdict(list)) - freq_threshold = get_conf()['UI_CALENDAR_FREQUENCY_THRESHOLD'] + freq_threshold = get_conf()["UI_CALENDAR_FREQUENCY_THRESHOLD"] for alarm in model.get_all_alarms(): - if freq_threshold and alarm['kind'] == 'frequency' and \ - FrequencyAlarm(alarm).interval < freq_threshold: + if ( + freq_threshold + and alarm["kind"] == "frequency" + and FrequencyAlarm(alarm).interval < freq_threshold + ): continue actions = tuple(model.get_actions_by_alarm(alarm)) if not actions: continue - t = datetime.fromtimestamp(int(today.strftime('%s'))) + t = datetime.fromtimestamp(int(today.strftime("%s"))) for t in timegenerate(alarm, now=t, howmany=maxdays): - if t is None or \ - t > datetime.combine(today+timedelta(days=maxdays), time()): + if t is None or t > datetime.combine( + today + timedelta(days=maxdays), time() + ): break days[t.date()][t].append((alarm, actions)) @@ -75,105 +91,101 @@ def events_calendar(): for d in sorted(days.keys()): weeks[d.isocalendar()[:2]].append(d) - return render_template('calendar.html', days=days, weeks=weeks) + return render_template("calendar.html", days=days, weeks=weeks) -@db.route('/add/time') +@db.route("/add/time") def addtime(): - kinds = get_avail_entrypoints('larigira.timeform_create') + kinds = get_avail_entrypoints("larigira.timeform_create") def gen_info(gen): - return dict(description=getattr(gen, 'description', '')) - info = {kind: gen_info(get_timegenerator(kind)) - for kind in kinds} - return render_template('add_time.html', kinds=kinds, info=info) + return dict(description=getattr(gen, "description", "")) + + info = {kind: gen_info(get_timegenerator(kind)) for kind in kinds} + return render_template("add_time.html", kinds=kinds, info=info) -@db.route('/edit/time/', methods=['GET', 'POST']) +@db.route("/edit/time/", methods=["GET", "POST"]) def edit_time(alarmid): model = get_model() timespec = model.get_alarm_by_id(alarmid) - kind = timespec['kind'] + kind = timespec["kind"] Form, receiver = tuple(forms.get_timeform(kind)) form = Form() - if request.method == 'GET': + if request.method == "GET": form.populate_from_timespec(timespec) - if request.method == 'POST' and form.validate(): + if request.method == "POST" and form.validate(): data = receiver(form) model.update_alarm(alarmid, data) model.reload() - return redirect(url_for('db.events_list', - _anchor='event-%d' % alarmid)) - return render_template('add_time_kind.html', - form=form, - kind=kind, - mode='edit', - alarmid=alarmid, - ) + return redirect(url_for("db.events_list", _anchor="event-%d" % alarmid)) + return render_template( + "add_time_kind.html", form=form, kind=kind, mode="edit", alarmid=alarmid + ) -@db.route('/add/time/', methods=['GET', 'POST']) +@db.route("/add/time/", methods=["GET", "POST"]) def addtime_kind(kind): Form, receiver = tuple(forms.get_timeform(kind)) form = Form() - if request.method == 'POST' and form.validate(): + if request.method == "POST" and form.validate(): data = receiver(form) eid = get_model().add_alarm(data) - return redirect(url_for('db.edit_event', alarmid=eid)) + return redirect(url_for("db.edit_event", alarmid=eid)) - return render_template('add_time_kind.html', - form=form, kind=kind, mode='add') + return render_template("add_time_kind.html", form=form, kind=kind, mode="add") -@db.route('/add/audio') +@db.route("/add/audio") def addaudio(): - kinds = get_avail_entrypoints('larigira.audioform_create') + kinds = get_avail_entrypoints("larigira.audioform_create") def gen_info(gen): - return dict(description=getattr(gen, 'description', '')) - info = {kind: gen_info(get_audiogenerator(kind)) - for kind in kinds} - return render_template('add_audio.html', kinds=kinds, info=info) + return dict(description=getattr(gen, "description", "")) + + info = {kind: gen_info(get_audiogenerator(kind)) for kind in kinds} + return render_template("add_audio.html", kinds=kinds, info=info) -@db.route('/add/audio/', methods=['GET', 'POST']) +@db.route("/add/audio/", methods=["GET", "POST"]) def addaudio_kind(kind): Form, receiver = tuple(forms.get_audioform(kind)) form = Form() - if request.method == 'POST' and form.validate(): + if request.method == "POST" and form.validate(): data = receiver(form) model = current_app.larigira.controller.monitor.model eid = model.add_action(data) return jsonify(dict(inserted=eid, data=data)) - return render_template('add_audio_kind.html', form=form, kind=kind, - suggestions=get_suggestions() - ) + return render_template( + "add_audio_kind.html", form=form, kind=kind, suggestions=get_suggestions() + ) -@db.route('/edit/audio/', methods=['GET', 'POST']) +@db.route("/edit/audio/", methods=["GET", "POST"]) def edit_audio(actionid): model = get_model() audiospec = model.get_action_by_id(actionid) - kind = audiospec['kind'] + kind = audiospec["kind"] Form, receiver = tuple(forms.get_audioform(kind)) form = Form() - if request.method == 'GET': + if request.method == "GET": form.populate_from_audiospec(audiospec) - if request.method == 'POST' and form.validate(): + if request.method == "POST" and form.validate(): data = receiver(form) model.update_action(actionid, data) model.reload() - return redirect(url_for('db.events_list')) - return render_template('add_audio_kind.html', - form=form, - kind=kind, - mode='edit', - suggestions=get_suggestions() - ) + return redirect(url_for("db.events_list")) + return render_template( + "add_audio_kind.html", + form=form, + kind=kind, + mode="edit", + suggestions=get_suggestions(), + ) -@db.route('/edit/event/') +@db.route("/edit/event/") def edit_event(alarmid): model = current_app.larigira.controller.monitor.model alarm = model.get_alarm_by_id(int(alarmid)) @@ -181,35 +193,37 @@ def edit_event(alarmid): abort(404) allactions = model.get_all_actions() actions = tuple(model.get_actions_by_alarm(alarm)) - return render_template('edit_event.html', - alarm=alarm, all_actions=allactions, - actions=actions, - routeprefix=get_conf()['ROUTE_PREFIX'] - ) + return render_template( + "edit_event.html", + alarm=alarm, + all_actions=allactions, + actions=actions, + routeprefix=get_conf()["ROUTE_PREFIX"], + ) -@db.route('/api/alarm//actions', methods=['POST']) +@db.route("/api/alarm//actions", methods=["POST"]) def change_actions(alarmid): - new_actions = request.form.getlist('actions[]') + new_actions = request.form.getlist("actions[]") if new_actions is None: new_actions = [] model = current_app.larigira.controller.monitor.model - ret = model.update_alarm(int(alarmid), - new_fields={'actions': [int(a) for a in - new_actions]}) + ret = model.update_alarm( + int(alarmid), new_fields={"actions": [int(a) for a in new_actions]} + ) return jsonify(dict(updated=alarmid, ret=ret)) -@db.route('/api/alarm//delete', methods=['POST']) +@db.route("/api/alarm//delete", methods=["POST"]) def delete_alarm(alarmid): model = current_app.larigira.controller.monitor.model try: alarm = model.get_alarm_by_id(int(alarmid)) - print(alarm['nick']) + print(alarm["nick"]) model.delete_alarm(alarmid) except KeyError: abort(404) if request_wants_json(): return jsonify(dict(deleted=alarmid)) - flash('Evento %d `%s` cancellato' % (alarmid, alarm['nick']) ) - return redirect(url_for('db.events_list')) + flash("Evento %d `%s` cancellato" % (alarmid, alarm["nick"])) + return redirect(url_for("db.events_list")) diff --git a/larigira/dbadmin/suggestions.py b/larigira/dbadmin/suggestions.py index e07b1c1..9f9e4c1 100644 --- a/larigira/dbadmin/suggestions.py +++ b/larigira/dbadmin/suggestions.py @@ -7,22 +7,23 @@ from larigira.fsutils import scan_dir_audio def get_suggested_files(): - if not get_conf()['FILE_PATH_SUGGESTION']: + if not get_conf()["FILE_PATH_SUGGESTION"]: return [] - if current_app.cache.has('dbadmin.get_suggested_files'): - return current_app.cache.get('dbadmin.get_suggested_files') - current_app.logger.debug('get_suggested_files MISS in cache') + if current_app.cache.has("dbadmin.get_suggested_files"): + return current_app.cache.get("dbadmin.get_suggested_files") + current_app.logger.debug("get_suggested_files MISS in cache") files = [] - for path in get_conf()['FILE_PATH_SUGGESTION']: + for path in get_conf()["FILE_PATH_SUGGESTION"]: if not os.path.isdir(path): - current_app.logger.warning('Invalid suggestion path: %s' % path) + current_app.logger.warning("Invalid suggestion path: %s" % path) continue pathfiles = scan_dir_audio(path) files.extend(pathfiles) - current_app.logger.debug('Suggested files: %s' % ', '.join(files)) + current_app.logger.debug("Suggested files: %s" % ", ".join(files)) - current_app.cache.set('dbadmin.get_suggested_files', files, - timeout=600) # ten minutes + current_app.cache.set( + "dbadmin.get_suggested_files", files, timeout=600 + ) # ten minutes return files @@ -40,11 +41,14 @@ def get_suggested_dirs(): def get_suggested_scripts(): - base = get_conf()['SCRIPTS_PATH'] + base = get_conf()["SCRIPTS_PATH"] if not base or not os.path.isdir(base): return [] - fnames = [f for f in os.listdir(base) - if os.access(os.path.join(base, f), os.R_OK | os.X_OK)] + fnames = [ + f + for f in os.listdir(base) + if os.access(os.path.join(base, f), os.R_OK | os.X_OK) + ] return fnames @@ -53,10 +57,4 @@ def get_suggestions(): if len(files) > 200: current_app.logger.warning("Too many suggested files, cropping") files = files[:200] - return dict( - files=files, - dirs=get_suggested_dirs(), - scripts=get_suggested_scripts(), - ) - - + return dict(files=files, dirs=get_suggested_dirs(), scripts=get_suggested_scripts()) diff --git a/larigira/entrypoints_utils.py b/larigira/entrypoints_utils.py index 3f97f3e..001c863 100644 --- a/larigira/entrypoints_utils.py +++ b/larigira/entrypoints_utils.py @@ -1,13 +1,14 @@ from logging import getLogger -log = getLogger('entrypoints_utils') + +log = getLogger("entrypoints_utils") from pkg_resources import iter_entry_points def get_one_entrypoint(group, kind): - '''Messes with entrypoints to return an entrypoint of a given group/kind''' + """Messes with entrypoints to return an entrypoint of a given group/kind""" points = tuple(iter_entry_points(group=group, name=kind)) if not points: - raise ValueError('cant find an entrypoint %s:%s' % (group, kind)) + raise ValueError("cant find an entrypoint %s:%s" % (group, kind)) if len(points) > 1: log.warning("Found more than one timeform for %s:%s", group, kind) return points[0].load() diff --git a/larigira/event.py b/larigira/event.py index 3912ec3..f8141fc 100644 --- a/larigira/event.py +++ b/larigira/event.py @@ -1,5 +1,6 @@ from __future__ import print_function from gevent import monkey + monkey.patch_all(subprocess=True) import logging from datetime import datetime, timedelta @@ -12,10 +13,11 @@ from .timegen import timegenerate from .audiogen import audiogenerate from .db import EventModel -logging.getLogger('mpd').setLevel(logging.WARNING) +logging.getLogger("mpd").setLevel(logging.WARNING) + class Monitor(ParentedLet): - ''' + """ Manages timegenerators and audiogenerators for DB events The mechanism is partially based on ticks, partially on scheduled actions. @@ -30,23 +32,25 @@ class Monitor(ParentedLet): The scheduling mechanism allows for more precision, catching exactly the right time. Being accurate only with ticks would have required very frequent ticks, which is cpu-intensive. - ''' + """ + def __init__(self, parent_queue, conf): ParentedLet.__init__(self, parent_queue) self.log = logging.getLogger(self.__class__.__name__) self.running = {} self.conf = conf self.q = Queue() - self.model = EventModel(self.conf['DB_URI']) - self.ticker = Timer(int(self.conf['EVENT_TICK_SECS']) * 1000, self.q) + self.model = EventModel(self.conf["DB_URI"]) + self.ticker = Timer(int(self.conf["EVENT_TICK_SECS"]) * 1000, self.q) def _alarm_missing_time(self, timespec): - now = datetime.now() + timedelta(seconds=self.conf['CACHING_TIME']) + now = datetime.now() + timedelta(seconds=self.conf["CACHING_TIME"]) try: when = next(timegenerate(timespec, now=now)) except: - logging.exception("Could not generate " - "an alarm from timespec %s", timespec) + logging.exception( + "Could not generate " "an alarm from timespec %s", timespec + ) if when is None: # expired return None @@ -55,12 +59,12 @@ class Monitor(ParentedLet): return delta def on_tick(self): - ''' + """ this is called every EVENT_TICK_SECS. Checks every event in the DB (which might be slightly CPU-intensive, so it is advisable to run it in its own greenlet); if the event is "near enough", schedule it; if it is too far, or already expired, ignore it. - ''' + """ self.model.reload() for alarm in self.model.get_all_alarms(): actions = list(self.model.get_actions_by_alarm(alarm)) @@ -71,75 +75,84 @@ class Monitor(ParentedLet): # but it is "tricky"; any small delay would cause the event to be # missed if delta is None: - self.log.debug('Skipping event %s: will never ring', - alarm.get('nick', alarm.eid)) - elif delta <= 2*self.conf['EVENT_TICK_SECS']: - self.log.debug('Scheduling event %s (%ds) => %s', - alarm.get('nick', alarm.eid), - delta, - [a.get('nick', a.eid) for a in actions] - ) + self.log.debug( + "Skipping event %s: will never ring", alarm.get("nick", alarm.eid) + ) + elif delta <= 2 * self.conf["EVENT_TICK_SECS"]: + self.log.debug( + "Scheduling event %s (%ds) => %s", + alarm.get("nick", alarm.eid), + delta, + [a.get("nick", a.eid) for a in actions], + ) self.schedule(alarm, actions, delta) else: - self.log.debug('Skipping event %s too far (%ds)', - alarm.get('nick', alarm.eid), - delta, - ) + self.log.debug( + "Skipping event %s too far (%ds)", + alarm.get("nick", alarm.eid), + delta, + ) def schedule(self, timespec, audiospecs, delta=None): - ''' + """ prepare an event to be run at a specified time with the specified actions; the DB won't be read anymore after this call. This means that this call should not be done too early, or any update to the DB will be ignored. - ''' + """ if delta is None: delta = self._alarm_missing_time(timespec) - audiogen = gevent.spawn_later(delta, - self.process_action, - timespec, audiospecs) + audiogen = gevent.spawn_later(delta, self.process_action, timespec, audiospecs) audiogen.parent_greenlet = self audiogen.doc = 'Will wait {} seconds, then generate audio "{}"'.format( - delta, - ','.join(aspec.get('nick', '') for aspec in audiospecs), + delta, ",".join(aspec.get("nick", "") for aspec in audiospecs) ) self.running[timespec.eid] = { - 'greenlet': audiogen, - 'running_time': datetime.now() + timedelta(seconds=delta), - 'timespec': timespec, - 'audiospecs': audiospecs + "greenlet": audiogen, + "running_time": datetime.now() + timedelta(seconds=delta), + "timespec": timespec, + "audiospecs": audiospecs, } def process_action(self, timespec, audiospecs): - '''Generate audio and submit it to Controller''' + """Generate audio and submit it to Controller""" if timespec.eid in self.running: del self.running[timespec.eid] else: - self.log.warning('Timespec %s completed but not in running ' - 'registry; this is most likely a bug', - timespec.get('nick', timespec.eid)) + self.log.warning( + "Timespec %s completed but not in running " + "registry; this is most likely a bug", + timespec.get("nick", timespec.eid), + ) uris = [] for audiospec in audiospecs: try: uris.extend(audiogenerate(audiospec)) except Exception as exc: - self.log.error('audiogenerate for <%s> failed; reason: %s', - str(audiospec), str(exc)) - self.send_to_parent('uris_enqueue', - dict(uris=uris, - timespec=timespec, - audiospecs=audiospecs, - aids=[a.eid for a in audiospecs])) + self.log.error( + "audiogenerate for <%s> failed; reason: %s", + str(audiospec), + str(exc), + ) + self.send_to_parent( + "uris_enqueue", + dict( + uris=uris, + timespec=timespec, + audiospecs=audiospecs, + aids=[a.eid for a in audiospecs], + ), + ) def _run(self): self.ticker.start() gevent.spawn(self.on_tick) while True: value = self.q.get() - kind = value['kind'] - if kind in ('forcetick', 'timer'): + kind = value["kind"] + if kind in ("forcetick", "timer"): gevent.spawn(self.on_tick) else: self.log.warning("Unknown message: %s", str(value)) diff --git a/larigira/event_manage.py b/larigira/event_manage.py index be54621..cd49b68 100644 --- a/larigira/event_manage.py +++ b/larigira/event_manage.py @@ -14,25 +14,28 @@ def main_list(args): def main_add(args): m = EventModel(args.file) - m.add_event(dict(kind='frequency', interval=args.interval, start=1), - [dict(kind='mpd', howmany=1)] - ) + m.add_event( + dict(kind="frequency", interval=args.interval, start=1), + [dict(kind="mpd", howmany=1)], + ) def main(): conf = get_conf() p = argparse.ArgumentParser() - p.add_argument('-f', '--file', help="Filepath for DB", required=False, - default=conf['DB_URI']) + p.add_argument( + "-f", "--file", help="Filepath for DB", required=False, default=conf["DB_URI"] + ) sub = p.add_subparsers() - sub_list = sub.add_parser('list') + sub_list = sub.add_parser("list") sub_list.set_defaults(func=main_list) - sub_add = sub.add_parser('add') - sub_add.add_argument('--interval', type=int, default=3600) + sub_add = sub.add_parser("add") + sub_add.add_argument("--interval", type=int, default=3600) sub_add.set_defaults(func=main_add) args = p.parse_args() args.func(args) -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/larigira/eventutils.py b/larigira/eventutils.py index 1f7d472..9526b95 100644 --- a/larigira/eventutils.py +++ b/larigira/eventutils.py @@ -2,13 +2,14 @@ import gevent class ParentedLet(gevent.Greenlet): - ''' + """ ParentedLet is just a helper subclass that will help you when your greenlet main duty is to "signal" things to a parent_queue. It won't save you much code, but "standardize" messages and make explicit the role of that greenlet - ''' + """ + def __init__(self, queue): gevent.Greenlet.__init__(self) self.parent_queue = queue @@ -17,36 +18,38 @@ class ParentedLet(gevent.Greenlet): def parent_msg(self, kind, *args): return { - 'emitter': self, - 'class': self.__class__.__name__, - 'tracker': self.tracker, - 'kind': kind, - 'args': args + "emitter": self, + "class": self.__class__.__name__, + "tracker": self.tracker, + "kind": kind, + "args": args, } def send_to_parent(self, kind, *args): self.parent_queue.put(self.parent_msg(kind, *args)) def _run(self): - if not hasattr(self, 'do_business'): - raise Exception("do_business method not implemented by %s" % - self.__class__.__name__) + if not hasattr(self, "do_business"): + raise Exception( + "do_business method not implemented by %s" % self.__class__.__name__ + ) for msg in self.do_business(): self.send_to_parent(*msg) class Timer(ParentedLet): - '''continously sleeps some time, then send a "timer" message to parent''' + """continously sleeps some time, then send a "timer" message to parent""" + def __init__(self, milliseconds, queue): ParentedLet.__init__(self, queue) self.ms = milliseconds def parent_msg(self, kind, *args): msg = ParentedLet.parent_msg(self, kind, *args) - msg['period'] = self.ms + msg["period"] = self.ms return msg def do_business(self): while True: gevent.sleep(self.ms / 1000.0) - yield ('timer', ) + yield ("timer",) diff --git a/larigira/filters/basic.py b/larigira/filters/basic.py index a2b9bd5..5c1ac62 100644 --- a/larigira/filters/basic.py +++ b/larigira/filters/basic.py @@ -2,22 +2,22 @@ import wave def maxwait(songs, context, conf): - wait = int(conf.get('EF_MAXWAIT_SEC', 0)) + wait = int(conf.get("EF_MAXWAIT_SEC", 0)) if wait == 0: return True - if 'time' not in context['status']: - return True, 'no song playing?' - curpos, duration = map(int, context['status']['time'].split(':')) + if "time" not in context["status"]: + return True, "no song playing?" + curpos, duration = map(int, context["status"]["time"].split(":")) remaining = duration - curpos if remaining > wait: - return False, 'remaining %d max allowed %d' % (remaining, wait) + return False, "remaining %d max allowed %d" % (remaining, wait) return True def get_duration(path): - '''get track duration in seconds''' - if path.lower().endswith('.wav'): - with wave.open(path, 'r') as f: + """get track duration in seconds""" + if path.lower().endswith(".wav"): + with wave.open(path, "r") as f: frames = f.getnframes() rate = f.getframerate() duration = frames / rate @@ -34,7 +34,7 @@ def get_duration(path): def percentwait(songs, context, conf, getdur=get_duration): - ''' + """ Similar to maxwait, but the maximum waiting time is proportional to the duration of the audio we're going to add. @@ -46,25 +46,25 @@ def percentwait(songs, context, conf, getdur=get_duration): are adding a jingle of 40seconds, then if EF_MAXWAIT_PERC==200 the audio will be added (40s*200% = 1m20s) while if EF_MAXWAIT_PERC==100 it will be filtered out. - ''' - percentwait = int(conf.get('EF_MAXWAIT_PERC', 0)) + """ + percentwait = int(conf.get("EF_MAXWAIT_PERC", 0)) if percentwait == 0: return True - if 'time' not in context['status']: - return True, 'no song playing?' - curpos, duration = map(int, context['status']['time'].split(':')) + if "time" not in context["status"]: + return True, "no song playing?" + curpos, duration = map(int, context["status"]["time"].split(":")) remaining = duration - curpos eventduration = 0 - for uri in songs['uris']: - if not uri.startswith('file://'): - return True, '%s is not a file' % uri - path = uri[len('file://'):] # strips file:// + for uri in songs["uris"]: + if not uri.startswith("file://"): + return True, "%s is not a file" % uri + path = uri[len("file://") :] # strips file:// songduration = getdur(path) if songduration is None: continue eventduration += songduration - wait = eventduration * (percentwait/100.) + wait = eventduration * (percentwait / 100.0) if remaining > wait: - return False, 'remaining %d max allowed %d' % (remaining, wait) + return False, "remaining %d max allowed %d" % (remaining, wait) return True diff --git a/larigira/filters/tests/test_basic.py b/larigira/filters/tests/test_basic.py index 12c86d5..7ee8228 100644 --- a/larigira/filters/tests/test_basic.py +++ b/larigira/filters/tests/test_basic.py @@ -9,17 +9,26 @@ def matchval(d): if k in input_: # string matching return d[k] raise Exception("This test case is bugged! No value for %s" % input_) + return mocked -durations = dict(one=60, two=120, three=180, four=240, ten=600, twenty=1200, - thirty=1800, nonexist=None) +durations = dict( + one=60, + two=120, + three=180, + four=240, + ten=600, + twenty=1200, + thirty=1800, + nonexist=None, +) dur = matchval(durations) def normalize_ret(ret): if type(ret) is bool: - return ret, '' + return ret, "" return ret @@ -28,7 +37,7 @@ def mw(*args, **kwargs): def pw(*args, **kwargs): - kwargs['getdur'] = dur + kwargs["getdur"] = dur return normalize_ret(percentwait(*args, **kwargs)) @@ -38,26 +47,26 @@ def test_maxwait_nonpresent_disabled(): def test_maxwait_explicitly_disabled(): - ret = mw([], {}, {'EF_MAXWAIT_SEC': 0}) + ret = mw([], {}, {"EF_MAXWAIT_SEC": 0}) assert ret[0] is True def test_maxwait_ok(): - ret = mw([], {'status': {'time': '250:300'}}, {'EF_MAXWAIT_SEC': 100}) + ret = mw([], {"status": {"time": "250:300"}}, {"EF_MAXWAIT_SEC": 100}) assert ret[0] is True def test_maxwait_exceeded(): - ret = mw([], {'status': {'time': '100:300'}}, {'EF_MAXWAIT_SEC': 100}) + ret = mw([], {"status": {"time": "100:300"}}, {"EF_MAXWAIT_SEC": 100}) assert ret[0] is False def test_maxwait_limit(): - ret = mw([], {'status': {'time': '199:300'}}, {'EF_MAXWAIT_SEC': 100}) + ret = mw([], {"status": {"time": "199:300"}}, {"EF_MAXWAIT_SEC": 100}) assert ret[0] is False - ret = mw([], {'status': {'time': '200:300'}}, {'EF_MAXWAIT_SEC': 100}) + ret = mw([], {"status": {"time": "200:300"}}, {"EF_MAXWAIT_SEC": 100}) assert ret[0] is True - ret = mw([], {'status': {'time': '201:300'}}, {'EF_MAXWAIT_SEC': 100}) + ret = mw([], {"status": {"time": "201:300"}}, {"EF_MAXWAIT_SEC": 100}) assert ret[0] is True @@ -67,32 +76,40 @@ def test_percentwait_nonpresent_disabled(): def test_percentwait_explicitly_disabled(): - ret = pw([], {}, {'EF_MAXWAIT_PERC': 0}) + ret = pw([], {}, {"EF_MAXWAIT_PERC": 0}) assert ret[0] is True def test_percentwait_ok(): # less than one minute missing - ret = pw(dict(uris=['file:///oneminute.ogg']), - {'status': {'time': '250:300'}}, - {'EF_MAXWAIT_PERC': 100}) + ret = pw( + dict(uris=["file:///oneminute.ogg"]), + {"status": {"time": "250:300"}}, + {"EF_MAXWAIT_PERC": 100}, + ) assert ret[0] is True # more than one minute missing - ret = pw(dict(uris=['file:///oneminute.ogg']), - {'status': {'time': '220:300'}}, - {'EF_MAXWAIT_PERC': 100}) + ret = pw( + dict(uris=["file:///oneminute.ogg"]), + {"status": {"time": "220:300"}}, + {"EF_MAXWAIT_PERC": 100}, + ) assert ret[0] is False def test_percentwait_morethan100(): # requiring 5*10 = 50mins = 3000sec - ret = pw(dict(uris=['file:///tenminute.ogg']), - {'status': {'time': '4800:6000'}}, - {'EF_MAXWAIT_PERC': 500}) + ret = pw( + dict(uris=["file:///tenminute.ogg"]), + {"status": {"time": "4800:6000"}}, + {"EF_MAXWAIT_PERC": 500}, + ) assert ret[0] is True - ret = pw(dict(uris=['file:///oneminute.ogg']), - {'status': {'time': '2000:6000'}}, - {'EF_MAXWAIT_PERC': 500}) + ret = pw( + dict(uris=["file:///oneminute.ogg"]), + {"status": {"time": "2000:6000"}}, + {"EF_MAXWAIT_PERC": 500}, + ) assert ret[0] is False diff --git a/larigira/forms.py b/larigira/forms.py index 8e8c54f..c75f6b3 100644 --- a/larigira/forms.py +++ b/larigira/forms.py @@ -1,15 +1,16 @@ from logging import getLogger -log = getLogger('timeform') + +log = getLogger("timeform") from .entrypoints_utils import get_one_entrypoint def get_timeform(kind): - '''Messes with entrypoints to return a TimeForm''' - for group in ('larigira.timeform_create', 'larigira.timeform_receive'): + """Messes with entrypoints to return a TimeForm""" + for group in ("larigira.timeform_create", "larigira.timeform_receive"): yield get_one_entrypoint(group, kind) def get_audioform(kind): - '''Messes with entrypoints to return a AudioForm''' - for group in ('larigira.audioform_create', 'larigira.audioform_receive'): + """Messes with entrypoints to return a AudioForm""" + for group in ("larigira.audioform_create", "larigira.audioform_receive"): yield get_one_entrypoint(group, kind) diff --git a/larigira/formutils.py b/larigira/formutils.py index f80dce6..f3a6f0a 100644 --- a/larigira/formutils.py +++ b/larigira/formutils.py @@ -11,17 +11,16 @@ log = logging.getLogger(__name__) class AutocompleteTextInput(wtforms.widgets.Input): def __init__(self, datalist=None): - super().__init__('text') + super().__init__("text") self.datalist = datalist def __call__(self, field, **kwargs): # every second can be specified if self.datalist is not None: return super(AutocompleteTextInput, self).__call__( - field, list=self.datalist, autocomplete="autocomplete", - **kwargs) - return super(AutocompleteTextInput, self).__call__( - field, **kwargs) + field, list=self.datalist, autocomplete="autocomplete", **kwargs + ) + return super(AutocompleteTextInput, self).__call__(field, **kwargs) class AutocompleteStringField(StringField): @@ -31,15 +30,15 @@ class AutocompleteStringField(StringField): class DateTimeInput(wtforms.widgets.Input): - input_type = 'datetime-local' + input_type = "datetime-local" def __call__(self, field, **kwargs): # every second can be specified - return super(DateTimeInput, self).__call__(field, step='1', **kwargs) + return super(DateTimeInput, self).__call__(field, step="1", **kwargs) class EasyDateTimeField(Field): - ''' + """ a "fork" of DateTimeField which uses HTML5 datetime-local The format is not customizable, because it is imposed by the HTML5 @@ -47,27 +46,28 @@ class EasyDateTimeField(Field): This field does not ensure that browser actually supports datetime-local input type, nor does it provide polyfills. - ''' + """ + widget = DateTimeInput() - formats = ('%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M') + formats = ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M") def __init__(self, label=None, validators=None, **kwargs): super(EasyDateTimeField, self).__init__(label, validators, **kwargs) def _value(self): if self.raw_data: - return ' '.join(self.raw_data) - return self.data and self.data.strftime(self.formats[0]) or '' + return " ".join(self.raw_data) + return self.data and self.data.strftime(self.formats[0]) or "" def process_formdata(self, valuelist): if valuelist: - date_str = ' '.join(valuelist) + date_str = " ".join(valuelist) for fmt in self.formats: try: self.data = datetime.strptime(date_str, fmt) return except ValueError: - log.debug('Format `%s` not valid for `%s`', - fmt, date_str) - raise ValueError(self.gettext( - 'Not a valid datetime value {}').format(date_str)) + log.debug("Format `%s` not valid for `%s`", fmt, date_str) + raise ValueError( + self.gettext("Not a valid datetime value {}").format(date_str) + ) diff --git a/larigira/fsutils.py b/larigira/fsutils.py index 2d5e9ab..9eae11c 100644 --- a/larigira/fsutils.py +++ b/larigira/fsutils.py @@ -5,7 +5,7 @@ import mimetypes def scan_dir(dirname, extension=None): if extension is None: - extension = '*' + extension = "*" for root, dirnames, filenames in os.walk(dirname): for fname in fnmatch.filter(filenames, extension): yield os.path.join(root, fname) @@ -13,7 +13,7 @@ def scan_dir(dirname, extension=None): def multi_fnmatch(fname, extensions): for ext in extensions: - if fnmatch.fnmatch(fname, '*.' + ext): + if fnmatch.fnmatch(fname, "*." + ext): return True return False @@ -22,10 +22,10 @@ def is_audio(fname): mimetype = mimetypes.guess_type(fname)[0] if mimetype is None: return False - return mimetype.split('/')[0] == 'audio' + return mimetype.split("/")[0] == "audio" -def scan_dir_audio(dirname, extensions=('mp3', 'oga', 'wav', 'ogg')): +def scan_dir_audio(dirname, extensions=("mp3", "oga", "wav", "ogg")): for root, dirnames, filenames in os.walk(dirname): for fname in filenames: if is_audio(fname): @@ -34,6 +34,6 @@ def scan_dir_audio(dirname, extensions=('mp3', 'oga', 'wav', 'ogg')): def shortname(path): name = os.path.basename(path) # filename - name = name.rsplit('.', 1)[0] # no extension - name = ''.join(c for c in name if c.isalnum()) # no strange chars + name = name.rsplit(".", 1)[0] # no extension + name = "".join(c for c in name if c.isalnum()) # no strange chars return name diff --git a/larigira/mpc.py b/larigira/mpc.py index cb56dc6..456081c 100644 --- a/larigira/mpc.py +++ b/larigira/mpc.py @@ -16,15 +16,16 @@ from .entrypoints_utils import get_avail_entrypoints def get_mpd_client(conf): client = mpd.MPDClient(use_unicode=True) - client.connect(conf['MPD_HOST'], conf['MPD_PORT']) + client.connect(conf["MPD_HOST"], conf["MPD_PORT"]) return client class MPDWatcher(ParentedLet): - ''' + """ MPDWatcher notifies parent about any mpd event - ''' + """ + def __init__(self, queue, conf, client=None): ParentedLet.__init__(self, queue) self.log = logging.getLogger(self.__class__.__name__) @@ -41,46 +42,51 @@ class MPDWatcher(ParentedLet): if self.client is None: self.refresh_client() if first_after_connection: - yield('mpc', 'connect') + yield ("mpc", "connect") status = self.client.idle()[0] - except (mpd.ConnectionError, ConnectionRefusedError, - FileNotFoundError) as exc: - self.log.warning('Connection to MPD failed (%s: %s)', - exc.__class__.__name__, exc) + except ( + mpd.ConnectionError, + ConnectionRefusedError, + FileNotFoundError, + ) as exc: + self.log.warning( + "Connection to MPD failed (%s: %s)", exc.__class__.__name__, exc + ) self.client = None first_after_connection = True gevent.sleep(5) continue else: first_after_connection = False - yield ('mpc', status) + yield ("mpc", status) class Player: - ''' + """ The player contains different mpd-related methods check_playlist determines whether the playlist is long enough and run audiogenerator accordingly enqueue receive audios that have been generated by Monitor and (if filters allow it) enqueue it to MPD playlist - ''' + """ + def __init__(self, conf): self.conf = conf self.log = logging.getLogger(self.__class__.__name__) self.min_playlist_length = 10 self.tmpcleaner = UnusedCleaner(conf) - self._continous_audiospec = self.conf['CONTINOUS_AUDIOSPEC'] + self._continous_audiospec = self.conf["CONTINOUS_AUDIOSPEC"] self.events_enabled = True def _get_mpd(self): mpd_client = mpd.MPDClient(use_unicode=True) try: - mpd_client.connect(self.conf['MPD_HOST'], self.conf['MPD_PORT']) - except (mpd.ConnectionError, ConnectionRefusedError, - FileNotFoundError) as exc: - self.log.warning('Connection to MPD failed (%s: %s)', - exc.__class__.__name__, exc) + mpd_client.connect(self.conf["MPD_HOST"], self.conf["MPD_PORT"]) + except (mpd.ConnectionError, ConnectionRefusedError, FileNotFoundError) as exc: + self.log.warning( + "Connection to MPD failed (%s: %s)", exc.__class__.__name__, exc + ) raise gevent.GreenletExit() return mpd_client @@ -90,16 +96,17 @@ class Player: @continous_audiospec.setter def continous_audiospec(self, spec): - self._continous_audiospec = self.conf['CONTINOUS_AUDIOSPEC'] \ - if spec is None else spec + self._continous_audiospec = ( + self.conf["CONTINOUS_AUDIOSPEC"] if spec is None else spec + ) def clear_everything_but_current_song(): mpdc = self._get_mpd() current = mpdc.currentsong() - pos = int(current.get('pos', 0)) + pos = int(current.get("pos", 0)) for song in mpdc.playlistid(): - if int(song['pos']) != pos: - mpdc.deleteid(song['id']) + if int(song["pos"]) != pos: + mpdc.deleteid(song["id"]) gevent.Greenlet.spawn(clear_everything_but_current_song) @@ -107,12 +114,11 @@ class Player: mpd_client = self._get_mpd() songs = mpd_client.playlist() current = mpd_client.currentsong() - pos = int(current.get('pos', 0)) + 1 + pos = int(current.get("pos", 0)) + 1 if (len(songs) - pos) >= self.min_playlist_length: return - self.log.info('need to add new songs') - picker = gevent.Greenlet(audiogenerate, - self.continous_audiospec) + self.log.info("need to add new songs") + picker = gevent.Greenlet(audiogenerate, self.continous_audiospec) def add(greenlet): uris = greenlet.value @@ -120,69 +126,71 @@ class Player: assert type(uri) is str, type(uri) self.tmpcleaner.watch(uri.strip()) mpd_client.add(uri.strip()) + picker.link_value(add) picker.start() def enqueue_filter(self, songs): - eventfilters = self.conf['EVENT_FILTERS'] + eventfilters = self.conf["EVENT_FILTERS"] if not eventfilters: - return True, '' - availfilters = get_avail_entrypoints('larigira.eventfilter') + return True, "" + availfilters = get_avail_entrypoints("larigira.eventfilter") if len([ef for ef in eventfilters if ef in availfilters]) == 0: - return True, '' + return True, "" mpdc = self._get_mpd() status = mpdc.status() - ctx = { - 'playlist': mpdc.playlist(), - 'status': status, - 'durations': [] - } - for entrypoint in iter_entry_points('larigira.eventfilter'): + ctx = {"playlist": mpdc.playlist(), "status": status, "durations": []} + for entrypoint in iter_entry_points("larigira.eventfilter"): if entrypoint.name in eventfilters: ef = entrypoint.load() try: ret = ef(songs=songs, context=ctx, conf=self.conf) except ImportError as exc: - self.log.warn("Filter %s skipped: %s" % (entrypoint.name, - exc)) + self.log.warn("Filter %s skipped: %s" % (entrypoint.name, exc)) continue if ret is None: # bad behavior! continue if type(ret) is bool: - reason = '' + reason = "" else: ret, reason = ret - reason = 'Filtered by %s (%s)' % (entrypoint.name, reason) + reason = "Filtered by %s (%s)" % (entrypoint.name, reason) if ret is False: return ret, reason - return True, 'Passed through %s' % ','.join(availfilters) + return True, "Passed through %s" % ",".join(availfilters) def enqueue(self, songs): assert type(songs) is dict - assert 'uris' in songs - spec = [aspec.get('nick', aspec.eid) for aspec in songs['audiospecs']] - nicks = ','.join((aspec.get('nick', aspec.eid) - for aspec in songs['audiospecs'])) + assert "uris" in songs + spec = [aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"]] + nicks = ",".join( + (aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"]) + ) if not self.events_enabled: - self.log.debug('Ignoring <%s> (events disabled)', nicks - ) + self.log.debug("Ignoring <%s> (events disabled)", nicks) return filterok, reason = self.enqueue_filter(songs) if not filterok: - self.log.debug('Ignoring <%s>, filtered: %s', nicks, reason) + self.log.debug("Ignoring <%s>, filtered: %s", nicks, reason) # delete those files - for uri in reversed(songs['uris']): + for uri in reversed(songs["uris"]): self.tmpcleaner.watch(uri.strip()) return mpd_client = self._get_mpd() - for uri in reversed(songs['uris']): + for uri in reversed(songs["uris"]): assert type(uri) is str - self.log.info('Adding %s to playlist (from <%s>:%s=%s)', - uri, - songs['timespec'].get('nick', ''), - songs['aids'], spec) - insert_pos = 0 if len(mpd_client.playlistid()) == 0 else \ - int(mpd_client.currentsong().get('pos', 0)) + 1 + self.log.info( + "Adding %s to playlist (from <%s>:%s=%s)", + uri, + songs["timespec"].get("nick", ""), + songs["aids"], + spec, + ) + insert_pos = ( + 0 + if len(mpd_client.playlistid()) == 0 + else int(mpd_client.currentsong().get("pos", 0)) + 1 + ) try: mpd_client.addid(uri, insert_pos) except mpd.CommandError: @@ -197,7 +205,7 @@ class Controller(gevent.Greenlet): self.conf = conf self.q = Queue() self.player = Player(self.conf) - if 'DB_URI' in self.conf: + if "DB_URI" in self.conf: self.monitor = Monitor(self.q, self.conf) self.monitor.parent_greenlet = self else: @@ -209,28 +217,28 @@ class Controller(gevent.Greenlet): mw = MPDWatcher(self.q, self.conf, client=None) mw.parent_greenlet = self mw.start() - t = Timer(int(self.conf['CHECK_SECS']) * 1000, self.q) + t = Timer(int(self.conf["CHECK_SECS"]) * 1000, self.q) t.parent_greenlet = self t.start() # at the very start, run a check! gevent.Greenlet.spawn(self.player.check_playlist) while True: value = self.q.get() - self.log.debug('<- %s', str(value)) + self.log.debug("<- %s", str(value)) # emitter = value['emitter'] - kind = value['kind'] - args = value['args'] - if kind == 'timer' or (kind == 'mpc' and - args[0] in ('player', 'playlist', - 'connect')): + kind = value["kind"] + args = value["args"] + if kind == "timer" or ( + kind == "mpc" and args[0] in ("player", "playlist", "connect") + ): gevent.Greenlet.spawn(self.player.check_playlist) try: self.player.tmpcleaner.check_playlist() except: pass - elif kind == 'mpc': + elif kind == "mpc": pass - elif kind == 'uris_enqueue': + elif kind == "uris_enqueue": # TODO: uris_enqueue messages should be delivered directly to Player.enqueue # probably we need a MPDEnqueuer that receives every uri we want to add try: @@ -238,13 +246,13 @@ class Controller(gevent.Greenlet): except AssertionError: raise except Exception: - self.log.exception("Error while adding to queue; " - "bad audiogen output?") - elif (kind == 'signal' and args[0] == signal.SIGALRM) or \ - kind == 'refresh': + self.log.exception( + "Error while adding to queue; " "bad audiogen output?" + ) + elif (kind == "signal" and args[0] == signal.SIGALRM) or kind == "refresh": # it's a tick! self.log.debug("Reload") - self.monitor.q.put(dict(kind='forcetick')) + self.monitor.q.put(dict(kind="forcetick")) gevent.Greenlet.spawn(self.player.check_playlist) else: self.log.warning("Unknown message: %s", str(value)) diff --git a/larigira/test_unused.py b/larigira/test_unused.py index b08a70e..afd2c3a 100644 --- a/larigira/test_unused.py +++ b/larigira/test_unused.py @@ -6,26 +6,27 @@ from .config import get_conf @pytest.fixture def unusedcleaner(): - return UnusedCleaner(get_conf(prefix='LARIGIRATEST_')) + return UnusedCleaner(get_conf(prefix="LARIGIRATEST_")) # this test suite heavily assumes that TMPDIR == /tmp/, which is the default # indeed. However, the code does not rely on this assumption. + def test_watch_file(unusedcleaner): # despite not existing, the file is added - unusedcleaner.watch('file:///tmp/gnam') + unusedcleaner.watch("file:///tmp/gnam") assert len(unusedcleaner.waiting_removal_files) == 1 - assert list(unusedcleaner.waiting_removal_files)[0] == '/tmp/gnam' + assert list(unusedcleaner.waiting_removal_files)[0] == "/tmp/gnam" def test_watch_path_error(unusedcleaner): - '''paths are not valid thing to watch. URIs only, thanks''' - unusedcleaner.watch('/tmp/foo') + """paths are not valid thing to watch. URIs only, thanks""" + unusedcleaner.watch("/tmp/foo") assert len(unusedcleaner.waiting_removal_files) == 0 def test_watch_notmp_error(unusedcleaner): - '''Files not in TMPDIR are not added''' - unusedcleaner.watch('file:///not/in/tmp') + """Files not in TMPDIR are not added""" + unusedcleaner.watch("file:///not/in/tmp") assert len(unusedcleaner.waiting_removal_files) == 0 diff --git a/larigira/tests/test_audiogen_mostrecent.py b/larigira/tests/test_audiogen_mostrecent.py index 09394ef..469d318 100644 --- a/larigira/tests/test_audiogen_mostrecent.py +++ b/larigira/tests/test_audiogen_mostrecent.py @@ -14,19 +14,19 @@ def now(request): @pytest.fixture def yesterday(request): - return int(time.time()) - 24*60*60 + return int(time.time()) - 24 * 60 * 60 @pytest.fixture def empty_dir(): - dirpath = tempfile.mkdtemp(prefix='mostrecent.') + dirpath = tempfile.mkdtemp(prefix="mostrecent.") yield dirpath os.removedirs(dirpath) @pytest.fixture def dir_with_old_file(empty_dir): - fd, fname = tempfile.mkstemp(prefix='old.', dir=empty_dir) + fd, fname = tempfile.mkstemp(prefix="old.", dir=empty_dir) os.close(fd) os.utime(fname, times=(0, 0)) yield empty_dir @@ -35,7 +35,7 @@ def dir_with_old_file(empty_dir): @pytest.fixture def dir_with_yesterday_file(empty_dir, yesterday): - fd, fname = tempfile.mkstemp(prefix='yesterday.', dir=empty_dir) + fd, fname = tempfile.mkstemp(prefix="yesterday.", dir=empty_dir) os.close(fd) os.utime(fname, times=(yesterday, yesterday)) yield empty_dir @@ -44,7 +44,7 @@ def dir_with_yesterday_file(empty_dir, yesterday): @pytest.fixture def dir_with_new_file(dir_with_old_file, now): - fd, fname = tempfile.mkstemp(prefix='new.', dir=dir_with_old_file) + fd, fname = tempfile.mkstemp(prefix="new.", dir=dir_with_old_file) os.close(fd) os.utime(fname, times=(now, now)) yield dir_with_old_file @@ -53,7 +53,7 @@ def dir_with_new_file(dir_with_old_file, now): @pytest.fixture def dir_with_two_recent_files(dir_with_yesterday_file, now): - fd, fname = tempfile.mkstemp(prefix='new.', dir=dir_with_yesterday_file) + fd, fname = tempfile.mkstemp(prefix="new.", dir=dir_with_yesterday_file) os.close(fd) os.utime(fname, times=(now, now)) yield dir_with_yesterday_file @@ -61,7 +61,7 @@ def dir_with_two_recent_files(dir_with_yesterday_file, now): def test_empty_is_empty(empty_dir, now): - '''nothing can be picked from a empty dir''' + """nothing can be picked from a empty dir""" picked = recent_choose([empty_dir], 1, now) assert len(picked) == 0 @@ -74,17 +74,17 @@ def test_old_files(dir_with_old_file, now): def test_new_files_found(dir_with_new_file): picked = recent_choose([dir_with_new_file], 1, 1) assert len(picked) == 1 - assert os.path.basename(picked[0]).startswith('new.') + assert os.path.basename(picked[0]).startswith("new.") def test_only_new_files_found(dir_with_new_file): picked = recent_choose([dir_with_new_file], 2, 1) assert len(picked) == 1 - assert os.path.basename(picked[0]).startswith('new.') + assert os.path.basename(picked[0]).startswith("new.") def test_correct_sorting(dir_with_two_recent_files): picked = recent_choose([dir_with_two_recent_files], 1, 1) assert len(picked) == 1 - assert not os.path.basename(picked[0]).startswith('yesterday.') - assert os.path.basename(picked[0]).startswith('new.') + assert not os.path.basename(picked[0]).startswith("yesterday.") + assert os.path.basename(picked[0]).startswith("new.") diff --git a/larigira/tests/test_audiogen_mpdrandom.py b/larigira/tests/test_audiogen_mpdrandom.py index 4301ea4..49de777 100644 --- a/larigira/tests/test_audiogen_mpdrandom.py +++ b/larigira/tests/test_audiogen_mpdrandom.py @@ -1,5 +1,6 @@ from __future__ import print_function from gevent import monkey + monkey.patch_all(subprocess=True) import pytest @@ -9,10 +10,9 @@ from larigira.audiogen_mpdrandom import generate_by_artist @pytest.fixture def simplerandom(): - return { - } + return {} def test_accepted_syntax(simplerandom): - '''Check the minimal needed configuration for mpdrandom''' + """Check the minimal needed configuration for mpdrandom""" generate_by_artist(simplerandom) diff --git a/larigira/tests/test_audiogen_randomdir.py b/larigira/tests/test_audiogen_randomdir.py index 89faf6f..8955048 100644 --- a/larigira/tests/test_audiogen_randomdir.py +++ b/larigira/tests/test_audiogen_randomdir.py @@ -9,51 +9,51 @@ def P(pypathlocal): def test_txt_files_are_excluded(tmpdir): p = tmpdir.join("foo.txt") - p.write('') + p.write("") assert len(candidates([P(p)])) == 0 assert len(candidates([P(tmpdir)])) == 0 def test_nested_txt_files_are_excluded(tmpdir): - p = tmpdir.mkdir('one').mkdir('two').join("foo.txt") - p.write('') + p = tmpdir.mkdir("one").mkdir("two").join("foo.txt") + p.write("") assert len(candidates([P(p)])) == 0 assert len(candidates([P(tmpdir)])) == 0 def test_mp3_files_are_considered(tmpdir): p = tmpdir.join("foo.mp3") - p.write('') + p.write("") assert len(candidates([P(p)])) == 1 assert len(candidates([P(tmpdir)])) == 1 def test_nested_mp3_files_are_considered(tmpdir): - p = tmpdir.mkdir('one').mkdir('two').join("foo.mp3") - p.write('') + p = tmpdir.mkdir("one").mkdir("two").join("foo.mp3") + p.write("") assert len(candidates([P(p)])) == 1 assert len(candidates([P(tmpdir)])) == 1 def test_same_name(tmpdir): - '''file with same name on different dir should not be confused''' - p = tmpdir.mkdir('one').mkdir('two').join("foo.mp3") - p.write('') + """file with same name on different dir should not be confused""" + p = tmpdir.mkdir("one").mkdir("two").join("foo.mp3") + p.write("") p = tmpdir.join("foo.mp3") - p.write('') + p.write("") assert len(candidates([P(tmpdir)])) == 2 def test_unknown_mime_ignore(tmpdir): p = tmpdir.join("foo.???") - p.write('') + p.write("") assert len(candidates([P(tmpdir)])) == 0 def test_unknown_mime_nocrash(tmpdir): p = tmpdir.join("foo.???") - p.write('') + p.write("") p = tmpdir.join("foo.ogg") - p.write('') + p.write("") assert len(candidates([P(tmpdir)])) == 1 diff --git a/larigira/tests/test_commonpath.py b/larigira/tests/test_commonpath.py index b3eb105..d8feb83 100644 --- a/larigira/tests/test_commonpath.py +++ b/larigira/tests/test_commonpath.py @@ -4,14 +4,14 @@ from larigira.unused import old_commonpath def test_same(): - assert old_commonpath(['/foo/bar', '/foo/bar/']) == '/foo/bar' + assert old_commonpath(["/foo/bar", "/foo/bar/"]) == "/foo/bar" def test_prefix(): - assert old_commonpath(['/foo/bar', '/foo/zap/']) == '/foo' - assert old_commonpath(['/foo/bar/', '/foo/zap/']) == '/foo' - assert old_commonpath(['/foo/bar/', '/foo/zap']) == '/foo' - assert old_commonpath(['/foo/bar', '/foo/zap']) == '/foo' + assert old_commonpath(["/foo/bar", "/foo/zap/"]) == "/foo" + assert old_commonpath(["/foo/bar/", "/foo/zap/"]) == "/foo" + assert old_commonpath(["/foo/bar/", "/foo/zap"]) == "/foo" + assert old_commonpath(["/foo/bar", "/foo/zap"]) == "/foo" try: @@ -22,17 +22,18 @@ else: # these tests are only available on python >= 3.5. That's fine though, as # our CI will perform validation of those cases to see if they match python # behavior - @pytest.fixture(params=['a', 'a/', 'a/b', 'a/b/', 'a/b/c', ]) + @pytest.fixture(params=["a", "a/", "a/b", "a/b/", "a/b/c"]) def relpath(request): return request.param @pytest.fixture def abspath(relpath): - return '/' + relpath + return "/" + relpath - @pytest.fixture(params=['', '/']) + @pytest.fixture(params=["", "/"]) def slashed_abspath(abspath, request): - return '%s%s' % (abspath, request.param) + return "%s%s" % (abspath, request.param) + slashed_abspath_b = slashed_abspath @pytest.fixture @@ -42,11 +43,12 @@ else: def test_abspath_match(abspath_couple): assert commonpath(abspath_couple) == old_commonpath(abspath_couple) - @pytest.fixture(params=['', '/']) + @pytest.fixture(params=["", "/"]) def slashed_relpath(relpath, request): - s = '%s%s' % (relpath, request.param) + s = "%s%s" % (relpath, request.param) if s: return s + slashed_relpath_b = slashed_relpath @pytest.fixture diff --git a/larigira/tests/test_db.py b/larigira/tests/test_db.py index 876faa0..29b1328 100644 --- a/larigira/tests/test_db.py +++ b/larigira/tests/test_db.py @@ -2,6 +2,7 @@ import tempfile import os from gevent import monkey + monkey.patch_all(subprocess=True) import pytest @@ -11,7 +12,7 @@ from larigira.db import EventModel @pytest.yield_fixture def db(): - fname = tempfile.mktemp(suffix='.json', prefix='larigira-test') + fname = tempfile.mktemp(suffix=".json", prefix="larigira-test") yield EventModel(uri=fname) os.unlink(fname) @@ -22,36 +23,39 @@ def test_empty(db): def test_add_basic(db): assert len(db.get_all_alarms()) == 0 - alarm_id = db.add_event(dict(kind='frequency', interval=60*3, start=1), - [dict(kind='mpd', paths=['foo.mp3'], howmany=1)]) + alarm_id = db.add_event( + dict(kind="frequency", interval=60 * 3, start=1), + [dict(kind="mpd", paths=["foo.mp3"], howmany=1)], + ) assert len(db.get_all_alarms()) == 1 assert db.get_alarm_by_id(alarm_id) is not None - assert len(tuple(db.get_actions_by_alarm( - db.get_alarm_by_id(alarm_id)))) == 1 + assert len(tuple(db.get_actions_by_alarm(db.get_alarm_by_id(alarm_id)))) == 1 def test_add_multiple_alarms(db): assert len(db.get_all_alarms()) == 0 - alarm_id = db.add_event(dict(kind='frequency', interval=60*3, start=1), - [dict(kind='mpd', paths=['foo.mp3'], howmany=1), - dict(kind='foo', a=3)]) + alarm_id = db.add_event( + dict(kind="frequency", interval=60 * 3, start=1), + [dict(kind="mpd", paths=["foo.mp3"], howmany=1), dict(kind="foo", a=3)], + ) assert len(db.get_all_alarms()) == 1 assert db.get_alarm_by_id(alarm_id) is not None assert len(db.get_all_actions()) == 2 - assert len(tuple(db.get_actions_by_alarm( - db.get_alarm_by_id(alarm_id)))) == 2 + assert len(tuple(db.get_actions_by_alarm(db.get_alarm_by_id(alarm_id)))) == 2 def test_delete_alarm(db): assert len(db.get_all_alarms()) == 0 - alarm_id = db.add_event(dict(kind='frequency', interval=60*3, start=1), - [dict(kind='mpd', paths=['foo.mp3'], howmany=1)]) + alarm_id = db.add_event( + dict(kind="frequency", interval=60 * 3, start=1), + [dict(kind="mpd", paths=["foo.mp3"], howmany=1)], + ) action_id = next(db.get_actions_by_alarm(db.get_alarm_by_id(alarm_id))).eid assert len(db.get_all_alarms()) == 1 db.delete_alarm(alarm_id) assert len(db.get_all_alarms()) == 0 # alarm deleted assert db.get_action_by_id(action_id) is not None - assert 'kind' in db.get_action_by_id(action_id) # action still there + assert "kind" in db.get_action_by_id(action_id) # action still there def test_delete_alarm_nonexisting(db): @@ -60,8 +64,10 @@ def test_delete_alarm_nonexisting(db): def test_delete_action(db): - alarm_id = db.add_event(dict(kind='frequency', interval=60*3, start=1), - [dict(kind='mpd', paths=['foo.mp3'], howmany=1)]) + alarm_id = db.add_event( + dict(kind="frequency", interval=60 * 3, start=1), + [dict(kind="mpd", paths=["foo.mp3"], howmany=1)], + ) alarm = db.get_alarm_by_id(alarm_id) assert len(tuple(db.get_actions_by_alarm(alarm))) == 1 action = next(db.get_actions_by_alarm(alarm)) diff --git a/larigira/tests/test_fsutils.py b/larigira/tests/test_fsutils.py index 4a5d8bd..c879376 100644 --- a/larigira/tests/test_fsutils.py +++ b/larigira/tests/test_fsutils.py @@ -2,15 +2,15 @@ from larigira.fsutils import shortname def test_shortname_self(): - '''sometimes, shortname is just filename without extension''' - assert shortname('/tmp/asd/foo.bar') == 'foo' + """sometimes, shortname is just filename without extension""" + assert shortname("/tmp/asd/foo.bar") == "foo" def test_shortname_has_numbers(): - '''shortname will preserve numbers''' - assert shortname('/tmp/asd/foo1.bar') == 'foo1' + """shortname will preserve numbers""" + assert shortname("/tmp/asd/foo1.bar") == "foo1" def test_shortname_has_no_hyphen(): - '''shortname will not preserve hyphens''' - assert shortname('/tmp/asd/foo-1.bar') == 'foo1' + """shortname will not preserve hyphens""" + assert shortname("/tmp/asd/foo-1.bar") == "foo1" diff --git a/larigira/tests/test_parented.py b/larigira/tests/test_parented.py index eb82bb0..25f4b0a 100644 --- a/larigira/tests/test_parented.py +++ b/larigira/tests/test_parented.py @@ -1,5 +1,6 @@ from __future__ import print_function from gevent import monkey + monkey.patch_all(subprocess=True) import pytest @@ -21,7 +22,8 @@ def range_parentlet(): def do_business(self): for i in range(self.howmany): - yield('range', i) + yield ("range", i) + return RangeLet @@ -33,7 +35,8 @@ def single_value_parentlet(): self.val = val def do_business(self): - yield('single', self.val) + yield ("single", self.val) + return SingleLet @@ -44,7 +47,7 @@ def test_parented_range(queue, range_parentlet): assert queue.qsize() == 5 while not queue.empty(): msg = queue.get() - assert msg['kind'] == 'range' + assert msg["kind"] == "range" def test_parented_single(queue, single_value_parentlet): @@ -52,25 +55,25 @@ def test_parented_single(queue, single_value_parentlet): t.start() gevent.sleep(0.01) msg = queue.get_nowait() - assert msg['args'][0] == 123 + assert msg["args"][0] == 123 def test_timer_finally(queue): - '''at somepoint, it will get results''' + """at somepoint, it will get results""" period = 10 t = Timer(period, queue) t.start() - gevent.sleep(period*3/1000.0) + gevent.sleep(period * 3 / 1000.0) queue.get_nowait() def test_timer_righttime(queue): - '''not too early, not too late''' + """not too early, not too late""" period = 500 t = Timer(period, queue) t.start() - gevent.sleep(period/(10*1000.0)) + gevent.sleep(period / (10 * 1000.0)) assert queue.empty() is True - gevent.sleep(period*(2/1000.0)) + gevent.sleep(period * (2 / 1000.0)) assert not queue.empty() queue.get_nowait() diff --git a/larigira/tests/test_time_every.py b/larigira/tests/test_time_every.py index 08226aa..c2cf295 100644 --- a/larigira/tests/test_time_every.py +++ b/larigira/tests/test_time_every.py @@ -8,7 +8,7 @@ from larigira.timegen import timegenerate def eq_(a, b, reason=None): - '''migrating tests from nose''' + """migrating tests from nose""" if reason is not None: assert a == b, reason else: @@ -20,63 +20,55 @@ def now(): return datetime.now() -@pytest.fixture(params=['seconds', 'human', 'humanlong', 'coloned']) +@pytest.fixture(params=["seconds", "human", "humanlong", "coloned"]) def onehour(now, request): - '''a FrequencyAlarm: every hour for one day''' - intervals = dict(seconds=3600, human='1h', humanlong='30m 1800s', - coloned='01:00:00') - return FrequencyAlarm({ - 'start': now - timedelta(days=1), - 'interval': intervals[request.param], - 'end': now + days(1)}) + """a FrequencyAlarm: every hour for one day""" + intervals = dict( + seconds=3600, human="1h", humanlong="30m 1800s", coloned="01:00:00" + ) + return FrequencyAlarm( + { + "start": now - timedelta(days=1), + "interval": intervals[request.param], + "end": now + days(1), + } + ) -@pytest.fixture(params=[1, '1']) +@pytest.fixture(params=[1, "1"]) def onehour_monday(request): weekday = request.param - yield FrequencyAlarm({ - 'interval': 3600*12, - 'weekdays': [weekday], - 'start': 0 - }) + yield FrequencyAlarm({"interval": 3600 * 12, "weekdays": [weekday], "start": 0}) -@pytest.fixture(params=[7, '7']) +@pytest.fixture(params=[7, "7"]) def onehour_sunday(request): weekday = request.param - yield FrequencyAlarm({ - 'interval': 3600*12, - 'weekdays': [weekday], - 'start': 0 - }) + yield FrequencyAlarm({"interval": 3600 * 12, "weekdays": [weekday], "start": 0}) @pytest.fixture(params=[1, 2, 3, 4, 5, 6, 7]) def singledow(request): weekday = request.param - yield FrequencyAlarm({ - 'interval': 3600*24, - 'weekdays': [weekday], - 'start': 0 - }) + yield FrequencyAlarm({"interval": 3600 * 24, "weekdays": [weekday], "start": 0}) -@pytest.fixture(params=['seconds', 'human', 'coloned']) +@pytest.fixture(params=["seconds", "human", "coloned"]) def tenseconds(now, request): - '''a FrequencyAlarm: every 10 seconds for one day''' - intervals = dict(seconds=10, human='10s', coloned='00:10') - return FrequencyAlarm({ - 'start': now - timedelta(days=1), - 'interval': intervals[request.param], - 'end': now + days(1)}) + """a FrequencyAlarm: every 10 seconds for one day""" + intervals = dict(seconds=10, human="10s", coloned="00:10") + return FrequencyAlarm( + { + "start": now - timedelta(days=1), + "interval": intervals[request.param], + "end": now + days(1), + } + ) @pytest.fixture(params=[1, 2, 3, 4, 5, 6, 7, 8]) def manyweeks(request): - yield FrequencyAlarm({ - 'interval': '{}w'.format(request.param), - 'start': 0 - }) + yield FrequencyAlarm({"interval": "{}w".format(request.param), "start": 0}) def days(n): @@ -84,24 +76,21 @@ def days(n): def test_single_creations(now): - return SingleAlarm({ - 'timestamp': now - }) + return SingleAlarm({"timestamp": now}) def test_freq_creations(now): - return FrequencyAlarm({ - 'start': now - timedelta(days=1), - 'interval': 3600, - 'end': now}) + return FrequencyAlarm( + {"start": now - timedelta(days=1), "interval": 3600, "end": now} + ) @pytest.mark.timeout(1) def test_single_ring(now): dt = now + days(1) - s = SingleAlarm({'timestamp': dt}) - eq_(s.next_ring(), dt) - eq_(s.next_ring(now), dt) + s = SingleAlarm({"timestamp": dt}) + eq_(s.next_ring(), dt) + eq_(s.next_ring(now), dt) assert s.next_ring(dt) is None, "%s - %s" % (str(s.next_ring(dt)), str(dt)) assert s.next_ring(now + days(2)) is None assert s.has_ring(dt) @@ -112,10 +101,10 @@ def test_single_ring(now): @pytest.mark.timeout(1) def test_single_all(now): dt = now + timedelta(days=1) - s = SingleAlarm({'timestamp': dt}) - eq_(list(s.all_rings()), [dt]) - eq_(list(s.all_rings(now)), [dt]) - eq_(list(s.all_rings(now + days(2))), []) + s = SingleAlarm({"timestamp": dt}) + eq_(list(s.all_rings()), [dt]) + eq_(list(s.all_rings(now)), [dt]) + eq_(list(s.all_rings(now + days(2))), []) def test_freq_short(now, tenseconds): @@ -165,12 +154,8 @@ def test_weekday_skip_2(onehour_sunday): def test_sunday_is_not_0(): with pytest.raises(ValueError) as excinfo: - FrequencyAlarm({ - 'interval': 3600*12, - 'weekdays': [0], - 'start': 0 - }) - assert 'Not a valid weekday:' in excinfo.value.args[0] + FrequencyAlarm({"interval": 3600 * 12, "weekdays": [0], "start": 0}) + assert "Not a valid weekday:" in excinfo.value.args[0] def test_long_interval(manyweeks): @@ -178,7 +163,7 @@ def test_long_interval(manyweeks): expected = manyweeks.interval got = manyweeks.next_ring(t) assert got is not None - assert int(got.strftime('%s')) == expected + assert int(got.strftime("%s")) == expected assert manyweeks.next_ring(got) is not None @@ -194,15 +179,8 @@ def test_singledow(singledow): def test_single_registered(): - timegenerate({ - 'kind': 'single', - 'timestamp': 1234567890 - }) + timegenerate({"kind": "single", "timestamp": 1234567890}) def test_frequency_registered(): - timegenerate({ - 'kind': 'frequency', - 'start': 1234567890, - 'interval': 60*15 - }) + timegenerate({"kind": "frequency", "start": 1234567890, "interval": 60 * 15}) diff --git a/larigira/tests/test_web.py b/larigira/tests/test_web.py index 1bbf20c..d3c39cf 100644 --- a/larigira/tests/test_web.py +++ b/larigira/tests/test_web.py @@ -1,5 +1,6 @@ from __future__ import print_function from gevent import monkey + monkey.patch_all(subprocess=True) import pytest @@ -15,5 +16,5 @@ def app(queue): def test_refresh(app): assert app.queue.empty() - app.test_client().get('/api/refresh') + app.test_client().get("/api/refresh") assert not app.queue.empty() diff --git a/larigira/timeform_base.py b/larigira/timeform_base.py index 02f0c71..06b5a75 100644 --- a/larigira/timeform_base.py +++ b/larigira/timeform_base.py @@ -4,101 +4,125 @@ from datetime import datetime from pytimeparse.timeparse import timeparse from flask_wtf import Form -from wtforms import StringField, validators, SubmitField, \ - SelectMultipleField, ValidationError +from wtforms import ( + StringField, + validators, + SubmitField, + SelectMultipleField, + ValidationError, +) from larigira.formutils import EasyDateTimeField + log = logging.getLogger(__name__) class SingleAlarmForm(Form): - nick = StringField('Alarm nick', validators=[validators.required()], - description='A simple name to recognize this alarm') - dt = EasyDateTimeField('Date and time', validators=[validators.required()], - description='Date to ring on, expressed as ' - '2000-12-31T13:42:00') - submit = SubmitField('Submit') + nick = StringField( + "Alarm nick", + validators=[validators.required()], + description="A simple name to recognize this alarm", + ) + dt = EasyDateTimeField( + "Date and time", + validators=[validators.required()], + description="Date to ring on, expressed as " "2000-12-31T13:42:00", + ) + submit = SubmitField("Submit") def populate_from_timespec(self, timespec): - if 'nick' in timespec: - self.nick.data = timespec['nick'] - if 'timestamp' in timespec: - self.dt.data = datetime.fromtimestamp(timespec['timestamp']) + if "nick" in timespec: + self.nick.data = timespec["nick"] + if "timestamp" in timespec: + self.dt.data = datetime.fromtimestamp(timespec["timestamp"]) def singlealarm_receive(form): return { - 'kind': 'single', - 'nick': form.nick.data, - 'timestamp': int(form.dt.data.strftime('%s')) + "kind": "single", + "nick": form.nick.data, + "timestamp": int(form.dt.data.strftime("%s")), } class FrequencyAlarmForm(Form): - nick = StringField('Alarm nick', validators=[validators.required()], - description='A simple name to recognize this alarm') - interval = StringField('Frequency', - validators=[validators.required()], - description='in seconds, or human-readable ' - '(like 9w3d12h)') - start = EasyDateTimeField('Start date and time', - validators=[validators.optional()], - description='Before this, no alarm will ring. ' - 'Expressed as YYYY-MM-DDTHH:MM:SS. If omitted, ' - 'the alarm will always ring') - end = EasyDateTimeField('End date and time', - validators=[validators.optional()], - description='After this, no alarm will ring. ' - 'Expressed as YYYY-MM-DDTHH:MM:SS. If omitted, ' - 'the alarm will always ring') - weekdays = SelectMultipleField('Days on which the alarm should be played', - choices=[('1', 'Monday'), - ('2', 'Tuesday'), - ('3', 'Wednesday'), - ('4', 'Thursday'), - ('5', 'Friday'), - ('6', 'Saturday'), - ('7', 'Sunday')], - default=list('1234567'), - validators=[validators.required()], - description='The alarm will ring only on ' - 'selected weekdays') - submit = SubmitField('Submit') + nick = StringField( + "Alarm nick", + validators=[validators.required()], + description="A simple name to recognize this alarm", + ) + interval = StringField( + "Frequency", + validators=[validators.required()], + description="in seconds, or human-readable " "(like 9w3d12h)", + ) + start = EasyDateTimeField( + "Start date and time", + validators=[validators.optional()], + description="Before this, no alarm will ring. " + "Expressed as YYYY-MM-DDTHH:MM:SS. If omitted, " + "the alarm will always ring", + ) + end = EasyDateTimeField( + "End date and time", + validators=[validators.optional()], + description="After this, no alarm will ring. " + "Expressed as YYYY-MM-DDTHH:MM:SS. If omitted, " + "the alarm will always ring", + ) + weekdays = SelectMultipleField( + "Days on which the alarm should be played", + choices=[ + ("1", "Monday"), + ("2", "Tuesday"), + ("3", "Wednesday"), + ("4", "Thursday"), + ("5", "Friday"), + ("6", "Saturday"), + ("7", "Sunday"), + ], + default=list("1234567"), + validators=[validators.required()], + description="The alarm will ring only on " "selected weekdays", + ) + submit = SubmitField("Submit") def populate_from_timespec(self, timespec): - if 'nick' in timespec: - self.nick.data = timespec['nick'] - if 'start' in timespec: - self.start.data = datetime.fromtimestamp(timespec['start']) - if 'end' in timespec: - self.end.data = datetime.fromtimestamp(timespec['end']) - if 'weekdays' in timespec: - self.weekdays.data = timespec['weekdays'] + if "nick" in timespec: + self.nick.data = timespec["nick"] + if "start" in timespec: + self.start.data = datetime.fromtimestamp(timespec["start"]) + if "end" in timespec: + self.end.data = datetime.fromtimestamp(timespec["end"]) + if "weekdays" in timespec: + self.weekdays.data = timespec["weekdays"] else: - self.weekdays.data = list('1234567') - self.interval.data = timespec['interval'] + self.weekdays.data = list("1234567") + self.interval.data = timespec["interval"] def validate_interval(self, field): try: int(field.data) except ValueError: if timeparse(field.data) is None: - raise ValidationError("interval must either be a number " - "(in seconds) or a human-readable " - "string like '1h2m' or '1d12h'") + raise ValidationError( + "interval must either be a number " + "(in seconds) or a human-readable " + "string like '1h2m' or '1d12h'" + ) def frequencyalarm_receive(form): obj = { - 'kind': 'frequency', - 'nick': form.nick.data, - 'interval': form.interval.data, - 'weekdays': form.weekdays.data, + "kind": "frequency", + "nick": form.nick.data, + "interval": form.interval.data, + "weekdays": form.weekdays.data, } if form.start.data: - obj['start'] = int(form.start.data.strftime('%s')) + obj["start"] = int(form.start.data.strftime("%s")) else: - obj['start'] = 0 + obj["start"] = 0 if form.end.data: - obj['end'] = int(form.end.data.strftime('%s')) + obj["end"] = int(form.end.data.strftime("%s")) return obj diff --git a/larigira/timegen.py b/larigira/timegen.py index 4d5b7a5..4c69224 100644 --- a/larigira/timegen.py +++ b/larigira/timegen.py @@ -1,6 +1,6 @@ -''' +""" main module to read and get informations about alarms -''' +""" from __future__ import print_function import sys from datetime import datetime @@ -8,31 +8,48 @@ import argparse import json from .entrypoints_utils import get_one_entrypoint from logging import getLogger -log = getLogger('timegen') + +log = getLogger("timegen") def get_timegenerator(kind): - '''Messes with entrypoints to return an timegenerator function''' - return get_one_entrypoint('larigira.timegenerators', kind) + """Messes with entrypoints to return an timegenerator function""" + return get_one_entrypoint("larigira.timegenerators", kind) def get_parser(): parser = argparse.ArgumentParser( - description='Generate "ring times" from a timespec') - parser.add_argument('timespec', metavar='TIMESPEC', type=str, nargs=1, - help='filename for timespec, formatted in json') - parser.add_argument('--now', metavar='NOW', type=int, nargs=1, - default=None, - help='Set a different "time", in unix epoch') - parser.add_argument('--howmany', metavar='N', type=int, nargs=1, - default=[1], - help='Set a different "time", in unix epoch') + description='Generate "ring times" from a timespec' + ) + parser.add_argument( + "timespec", + metavar="TIMESPEC", + type=str, + nargs=1, + help="filename for timespec, formatted in json", + ) + parser.add_argument( + "--now", + metavar="NOW", + type=int, + nargs=1, + default=None, + help='Set a different "time", in unix epoch', + ) + parser.add_argument( + "--howmany", + metavar="N", + type=int, + nargs=1, + default=[1], + help='Set a different "time", in unix epoch', + ) return parser def read_spec(fname): try: - if fname == '-': + if fname == "-": return json.load(sys.stdin) with open(fname) as buf: return json.load(buf) @@ -42,12 +59,12 @@ def read_spec(fname): def check_spec(spec): - if 'kind' not in spec: + if "kind" not in spec: yield "Missing field 'kind'" def timegenerate(spec, now=None, howmany=1): - Alarm = get_timegenerator(spec['kind']) + Alarm = get_timegenerator(spec["kind"]) generator = Alarm(spec) if now is not None: if type(now) is not datetime: @@ -58,14 +75,14 @@ def timegenerate(spec, now=None, howmany=1): def main(): - '''Main function for the "larigira-timegen" executable''' + """Main function for the "larigira-timegen" executable""" args = get_parser().parse_args() spec = read_spec(args.timespec[0]) errors = tuple(check_spec(spec)) if errors: log.error("Errors in timespec") for err in errors: - sys.stderr.write('Error: {}\n'.format(err)) + sys.stderr.write("Error: {}\n".format(err)) sys.exit(1) now = None if args.now is None else args.now.pop() howmany = None if args.howmany is None else args.howmany.pop() diff --git a/larigira/timegen_every.py b/larigira/timegen_every.py index bd4f5dd..ed034de 100644 --- a/larigira/timegen_every.py +++ b/larigira/timegen_every.py @@ -1,6 +1,7 @@ from __future__ import print_function import logging -log = logging.getLogger('time-every') + +log = logging.getLogger("time-every") from datetime import datetime, timedelta from pytimeparse.timeparse import timeparse @@ -17,21 +18,21 @@ class Alarm(object): pass def next_ring(self, current_time=None): - '''if current_time is None, it is now() + """if current_time is None, it is now() returns the next time it will ring; or None if it will not anymore - ''' + """ raise NotImplementedError() def has_ring(self, time=None): - '''returns True IFF the alarm will ring exactly at ``time``''' + """returns True IFF the alarm will ring exactly at ``time``""" raise NotImplementedError() def all_rings(self, current_time=None): - ''' + """ all future rings this, of course, is an iterator (they could be infinite) - ''' + """ ring = self.next_ring(current_time) while ring is not None: yield ring @@ -39,17 +40,18 @@ class Alarm(object): class SingleAlarm(Alarm): - ''' + """ rings a single time - ''' - description = 'Only once, at a specified date and time' + """ + + description = "Only once, at a specified date and time" def __init__(self, obj): super().__init__() - self.dt = getdate(obj['timestamp']) + self.dt = getdate(obj["timestamp"]) def next_ring(self, current_time=None): - '''if current_time is None, it is now()''' + """if current_time is None, it is now()""" if current_time is None: current_time = datetime.now() if current_time >= self.dt: @@ -63,29 +65,28 @@ class SingleAlarm(Alarm): class FrequencyAlarm(Alarm): - ''' + """ rings on {t | exists a k integer >= 0 s.t. t = start+k*t, start self.end: @@ -100,22 +101,22 @@ class FrequencyAlarm(Alarm): # fact, it is necessary to retry until a valid event/weekday is # found. a "while True" might have been more elegant (and maybe # fast), but this gives a clear upper bound to the cycle. - for _ in range(max(60*60*24*7 // self.interval, 1)): + for _ in range(max(60 * 60 * 24 * 7 // self.interval, 1)): n_interval = ( (current_time - self.start).total_seconds() // self.interval - ) + 1 + ) + 1 ring = self.start + timedelta(seconds=self.interval * n_interval) if ring == current_time: ring += timedelta(seconds=self.interval) if self.end is not None and ring > self.end: return None - if self.weekdays is not None \ - and ring.isoweekday() not in self.weekdays: + if self.weekdays is not None and ring.isoweekday() not in self.weekdays: current_time = ring continue return ring - log.warning("Can't find a valid time for event %s; " - "something went wrong", str(self)) + log.warning( + "Can't find a valid time for event %s; " "something went wrong", str(self) + ) return None def has_ring(self, current_time=None): @@ -124,11 +125,9 @@ class FrequencyAlarm(Alarm): if not self.start >= current_time >= self.end: return False - n_interval = (current_time - self.start).total_seconds() // \ - self.interval - expected_time = self.start + \ - timedelta(seconds=self.interval * n_interval) + n_interval = (current_time - self.start).total_seconds() // self.interval + expected_time = self.start + timedelta(seconds=self.interval * n_interval) return expected_time == current_time def __str__(self): - return 'FrequencyAlarm(every %ds)' % self.interval + return "FrequencyAlarm(every %ds)" % self.interval diff --git a/larigira/unused.py b/larigira/unused.py index a82fa76..d7bb055 100644 --- a/larigira/unused.py +++ b/larigira/unused.py @@ -1,9 +1,9 @@ -''' +""" This component will look for files to be removed. There are some assumptions: * Only files in $TMPDIR are removed. Please remember that larigira has its own specific TMPDIR * MPD URIs are parsed, and only file:/// is supported -''' +""" import os from os.path import normpath import logging @@ -11,13 +11,14 @@ import mpd def old_commonpath(directories): - if any(p for p in directories if p.startswith('/')) and \ - any(p for p in directories if not p.startswith('/')): + if any(p for p in directories if p.startswith("/")) and any( + p for p in directories if not p.startswith("/") + ): raise ValueError("Can't mix absolute and relative paths") norm_paths = [normpath(p) + os.path.sep for p in directories] ret = os.path.dirname(os.path.commonprefix(norm_paths)) - if len(ret) > 0 and ret == '/' * len(ret): - return '/' + if len(ret) > 0 and ret == "/" * len(ret): + return "/" return ret @@ -35,35 +36,39 @@ class UnusedCleaner: def _get_mpd(self): mpd_client = mpd.MPDClient(use_unicode=True) - mpd_client.connect(self.conf['MPD_HOST'], self.conf['MPD_PORT']) + mpd_client.connect(self.conf["MPD_HOST"], self.conf["MPD_PORT"]) return mpd_client def watch(self, uri): - ''' + """ adds fpath to the list of "watched" file as soon as it leaves the mpc playlist, it is removed - ''' - if not uri.startswith('file:///'): + """ + if not uri.startswith("file:///"): return # not a file URI - fpath = uri[len('file://'):] - if 'TMPDIR' in self.conf and self.conf['TMPDIR'] \ - and commonpath([self.conf['TMPDIR'], fpath]) != \ - normpath(self.conf['TMPDIR']): - self.log.info('Not watching file %s: not in TMPDIR', fpath) + fpath = uri[len("file://") :] + if ( + "TMPDIR" in self.conf + and self.conf["TMPDIR"] + and commonpath([self.conf["TMPDIR"], fpath]) + != normpath(self.conf["TMPDIR"]) + ): + self.log.info("Not watching file %s: not in TMPDIR", fpath) return if not os.path.exists(fpath): - self.log.warning('a path that does not exist is being monitored') + self.log.warning("a path that does not exist is being monitored") self.waiting_removal_files.add(fpath) def check_playlist(self): - '''check playlist + internal watchlist to see what can be removed''' + """check playlist + internal watchlist to see what can be removed""" mpdc = self._get_mpd() - files_in_playlist = {song['file'] for song in mpdc.playlistid() - if song['file'].startswith('/')} + files_in_playlist = { + song["file"] for song in mpdc.playlistid() if song["file"].startswith("/") + } for fpath in self.waiting_removal_files - files_in_playlist: # we can remove it! - self.log.debug('removing unused: %s', fpath) + self.log.debug("removing unused: %s", fpath) self.waiting_removal_files.remove(fpath) if os.path.exists(fpath): os.unlink(fpath)