sito-hackit-20/plugins/talks/talks.py
2020-01-15 17:10:28 +01:00

577 lines
18 KiB
Python

# -*- coding: utf-8 -*-
"""
Manage talks scheduling in a semantic way
"""
from __future__ import print_function
import os
import io
from functools import wraps
import logging
import re
import datetime
import shutil
from copy import copy
import locale
from contextlib import contextmanager
import inspect
from babel.dates import format_date, format_datetime, format_time
from markdown import markdown
from docutils import nodes
from docutils.parsers.rst import directives, Directive
import six
from pelican import signals, generators
import jinja2
try:
import ics
except ImportError:
ICS_ENABLED = False
else:
ICS_ENABLED = True
import unidecode
import dateutil
pelican = None # This will be set during register()
def memoize(function):
"""decorators to cache"""
memo = {}
@wraps(function)
def wrapper(*args):
if args in memo:
return memo[args]
else:
rv = function(*args)
memo[args] = rv
return rv
return wrapper
@contextmanager
def setlocale(name):
saved = locale.setlocale(locale.LC_ALL)
try:
yield locale.setlocale(locale.LC_ALL, name)
finally:
locale.setlocale(locale.LC_ALL, saved)
@memoize
def get_talk_names():
names = [
name
for name in os.listdir(pelican.settings["TALKS_PATH"])
if not name.startswith("_") and get_talk_data(name) is not None
]
names.sort()
return names
def all_talks():
return [get_talk_data(tn) for tn in get_talk_names()]
def unique_attr(iterable, attr):
return {x[attr] for x in iterable if attr in x}
@memoize
def get_global_data():
fname = os.path.join(pelican.settings["TALKS_PATH"], "meta.yaml")
if not os.path.isfile(fname):
return None
with io.open(fname, encoding="utf8") as buf:
try:
data = yaml.load(buf)
except Exception:
logging.exception("Syntax error reading %s; skipping", fname)
return None
if data is None:
return None
if "startdate" not in data:
logging.error("Missing startdate in global data")
data["startdate"] = datetime.datetime.now()
if "rooms" not in data:
data["rooms"] = {}
if "names" not in data["rooms"]:
data["rooms"]["names"] = {}
if "order" not in data["rooms"]:
data["rooms"]["order"] = []
return data
def _get_time_shift(timestring):
""" Il problema che abbiamo è che vogliamo dire che le 2 di notte del sabato sono in realtà "parte" del
venerdì. Per farlo accettiamo orari che superano le 24, ad esempio 25.30 vuol dire 1.30.
Questa funzione ritorna una timedelta in base alla stringa passata
"""
timeparts = re.findall(r"\d+", timestring)
if not timeparts or len(timeparts) > 2:
raise ValueError("Malformed time %s" % timestring)
timeparts += [0, 0] # "padding" per essere sicuro ci siano anche [1] e [2]
duration = datetime.timedelta(
hours=int(timeparts[0]), minutes=int(timeparts[1]), seconds=int(timeparts[2])
)
if duration.total_seconds() > 3600 * 31 or duration.total_seconds() < 0:
raise ValueError("Sforamento eccessivo: %d" % duration.hours)
return duration
@memoize
def get_talk_data(talkname):
fname = os.path.join(pelican.settings["TALKS_PATH"], talkname, "meta.yaml")
if not os.path.isfile(fname):
return None
with io.open(fname, encoding="utf8") as buf:
try:
data = yaml.load(buf)
except Exception:
logging.exception("Syntax error reading %s; skipping", fname)
return None
if data is None:
return None
try:
gridstep = pelican.settings["TALKS_GRID_STEP"]
data.setdefault("nooverlap", [])
if "title" not in data:
logging.warn("Talk <{}> has no `title` field".format(talkname))
data["title"] = six.text_type(talkname)
else:
data["title"] = six.text_type(data["title"])
if "text" not in data:
logging.warn("Talk <{}> has no `text` field".format(talkname))
data["text"] = ""
else:
data["text"] = six.text_type(data["text"])
if "duration" not in data:
logging.info(
"Talk <{}> has no `duration` field (50min used)".format(talkname)
)
data["duration"] = 50
data["duration"] = int(data["duration"])
if data["duration"] < gridstep:
logging.info(
"Talk <{}> lasts only {} minutes; changing to {}".format(
talkname, data["duration"], gridstep
)
)
data["duration"] = gridstep
if "links" not in data or not data["links"]:
data["links"] = []
if "contacts" not in data or not data["contacts"]:
data["contacts"] = []
if "needs" not in data or not data["needs"]:
data["needs"] = []
if "room" not in data:
logging.warn("Talk <{}> has no `room` field".format(talkname))
else:
if data["room"] in get_global_data()["rooms"]["names"]:
data["room"] = get_global_data()["rooms"]["names"][data["room"]]
if "time" not in data or "day" not in data:
logging.warn("Talk <{}> has no `time` or `day`".format(talkname))
if "time" in data:
del data["time"]
if "day" in data:
del data["day"]
else:
data["day"] = get_global_data()["startdate"] + datetime.timedelta(
days=data["day"]
)
try:
shift = _get_time_shift(str(data["time"]))
except ValueError:
logging.error("Talk <%s> has malformed `time`", talkname)
data["delta"] = shift
data["time"] = datetime.datetime.combine(
data["day"], datetime.time(0, 0, 0)
)
data["time"] += shift
data["time"] = data["time"].replace(tzinfo=dateutil.tz.gettz("Europe/Rome"))
data["id"] = talkname
resdir = os.path.join(
pelican.settings["TALKS_PATH"],
talkname,
pelican.settings["TALKS_ATTACHMENT_PATH"],
)
if os.path.isdir(resdir) and os.listdir(resdir):
data["resources"] = resdir
return data
except Exception:
logging.exception("Error on talk %s", talkname)
raise
def overlap(interval_a, interval_b):
"""how many minutes do they overlap?"""
return max(0, min(interval_a[1], interval_b[1]) - max(interval_a[0], interval_b[0]))
def get_talk_overlaps(name):
data = get_talk_data(name)
overlapping_talks = set()
if "time" not in data:
return overlapping_talks
start = int(data["time"].strftime("%s"))
end = start + data["duration"] * 60
for other in get_talk_names():
if other == name:
continue
if "time" not in get_talk_data(other):
continue
other_start = int(get_talk_data(other)["time"].strftime("%s"))
other_end = other_start + get_talk_data(other)["duration"] * 60
minutes = overlap((start, end), (other_start, other_end))
if minutes > 0:
overlapping_talks.add(other)
return overlapping_talks
@memoize
def check_overlaps():
for t in get_talk_names():
over = get_talk_overlaps(t)
noover = get_talk_data(t)["nooverlap"]
contacts = set(get_talk_data(t)["contacts"])
for overlapping in over:
if overlapping in noover or set(
get_talk_data(overlapping)["contacts"]
).intersection(contacts):
logging.warning("Talk %s overlaps with %s" % (t, overlapping))
@memoize
def jinja_env():
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(
os.path.join(pelican.settings["TALKS_PATH"], "_templates")
),
autoescape=True,
)
env.filters["markdown"] = lambda text: jinja2.Markup(markdown(text))
env.filters["dateformat"] = format_date
env.filters["datetimeformat"] = format_datetime
env.filters["timeformat"] = format_time
return env
@memoize
def get_css():
plugindir = os.path.dirname(
os.path.abspath(inspect.getfile(inspect.currentframe()))
)
with open(os.path.join(plugindir, "style.css")) as buf:
return buf.read()
class TalkListDirective(Directive):
final_argument_whitespace = True
has_content = True
option_spec = {"lang": directives.unchanged}
def run(self):
lang = self.options.get("lang", "C")
tmpl = jinja_env().get_template("talk.html")
def _sort_date(name):
"""
This function is a helper to sort talks by start date
When no date is available, put at the beginning
"""
d = get_talk_data(name)
room = d.get("room", "")
time = d.get(
"time",
datetime.datetime(1, 1, 1).replace(
tzinfo=dateutil.tz.gettz("Europe/Rome")
),
)
title = d.get("title", "")
return (time, room, title)
return [
nodes.raw("", tmpl.render(lang=lang, **get_talk_data(n)), format="html")
for n in sorted(get_talk_names(), key=_sort_date)
]
class TalkDirective(Directive):
required_arguments = 1
final_argument_whitespace = True
has_content = True
option_spec = {"lang": directives.unchanged}
def run(self):
lang = self.options.get("lang", "C")
tmpl = jinja_env().get_template("talk.html")
data = get_talk_data(self.arguments[0])
if data is None:
return []
return [nodes.raw("", tmpl.render(lang=lang, **data), format="html")]
def _delta_to_position(delta):
gridstep = pelican.settings["TALKS_GRID_STEP"]
sec = delta.total_seconds() // gridstep * gridstep
return int("%2d%02d" % (sec // 3600, (sec % 3600) // 60))
def _delta_inc_position(delta, i):
gridstep = pelican.settings["TALKS_GRID_STEP"]
delta = delta + datetime.timedelta(minutes=i * gridstep)
sec = delta.total_seconds() // gridstep * gridstep
return int("%2d%02d" % (sec // 3600, (sec % 3600) // 60))
def _approx_timestr(timestr):
gridstep = pelican.settings["TALKS_GRID_STEP"]
t = str(timestr)
minutes = int(t[-2:])
hours = t[:-2]
minutes = minutes // gridstep * gridstep
return int("%s%02d" % (hours, minutes))
class TalkGridDirective(Directive):
"""A complete grid"""
final_argument_whitespace = True
has_content = True
option_spec = {"lang": directives.unchanged}
def run(self):
try:
lang = self.options.get("lang", "C")
tmpl = jinja_env().get_template("grid.html")
output = []
days = unique_attr(all_talks(), "day")
gridstep = pelican.settings["TALKS_GRID_STEP"]
for day in sorted(days):
talks = {
talk["id"]
for talk in all_talks()
if talk.get("day", None) == day
and "time" in talk
and "room" in talk
}
if not talks:
continue
talks = [get_talk_data(t) for t in talks]
rooms = set()
for t in talks:
if type(t["room"]) is list:
for r in t["room"]:
rooms.add(r)
else:
rooms.add(t["room"])
def _room_sort_key(r):
order = get_global_data()["rooms"]["order"]
base = None
for k, v in get_global_data()["rooms"]["names"].items():
if v == r:
base = k
break
else:
if type(r) is str:
return ord(r[0])
# int?
return r
return order.index(base)
rooms = list(sorted(rooms, key=_room_sort_key))
# room=* is not a real room.
# Remove it unless that day only has special rooms
if "*" in rooms and len(rooms) > 1:
del rooms[rooms.index("*")]
mintimedelta = min({talk["delta"] for talk in talks})
maxtimedelta = max(
{
talk["delta"] + datetime.timedelta(minutes=talk["duration"])
for talk in talks
}
)
mintime = _delta_to_position(mintimedelta)
maxtime = _delta_to_position(maxtimedelta)
times = {}
t = mintimedelta
while t <= maxtimedelta:
times[_delta_to_position(t)] = [None] * len(rooms)
t += datetime.timedelta(minutes=gridstep)
for talk in sorted(talks, key=lambda x: x["delta"]):
talktime = _delta_to_position(talk["delta"])
position = _approx_timestr(talktime)
assert position in times, "pos=%d,time=%d" % (position, talktime)
if talk["room"] == "*":
roomnums = range(len(rooms))
elif type(talk["room"]) is list:
roomnums = [rooms.index(r) for r in talk["room"]]
else:
roomnums = [rooms.index(talk["room"])]
for roomnum in roomnums:
if times[position][roomnum] is not None:
logging.error(
"Talk %s and %s overlap! (room %s)",
times[position][roomnum]["id"],
talk["id"],
rooms[roomnum],
)
continue
times[position][roomnum] = copy(talk)
times[position][roomnum]["skip"] = False
for i in range(1, talk["duration"] // gridstep):
p = _approx_timestr(_delta_inc_position(talk["delta"], i))
times[p][roomnum] = copy(talk)
times[p][roomnum]["skip"] = True
render = tmpl.render(
times=times,
rooms=rooms,
mintime=mintime,
maxtime=maxtime,
timestep=gridstep,
lang=lang,
)
output.append(
nodes.raw(
"",
u"<h4>%s</h4>" % format_date(day, format="full", locale=lang),
format="html",
)
)
output.append(nodes.raw("", render, format="html"))
except:
logging.exception("Error on talk grid")
import traceback
traceback.print_exc()
return []
css = get_css()
if css:
output.insert(
0,
nodes.raw("", '<style type="text/css">%s</style>' % css, format="html"),
)
return output
def talks_to_ics():
c = ics.Calendar()
c.creator = u"pelican"
for t in all_talks():
e = talk_to_ics(t)
if e is not None:
c.events.add(e)
return six.text_type(c)
def talk_to_ics(talk):
def _decode(s):
if six.PY2:
return unidecode.unidecode(s)
else:
return s
if "time" not in talk or "duration" not in talk:
return None
e = ics.Event(
uid="%s@%d.hackmeeting.org" % (talk["id"], get_global_data()["startdate"].year),
name=_decode(talk["title"]),
begin=talk["time"],
duration=datetime.timedelta(minutes=talk["duration"]),
transparent=True,
)
# ics.py has some problems with unicode
# unidecode replaces letters with their most similar ASCII counterparts
# (ie: accents get stripped)
if "text" in talk:
e.description = _decode(talk["text"])
e.url = pelican.settings["SCHEDULEURL"] + "#talk-" + talk["id"]
if "room" in talk:
e.location = talk["room"]
return e
class TalksGenerator(generators.Generator):
def __init__(self, *args, **kwargs):
self.talks = []
super(TalksGenerator, self).__init__(*args, **kwargs)
def generate_context(self):
self.talks = {n: get_talk_data(n) for n in get_talk_names()}
self._update_context(("talks",))
check_overlaps()
def generate_output(self, writer=None):
for talkname in sorted(self.talks):
if "resources" in self.talks[talkname]:
outdir = os.path.join(
self.output_path,
pelican.settings["TALKS_PATH"],
talkname,
pelican.settings["TALKS_ATTACHMENT_PATH"],
)
if os.path.isdir(outdir):
shutil.rmtree(outdir)
shutil.copytree(self.talks[talkname]["resources"], outdir)
if ICS_ENABLED:
with io.open(
os.path.join(self.output_path, pelican.settings.get("TALKS_ICS")),
"w",
encoding="utf8",
) as buf:
buf.write(talks_to_ics())
else:
logging.warning(
"module `ics` not found. " "ICS calendar will not be generated"
)
def add_talks_option_defaults(pelican):
pelican.settings.setdefault("TALKS_PATH", "talks")
pelican.settings.setdefault("TALKS_ATTACHMENT_PATH", "res")
pelican.settings.setdefault("TALKS_ICS", "schedule.ics")
pelican.settings.setdefault("TALKS_GRID_STEP", 30)
def get_generators(gen):
return TalksGenerator
def pelican_init(pelicanobj):
global pelican
pelican = pelicanobj
try:
import yaml
except ImportError:
print("ERROR: yaml not found. Talks plugins will be disabled")
def register():
pass
else:
def register():
signals.initialized.connect(pelican_init)
signals.get_generators.connect(get_generators)
signals.initialized.connect(add_talks_option_defaults)
directives.register_directive("talklist", TalkListDirective)
directives.register_directive("talk", TalkDirective)
directives.register_directive("talkgrid", TalkGridDirective)