formattazione con black

This commit is contained in:
boyska 2019-06-25 13:49:33 +02:00
parent 0a7cad0e2d
commit d19b18eb3c
40 changed files with 949 additions and 811 deletions

2
.flake8 Normal file
View file

@ -0,0 +1,2 @@
[flake8]
max-line-length=79

View file

@ -3,23 +3,24 @@ from wtforms import StringField, validators, SubmitField
class AudioForm(Form):
nick = StringField('Audio nick', validators=[validators.required()],
description='A simple name to recognize this audio')
urls = StringField('URLs',
validators=[validators.required()],
description='URL of the file to download')
submit = SubmitField('Submit')
nick = StringField(
"Audio nick",
validators=[validators.required()],
description="A simple name to recognize this audio",
)
urls = StringField(
"URLs",
validators=[validators.required()],
description="URL of the file to download",
)
submit = SubmitField("Submit")
def populate_from_audiospec(self, audiospec):
if 'nick' in audiospec:
self.nick.data = audiospec['nick']
if 'urls' in audiospec:
self.urls.data = ';'.join(audiospec['urls'])
if "nick" in audiospec:
self.nick.data = audiospec["nick"]
if "urls" in audiospec:
self.urls.data = ";".join(audiospec["urls"])
def audio_receive(form):
return {
'kind': 'http',
'nick': form.nick.data,
'urls': form.urls.data.split(';'),
}
return {"kind": "http", "nick": form.nick.data, "urls": form.urls.data.split(";")}

View file

@ -6,40 +6,49 @@ from larigira.formutils import AutocompleteStringField
class AudioForm(Form):
nick = StringField('Audio nick', validators=[validators.required()],
description='A simple name to recognize this audio')
path = AutocompleteStringField('dl-suggested-dirs',
'Path', validators=[validators.required()],
description='Directory to pick file from')
maxage = StringField('Max age',
validators=[validators.required()],
description='in seconds, or human-readable '
'(like 9w3d12h)')
submit = SubmitField('Submit')
nick = StringField(
"Audio nick",
validators=[validators.required()],
description="A simple name to recognize this audio",
)
path = AutocompleteStringField(
"dl-suggested-dirs",
"Path",
validators=[validators.required()],
description="Directory to pick file from",
)
maxage = StringField(
"Max age",
validators=[validators.required()],
description="in seconds, or human-readable " "(like 9w3d12h)",
)
submit = SubmitField("Submit")
def validate_maxage(self, field):
try:
int(field.data)
except ValueError:
if timeparse(field.data) is None:
raise ValidationError("maxage must either be a number "
"(in seconds) or a human-readable "
"string like '1h2m' or '1d12h'")
raise ValidationError(
"maxage must either be a number "
"(in seconds) or a human-readable "
"string like '1h2m' or '1d12h'"
)
def populate_from_audiospec(self, audiospec):
if 'nick' in audiospec:
self.nick.data = audiospec['nick']
if 'path' in audiospec:
self.path.data = audiospec['path']
if 'maxage' in audiospec:
self.maxage.data = audiospec['maxage']
if "nick" in audiospec:
self.nick.data = audiospec["nick"]
if "path" in audiospec:
self.path.data = audiospec["path"]
if "maxage" in audiospec:
self.maxage.data = audiospec["maxage"]
def audio_receive(form):
return {
'kind': 'mostrecent',
'nick': form.nick.data,
'path': form.path.data,
'maxage': form.maxage.data,
'howmany': 1
"kind": "mostrecent",
"nick": form.nick.data,
"path": form.path.data,
"maxage": form.maxage.data,
"howmany": 1,
}

View file

@ -5,33 +5,40 @@ from larigira.formutils import AutocompleteStringField
class Form(flask_wtf.Form):
nick = StringField('Audio nick', validators=[validators.required()],
description='A simple name to recognize this audio')
path = AutocompleteStringField('dl-suggested-dirs',
'Path', validators=[validators.required()],
description='Full path to source directory')
howmany = IntegerField('Number', validators=[validators.optional()],
default=1,
description='How many songs to be picked'
'from this dir; defaults to 1')
submit = SubmitField('Submit')
nick = StringField(
"Audio nick",
validators=[validators.required()],
description="A simple name to recognize this audio",
)
path = AutocompleteStringField(
"dl-suggested-dirs",
"Path",
validators=[validators.required()],
description="Full path to source directory",
)
howmany = IntegerField(
"Number",
validators=[validators.optional()],
default=1,
description="How many songs to be picked" "from this dir; defaults to 1",
)
submit = SubmitField("Submit")
def populate_from_audiospec(self, audiospec):
if 'nick' in audiospec:
self.nick.data = audiospec['nick']
if 'paths' in audiospec:
self.path.data = audiospec['paths'][0]
if 'howmany' in audiospec:
self.howmany.data = audiospec['howmany']
if "nick" in audiospec:
self.nick.data = audiospec["nick"]
if "paths" in audiospec:
self.path.data = audiospec["paths"][0]
if "howmany" in audiospec:
self.howmany.data = audiospec["howmany"]
else:
self.howmany.data = 1
def receive(form):
return {
'kind': 'randomdir',
'nick': form.nick.data,
'paths': [form.path.data],
'howmany': form.howmany.data or 1
"kind": "randomdir",
"nick": form.nick.data,
"paths": [form.path.data],
"howmany": form.howmany.data or 1,
}

View file

@ -3,38 +3,44 @@ from wtforms import StringField, validators, SubmitField, ValidationError
from larigira.formutils import AutocompleteStringField
class ScriptAudioForm(Form):
nick = StringField('Audio nick', validators=[validators.required()],
description='A simple name to recognize this audio')
nick = StringField(
"Audio nick",
validators=[validators.required()],
description="A simple name to recognize this audio",
)
name = AutocompleteStringField(
'dl-suggested-scripts',
'Name', validators=[validators.required()],
description='filename (NOT path) of the script')
args = StringField('Arguments',
description='arguments, separated by ";"')
submit = SubmitField('Submit')
"dl-suggested-scripts",
"Name",
validators=[validators.required()],
description="filename (NOT path) of the script",
)
args = StringField("Arguments", description='arguments, separated by ";"')
submit = SubmitField("Submit")
def populate_from_audiospec(self, audiospec):
if 'nick' in audiospec:
self.nick.data = audiospec['nick']
if 'name' in audiospec:
self.name.data = audiospec['name']
if 'args' in audiospec:
if type(audiospec['args']) is str: # legacy compatibility
self.args.data = audiospec['args'].replace(' ', ';')
if "nick" in audiospec:
self.nick.data = audiospec["nick"]
if "name" in audiospec:
self.name.data = audiospec["name"]
if "args" in audiospec:
if type(audiospec["args"]) is str: # legacy compatibility
self.args.data = audiospec["args"].replace(" ", ";")
else:
self.args.data = ';'.join(audiospec['args'])
self.args.data = ";".join(audiospec["args"])
def validate_name(self, field):
if '/' in field.data:
raise ValidationError("Name cannot have slashes: "
"it's a name, not a path")
if "/" in field.data:
raise ValidationError(
"Name cannot have slashes: " "it's a name, not a path"
)
def scriptaudio_receive(form):
return {
'kind': 'script',
'nick': form.nick.data,
'name': form.name.data,
'args': form.args.data.split(';')
"kind": "script",
"nick": form.nick.data,
"name": form.name.data,
"args": form.args.data.split(";"),
}

View file

@ -5,23 +5,25 @@ from larigira.formutils import AutocompleteStringField
class StaticAudioForm(Form):
nick = StringField('Audio nick', validators=[validators.required()],
description='A simple name to recognize this audio')
path = AutocompleteStringField('dl-suggested-files',
'Path', validators=[validators.required()],
description='Full path to audio file')
submit = SubmitField('Submit')
nick = StringField(
"Audio nick",
validators=[validators.required()],
description="A simple name to recognize this audio",
)
path = AutocompleteStringField(
"dl-suggested-files",
"Path",
validators=[validators.required()],
description="Full path to audio file",
)
submit = SubmitField("Submit")
def populate_from_audiospec(self, audiospec):
if 'nick' in audiospec:
self.nick.data = audiospec['nick']
if 'paths' in audiospec:
self.path.data = audiospec['paths'][0]
if "nick" in audiospec:
self.nick.data = audiospec["nick"]
if "paths" in audiospec:
self.path.data = audiospec["paths"][0]
def staticaudio_receive(form):
return {
'kind': 'static',
'nick': form.nick.data,
'paths': [form.path.data]
}
return {"kind": "static", "nick": form.nick.data, "paths": [form.path.data]}

View file

@ -4,25 +4,30 @@ import argparse
from .entrypoints_utils import get_one_entrypoint
import json
from logging import getLogger
log = getLogger('audiogen')
log = getLogger("audiogen")
def get_audiogenerator(kind):
'''Messes with entrypoints to return an audiogenerator function'''
return get_one_entrypoint('larigira.audiogenerators', kind)
"""Messes with entrypoints to return an audiogenerator function"""
return get_one_entrypoint("larigira.audiogenerators", kind)
def get_parser():
parser = argparse.ArgumentParser(
description='Generate audio and output paths')
parser.add_argument('audiospec', metavar='AUDIOSPEC', type=str, nargs=1,
help='filename for audiospec, formatted in json')
parser = argparse.ArgumentParser(description="Generate audio and output paths")
parser.add_argument(
"audiospec",
metavar="AUDIOSPEC",
type=str,
nargs=1,
help="filename for audiospec, formatted in json",
)
return parser
def read_spec(fname):
try:
if fname == '-':
if fname == "-":
return json.load(sys.stdin)
with open(fname) as buf:
return json.load(buf)
@ -32,28 +37,28 @@ def read_spec(fname):
def check_spec(spec):
if 'kind' not in spec:
if "kind" not in spec:
yield "Missing field 'kind'"
def audiogenerate(spec):
gen = get_audiogenerator(spec['kind'])
gen = get_audiogenerator(spec["kind"])
return tuple(gen(spec))
def main():
'''Main function for the "larigira-audiogen" executable'''
"""Main function for the "larigira-audiogen" executable"""
args = get_parser().parse_args()
spec = read_spec(args.audiospec[0])
errors = tuple(check_spec(spec))
if errors:
log.error("Errors in audiospec")
for err in errors:
sys.stderr.write('Error: {}\n'.format(err))
sys.stderr.write("Error: {}\n".format(err))
sys.exit(1)
for path in audiogenerate(spec):
print(path)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -9,40 +9,39 @@ log = logging.getLogger(__name__)
def put(url, destdir=None, copy=False):
if url.split(':')[0] not in ('http', 'https'):
log.warning('Not a valid URL: %s', url)
if url.split(":")[0] not in ("http", "https"):
log.warning("Not a valid URL: %s", url)
return None
ext = url.split('.')[-1]
if ext.lower() not in ('mp3', 'ogg', 'oga', 'wma', 'm4a'):
ext = url.split(".")[-1]
if ext.lower() not in ("mp3", "ogg", "oga", "wma", "m4a"):
log.warning('Invalid format (%s) for "%s"', ext, url)
return None
if not copy:
return url
fname = posixpath.basename(urlparse(url).path)
# sanitize
fname = "".join(c for c in fname
if c.isalnum() or c in list('._-')).rstrip()
tmp = mkstemp(suffix='.' + ext, prefix='http-%s-' % fname, dir=destdir)
fname = "".join(c for c in fname if c.isalnum() or c in list("._-")).rstrip()
tmp = mkstemp(suffix="." + ext, prefix="http-%s-" % fname, dir=destdir)
os.close(tmp[0])
log.info("downloading %s -> %s", url, tmp[1])
fname, headers = urllib.request.urlretrieve(url, tmp[1])
return 'file://%s' % os.path.realpath(tmp[1])
return "file://%s" % os.path.realpath(tmp[1])
def generate(spec):
'''
"""
resolves audiospec-static
Recognized argument is "paths" (list of static paths)
'''
if 'urls' not in spec:
"""
if "urls" not in spec:
raise ValueError("Malformed audiospec: missing 'paths'")
for url in spec['urls']:
for url in spec["urls"]:
ret = put(url, copy=True)
if ret is None:
continue
yield ret
generate.description = 'Fetch audio from an URL'
generate.description = "Fetch audio from an URL"

View file

@ -7,6 +7,7 @@ from tempfile import mkstemp
from pytimeparse.timeparse import timeparse
from larigira.fsutils import scan_dir, shortname
log = logging.getLogger(__name__)
@ -23,36 +24,40 @@ def recent_choose(paths, howmany, minepoch):
if os.path.isfile(path):
found_files[path] = get_mtime(path)
elif os.path.isdir(path):
found_files.update({fname: get_mtime(fname)
for fname in scan_dir(path)})
found_files = [(fname, mtime)
for (fname, mtime) in found_files.items()
if mtime >= minepoch]
found_files.update({fname: get_mtime(fname) for fname in scan_dir(path)})
found_files = [
(fname, mtime) for (fname, mtime) in found_files.items() if mtime >= minepoch
]
return [fname for fname, mtime in
sorted(found_files, key=lambda x: x[1], reverse=True)[:howmany]]
return [
fname
for fname, mtime in sorted(found_files, key=lambda x: x[1], reverse=True)[
:howmany
]
]
def generate(spec):
'''
"""
resolves audiospec-randomdir
Recognized arguments:
- path [mandatory] source dir
- maxage [default=ignored] max age of audio files to pick
'''
for attr in ('path', 'maxage'):
"""
for attr in ("path", "maxage"):
if attr not in spec:
raise ValueError("Malformed audiospec: missing '%s'" % attr)
if spec['maxage'].strip():
if spec["maxage"].strip():
try:
maxage = int(spec['maxage'])
maxage = int(spec["maxage"])
except ValueError:
maxage = timeparse(spec['maxage'])
maxage = timeparse(spec["maxage"])
if maxage is None:
raise ValueError("Unknown format for maxage: '{}'"
.format(spec['maxage']))
raise ValueError(
"Unknown format for maxage: '{}'".format(spec["maxage"])
)
assert type(maxage) is int
else:
maxage = None
@ -60,15 +65,16 @@ def generate(spec):
now = int(time.time())
minepoch = 0 if maxage is None else now - maxage
picked = recent_choose([spec['path']], 1, minepoch)
picked = recent_choose([spec["path"]], 1, minepoch)
for path in picked:
tmp = mkstemp(suffix=os.path.splitext(path)[-1],
prefix='randomdir-%s-' % shortname(path))
tmp = mkstemp(
suffix=os.path.splitext(path)[-1], prefix="randomdir-%s-" % shortname(path)
)
os.close(tmp[0])
shutil.copy(path, tmp[1])
log.info("copying %s -> %s", path, os.path.basename(tmp[1]))
yield 'file://{}'.format(tmp[1])
yield "file://{}".format(tmp[1])
generate.description = 'Select most recent file inside a directory'
generate.description = "Select most recent file inside a directory"

View file

@ -1,5 +1,6 @@
import logging
log = logging.getLogger('mpdrandom')
log = logging.getLogger("mpdrandom")
import random
from mpd import MPDClient
@ -8,17 +9,17 @@ from .config import get_conf
def generate_by_artist(spec):
'''choose HOWMANY random artists, and for each one choose a random song'''
spec.setdefault('howmany', 1)
log.info('generating')
"""choose HOWMANY random artists, and for each one choose a random song"""
spec.setdefault("howmany", 1)
log.info("generating")
conf = get_conf()
c = MPDClient(use_unicode=True)
c.connect(conf['MPD_HOST'], conf['MPD_PORT'])
c.connect(conf["MPD_HOST"], conf["MPD_PORT"])
artists = c.list('artist')
artists = c.list("artist")
log.debug("got %d artists", len(artists))
if not artists:
raise ValueError("no artists in your mpd database")
for _ in range(spec['howmany']):
for _ in range(spec["howmany"]):
artist = random.choice(artists)
yield random.choice(c.find('artist', artist))['file']
yield random.choice(c.find("artist", artist))["file"]

View file

@ -6,6 +6,7 @@ from tempfile import mkstemp
from pathlib import Path
from larigira.fsutils import scan_dir_audio, shortname, is_audio
log = logging.getLogger(__name__)
@ -23,36 +24,37 @@ def candidates(paths):
def generate(spec):
'''
"""
resolves audiospec-randomdir
Recognized arguments:
- paths [mandatory] list of source paths
- howmany [default=1] number of audio files to pick
'''
spec.setdefault('howmany', 1)
for attr in ('paths', ):
"""
spec.setdefault("howmany", 1)
for attr in ("paths",):
if attr not in spec:
raise ValueError("Malformed audiospec: missing '%s'" % attr)
found_files = candidates([Path(p) for p in spec['paths']])
found_files = candidates([Path(p) for p in spec["paths"]])
picked = random.sample(found_files, int(spec['howmany']))
picked = random.sample(found_files, int(spec["howmany"]))
nick = spec.get('nick', '')
nick = spec.get("nick", "")
if not nick:
if hasattr(spec, 'eid'):
if hasattr(spec, "eid"):
nick = spec.eid
else:
nick = 'NONICK'
nick = "NONICK"
for path in picked:
tmp = mkstemp(suffix=os.path.splitext(path)[-1],
prefix='randomdir-%s-%s-' % (shortname(nick),
shortname(path)))
tmp = mkstemp(
suffix=os.path.splitext(path)[-1],
prefix="randomdir-%s-%s-" % (shortname(nick), shortname(path)),
)
os.close(tmp[0])
shutil.copy(path, tmp[1])
log.info("copying %s -> %s", path, os.path.basename(tmp[1]))
yield "file://{}".format(tmp[1])
generate.description = 'Picks random files from a specified directory'
generate.description = "Picks random files from a specified directory"

View file

@ -1,4 +1,4 @@
'''
"""
script audiogenerator: uses an external program to generate audio URIs
a script can be any valid executable in
@ -14,64 +14,65 @@ The output MUST be UTF-8-encoded.
Empty lines will be skipped. stderr will be logged, so please be careful. any
non-zero exit code will result in no files being added.and an exception being
logged.
'''
"""
import logging
import os
import subprocess
from .config import get_conf
log = logging.getLogger(__name__)
def generate(spec):
'''
"""
Recognized arguments (fields in spec):
- name [mandatory] script name
- args [default=empty] arguments, colon-separated
'''
"""
conf = get_conf()
spec.setdefault('args', '')
if type(spec['args']) is str:
args = spec['args'].split(';')
args = list(spec['args'])
for attr in ('name', ):
spec.setdefault("args", "")
if type(spec["args"]) is str:
args = spec["args"].split(";")
args = list(spec["args"])
for attr in ("name",):
if attr not in spec:
raise ValueError("Malformed audiospec: missing '%s'" % attr)
if '/' in spec['name']:
raise ValueError("Script name is a filename, not a path ({} provided)"
.format(spec['name']))
scriptpath = os.path.join(conf['SCRIPTS_PATH'], spec['name'])
if "/" in spec["name"]:
raise ValueError(
"Script name is a filename, not a path ({} provided)".format(spec["name"])
)
scriptpath = os.path.join(conf["SCRIPTS_PATH"], spec["name"])
if not os.path.exists(scriptpath):
raise ValueError("Script %s not found", spec['name'])
raise ValueError("Script %s not found", spec["name"])
if not os.access(scriptpath, os.R_OK | os.X_OK):
raise ValueError("Insufficient privileges for script %s" % scriptpath)
if os.stat(scriptpath).st_uid != os.getuid():
raise ValueError("Script %s owned by %d, should be owned by %d"
% (spec['name'], os.stat(scriptpath).st_uid,
os.getuid()))
try:
log.info('Going to run %s', [scriptpath] + args)
env = dict(
HOME=os.environ['HOME'],
PATH=os.environ['PATH'],
MPD_HOST=conf['MPD_HOST'],
MPD_PORT=str(conf['MPD_PORT'])
raise ValueError(
"Script %s owned by %d, should be owned by %d"
% (spec["name"], os.stat(scriptpath).st_uid, os.getuid())
)
if 'TMPDIR' in os.environ:
env['TMPDIR'] = os.environ['TMPDIR']
out = subprocess.check_output([scriptpath] + args,
env=env,
cwd='/')
try:
log.info("Going to run %s", [scriptpath] + args)
env = dict(
HOME=os.environ["HOME"],
PATH=os.environ["PATH"],
MPD_HOST=conf["MPD_HOST"],
MPD_PORT=str(conf["MPD_PORT"]),
)
if "TMPDIR" in os.environ:
env["TMPDIR"] = os.environ["TMPDIR"]
out = subprocess.check_output([scriptpath] + args, env=env, cwd="/")
except subprocess.CalledProcessError as exc:
log.error("Error %d when running script %s",
exc.returncode, spec['name'])
log.error("Error %d when running script %s", exc.returncode, spec["name"])
return []
out = out.decode('utf-8')
out = [p for p in out.split('\n') if p]
logging.debug('Script %s produced %d files', spec['name'], len(out))
out = out.decode("utf-8")
out = [p for p in out.split("\n") if p]
logging.debug("Script %s produced %d files", spec["name"], len(out))
return out
generate.description = 'Generate audio through an external script. ' \
'Experts only.'
generate.description = "Generate audio through an external script. " "Experts only."

View file

@ -4,28 +4,30 @@ import shutil
from tempfile import mkstemp
from larigira.fsutils import shortname
log = logging.getLogger(__name__)
def generate(spec):
'''
"""
resolves audiospec-static
Recognized argument is "paths" (list of static paths)
'''
if 'paths' not in spec:
"""
if "paths" not in spec:
raise ValueError("Malformed audiospec: missing 'paths'")
for path in spec['paths']:
for path in spec["paths"]:
if not os.path.exists(path):
log.warning("Can't find requested path: %s", path)
continue
tmp = mkstemp(suffix=os.path.splitext(path)[-1],
prefix='static-%s-' % shortname(path))
tmp = mkstemp(
suffix=os.path.splitext(path)[-1], prefix="static-%s-" % shortname(path)
)
os.close(tmp[0])
log.info("copying %s -> %s", path, os.path.basename(tmp[1]))
shutil.copy(path, tmp[1])
yield 'file://{}'.format(tmp[1])
yield "file://{}".format(tmp[1])
generate.description = 'Picks always the same specified file'
generate.description = "Picks always the same specified file"

View file

@ -1,5 +1,6 @@
from tinydb import TinyDB
class EventModel(object):
def __init__(self, uri):
self.uri = uri
@ -10,8 +11,8 @@ class EventModel(object):
if self.db is not None:
self.db.close()
self.db = TinyDB(self.uri, indent=2)
self._actions = self.db.table('actions')
self._alarms = self.db.table('alarms')
self._actions = self.db.table("actions")
self._alarms = self.db.table("alarms")
def get_action_by_id(self, action_id):
return self._actions.get(eid=action_id)
@ -20,7 +21,7 @@ class EventModel(object):
return self._alarms.get(eid=alarm_id)
def get_actions_by_alarm(self, alarm):
for action_id in alarm.get('actions', []):
for action_id in alarm.get("actions", []):
action = self.get_action_by_id(action_id)
if action is None:
continue
@ -39,7 +40,7 @@ class EventModel(object):
def add_event(self, alarm, actions):
action_ids = [self.add_action(a) for a in actions]
alarm['actions'] = action_ids
alarm["actions"] = action_ids
return self._alarms.insert(alarm)
def add_action(self, action):

View file

@ -1,14 +1,23 @@
'''
"""
This module contains a flask blueprint for db administration stuff
Templates are self-contained in this directory
'''
"""
from __future__ import print_function
from datetime import datetime, timedelta, time
from collections import defaultdict
from flask import current_app, Blueprint, render_template, jsonify, abort, \
request, redirect, url_for, flash
from flask import (
current_app,
Blueprint,
render_template,
jsonify,
abort,
request,
redirect,
url_for,
flash,
)
from larigira.entrypoints_utils import get_avail_entrypoints
from larigira.audiogen import get_audiogenerator
@ -17,56 +26,63 @@ from larigira.timegen import get_timegenerator, timegenerate
from larigira import forms
from larigira.config import get_conf
from .suggestions import get_suggestions
db = Blueprint('db', __name__,
url_prefix=get_conf()['ROUTE_PREFIX'] + '/db',
template_folder='templates')
db = Blueprint(
"db",
__name__,
url_prefix=get_conf()["ROUTE_PREFIX"] + "/db",
template_folder="templates",
)
def request_wants_json():
best = request.accept_mimetypes \
.best_match(['application/json', 'text/html'])
return best == 'application/json' and \
request.accept_mimetypes[best] > \
request.accept_mimetypes['text/html']
best = request.accept_mimetypes.best_match(["application/json", "text/html"])
return (
best == "application/json"
and request.accept_mimetypes[best] > request.accept_mimetypes["text/html"]
)
def get_model():
return current_app.larigira.controller.monitor.model
@db.route('/')
@db.route("/")
def home():
return render_template('dbadmin_base.html')
return render_template("dbadmin_base.html")
@db.route('/list')
@db.route("/list")
def events_list():
model = current_app.larigira.controller.monitor.model
alarms = tuple(model.get_all_alarms())
events = [(alarm, model.get_actions_by_alarm(alarm))
for alarm in alarms]
return render_template('list.html', events=events)
events = [(alarm, model.get_actions_by_alarm(alarm)) for alarm in alarms]
return render_template("list.html", events=events)
@db.route('/calendar')
@db.route("/calendar")
def events_calendar():
model = current_app.larigira.controller.monitor.model
today = datetime.now().date()
maxdays = 30
# {date: {datetime: [(alarm1,actions1), (alarm2,actions2)]}}
days = defaultdict(lambda: defaultdict(list))
freq_threshold = get_conf()['UI_CALENDAR_FREQUENCY_THRESHOLD']
freq_threshold = get_conf()["UI_CALENDAR_FREQUENCY_THRESHOLD"]
for alarm in model.get_all_alarms():
if freq_threshold and alarm['kind'] == 'frequency' and \
FrequencyAlarm(alarm).interval < freq_threshold:
if (
freq_threshold
and alarm["kind"] == "frequency"
and FrequencyAlarm(alarm).interval < freq_threshold
):
continue
actions = tuple(model.get_actions_by_alarm(alarm))
if not actions:
continue
t = datetime.fromtimestamp(int(today.strftime('%s')))
t = datetime.fromtimestamp(int(today.strftime("%s")))
for t in timegenerate(alarm, now=t, howmany=maxdays):
if t is None or \
t > datetime.combine(today+timedelta(days=maxdays), time()):
if t is None or t > datetime.combine(
today + timedelta(days=maxdays), time()
):
break
days[t.date()][t].append((alarm, actions))
@ -75,105 +91,101 @@ def events_calendar():
for d in sorted(days.keys()):
weeks[d.isocalendar()[:2]].append(d)
return render_template('calendar.html', days=days, weeks=weeks)
return render_template("calendar.html", days=days, weeks=weeks)
@db.route('/add/time')
@db.route("/add/time")
def addtime():
kinds = get_avail_entrypoints('larigira.timeform_create')
kinds = get_avail_entrypoints("larigira.timeform_create")
def gen_info(gen):
return dict(description=getattr(gen, 'description', ''))
info = {kind: gen_info(get_timegenerator(kind))
for kind in kinds}
return render_template('add_time.html', kinds=kinds, info=info)
return dict(description=getattr(gen, "description", ""))
info = {kind: gen_info(get_timegenerator(kind)) for kind in kinds}
return render_template("add_time.html", kinds=kinds, info=info)
@db.route('/edit/time/<int:alarmid>', methods=['GET', 'POST'])
@db.route("/edit/time/<int:alarmid>", methods=["GET", "POST"])
def edit_time(alarmid):
model = get_model()
timespec = model.get_alarm_by_id(alarmid)
kind = timespec['kind']
kind = timespec["kind"]
Form, receiver = tuple(forms.get_timeform(kind))
form = Form()
if request.method == 'GET':
if request.method == "GET":
form.populate_from_timespec(timespec)
if request.method == 'POST' and form.validate():
if request.method == "POST" and form.validate():
data = receiver(form)
model.update_alarm(alarmid, data)
model.reload()
return redirect(url_for('db.events_list',
_anchor='event-%d' % alarmid))
return render_template('add_time_kind.html',
form=form,
kind=kind,
mode='edit',
alarmid=alarmid,
)
return redirect(url_for("db.events_list", _anchor="event-%d" % alarmid))
return render_template(
"add_time_kind.html", form=form, kind=kind, mode="edit", alarmid=alarmid
)
@db.route('/add/time/<kind>', methods=['GET', 'POST'])
@db.route("/add/time/<kind>", methods=["GET", "POST"])
def addtime_kind(kind):
Form, receiver = tuple(forms.get_timeform(kind))
form = Form()
if request.method == 'POST' and form.validate():
if request.method == "POST" and form.validate():
data = receiver(form)
eid = get_model().add_alarm(data)
return redirect(url_for('db.edit_event', alarmid=eid))
return redirect(url_for("db.edit_event", alarmid=eid))
return render_template('add_time_kind.html',
form=form, kind=kind, mode='add')
return render_template("add_time_kind.html", form=form, kind=kind, mode="add")
@db.route('/add/audio')
@db.route("/add/audio")
def addaudio():
kinds = get_avail_entrypoints('larigira.audioform_create')
kinds = get_avail_entrypoints("larigira.audioform_create")
def gen_info(gen):
return dict(description=getattr(gen, 'description', ''))
info = {kind: gen_info(get_audiogenerator(kind))
for kind in kinds}
return render_template('add_audio.html', kinds=kinds, info=info)
return dict(description=getattr(gen, "description", ""))
info = {kind: gen_info(get_audiogenerator(kind)) for kind in kinds}
return render_template("add_audio.html", kinds=kinds, info=info)
@db.route('/add/audio/<kind>', methods=['GET', 'POST'])
@db.route("/add/audio/<kind>", methods=["GET", "POST"])
def addaudio_kind(kind):
Form, receiver = tuple(forms.get_audioform(kind))
form = Form()
if request.method == 'POST' and form.validate():
if request.method == "POST" and form.validate():
data = receiver(form)
model = current_app.larigira.controller.monitor.model
eid = model.add_action(data)
return jsonify(dict(inserted=eid, data=data))
return render_template('add_audio_kind.html', form=form, kind=kind,
suggestions=get_suggestions()
)
return render_template(
"add_audio_kind.html", form=form, kind=kind, suggestions=get_suggestions()
)
@db.route('/edit/audio/<int:actionid>', methods=['GET', 'POST'])
@db.route("/edit/audio/<int:actionid>", methods=["GET", "POST"])
def edit_audio(actionid):
model = get_model()
audiospec = model.get_action_by_id(actionid)
kind = audiospec['kind']
kind = audiospec["kind"]
Form, receiver = tuple(forms.get_audioform(kind))
form = Form()
if request.method == 'GET':
if request.method == "GET":
form.populate_from_audiospec(audiospec)
if request.method == 'POST' and form.validate():
if request.method == "POST" and form.validate():
data = receiver(form)
model.update_action(actionid, data)
model.reload()
return redirect(url_for('db.events_list'))
return render_template('add_audio_kind.html',
form=form,
kind=kind,
mode='edit',
suggestions=get_suggestions()
)
return redirect(url_for("db.events_list"))
return render_template(
"add_audio_kind.html",
form=form,
kind=kind,
mode="edit",
suggestions=get_suggestions(),
)
@db.route('/edit/event/<alarmid>')
@db.route("/edit/event/<alarmid>")
def edit_event(alarmid):
model = current_app.larigira.controller.monitor.model
alarm = model.get_alarm_by_id(int(alarmid))
@ -181,35 +193,37 @@ def edit_event(alarmid):
abort(404)
allactions = model.get_all_actions()
actions = tuple(model.get_actions_by_alarm(alarm))
return render_template('edit_event.html',
alarm=alarm, all_actions=allactions,
actions=actions,
routeprefix=get_conf()['ROUTE_PREFIX']
)
return render_template(
"edit_event.html",
alarm=alarm,
all_actions=allactions,
actions=actions,
routeprefix=get_conf()["ROUTE_PREFIX"],
)
@db.route('/api/alarm/<alarmid>/actions', methods=['POST'])
@db.route("/api/alarm/<alarmid>/actions", methods=["POST"])
def change_actions(alarmid):
new_actions = request.form.getlist('actions[]')
new_actions = request.form.getlist("actions[]")
if new_actions is None:
new_actions = []
model = current_app.larigira.controller.monitor.model
ret = model.update_alarm(int(alarmid),
new_fields={'actions': [int(a) for a in
new_actions]})
ret = model.update_alarm(
int(alarmid), new_fields={"actions": [int(a) for a in new_actions]}
)
return jsonify(dict(updated=alarmid, ret=ret))
@db.route('/api/alarm/<int:alarmid>/delete', methods=['POST'])
@db.route("/api/alarm/<int:alarmid>/delete", methods=["POST"])
def delete_alarm(alarmid):
model = current_app.larigira.controller.monitor.model
try:
alarm = model.get_alarm_by_id(int(alarmid))
print(alarm['nick'])
print(alarm["nick"])
model.delete_alarm(alarmid)
except KeyError:
abort(404)
if request_wants_json():
return jsonify(dict(deleted=alarmid))
flash('Evento %d `%s` cancellato' % (alarmid, alarm['nick']) )
return redirect(url_for('db.events_list'))
flash("Evento %d `%s` cancellato" % (alarmid, alarm["nick"]))
return redirect(url_for("db.events_list"))

View file

@ -7,22 +7,23 @@ from larigira.fsutils import scan_dir_audio
def get_suggested_files():
if not get_conf()['FILE_PATH_SUGGESTION']:
if not get_conf()["FILE_PATH_SUGGESTION"]:
return []
if current_app.cache.has('dbadmin.get_suggested_files'):
return current_app.cache.get('dbadmin.get_suggested_files')
current_app.logger.debug('get_suggested_files MISS in cache')
if current_app.cache.has("dbadmin.get_suggested_files"):
return current_app.cache.get("dbadmin.get_suggested_files")
current_app.logger.debug("get_suggested_files MISS in cache")
files = []
for path in get_conf()['FILE_PATH_SUGGESTION']:
for path in get_conf()["FILE_PATH_SUGGESTION"]:
if not os.path.isdir(path):
current_app.logger.warning('Invalid suggestion path: %s' % path)
current_app.logger.warning("Invalid suggestion path: %s" % path)
continue
pathfiles = scan_dir_audio(path)
files.extend(pathfiles)
current_app.logger.debug('Suggested files: %s' % ', '.join(files))
current_app.logger.debug("Suggested files: %s" % ", ".join(files))
current_app.cache.set('dbadmin.get_suggested_files', files,
timeout=600) # ten minutes
current_app.cache.set(
"dbadmin.get_suggested_files", files, timeout=600
) # ten minutes
return files
@ -40,11 +41,14 @@ def get_suggested_dirs():
def get_suggested_scripts():
base = get_conf()['SCRIPTS_PATH']
base = get_conf()["SCRIPTS_PATH"]
if not base or not os.path.isdir(base):
return []
fnames = [f for f in os.listdir(base)
if os.access(os.path.join(base, f), os.R_OK | os.X_OK)]
fnames = [
f
for f in os.listdir(base)
if os.access(os.path.join(base, f), os.R_OK | os.X_OK)
]
return fnames
@ -53,10 +57,4 @@ def get_suggestions():
if len(files) > 200:
current_app.logger.warning("Too many suggested files, cropping")
files = files[:200]
return dict(
files=files,
dirs=get_suggested_dirs(),
scripts=get_suggested_scripts(),
)
return dict(files=files, dirs=get_suggested_dirs(), scripts=get_suggested_scripts())

View file

@ -1,13 +1,14 @@
from logging import getLogger
log = getLogger('entrypoints_utils')
log = getLogger("entrypoints_utils")
from pkg_resources import iter_entry_points
def get_one_entrypoint(group, kind):
'''Messes with entrypoints to return an entrypoint of a given group/kind'''
"""Messes with entrypoints to return an entrypoint of a given group/kind"""
points = tuple(iter_entry_points(group=group, name=kind))
if not points:
raise ValueError('cant find an entrypoint %s:%s' % (group, kind))
raise ValueError("cant find an entrypoint %s:%s" % (group, kind))
if len(points) > 1:
log.warning("Found more than one timeform for %s:%s", group, kind)
return points[0].load()

View file

@ -1,5 +1,6 @@
from __future__ import print_function
from gevent import monkey
monkey.patch_all(subprocess=True)
import logging
from datetime import datetime, timedelta
@ -12,10 +13,11 @@ from .timegen import timegenerate
from .audiogen import audiogenerate
from .db import EventModel
logging.getLogger('mpd').setLevel(logging.WARNING)
logging.getLogger("mpd").setLevel(logging.WARNING)
class Monitor(ParentedLet):
'''
"""
Manages timegenerators and audiogenerators for DB events
The mechanism is partially based on ticks, partially on scheduled actions.
@ -30,23 +32,25 @@ class Monitor(ParentedLet):
The scheduling mechanism allows for more precision, catching exactly the
right time. Being accurate only with ticks would have required very
frequent ticks, which is cpu-intensive.
'''
"""
def __init__(self, parent_queue, conf):
ParentedLet.__init__(self, parent_queue)
self.log = logging.getLogger(self.__class__.__name__)
self.running = {}
self.conf = conf
self.q = Queue()
self.model = EventModel(self.conf['DB_URI'])
self.ticker = Timer(int(self.conf['EVENT_TICK_SECS']) * 1000, self.q)
self.model = EventModel(self.conf["DB_URI"])
self.ticker = Timer(int(self.conf["EVENT_TICK_SECS"]) * 1000, self.q)
def _alarm_missing_time(self, timespec):
now = datetime.now() + timedelta(seconds=self.conf['CACHING_TIME'])
now = datetime.now() + timedelta(seconds=self.conf["CACHING_TIME"])
try:
when = next(timegenerate(timespec, now=now))
except:
logging.exception("Could not generate "
"an alarm from timespec %s", timespec)
logging.exception(
"Could not generate " "an alarm from timespec %s", timespec
)
if when is None:
# expired
return None
@ -55,12 +59,12 @@ class Monitor(ParentedLet):
return delta
def on_tick(self):
'''
"""
this is called every EVENT_TICK_SECS.
Checks every event in the DB (which might be slightly CPU-intensive, so
it is advisable to run it in its own greenlet); if the event is "near
enough", schedule it; if it is too far, or already expired, ignore it.
'''
"""
self.model.reload()
for alarm in self.model.get_all_alarms():
actions = list(self.model.get_actions_by_alarm(alarm))
@ -71,75 +75,84 @@ class Monitor(ParentedLet):
# but it is "tricky"; any small delay would cause the event to be
# missed
if delta is None:
self.log.debug('Skipping event %s: will never ring',
alarm.get('nick', alarm.eid))
elif delta <= 2*self.conf['EVENT_TICK_SECS']:
self.log.debug('Scheduling event %s (%ds) => %s',
alarm.get('nick', alarm.eid),
delta,
[a.get('nick', a.eid) for a in actions]
)
self.log.debug(
"Skipping event %s: will never ring", alarm.get("nick", alarm.eid)
)
elif delta <= 2 * self.conf["EVENT_TICK_SECS"]:
self.log.debug(
"Scheduling event %s (%ds) => %s",
alarm.get("nick", alarm.eid),
delta,
[a.get("nick", a.eid) for a in actions],
)
self.schedule(alarm, actions, delta)
else:
self.log.debug('Skipping event %s too far (%ds)',
alarm.get('nick', alarm.eid),
delta,
)
self.log.debug(
"Skipping event %s too far (%ds)",
alarm.get("nick", alarm.eid),
delta,
)
def schedule(self, timespec, audiospecs, delta=None):
'''
"""
prepare an event to be run at a specified time with the specified
actions; the DB won't be read anymore after this call.
This means that this call should not be done too early, or any update
to the DB will be ignored.
'''
"""
if delta is None:
delta = self._alarm_missing_time(timespec)
audiogen = gevent.spawn_later(delta,
self.process_action,
timespec, audiospecs)
audiogen = gevent.spawn_later(delta, self.process_action, timespec, audiospecs)
audiogen.parent_greenlet = self
audiogen.doc = 'Will wait {} seconds, then generate audio "{}"'.format(
delta,
','.join(aspec.get('nick', '') for aspec in audiospecs),
delta, ",".join(aspec.get("nick", "") for aspec in audiospecs)
)
self.running[timespec.eid] = {
'greenlet': audiogen,
'running_time': datetime.now() + timedelta(seconds=delta),
'timespec': timespec,
'audiospecs': audiospecs
"greenlet": audiogen,
"running_time": datetime.now() + timedelta(seconds=delta),
"timespec": timespec,
"audiospecs": audiospecs,
}
def process_action(self, timespec, audiospecs):
'''Generate audio and submit it to Controller'''
"""Generate audio and submit it to Controller"""
if timespec.eid in self.running:
del self.running[timespec.eid]
else:
self.log.warning('Timespec %s completed but not in running '
'registry; this is most likely a bug',
timespec.get('nick', timespec.eid))
self.log.warning(
"Timespec %s completed but not in running "
"registry; this is most likely a bug",
timespec.get("nick", timespec.eid),
)
uris = []
for audiospec in audiospecs:
try:
uris.extend(audiogenerate(audiospec))
except Exception as exc:
self.log.error('audiogenerate for <%s> failed; reason: %s',
str(audiospec), str(exc))
self.send_to_parent('uris_enqueue',
dict(uris=uris,
timespec=timespec,
audiospecs=audiospecs,
aids=[a.eid for a in audiospecs]))
self.log.error(
"audiogenerate for <%s> failed; reason: %s",
str(audiospec),
str(exc),
)
self.send_to_parent(
"uris_enqueue",
dict(
uris=uris,
timespec=timespec,
audiospecs=audiospecs,
aids=[a.eid for a in audiospecs],
),
)
def _run(self):
self.ticker.start()
gevent.spawn(self.on_tick)
while True:
value = self.q.get()
kind = value['kind']
if kind in ('forcetick', 'timer'):
kind = value["kind"]
if kind in ("forcetick", "timer"):
gevent.spawn(self.on_tick)
else:
self.log.warning("Unknown message: %s", str(value))

View file

@ -14,25 +14,28 @@ def main_list(args):
def main_add(args):
m = EventModel(args.file)
m.add_event(dict(kind='frequency', interval=args.interval, start=1),
[dict(kind='mpd', howmany=1)]
)
m.add_event(
dict(kind="frequency", interval=args.interval, start=1),
[dict(kind="mpd", howmany=1)],
)
def main():
conf = get_conf()
p = argparse.ArgumentParser()
p.add_argument('-f', '--file', help="Filepath for DB", required=False,
default=conf['DB_URI'])
p.add_argument(
"-f", "--file", help="Filepath for DB", required=False, default=conf["DB_URI"]
)
sub = p.add_subparsers()
sub_list = sub.add_parser('list')
sub_list = sub.add_parser("list")
sub_list.set_defaults(func=main_list)
sub_add = sub.add_parser('add')
sub_add.add_argument('--interval', type=int, default=3600)
sub_add = sub.add_parser("add")
sub_add.add_argument("--interval", type=int, default=3600)
sub_add.set_defaults(func=main_add)
args = p.parse_args()
args.func(args)
if __name__ == '__main__':
if __name__ == "__main__":
main()

View file

@ -2,13 +2,14 @@ import gevent
class ParentedLet(gevent.Greenlet):
'''
"""
ParentedLet is just a helper subclass that will help you when your
greenlet main duty is to "signal" things to a parent_queue.
It won't save you much code, but "standardize" messages and make explicit
the role of that greenlet
'''
"""
def __init__(self, queue):
gevent.Greenlet.__init__(self)
self.parent_queue = queue
@ -17,36 +18,38 @@ class ParentedLet(gevent.Greenlet):
def parent_msg(self, kind, *args):
return {
'emitter': self,
'class': self.__class__.__name__,
'tracker': self.tracker,
'kind': kind,
'args': args
"emitter": self,
"class": self.__class__.__name__,
"tracker": self.tracker,
"kind": kind,
"args": args,
}
def send_to_parent(self, kind, *args):
self.parent_queue.put(self.parent_msg(kind, *args))
def _run(self):
if not hasattr(self, 'do_business'):
raise Exception("do_business method not implemented by %s" %
self.__class__.__name__)
if not hasattr(self, "do_business"):
raise Exception(
"do_business method not implemented by %s" % self.__class__.__name__
)
for msg in self.do_business():
self.send_to_parent(*msg)
class Timer(ParentedLet):
'''continously sleeps some time, then send a "timer" message to parent'''
"""continously sleeps some time, then send a "timer" message to parent"""
def __init__(self, milliseconds, queue):
ParentedLet.__init__(self, queue)
self.ms = milliseconds
def parent_msg(self, kind, *args):
msg = ParentedLet.parent_msg(self, kind, *args)
msg['period'] = self.ms
msg["period"] = self.ms
return msg
def do_business(self):
while True:
gevent.sleep(self.ms / 1000.0)
yield ('timer', )
yield ("timer",)

View file

@ -2,22 +2,22 @@ import wave
def maxwait(songs, context, conf):
wait = int(conf.get('EF_MAXWAIT_SEC', 0))
wait = int(conf.get("EF_MAXWAIT_SEC", 0))
if wait == 0:
return True
if 'time' not in context['status']:
return True, 'no song playing?'
curpos, duration = map(int, context['status']['time'].split(':'))
if "time" not in context["status"]:
return True, "no song playing?"
curpos, duration = map(int, context["status"]["time"].split(":"))
remaining = duration - curpos
if remaining > wait:
return False, 'remaining %d max allowed %d' % (remaining, wait)
return False, "remaining %d max allowed %d" % (remaining, wait)
return True
def get_duration(path):
'''get track duration in seconds'''
if path.lower().endswith('.wav'):
with wave.open(path, 'r') as f:
"""get track duration in seconds"""
if path.lower().endswith(".wav"):
with wave.open(path, "r") as f:
frames = f.getnframes()
rate = f.getframerate()
duration = frames / rate
@ -34,7 +34,7 @@ def get_duration(path):
def percentwait(songs, context, conf, getdur=get_duration):
'''
"""
Similar to maxwait, but the maximum waiting time is proportional to the
duration of the audio we're going to add.
@ -46,25 +46,25 @@ def percentwait(songs, context, conf, getdur=get_duration):
are adding a jingle of 40seconds, then if EF_MAXWAIT_PERC==200 the audio
will be added (40s*200% = 1m20s) while if EF_MAXWAIT_PERC==100 it will be
filtered out.
'''
percentwait = int(conf.get('EF_MAXWAIT_PERC', 0))
"""
percentwait = int(conf.get("EF_MAXWAIT_PERC", 0))
if percentwait == 0:
return True
if 'time' not in context['status']:
return True, 'no song playing?'
curpos, duration = map(int, context['status']['time'].split(':'))
if "time" not in context["status"]:
return True, "no song playing?"
curpos, duration = map(int, context["status"]["time"].split(":"))
remaining = duration - curpos
eventduration = 0
for uri in songs['uris']:
if not uri.startswith('file://'):
return True, '%s is not a file' % uri
path = uri[len('file://'):] # strips file://
for uri in songs["uris"]:
if not uri.startswith("file://"):
return True, "%s is not a file" % uri
path = uri[len("file://") :] # strips file://
songduration = getdur(path)
if songduration is None:
continue
eventduration += songduration
wait = eventduration * (percentwait/100.)
wait = eventduration * (percentwait / 100.0)
if remaining > wait:
return False, 'remaining %d max allowed %d' % (remaining, wait)
return False, "remaining %d max allowed %d" % (remaining, wait)
return True

View file

@ -9,17 +9,26 @@ def matchval(d):
if k in input_: # string matching
return d[k]
raise Exception("This test case is bugged! No value for %s" % input_)
return mocked
durations = dict(one=60, two=120, three=180, four=240, ten=600, twenty=1200,
thirty=1800, nonexist=None)
durations = dict(
one=60,
two=120,
three=180,
four=240,
ten=600,
twenty=1200,
thirty=1800,
nonexist=None,
)
dur = matchval(durations)
def normalize_ret(ret):
if type(ret) is bool:
return ret, ''
return ret, ""
return ret
@ -28,7 +37,7 @@ def mw(*args, **kwargs):
def pw(*args, **kwargs):
kwargs['getdur'] = dur
kwargs["getdur"] = dur
return normalize_ret(percentwait(*args, **kwargs))
@ -38,26 +47,26 @@ def test_maxwait_nonpresent_disabled():
def test_maxwait_explicitly_disabled():
ret = mw([], {}, {'EF_MAXWAIT_SEC': 0})
ret = mw([], {}, {"EF_MAXWAIT_SEC": 0})
assert ret[0] is True
def test_maxwait_ok():
ret = mw([], {'status': {'time': '250:300'}}, {'EF_MAXWAIT_SEC': 100})
ret = mw([], {"status": {"time": "250:300"}}, {"EF_MAXWAIT_SEC": 100})
assert ret[0] is True
def test_maxwait_exceeded():
ret = mw([], {'status': {'time': '100:300'}}, {'EF_MAXWAIT_SEC': 100})
ret = mw([], {"status": {"time": "100:300"}}, {"EF_MAXWAIT_SEC": 100})
assert ret[0] is False
def test_maxwait_limit():
ret = mw([], {'status': {'time': '199:300'}}, {'EF_MAXWAIT_SEC': 100})
ret = mw([], {"status": {"time": "199:300"}}, {"EF_MAXWAIT_SEC": 100})
assert ret[0] is False
ret = mw([], {'status': {'time': '200:300'}}, {'EF_MAXWAIT_SEC': 100})
ret = mw([], {"status": {"time": "200:300"}}, {"EF_MAXWAIT_SEC": 100})
assert ret[0] is True
ret = mw([], {'status': {'time': '201:300'}}, {'EF_MAXWAIT_SEC': 100})
ret = mw([], {"status": {"time": "201:300"}}, {"EF_MAXWAIT_SEC": 100})
assert ret[0] is True
@ -67,32 +76,40 @@ def test_percentwait_nonpresent_disabled():
def test_percentwait_explicitly_disabled():
ret = pw([], {}, {'EF_MAXWAIT_PERC': 0})
ret = pw([], {}, {"EF_MAXWAIT_PERC": 0})
assert ret[0] is True
def test_percentwait_ok():
# less than one minute missing
ret = pw(dict(uris=['file:///oneminute.ogg']),
{'status': {'time': '250:300'}},
{'EF_MAXWAIT_PERC': 100})
ret = pw(
dict(uris=["file:///oneminute.ogg"]),
{"status": {"time": "250:300"}},
{"EF_MAXWAIT_PERC": 100},
)
assert ret[0] is True
# more than one minute missing
ret = pw(dict(uris=['file:///oneminute.ogg']),
{'status': {'time': '220:300'}},
{'EF_MAXWAIT_PERC': 100})
ret = pw(
dict(uris=["file:///oneminute.ogg"]),
{"status": {"time": "220:300"}},
{"EF_MAXWAIT_PERC": 100},
)
assert ret[0] is False
def test_percentwait_morethan100():
# requiring 5*10 = 50mins = 3000sec
ret = pw(dict(uris=['file:///tenminute.ogg']),
{'status': {'time': '4800:6000'}},
{'EF_MAXWAIT_PERC': 500})
ret = pw(
dict(uris=["file:///tenminute.ogg"]),
{"status": {"time": "4800:6000"}},
{"EF_MAXWAIT_PERC": 500},
)
assert ret[0] is True
ret = pw(dict(uris=['file:///oneminute.ogg']),
{'status': {'time': '2000:6000'}},
{'EF_MAXWAIT_PERC': 500})
ret = pw(
dict(uris=["file:///oneminute.ogg"]),
{"status": {"time": "2000:6000"}},
{"EF_MAXWAIT_PERC": 500},
)
assert ret[0] is False

View file

@ -1,15 +1,16 @@
from logging import getLogger
log = getLogger('timeform')
log = getLogger("timeform")
from .entrypoints_utils import get_one_entrypoint
def get_timeform(kind):
'''Messes with entrypoints to return a TimeForm'''
for group in ('larigira.timeform_create', 'larigira.timeform_receive'):
"""Messes with entrypoints to return a TimeForm"""
for group in ("larigira.timeform_create", "larigira.timeform_receive"):
yield get_one_entrypoint(group, kind)
def get_audioform(kind):
'''Messes with entrypoints to return a AudioForm'''
for group in ('larigira.audioform_create', 'larigira.audioform_receive'):
"""Messes with entrypoints to return a AudioForm"""
for group in ("larigira.audioform_create", "larigira.audioform_receive"):
yield get_one_entrypoint(group, kind)

View file

@ -11,17 +11,16 @@ log = logging.getLogger(__name__)
class AutocompleteTextInput(wtforms.widgets.Input):
def __init__(self, datalist=None):
super().__init__('text')
super().__init__("text")
self.datalist = datalist
def __call__(self, field, **kwargs):
# every second can be specified
if self.datalist is not None:
return super(AutocompleteTextInput, self).__call__(
field, list=self.datalist, autocomplete="autocomplete",
**kwargs)
return super(AutocompleteTextInput, self).__call__(
field, **kwargs)
field, list=self.datalist, autocomplete="autocomplete", **kwargs
)
return super(AutocompleteTextInput, self).__call__(field, **kwargs)
class AutocompleteStringField(StringField):
@ -31,15 +30,15 @@ class AutocompleteStringField(StringField):
class DateTimeInput(wtforms.widgets.Input):
input_type = 'datetime-local'
input_type = "datetime-local"
def __call__(self, field, **kwargs):
# every second can be specified
return super(DateTimeInput, self).__call__(field, step='1', **kwargs)
return super(DateTimeInput, self).__call__(field, step="1", **kwargs)
class EasyDateTimeField(Field):
'''
"""
a "fork" of DateTimeField which uses HTML5 datetime-local
The format is not customizable, because it is imposed by the HTML5
@ -47,27 +46,28 @@ class EasyDateTimeField(Field):
This field does not ensure that browser actually supports datetime-local
input type, nor does it provide polyfills.
'''
"""
widget = DateTimeInput()
formats = ('%Y-%m-%dT%H:%M:%S', '%Y-%m-%dT%H:%M')
formats = ("%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M")
def __init__(self, label=None, validators=None, **kwargs):
super(EasyDateTimeField, self).__init__(label, validators, **kwargs)
def _value(self):
if self.raw_data:
return ' '.join(self.raw_data)
return self.data and self.data.strftime(self.formats[0]) or ''
return " ".join(self.raw_data)
return self.data and self.data.strftime(self.formats[0]) or ""
def process_formdata(self, valuelist):
if valuelist:
date_str = ' '.join(valuelist)
date_str = " ".join(valuelist)
for fmt in self.formats:
try:
self.data = datetime.strptime(date_str, fmt)
return
except ValueError:
log.debug('Format `%s` not valid for `%s`',
fmt, date_str)
raise ValueError(self.gettext(
'Not a valid datetime value <tt>{}</tt>').format(date_str))
log.debug("Format `%s` not valid for `%s`", fmt, date_str)
raise ValueError(
self.gettext("Not a valid datetime value <tt>{}</tt>").format(date_str)
)

View file

@ -5,7 +5,7 @@ import mimetypes
def scan_dir(dirname, extension=None):
if extension is None:
extension = '*'
extension = "*"
for root, dirnames, filenames in os.walk(dirname):
for fname in fnmatch.filter(filenames, extension):
yield os.path.join(root, fname)
@ -13,7 +13,7 @@ def scan_dir(dirname, extension=None):
def multi_fnmatch(fname, extensions):
for ext in extensions:
if fnmatch.fnmatch(fname, '*.' + ext):
if fnmatch.fnmatch(fname, "*." + ext):
return True
return False
@ -22,10 +22,10 @@ def is_audio(fname):
mimetype = mimetypes.guess_type(fname)[0]
if mimetype is None:
return False
return mimetype.split('/')[0] == 'audio'
return mimetype.split("/")[0] == "audio"
def scan_dir_audio(dirname, extensions=('mp3', 'oga', 'wav', 'ogg')):
def scan_dir_audio(dirname, extensions=("mp3", "oga", "wav", "ogg")):
for root, dirnames, filenames in os.walk(dirname):
for fname in filenames:
if is_audio(fname):
@ -34,6 +34,6 @@ def scan_dir_audio(dirname, extensions=('mp3', 'oga', 'wav', 'ogg')):
def shortname(path):
name = os.path.basename(path) # filename
name = name.rsplit('.', 1)[0] # no extension
name = ''.join(c for c in name if c.isalnum()) # no strange chars
name = name.rsplit(".", 1)[0] # no extension
name = "".join(c for c in name if c.isalnum()) # no strange chars
return name

View file

@ -16,15 +16,16 @@ from .entrypoints_utils import get_avail_entrypoints
def get_mpd_client(conf):
client = mpd.MPDClient(use_unicode=True)
client.connect(conf['MPD_HOST'], conf['MPD_PORT'])
client.connect(conf["MPD_HOST"], conf["MPD_PORT"])
return client
class MPDWatcher(ParentedLet):
'''
"""
MPDWatcher notifies parent about any mpd event
'''
"""
def __init__(self, queue, conf, client=None):
ParentedLet.__init__(self, queue)
self.log = logging.getLogger(self.__class__.__name__)
@ -41,46 +42,51 @@ class MPDWatcher(ParentedLet):
if self.client is None:
self.refresh_client()
if first_after_connection:
yield('mpc', 'connect')
yield ("mpc", "connect")
status = self.client.idle()[0]
except (mpd.ConnectionError, ConnectionRefusedError,
FileNotFoundError) as exc:
self.log.warning('Connection to MPD failed (%s: %s)',
exc.__class__.__name__, exc)
except (
mpd.ConnectionError,
ConnectionRefusedError,
FileNotFoundError,
) as exc:
self.log.warning(
"Connection to MPD failed (%s: %s)", exc.__class__.__name__, exc
)
self.client = None
first_after_connection = True
gevent.sleep(5)
continue
else:
first_after_connection = False
yield ('mpc', status)
yield ("mpc", status)
class Player:
'''
"""
The player contains different mpd-related methods
check_playlist determines whether the playlist is long enough and run audiogenerator accordingly
enqueue receive audios that have been generated by Monitor and (if filters allow it) enqueue it to MPD playlist
'''
"""
def __init__(self, conf):
self.conf = conf
self.log = logging.getLogger(self.__class__.__name__)
self.min_playlist_length = 10
self.tmpcleaner = UnusedCleaner(conf)
self._continous_audiospec = self.conf['CONTINOUS_AUDIOSPEC']
self._continous_audiospec = self.conf["CONTINOUS_AUDIOSPEC"]
self.events_enabled = True
def _get_mpd(self):
mpd_client = mpd.MPDClient(use_unicode=True)
try:
mpd_client.connect(self.conf['MPD_HOST'], self.conf['MPD_PORT'])
except (mpd.ConnectionError, ConnectionRefusedError,
FileNotFoundError) as exc:
self.log.warning('Connection to MPD failed (%s: %s)',
exc.__class__.__name__, exc)
mpd_client.connect(self.conf["MPD_HOST"], self.conf["MPD_PORT"])
except (mpd.ConnectionError, ConnectionRefusedError, FileNotFoundError) as exc:
self.log.warning(
"Connection to MPD failed (%s: %s)", exc.__class__.__name__, exc
)
raise gevent.GreenletExit()
return mpd_client
@ -90,16 +96,17 @@ class Player:
@continous_audiospec.setter
def continous_audiospec(self, spec):
self._continous_audiospec = self.conf['CONTINOUS_AUDIOSPEC'] \
if spec is None else spec
self._continous_audiospec = (
self.conf["CONTINOUS_AUDIOSPEC"] if spec is None else spec
)
def clear_everything_but_current_song():
mpdc = self._get_mpd()
current = mpdc.currentsong()
pos = int(current.get('pos', 0))
pos = int(current.get("pos", 0))
for song in mpdc.playlistid():
if int(song['pos']) != pos:
mpdc.deleteid(song['id'])
if int(song["pos"]) != pos:
mpdc.deleteid(song["id"])
gevent.Greenlet.spawn(clear_everything_but_current_song)
@ -107,12 +114,11 @@ class Player:
mpd_client = self._get_mpd()
songs = mpd_client.playlist()
current = mpd_client.currentsong()
pos = int(current.get('pos', 0)) + 1
pos = int(current.get("pos", 0)) + 1
if (len(songs) - pos) >= self.min_playlist_length:
return
self.log.info('need to add new songs')
picker = gevent.Greenlet(audiogenerate,
self.continous_audiospec)
self.log.info("need to add new songs")
picker = gevent.Greenlet(audiogenerate, self.continous_audiospec)
def add(greenlet):
uris = greenlet.value
@ -120,69 +126,71 @@ class Player:
assert type(uri) is str, type(uri)
self.tmpcleaner.watch(uri.strip())
mpd_client.add(uri.strip())
picker.link_value(add)
picker.start()
def enqueue_filter(self, songs):
eventfilters = self.conf['EVENT_FILTERS']
eventfilters = self.conf["EVENT_FILTERS"]
if not eventfilters:
return True, ''
availfilters = get_avail_entrypoints('larigira.eventfilter')
return True, ""
availfilters = get_avail_entrypoints("larigira.eventfilter")
if len([ef for ef in eventfilters if ef in availfilters]) == 0:
return True, ''
return True, ""
mpdc = self._get_mpd()
status = mpdc.status()
ctx = {
'playlist': mpdc.playlist(),
'status': status,
'durations': []
}
for entrypoint in iter_entry_points('larigira.eventfilter'):
ctx = {"playlist": mpdc.playlist(), "status": status, "durations": []}
for entrypoint in iter_entry_points("larigira.eventfilter"):
if entrypoint.name in eventfilters:
ef = entrypoint.load()
try:
ret = ef(songs=songs, context=ctx, conf=self.conf)
except ImportError as exc:
self.log.warn("Filter %s skipped: %s" % (entrypoint.name,
exc))
self.log.warn("Filter %s skipped: %s" % (entrypoint.name, exc))
continue
if ret is None: # bad behavior!
continue
if type(ret) is bool:
reason = ''
reason = ""
else:
ret, reason = ret
reason = 'Filtered by %s (%s)' % (entrypoint.name, reason)
reason = "Filtered by %s (%s)" % (entrypoint.name, reason)
if ret is False:
return ret, reason
return True, 'Passed through %s' % ','.join(availfilters)
return True, "Passed through %s" % ",".join(availfilters)
def enqueue(self, songs):
assert type(songs) is dict
assert 'uris' in songs
spec = [aspec.get('nick', aspec.eid) for aspec in songs['audiospecs']]
nicks = ','.join((aspec.get('nick', aspec.eid)
for aspec in songs['audiospecs']))
assert "uris" in songs
spec = [aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"]]
nicks = ",".join(
(aspec.get("nick", aspec.eid) for aspec in songs["audiospecs"])
)
if not self.events_enabled:
self.log.debug('Ignoring <%s> (events disabled)', nicks
)
self.log.debug("Ignoring <%s> (events disabled)", nicks)
return
filterok, reason = self.enqueue_filter(songs)
if not filterok:
self.log.debug('Ignoring <%s>, filtered: %s', nicks, reason)
self.log.debug("Ignoring <%s>, filtered: %s", nicks, reason)
# delete those files
for uri in reversed(songs['uris']):
for uri in reversed(songs["uris"]):
self.tmpcleaner.watch(uri.strip())
return
mpd_client = self._get_mpd()
for uri in reversed(songs['uris']):
for uri in reversed(songs["uris"]):
assert type(uri) is str
self.log.info('Adding %s to playlist (from <%s>:%s=%s)',
uri,
songs['timespec'].get('nick', ''),
songs['aids'], spec)
insert_pos = 0 if len(mpd_client.playlistid()) == 0 else \
int(mpd_client.currentsong().get('pos', 0)) + 1
self.log.info(
"Adding %s to playlist (from <%s>:%s=%s)",
uri,
songs["timespec"].get("nick", ""),
songs["aids"],
spec,
)
insert_pos = (
0
if len(mpd_client.playlistid()) == 0
else int(mpd_client.currentsong().get("pos", 0)) + 1
)
try:
mpd_client.addid(uri, insert_pos)
except mpd.CommandError:
@ -197,7 +205,7 @@ class Controller(gevent.Greenlet):
self.conf = conf
self.q = Queue()
self.player = Player(self.conf)
if 'DB_URI' in self.conf:
if "DB_URI" in self.conf:
self.monitor = Monitor(self.q, self.conf)
self.monitor.parent_greenlet = self
else:
@ -209,28 +217,28 @@ class Controller(gevent.Greenlet):
mw = MPDWatcher(self.q, self.conf, client=None)
mw.parent_greenlet = self
mw.start()
t = Timer(int(self.conf['CHECK_SECS']) * 1000, self.q)
t = Timer(int(self.conf["CHECK_SECS"]) * 1000, self.q)
t.parent_greenlet = self
t.start()
# at the very start, run a check!
gevent.Greenlet.spawn(self.player.check_playlist)
while True:
value = self.q.get()
self.log.debug('<- %s', str(value))
self.log.debug("<- %s", str(value))
# emitter = value['emitter']
kind = value['kind']
args = value['args']
if kind == 'timer' or (kind == 'mpc' and
args[0] in ('player', 'playlist',
'connect')):
kind = value["kind"]
args = value["args"]
if kind == "timer" or (
kind == "mpc" and args[0] in ("player", "playlist", "connect")
):
gevent.Greenlet.spawn(self.player.check_playlist)
try:
self.player.tmpcleaner.check_playlist()
except:
pass
elif kind == 'mpc':
elif kind == "mpc":
pass
elif kind == 'uris_enqueue':
elif kind == "uris_enqueue":
# TODO: uris_enqueue messages should be delivered directly to Player.enqueue
# probably we need a MPDEnqueuer that receives every uri we want to add
try:
@ -238,13 +246,13 @@ class Controller(gevent.Greenlet):
except AssertionError:
raise
except Exception:
self.log.exception("Error while adding to queue; "
"bad audiogen output?")
elif (kind == 'signal' and args[0] == signal.SIGALRM) or \
kind == 'refresh':
self.log.exception(
"Error while adding to queue; " "bad audiogen output?"
)
elif (kind == "signal" and args[0] == signal.SIGALRM) or kind == "refresh":
# it's a tick!
self.log.debug("Reload")
self.monitor.q.put(dict(kind='forcetick'))
self.monitor.q.put(dict(kind="forcetick"))
gevent.Greenlet.spawn(self.player.check_playlist)
else:
self.log.warning("Unknown message: %s", str(value))

View file

@ -6,26 +6,27 @@ from .config import get_conf
@pytest.fixture
def unusedcleaner():
return UnusedCleaner(get_conf(prefix='LARIGIRATEST_'))
return UnusedCleaner(get_conf(prefix="LARIGIRATEST_"))
# this test suite heavily assumes that TMPDIR == /tmp/, which is the default
# indeed. However, the code does not rely on this assumption.
def test_watch_file(unusedcleaner):
# despite not existing, the file is added
unusedcleaner.watch('file:///tmp/gnam')
unusedcleaner.watch("file:///tmp/gnam")
assert len(unusedcleaner.waiting_removal_files) == 1
assert list(unusedcleaner.waiting_removal_files)[0] == '/tmp/gnam'
assert list(unusedcleaner.waiting_removal_files)[0] == "/tmp/gnam"
def test_watch_path_error(unusedcleaner):
'''paths are not valid thing to watch. URIs only, thanks'''
unusedcleaner.watch('/tmp/foo')
"""paths are not valid thing to watch. URIs only, thanks"""
unusedcleaner.watch("/tmp/foo")
assert len(unusedcleaner.waiting_removal_files) == 0
def test_watch_notmp_error(unusedcleaner):
'''Files not in TMPDIR are not added'''
unusedcleaner.watch('file:///not/in/tmp')
"""Files not in TMPDIR are not added"""
unusedcleaner.watch("file:///not/in/tmp")
assert len(unusedcleaner.waiting_removal_files) == 0

View file

@ -14,19 +14,19 @@ def now(request):
@pytest.fixture
def yesterday(request):
return int(time.time()) - 24*60*60
return int(time.time()) - 24 * 60 * 60
@pytest.fixture
def empty_dir():
dirpath = tempfile.mkdtemp(prefix='mostrecent.')
dirpath = tempfile.mkdtemp(prefix="mostrecent.")
yield dirpath
os.removedirs(dirpath)
@pytest.fixture
def dir_with_old_file(empty_dir):
fd, fname = tempfile.mkstemp(prefix='old.', dir=empty_dir)
fd, fname = tempfile.mkstemp(prefix="old.", dir=empty_dir)
os.close(fd)
os.utime(fname, times=(0, 0))
yield empty_dir
@ -35,7 +35,7 @@ def dir_with_old_file(empty_dir):
@pytest.fixture
def dir_with_yesterday_file(empty_dir, yesterday):
fd, fname = tempfile.mkstemp(prefix='yesterday.', dir=empty_dir)
fd, fname = tempfile.mkstemp(prefix="yesterday.", dir=empty_dir)
os.close(fd)
os.utime(fname, times=(yesterday, yesterday))
yield empty_dir
@ -44,7 +44,7 @@ def dir_with_yesterday_file(empty_dir, yesterday):
@pytest.fixture
def dir_with_new_file(dir_with_old_file, now):
fd, fname = tempfile.mkstemp(prefix='new.', dir=dir_with_old_file)
fd, fname = tempfile.mkstemp(prefix="new.", dir=dir_with_old_file)
os.close(fd)
os.utime(fname, times=(now, now))
yield dir_with_old_file
@ -53,7 +53,7 @@ def dir_with_new_file(dir_with_old_file, now):
@pytest.fixture
def dir_with_two_recent_files(dir_with_yesterday_file, now):
fd, fname = tempfile.mkstemp(prefix='new.', dir=dir_with_yesterday_file)
fd, fname = tempfile.mkstemp(prefix="new.", dir=dir_with_yesterday_file)
os.close(fd)
os.utime(fname, times=(now, now))
yield dir_with_yesterday_file
@ -61,7 +61,7 @@ def dir_with_two_recent_files(dir_with_yesterday_file, now):
def test_empty_is_empty(empty_dir, now):
'''nothing can be picked from a empty dir'''
"""nothing can be picked from a empty dir"""
picked = recent_choose([empty_dir], 1, now)
assert len(picked) == 0
@ -74,17 +74,17 @@ def test_old_files(dir_with_old_file, now):
def test_new_files_found(dir_with_new_file):
picked = recent_choose([dir_with_new_file], 1, 1)
assert len(picked) == 1
assert os.path.basename(picked[0]).startswith('new.')
assert os.path.basename(picked[0]).startswith("new.")
def test_only_new_files_found(dir_with_new_file):
picked = recent_choose([dir_with_new_file], 2, 1)
assert len(picked) == 1
assert os.path.basename(picked[0]).startswith('new.')
assert os.path.basename(picked[0]).startswith("new.")
def test_correct_sorting(dir_with_two_recent_files):
picked = recent_choose([dir_with_two_recent_files], 1, 1)
assert len(picked) == 1
assert not os.path.basename(picked[0]).startswith('yesterday.')
assert os.path.basename(picked[0]).startswith('new.')
assert not os.path.basename(picked[0]).startswith("yesterday.")
assert os.path.basename(picked[0]).startswith("new.")

View file

@ -1,5 +1,6 @@
from __future__ import print_function
from gevent import monkey
monkey.patch_all(subprocess=True)
import pytest
@ -9,10 +10,9 @@ from larigira.audiogen_mpdrandom import generate_by_artist
@pytest.fixture
def simplerandom():
return {
}
return {}
def test_accepted_syntax(simplerandom):
'''Check the minimal needed configuration for mpdrandom'''
"""Check the minimal needed configuration for mpdrandom"""
generate_by_artist(simplerandom)

View file

@ -9,51 +9,51 @@ def P(pypathlocal):
def test_txt_files_are_excluded(tmpdir):
p = tmpdir.join("foo.txt")
p.write('')
p.write("")
assert len(candidates([P(p)])) == 0
assert len(candidates([P(tmpdir)])) == 0
def test_nested_txt_files_are_excluded(tmpdir):
p = tmpdir.mkdir('one').mkdir('two').join("foo.txt")
p.write('')
p = tmpdir.mkdir("one").mkdir("two").join("foo.txt")
p.write("")
assert len(candidates([P(p)])) == 0
assert len(candidates([P(tmpdir)])) == 0
def test_mp3_files_are_considered(tmpdir):
p = tmpdir.join("foo.mp3")
p.write('')
p.write("")
assert len(candidates([P(p)])) == 1
assert len(candidates([P(tmpdir)])) == 1
def test_nested_mp3_files_are_considered(tmpdir):
p = tmpdir.mkdir('one').mkdir('two').join("foo.mp3")
p.write('')
p = tmpdir.mkdir("one").mkdir("two").join("foo.mp3")
p.write("")
assert len(candidates([P(p)])) == 1
assert len(candidates([P(tmpdir)])) == 1
def test_same_name(tmpdir):
'''file with same name on different dir should not be confused'''
p = tmpdir.mkdir('one').mkdir('two').join("foo.mp3")
p.write('')
"""file with same name on different dir should not be confused"""
p = tmpdir.mkdir("one").mkdir("two").join("foo.mp3")
p.write("")
p = tmpdir.join("foo.mp3")
p.write('')
p.write("")
assert len(candidates([P(tmpdir)])) == 2
def test_unknown_mime_ignore(tmpdir):
p = tmpdir.join("foo.???")
p.write('')
p.write("")
assert len(candidates([P(tmpdir)])) == 0
def test_unknown_mime_nocrash(tmpdir):
p = tmpdir.join("foo.???")
p.write('')
p.write("")
p = tmpdir.join("foo.ogg")
p.write('')
p.write("")
assert len(candidates([P(tmpdir)])) == 1

View file

@ -4,14 +4,14 @@ from larigira.unused import old_commonpath
def test_same():
assert old_commonpath(['/foo/bar', '/foo/bar/']) == '/foo/bar'
assert old_commonpath(["/foo/bar", "/foo/bar/"]) == "/foo/bar"
def test_prefix():
assert old_commonpath(['/foo/bar', '/foo/zap/']) == '/foo'
assert old_commonpath(['/foo/bar/', '/foo/zap/']) == '/foo'
assert old_commonpath(['/foo/bar/', '/foo/zap']) == '/foo'
assert old_commonpath(['/foo/bar', '/foo/zap']) == '/foo'
assert old_commonpath(["/foo/bar", "/foo/zap/"]) == "/foo"
assert old_commonpath(["/foo/bar/", "/foo/zap/"]) == "/foo"
assert old_commonpath(["/foo/bar/", "/foo/zap"]) == "/foo"
assert old_commonpath(["/foo/bar", "/foo/zap"]) == "/foo"
try:
@ -22,17 +22,18 @@ else:
# these tests are only available on python >= 3.5. That's fine though, as
# our CI will perform validation of those cases to see if they match python
# behavior
@pytest.fixture(params=['a', 'a/', 'a/b', 'a/b/', 'a/b/c', ])
@pytest.fixture(params=["a", "a/", "a/b", "a/b/", "a/b/c"])
def relpath(request):
return request.param
@pytest.fixture
def abspath(relpath):
return '/' + relpath
return "/" + relpath
@pytest.fixture(params=['', '/'])
@pytest.fixture(params=["", "/"])
def slashed_abspath(abspath, request):
return '%s%s' % (abspath, request.param)
return "%s%s" % (abspath, request.param)
slashed_abspath_b = slashed_abspath
@pytest.fixture
@ -42,11 +43,12 @@ else:
def test_abspath_match(abspath_couple):
assert commonpath(abspath_couple) == old_commonpath(abspath_couple)
@pytest.fixture(params=['', '/'])
@pytest.fixture(params=["", "/"])
def slashed_relpath(relpath, request):
s = '%s%s' % (relpath, request.param)
s = "%s%s" % (relpath, request.param)
if s:
return s
slashed_relpath_b = slashed_relpath
@pytest.fixture

View file

@ -2,6 +2,7 @@ import tempfile
import os
from gevent import monkey
monkey.patch_all(subprocess=True)
import pytest
@ -11,7 +12,7 @@ from larigira.db import EventModel
@pytest.yield_fixture
def db():
fname = tempfile.mktemp(suffix='.json', prefix='larigira-test')
fname = tempfile.mktemp(suffix=".json", prefix="larigira-test")
yield EventModel(uri=fname)
os.unlink(fname)
@ -22,36 +23,39 @@ def test_empty(db):
def test_add_basic(db):
assert len(db.get_all_alarms()) == 0
alarm_id = db.add_event(dict(kind='frequency', interval=60*3, start=1),
[dict(kind='mpd', paths=['foo.mp3'], howmany=1)])
alarm_id = db.add_event(
dict(kind="frequency", interval=60 * 3, start=1),
[dict(kind="mpd", paths=["foo.mp3"], howmany=1)],
)
assert len(db.get_all_alarms()) == 1
assert db.get_alarm_by_id(alarm_id) is not None
assert len(tuple(db.get_actions_by_alarm(
db.get_alarm_by_id(alarm_id)))) == 1
assert len(tuple(db.get_actions_by_alarm(db.get_alarm_by_id(alarm_id)))) == 1
def test_add_multiple_alarms(db):
assert len(db.get_all_alarms()) == 0
alarm_id = db.add_event(dict(kind='frequency', interval=60*3, start=1),
[dict(kind='mpd', paths=['foo.mp3'], howmany=1),
dict(kind='foo', a=3)])
alarm_id = db.add_event(
dict(kind="frequency", interval=60 * 3, start=1),
[dict(kind="mpd", paths=["foo.mp3"], howmany=1), dict(kind="foo", a=3)],
)
assert len(db.get_all_alarms()) == 1
assert db.get_alarm_by_id(alarm_id) is not None
assert len(db.get_all_actions()) == 2
assert len(tuple(db.get_actions_by_alarm(
db.get_alarm_by_id(alarm_id)))) == 2
assert len(tuple(db.get_actions_by_alarm(db.get_alarm_by_id(alarm_id)))) == 2
def test_delete_alarm(db):
assert len(db.get_all_alarms()) == 0
alarm_id = db.add_event(dict(kind='frequency', interval=60*3, start=1),
[dict(kind='mpd', paths=['foo.mp3'], howmany=1)])
alarm_id = db.add_event(
dict(kind="frequency", interval=60 * 3, start=1),
[dict(kind="mpd", paths=["foo.mp3"], howmany=1)],
)
action_id = next(db.get_actions_by_alarm(db.get_alarm_by_id(alarm_id))).eid
assert len(db.get_all_alarms()) == 1
db.delete_alarm(alarm_id)
assert len(db.get_all_alarms()) == 0 # alarm deleted
assert db.get_action_by_id(action_id) is not None
assert 'kind' in db.get_action_by_id(action_id) # action still there
assert "kind" in db.get_action_by_id(action_id) # action still there
def test_delete_alarm_nonexisting(db):
@ -60,8 +64,10 @@ def test_delete_alarm_nonexisting(db):
def test_delete_action(db):
alarm_id = db.add_event(dict(kind='frequency', interval=60*3, start=1),
[dict(kind='mpd', paths=['foo.mp3'], howmany=1)])
alarm_id = db.add_event(
dict(kind="frequency", interval=60 * 3, start=1),
[dict(kind="mpd", paths=["foo.mp3"], howmany=1)],
)
alarm = db.get_alarm_by_id(alarm_id)
assert len(tuple(db.get_actions_by_alarm(alarm))) == 1
action = next(db.get_actions_by_alarm(alarm))

View file

@ -2,15 +2,15 @@ from larigira.fsutils import shortname
def test_shortname_self():
'''sometimes, shortname is just filename without extension'''
assert shortname('/tmp/asd/foo.bar') == 'foo'
"""sometimes, shortname is just filename without extension"""
assert shortname("/tmp/asd/foo.bar") == "foo"
def test_shortname_has_numbers():
'''shortname will preserve numbers'''
assert shortname('/tmp/asd/foo1.bar') == 'foo1'
"""shortname will preserve numbers"""
assert shortname("/tmp/asd/foo1.bar") == "foo1"
def test_shortname_has_no_hyphen():
'''shortname will not preserve hyphens'''
assert shortname('/tmp/asd/foo-1.bar') == 'foo1'
"""shortname will not preserve hyphens"""
assert shortname("/tmp/asd/foo-1.bar") == "foo1"

View file

@ -1,5 +1,6 @@
from __future__ import print_function
from gevent import monkey
monkey.patch_all(subprocess=True)
import pytest
@ -21,7 +22,8 @@ def range_parentlet():
def do_business(self):
for i in range(self.howmany):
yield('range', i)
yield ("range", i)
return RangeLet
@ -33,7 +35,8 @@ def single_value_parentlet():
self.val = val
def do_business(self):
yield('single', self.val)
yield ("single", self.val)
return SingleLet
@ -44,7 +47,7 @@ def test_parented_range(queue, range_parentlet):
assert queue.qsize() == 5
while not queue.empty():
msg = queue.get()
assert msg['kind'] == 'range'
assert msg["kind"] == "range"
def test_parented_single(queue, single_value_parentlet):
@ -52,25 +55,25 @@ def test_parented_single(queue, single_value_parentlet):
t.start()
gevent.sleep(0.01)
msg = queue.get_nowait()
assert msg['args'][0] == 123
assert msg["args"][0] == 123
def test_timer_finally(queue):
'''at somepoint, it will get results'''
"""at somepoint, it will get results"""
period = 10
t = Timer(period, queue)
t.start()
gevent.sleep(period*3/1000.0)
gevent.sleep(period * 3 / 1000.0)
queue.get_nowait()
def test_timer_righttime(queue):
'''not too early, not too late'''
"""not too early, not too late"""
period = 500
t = Timer(period, queue)
t.start()
gevent.sleep(period/(10*1000.0))
gevent.sleep(period / (10 * 1000.0))
assert queue.empty() is True
gevent.sleep(period*(2/1000.0))
gevent.sleep(period * (2 / 1000.0))
assert not queue.empty()
queue.get_nowait()

View file

@ -8,7 +8,7 @@ from larigira.timegen import timegenerate
def eq_(a, b, reason=None):
'''migrating tests from nose'''
"""migrating tests from nose"""
if reason is not None:
assert a == b, reason
else:
@ -20,63 +20,55 @@ def now():
return datetime.now()
@pytest.fixture(params=['seconds', 'human', 'humanlong', 'coloned'])
@pytest.fixture(params=["seconds", "human", "humanlong", "coloned"])
def onehour(now, request):
'''a FrequencyAlarm: every hour for one day'''
intervals = dict(seconds=3600, human='1h', humanlong='30m 1800s',
coloned='01:00:00')
return FrequencyAlarm({
'start': now - timedelta(days=1),
'interval': intervals[request.param],
'end': now + days(1)})
"""a FrequencyAlarm: every hour for one day"""
intervals = dict(
seconds=3600, human="1h", humanlong="30m 1800s", coloned="01:00:00"
)
return FrequencyAlarm(
{
"start": now - timedelta(days=1),
"interval": intervals[request.param],
"end": now + days(1),
}
)
@pytest.fixture(params=[1, '1'])
@pytest.fixture(params=[1, "1"])
def onehour_monday(request):
weekday = request.param
yield FrequencyAlarm({
'interval': 3600*12,
'weekdays': [weekday],
'start': 0
})
yield FrequencyAlarm({"interval": 3600 * 12, "weekdays": [weekday], "start": 0})
@pytest.fixture(params=[7, '7'])
@pytest.fixture(params=[7, "7"])
def onehour_sunday(request):
weekday = request.param
yield FrequencyAlarm({
'interval': 3600*12,
'weekdays': [weekday],
'start': 0
})
yield FrequencyAlarm({"interval": 3600 * 12, "weekdays": [weekday], "start": 0})
@pytest.fixture(params=[1, 2, 3, 4, 5, 6, 7])
def singledow(request):
weekday = request.param
yield FrequencyAlarm({
'interval': 3600*24,
'weekdays': [weekday],
'start': 0
})
yield FrequencyAlarm({"interval": 3600 * 24, "weekdays": [weekday], "start": 0})
@pytest.fixture(params=['seconds', 'human', 'coloned'])
@pytest.fixture(params=["seconds", "human", "coloned"])
def tenseconds(now, request):
'''a FrequencyAlarm: every 10 seconds for one day'''
intervals = dict(seconds=10, human='10s', coloned='00:10')
return FrequencyAlarm({
'start': now - timedelta(days=1),
'interval': intervals[request.param],
'end': now + days(1)})
"""a FrequencyAlarm: every 10 seconds for one day"""
intervals = dict(seconds=10, human="10s", coloned="00:10")
return FrequencyAlarm(
{
"start": now - timedelta(days=1),
"interval": intervals[request.param],
"end": now + days(1),
}
)
@pytest.fixture(params=[1, 2, 3, 4, 5, 6, 7, 8])
def manyweeks(request):
yield FrequencyAlarm({
'interval': '{}w'.format(request.param),
'start': 0
})
yield FrequencyAlarm({"interval": "{}w".format(request.param), "start": 0})
def days(n):
@ -84,24 +76,21 @@ def days(n):
def test_single_creations(now):
return SingleAlarm({
'timestamp': now
})
return SingleAlarm({"timestamp": now})
def test_freq_creations(now):
return FrequencyAlarm({
'start': now - timedelta(days=1),
'interval': 3600,
'end': now})
return FrequencyAlarm(
{"start": now - timedelta(days=1), "interval": 3600, "end": now}
)
@pytest.mark.timeout(1)
def test_single_ring(now):
dt = now + days(1)
s = SingleAlarm({'timestamp': dt})
eq_(s.next_ring(), dt)
eq_(s.next_ring(now), dt)
s = SingleAlarm({"timestamp": dt})
eq_(s.next_ring(), dt)
eq_(s.next_ring(now), dt)
assert s.next_ring(dt) is None, "%s - %s" % (str(s.next_ring(dt)), str(dt))
assert s.next_ring(now + days(2)) is None
assert s.has_ring(dt)
@ -112,10 +101,10 @@ def test_single_ring(now):
@pytest.mark.timeout(1)
def test_single_all(now):
dt = now + timedelta(days=1)
s = SingleAlarm({'timestamp': dt})
eq_(list(s.all_rings()), [dt])
eq_(list(s.all_rings(now)), [dt])
eq_(list(s.all_rings(now + days(2))), [])
s = SingleAlarm({"timestamp": dt})
eq_(list(s.all_rings()), [dt])
eq_(list(s.all_rings(now)), [dt])
eq_(list(s.all_rings(now + days(2))), [])
def test_freq_short(now, tenseconds):
@ -165,12 +154,8 @@ def test_weekday_skip_2(onehour_sunday):
def test_sunday_is_not_0():
with pytest.raises(ValueError) as excinfo:
FrequencyAlarm({
'interval': 3600*12,
'weekdays': [0],
'start': 0
})
assert 'Not a valid weekday:' in excinfo.value.args[0]
FrequencyAlarm({"interval": 3600 * 12, "weekdays": [0], "start": 0})
assert "Not a valid weekday:" in excinfo.value.args[0]
def test_long_interval(manyweeks):
@ -178,7 +163,7 @@ def test_long_interval(manyweeks):
expected = manyweeks.interval
got = manyweeks.next_ring(t)
assert got is not None
assert int(got.strftime('%s')) == expected
assert int(got.strftime("%s")) == expected
assert manyweeks.next_ring(got) is not None
@ -194,15 +179,8 @@ def test_singledow(singledow):
def test_single_registered():
timegenerate({
'kind': 'single',
'timestamp': 1234567890
})
timegenerate({"kind": "single", "timestamp": 1234567890})
def test_frequency_registered():
timegenerate({
'kind': 'frequency',
'start': 1234567890,
'interval': 60*15
})
timegenerate({"kind": "frequency", "start": 1234567890, "interval": 60 * 15})

View file

@ -1,5 +1,6 @@
from __future__ import print_function
from gevent import monkey
monkey.patch_all(subprocess=True)
import pytest
@ -15,5 +16,5 @@ def app(queue):
def test_refresh(app):
assert app.queue.empty()
app.test_client().get('/api/refresh')
app.test_client().get("/api/refresh")
assert not app.queue.empty()

View file

@ -4,101 +4,125 @@ from datetime import datetime
from pytimeparse.timeparse import timeparse
from flask_wtf import Form
from wtforms import StringField, validators, SubmitField, \
SelectMultipleField, ValidationError
from wtforms import (
StringField,
validators,
SubmitField,
SelectMultipleField,
ValidationError,
)
from larigira.formutils import EasyDateTimeField
log = logging.getLogger(__name__)
class SingleAlarmForm(Form):
nick = StringField('Alarm nick', validators=[validators.required()],
description='A simple name to recognize this alarm')
dt = EasyDateTimeField('Date and time', validators=[validators.required()],
description='Date to ring on, expressed as '
'2000-12-31T13:42:00')
submit = SubmitField('Submit')
nick = StringField(
"Alarm nick",
validators=[validators.required()],
description="A simple name to recognize this alarm",
)
dt = EasyDateTimeField(
"Date and time",
validators=[validators.required()],
description="Date to ring on, expressed as " "2000-12-31T13:42:00",
)
submit = SubmitField("Submit")
def populate_from_timespec(self, timespec):
if 'nick' in timespec:
self.nick.data = timespec['nick']
if 'timestamp' in timespec:
self.dt.data = datetime.fromtimestamp(timespec['timestamp'])
if "nick" in timespec:
self.nick.data = timespec["nick"]
if "timestamp" in timespec:
self.dt.data = datetime.fromtimestamp(timespec["timestamp"])
def singlealarm_receive(form):
return {
'kind': 'single',
'nick': form.nick.data,
'timestamp': int(form.dt.data.strftime('%s'))
"kind": "single",
"nick": form.nick.data,
"timestamp": int(form.dt.data.strftime("%s")),
}
class FrequencyAlarmForm(Form):
nick = StringField('Alarm nick', validators=[validators.required()],
description='A simple name to recognize this alarm')
interval = StringField('Frequency',
validators=[validators.required()],
description='in seconds, or human-readable '
'(like 9w3d12h)')
start = EasyDateTimeField('Start date and time',
validators=[validators.optional()],
description='Before this, no alarm will ring. '
'Expressed as YYYY-MM-DDTHH:MM:SS. If omitted, '
'the alarm will always ring')
end = EasyDateTimeField('End date and time',
validators=[validators.optional()],
description='After this, no alarm will ring. '
'Expressed as YYYY-MM-DDTHH:MM:SS. If omitted, '
'the alarm will always ring')
weekdays = SelectMultipleField('Days on which the alarm should be played',
choices=[('1', 'Monday'),
('2', 'Tuesday'),
('3', 'Wednesday'),
('4', 'Thursday'),
('5', 'Friday'),
('6', 'Saturday'),
('7', 'Sunday')],
default=list('1234567'),
validators=[validators.required()],
description='The alarm will ring only on '
'selected weekdays')
submit = SubmitField('Submit')
nick = StringField(
"Alarm nick",
validators=[validators.required()],
description="A simple name to recognize this alarm",
)
interval = StringField(
"Frequency",
validators=[validators.required()],
description="in seconds, or human-readable " "(like 9w3d12h)",
)
start = EasyDateTimeField(
"Start date and time",
validators=[validators.optional()],
description="Before this, no alarm will ring. "
"Expressed as YYYY-MM-DDTHH:MM:SS. If omitted, "
"the alarm will always ring",
)
end = EasyDateTimeField(
"End date and time",
validators=[validators.optional()],
description="After this, no alarm will ring. "
"Expressed as YYYY-MM-DDTHH:MM:SS. If omitted, "
"the alarm will always ring",
)
weekdays = SelectMultipleField(
"Days on which the alarm should be played",
choices=[
("1", "Monday"),
("2", "Tuesday"),
("3", "Wednesday"),
("4", "Thursday"),
("5", "Friday"),
("6", "Saturday"),
("7", "Sunday"),
],
default=list("1234567"),
validators=[validators.required()],
description="The alarm will ring only on " "selected weekdays",
)
submit = SubmitField("Submit")
def populate_from_timespec(self, timespec):
if 'nick' in timespec:
self.nick.data = timespec['nick']
if 'start' in timespec:
self.start.data = datetime.fromtimestamp(timespec['start'])
if 'end' in timespec:
self.end.data = datetime.fromtimestamp(timespec['end'])
if 'weekdays' in timespec:
self.weekdays.data = timespec['weekdays']
if "nick" in timespec:
self.nick.data = timespec["nick"]
if "start" in timespec:
self.start.data = datetime.fromtimestamp(timespec["start"])
if "end" in timespec:
self.end.data = datetime.fromtimestamp(timespec["end"])
if "weekdays" in timespec:
self.weekdays.data = timespec["weekdays"]
else:
self.weekdays.data = list('1234567')
self.interval.data = timespec['interval']
self.weekdays.data = list("1234567")
self.interval.data = timespec["interval"]
def validate_interval(self, field):
try:
int(field.data)
except ValueError:
if timeparse(field.data) is None:
raise ValidationError("interval must either be a number "
"(in seconds) or a human-readable "
"string like '1h2m' or '1d12h'")
raise ValidationError(
"interval must either be a number "
"(in seconds) or a human-readable "
"string like '1h2m' or '1d12h'"
)
def frequencyalarm_receive(form):
obj = {
'kind': 'frequency',
'nick': form.nick.data,
'interval': form.interval.data,
'weekdays': form.weekdays.data,
"kind": "frequency",
"nick": form.nick.data,
"interval": form.interval.data,
"weekdays": form.weekdays.data,
}
if form.start.data:
obj['start'] = int(form.start.data.strftime('%s'))
obj["start"] = int(form.start.data.strftime("%s"))
else:
obj['start'] = 0
obj["start"] = 0
if form.end.data:
obj['end'] = int(form.end.data.strftime('%s'))
obj["end"] = int(form.end.data.strftime("%s"))
return obj

View file

@ -1,6 +1,6 @@
'''
"""
main module to read and get informations about alarms
'''
"""
from __future__ import print_function
import sys
from datetime import datetime
@ -8,31 +8,48 @@ import argparse
import json
from .entrypoints_utils import get_one_entrypoint
from logging import getLogger
log = getLogger('timegen')
log = getLogger("timegen")
def get_timegenerator(kind):
'''Messes with entrypoints to return an timegenerator function'''
return get_one_entrypoint('larigira.timegenerators', kind)
"""Messes with entrypoints to return an timegenerator function"""
return get_one_entrypoint("larigira.timegenerators", kind)
def get_parser():
parser = argparse.ArgumentParser(
description='Generate "ring times" from a timespec')
parser.add_argument('timespec', metavar='TIMESPEC', type=str, nargs=1,
help='filename for timespec, formatted in json')
parser.add_argument('--now', metavar='NOW', type=int, nargs=1,
default=None,
help='Set a different "time", in unix epoch')
parser.add_argument('--howmany', metavar='N', type=int, nargs=1,
default=[1],
help='Set a different "time", in unix epoch')
description='Generate "ring times" from a timespec'
)
parser.add_argument(
"timespec",
metavar="TIMESPEC",
type=str,
nargs=1,
help="filename for timespec, formatted in json",
)
parser.add_argument(
"--now",
metavar="NOW",
type=int,
nargs=1,
default=None,
help='Set a different "time", in unix epoch',
)
parser.add_argument(
"--howmany",
metavar="N",
type=int,
nargs=1,
default=[1],
help='Set a different "time", in unix epoch',
)
return parser
def read_spec(fname):
try:
if fname == '-':
if fname == "-":
return json.load(sys.stdin)
with open(fname) as buf:
return json.load(buf)
@ -42,12 +59,12 @@ def read_spec(fname):
def check_spec(spec):
if 'kind' not in spec:
if "kind" not in spec:
yield "Missing field 'kind'"
def timegenerate(spec, now=None, howmany=1):
Alarm = get_timegenerator(spec['kind'])
Alarm = get_timegenerator(spec["kind"])
generator = Alarm(spec)
if now is not None:
if type(now) is not datetime:
@ -58,14 +75,14 @@ def timegenerate(spec, now=None, howmany=1):
def main():
'''Main function for the "larigira-timegen" executable'''
"""Main function for the "larigira-timegen" executable"""
args = get_parser().parse_args()
spec = read_spec(args.timespec[0])
errors = tuple(check_spec(spec))
if errors:
log.error("Errors in timespec")
for err in errors:
sys.stderr.write('Error: {}\n'.format(err))
sys.stderr.write("Error: {}\n".format(err))
sys.exit(1)
now = None if args.now is None else args.now.pop()
howmany = None if args.howmany is None else args.howmany.pop()

View file

@ -1,6 +1,7 @@
from __future__ import print_function
import logging
log = logging.getLogger('time-every')
log = logging.getLogger("time-every")
from datetime import datetime, timedelta
from pytimeparse.timeparse import timeparse
@ -17,21 +18,21 @@ class Alarm(object):
pass
def next_ring(self, current_time=None):
'''if current_time is None, it is now()
"""if current_time is None, it is now()
returns the next time it will ring; or None if it will not anymore
'''
"""
raise NotImplementedError()
def has_ring(self, time=None):
'''returns True IFF the alarm will ring exactly at ``time``'''
"""returns True IFF the alarm will ring exactly at ``time``"""
raise NotImplementedError()
def all_rings(self, current_time=None):
'''
"""
all future rings
this, of course, is an iterator (they could be infinite)
'''
"""
ring = self.next_ring(current_time)
while ring is not None:
yield ring
@ -39,17 +40,18 @@ class Alarm(object):
class SingleAlarm(Alarm):
'''
"""
rings a single time
'''
description = 'Only once, at a specified date and time'
"""
description = "Only once, at a specified date and time"
def __init__(self, obj):
super().__init__()
self.dt = getdate(obj['timestamp'])
self.dt = getdate(obj["timestamp"])
def next_ring(self, current_time=None):
'''if current_time is None, it is now()'''
"""if current_time is None, it is now()"""
if current_time is None:
current_time = datetime.now()
if current_time >= self.dt:
@ -63,29 +65,28 @@ class SingleAlarm(Alarm):
class FrequencyAlarm(Alarm):
'''
"""
rings on {t | exists a k integer >= 0 s.t. t = start+k*t, start<t<end}
'''
description = 'Events at a specified frequency. Example: every 30minutes'
"""
description = "Events at a specified frequency. Example: every 30minutes"
def __init__(self, obj):
self.start = getdate(obj['start'])
self.start = getdate(obj["start"])
try:
self.interval = int(obj['interval'])
self.interval = int(obj["interval"])
except ValueError:
self.interval = timeparse(obj['interval'])
self.interval = timeparse(obj["interval"])
assert type(self.interval) is int
self.end = getdate(obj['end']) if 'end' in obj else None
self.weekdays = [int(x) for x in obj['weekdays']] if \
'weekdays' in obj else None
self.end = getdate(obj["end"]) if "end" in obj else None
self.weekdays = [int(x) for x in obj["weekdays"]] if "weekdays" in obj else None
if self.weekdays is not None:
for weekday in self.weekdays:
if not 1 <= weekday <= 7:
raise ValueError('Not a valid weekday: {}'
.format(weekday))
raise ValueError("Not a valid weekday: {}".format(weekday))
def next_ring(self, current_time=None):
'''if current_time is None, it is now()'''
"""if current_time is None, it is now()"""
if current_time is None:
current_time = datetime.now()
if self.end is not None and current_time > self.end:
@ -100,22 +101,22 @@ class FrequencyAlarm(Alarm):
# fact, it is necessary to retry until a valid event/weekday is
# found. a "while True" might have been more elegant (and maybe
# fast), but this gives a clear upper bound to the cycle.
for _ in range(max(60*60*24*7 // self.interval, 1)):
for _ in range(max(60 * 60 * 24 * 7 // self.interval, 1)):
n_interval = (
(current_time - self.start).total_seconds() // self.interval
) + 1
) + 1
ring = self.start + timedelta(seconds=self.interval * n_interval)
if ring == current_time:
ring += timedelta(seconds=self.interval)
if self.end is not None and ring > self.end:
return None
if self.weekdays is not None \
and ring.isoweekday() not in self.weekdays:
if self.weekdays is not None and ring.isoweekday() not in self.weekdays:
current_time = ring
continue
return ring
log.warning("Can't find a valid time for event %s; "
"something went wrong", str(self))
log.warning(
"Can't find a valid time for event %s; " "something went wrong", str(self)
)
return None
def has_ring(self, current_time=None):
@ -124,11 +125,9 @@ class FrequencyAlarm(Alarm):
if not self.start >= current_time >= self.end:
return False
n_interval = (current_time - self.start).total_seconds() // \
self.interval
expected_time = self.start + \
timedelta(seconds=self.interval * n_interval)
n_interval = (current_time - self.start).total_seconds() // self.interval
expected_time = self.start + timedelta(seconds=self.interval * n_interval)
return expected_time == current_time
def __str__(self):
return 'FrequencyAlarm(every %ds)' % self.interval
return "FrequencyAlarm(every %ds)" % self.interval

View file

@ -1,9 +1,9 @@
'''
"""
This component will look for files to be removed. There are some assumptions:
* Only files in $TMPDIR are removed. Please remember that larigira has its
own specific TMPDIR
* MPD URIs are parsed, and only file:/// is supported
'''
"""
import os
from os.path import normpath
import logging
@ -11,13 +11,14 @@ import mpd
def old_commonpath(directories):
if any(p for p in directories if p.startswith('/')) and \
any(p for p in directories if not p.startswith('/')):
if any(p for p in directories if p.startswith("/")) and any(
p for p in directories if not p.startswith("/")
):
raise ValueError("Can't mix absolute and relative paths")
norm_paths = [normpath(p) + os.path.sep for p in directories]
ret = os.path.dirname(os.path.commonprefix(norm_paths))
if len(ret) > 0 and ret == '/' * len(ret):
return '/'
if len(ret) > 0 and ret == "/" * len(ret):
return "/"
return ret
@ -35,35 +36,39 @@ class UnusedCleaner:
def _get_mpd(self):
mpd_client = mpd.MPDClient(use_unicode=True)
mpd_client.connect(self.conf['MPD_HOST'], self.conf['MPD_PORT'])
mpd_client.connect(self.conf["MPD_HOST"], self.conf["MPD_PORT"])
return mpd_client
def watch(self, uri):
'''
"""
adds fpath to the list of "watched" file
as soon as it leaves the mpc playlist, it is removed
'''
if not uri.startswith('file:///'):
"""
if not uri.startswith("file:///"):
return # not a file URI
fpath = uri[len('file://'):]
if 'TMPDIR' in self.conf and self.conf['TMPDIR'] \
and commonpath([self.conf['TMPDIR'], fpath]) != \
normpath(self.conf['TMPDIR']):
self.log.info('Not watching file %s: not in TMPDIR', fpath)
fpath = uri[len("file://") :]
if (
"TMPDIR" in self.conf
and self.conf["TMPDIR"]
and commonpath([self.conf["TMPDIR"], fpath])
!= normpath(self.conf["TMPDIR"])
):
self.log.info("Not watching file %s: not in TMPDIR", fpath)
return
if not os.path.exists(fpath):
self.log.warning('a path that does not exist is being monitored')
self.log.warning("a path that does not exist is being monitored")
self.waiting_removal_files.add(fpath)
def check_playlist(self):
'''check playlist + internal watchlist to see what can be removed'''
"""check playlist + internal watchlist to see what can be removed"""
mpdc = self._get_mpd()
files_in_playlist = {song['file'] for song in mpdc.playlistid()
if song['file'].startswith('/')}
files_in_playlist = {
song["file"] for song in mpdc.playlistid() if song["file"].startswith("/")
}
for fpath in self.waiting_removal_files - files_in_playlist:
# we can remove it!
self.log.debug('removing unused: %s', fpath)
self.log.debug("removing unused: %s", fpath)
self.waiting_removal_files.remove(fpath)
if os.path.exists(fpath):
os.unlink(fpath)