123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577 |
- # -*- coding: utf-8 -*-
- """
- Manage talks scheduling in a semantic way
- """
- from __future__ import print_function
- import os
- import io
- from functools import wraps
- import logging
- import re
- import datetime
- import shutil
- from copy import copy
- import locale
- from contextlib import contextmanager
- import inspect
- from babel.dates import format_date, format_datetime, format_time
- from markdown import markdown
- from docutils import nodes
- from docutils.parsers.rst import directives, Directive
- import six
- from pelican import signals, generators
- import jinja2
- try:
- import ics
- except ImportError:
- ICS_ENABLED = False
- else:
- ICS_ENABLED = True
- import unidecode
- import dateutil
- pelican = None # This will be set during register()
- def memoize(function):
- """decorators to cache"""
- memo = {}
- @wraps(function)
- def wrapper(*args):
- if args in memo:
- return memo[args]
- else:
- rv = function(*args)
- memo[args] = rv
- return rv
- return wrapper
- @contextmanager
- def setlocale(name):
- saved = locale.setlocale(locale.LC_ALL)
- try:
- yield locale.setlocale(locale.LC_ALL, name)
- finally:
- locale.setlocale(locale.LC_ALL, saved)
- @memoize
- def get_talk_names():
- names = [
- name
- for name in os.listdir(pelican.settings["TALKS_PATH"])
- if not name.startswith("_") and get_talk_data(name) is not None
- ]
- names.sort()
- return names
- def all_talks():
- return [get_talk_data(tn) for tn in get_talk_names()]
- def unique_attr(iterable, attr):
- return {x[attr] for x in iterable if attr in x}
- @memoize
- def get_global_data():
- fname = os.path.join(pelican.settings["TALKS_PATH"], "meta.yaml")
- if not os.path.isfile(fname):
- return None
- with io.open(fname, encoding="utf8") as buf:
- try:
- data = yaml.load(buf)
- except Exception:
- logging.exception("Syntax error reading %s; skipping", fname)
- return None
- if data is None:
- return None
- if "startdate" not in data:
- logging.error("Missing startdate in global data")
- data["startdate"] = datetime.datetime.now()
- if "rooms" not in data:
- data["rooms"] = {}
- if "names" not in data["rooms"]:
- data["rooms"]["names"] = {}
- if "order" not in data["rooms"]:
- data["rooms"]["order"] = []
- return data
- def _get_time_shift(timestring):
- """ Il problema che abbiamo è che vogliamo dire che le 2 di notte del sabato sono in realtà "parte" del
- venerdì. Per farlo accettiamo orari che superano le 24, ad esempio 25.30 vuol dire 1.30.
- Questa funzione ritorna una timedelta in base alla stringa passata
- """
- timeparts = re.findall(r"\d+", timestring)
- if not timeparts or len(timeparts) > 2:
- raise ValueError("Malformed time %s" % timestring)
- timeparts += [0, 0] # "padding" per essere sicuro ci siano anche [1] e [2]
- duration = datetime.timedelta(
- hours=int(timeparts[0]), minutes=int(timeparts[1]), seconds=int(timeparts[2])
- )
- if duration.total_seconds() > 3600 * 31 or duration.total_seconds() < 0:
- raise ValueError("Sforamento eccessivo: %d" % duration.hours)
- return duration
- @memoize
- def get_talk_data(talkname):
- fname = os.path.join(pelican.settings["TALKS_PATH"], talkname, "meta.yaml")
- if not os.path.isfile(fname):
- return None
- with io.open(fname, encoding="utf8") as buf:
- try:
- data = yaml.load(buf)
- except Exception:
- logging.exception("Syntax error reading %s; skipping", fname)
- return None
- if data is None:
- return None
- try:
- gridstep = pelican.settings["TALKS_GRID_STEP"]
- data.setdefault("nooverlap", [])
- if "title" not in data:
- logging.warn("Talk <{}> has no `title` field".format(talkname))
- data["title"] = six.text_type(talkname)
- else:
- data["title"] = six.text_type(data["title"])
- if "text" not in data:
- logging.warn("Talk <{}> has no `text` field".format(talkname))
- data["text"] = ""
- else:
- data["text"] = six.text_type(data["text"])
- if "duration" not in data:
- logging.info(
- "Talk <{}> has no `duration` field (50min used)".format(talkname)
- )
- data["duration"] = 50
- data["duration"] = int(data["duration"])
- if data["duration"] < gridstep:
- logging.info(
- "Talk <{}> lasts only {} minutes; changing to {}".format(
- talkname, data["duration"], gridstep
- )
- )
- data["duration"] = gridstep
- if "links" not in data or not data["links"]:
- data["links"] = []
- if "contacts" not in data or not data["contacts"]:
- data["contacts"] = []
- if "needs" not in data or not data["needs"]:
- data["needs"] = []
- if "room" not in data:
- logging.warn("Talk <{}> has no `room` field".format(talkname))
- else:
- if data["room"] in get_global_data()["rooms"]["names"]:
- data["room"] = get_global_data()["rooms"]["names"][data["room"]]
- if "time" not in data or "day" not in data:
- logging.warn("Talk <{}> has no `time` or `day`".format(talkname))
- if "time" in data:
- del data["time"]
- if "day" in data:
- del data["day"]
- else:
- data["day"] = get_global_data()["startdate"] + datetime.timedelta(
- days=data["day"]
- )
- try:
- shift = _get_time_shift(str(data["time"]))
- except ValueError:
- logging.error("Talk <%s> has malformed `time`", talkname)
- data["delta"] = shift
- data["time"] = datetime.datetime.combine(
- data["day"], datetime.time(0, 0, 0)
- )
- data["time"] += shift
- data["time"] = data["time"].replace(tzinfo=dateutil.tz.gettz("Europe/Rome"))
- data["id"] = talkname
- resdir = os.path.join(
- pelican.settings["TALKS_PATH"],
- talkname,
- pelican.settings["TALKS_ATTACHMENT_PATH"],
- )
- if os.path.isdir(resdir) and os.listdir(resdir):
- data["resources"] = resdir
- return data
- except Exception:
- logging.exception("Error on talk %s", talkname)
- raise
- def overlap(interval_a, interval_b):
- """how many minutes do they overlap?"""
- return max(0, min(interval_a[1], interval_b[1]) - max(interval_a[0], interval_b[0]))
- def get_talk_overlaps(name):
- data = get_talk_data(name)
- overlapping_talks = set()
- if "time" not in data:
- return overlapping_talks
- start = int(data["time"].strftime("%s"))
- end = start + data["duration"] * 60
- for other in get_talk_names():
- if other == name:
- continue
- if "time" not in get_talk_data(other):
- continue
- other_start = int(get_talk_data(other)["time"].strftime("%s"))
- other_end = other_start + get_talk_data(other)["duration"] * 60
- minutes = overlap((start, end), (other_start, other_end))
- if minutes > 0:
- overlapping_talks.add(other)
- return overlapping_talks
- @memoize
- def check_overlaps():
- for t in get_talk_names():
- over = get_talk_overlaps(t)
- noover = get_talk_data(t)["nooverlap"]
- contacts = set(get_talk_data(t)["contacts"])
- for overlapping in over:
- if overlapping in noover or set(
- get_talk_data(overlapping)["contacts"]
- ).intersection(contacts):
- logging.warning("Talk %s overlaps with %s" % (t, overlapping))
- @memoize
- def jinja_env():
- env = jinja2.Environment(
- loader=jinja2.FileSystemLoader(
- os.path.join(pelican.settings["TALKS_PATH"], "_templates")
- ),
- autoescape=True,
- )
- env.filters["markdown"] = lambda text: jinja2.Markup(markdown(text))
- env.filters["dateformat"] = format_date
- env.filters["datetimeformat"] = format_datetime
- env.filters["timeformat"] = format_time
- return env
- @memoize
- def get_css():
- plugindir = os.path.dirname(
- os.path.abspath(inspect.getfile(inspect.currentframe()))
- )
- with open(os.path.join(plugindir, "style.css")) as buf:
- return buf.read()
- class TalkListDirective(Directive):
- final_argument_whitespace = True
- has_content = True
- option_spec = {"lang": directives.unchanged}
- def run(self):
- lang = self.options.get("lang", "C")
- tmpl = jinja_env().get_template("talk.html")
- def _sort_date(name):
- """
- This function is a helper to sort talks by start date
- When no date is available, put at the beginning
- """
- d = get_talk_data(name)
- room = d.get("room", "")
- time = d.get(
- "time",
- datetime.datetime(1, 1, 1).replace(
- tzinfo=dateutil.tz.gettz("Europe/Rome")
- ),
- )
- title = d.get("title", "")
- return (time, room, title)
- return [
- nodes.raw("", tmpl.render(lang=lang, **get_talk_data(n)), format="html")
- for n in sorted(get_talk_names(), key=_sort_date)
- ]
- class TalkDirective(Directive):
- required_arguments = 1
- final_argument_whitespace = True
- has_content = True
- option_spec = {"lang": directives.unchanged}
- def run(self):
- lang = self.options.get("lang", "C")
- tmpl = jinja_env().get_template("talk.html")
- data = get_talk_data(self.arguments[0])
- if data is None:
- return []
- return [nodes.raw("", tmpl.render(lang=lang, **data), format="html")]
- def _delta_to_position(delta):
- gridstep = pelican.settings["TALKS_GRID_STEP"]
- sec = delta.total_seconds() // gridstep * gridstep
- return int("%2d%02d" % (sec // 3600, (sec % 3600) // 60))
- def _delta_inc_position(delta, i):
- gridstep = pelican.settings["TALKS_GRID_STEP"]
- delta = delta + datetime.timedelta(minutes=i * gridstep)
- sec = delta.total_seconds() // gridstep * gridstep
- return int("%2d%02d" % (sec // 3600, (sec % 3600) // 60))
- def _approx_timestr(timestr):
- gridstep = pelican.settings["TALKS_GRID_STEP"]
- t = str(timestr)
- minutes = int(t[-2:])
- hours = t[:-2]
- minutes = minutes // gridstep * gridstep
- return int("%s%02d" % (hours, minutes))
- class TalkGridDirective(Directive):
- """A complete grid"""
- final_argument_whitespace = True
- has_content = True
- option_spec = {"lang": directives.unchanged}
- def run(self):
- try:
- lang = self.options.get("lang", "C")
- tmpl = jinja_env().get_template("grid.html")
- output = []
- days = unique_attr(all_talks(), "day")
- gridstep = pelican.settings["TALKS_GRID_STEP"]
- for day in sorted(days):
- talks = {
- talk["id"]
- for talk in all_talks()
- if talk.get("day", None) == day
- and "time" in talk
- and "room" in talk
- }
- if not talks:
- continue
- talks = [get_talk_data(t) for t in talks]
- rooms = set()
- for t in talks:
- if type(t["room"]) is list:
- for r in t["room"]:
- rooms.add(r)
- else:
- rooms.add(t["room"])
- def _room_sort_key(r):
- order = get_global_data()["rooms"]["order"]
- base = None
- for k, v in get_global_data()["rooms"]["names"].items():
- if v == r:
- base = k
- break
- else:
- if type(r) is str:
- return ord(r[0])
- # int?
- return r
- return order.index(base)
- rooms = list(sorted(rooms, key=_room_sort_key))
- # room=* is not a real room.
- # Remove it unless that day only has special rooms
- if "*" in rooms and len(rooms) > 1:
- del rooms[rooms.index("*")]
- mintimedelta = min({talk["delta"] for talk in talks})
- maxtimedelta = max(
- {
- talk["delta"] + datetime.timedelta(minutes=talk["duration"])
- for talk in talks
- }
- )
- mintime = _delta_to_position(mintimedelta)
- maxtime = _delta_to_position(maxtimedelta)
- times = {}
- t = mintimedelta
- while t <= maxtimedelta:
- times[_delta_to_position(t)] = [None] * len(rooms)
- t += datetime.timedelta(minutes=gridstep)
- for talk in sorted(talks, key=lambda x: x["delta"]):
- talktime = _delta_to_position(talk["delta"])
- position = _approx_timestr(talktime)
- assert position in times, "pos=%d,time=%d" % (position, talktime)
- if talk["room"] == "*":
- roomnums = range(len(rooms))
- elif type(talk["room"]) is list:
- roomnums = [rooms.index(r) for r in talk["room"]]
- else:
- roomnums = [rooms.index(talk["room"])]
- for roomnum in roomnums:
- if times[position][roomnum] is not None:
- logging.error(
- "Talk %s and %s overlap! (room %s)",
- times[position][roomnum]["id"],
- talk["id"],
- rooms[roomnum],
- )
- continue
- times[position][roomnum] = copy(talk)
- times[position][roomnum]["skip"] = False
- for i in range(1, talk["duration"] // gridstep):
- p = _approx_timestr(_delta_inc_position(talk["delta"], i))
- times[p][roomnum] = copy(talk)
- times[p][roomnum]["skip"] = True
- render = tmpl.render(
- times=times,
- rooms=rooms,
- mintime=mintime,
- maxtime=maxtime,
- timestep=gridstep,
- lang=lang,
- )
- output.append(
- nodes.raw(
- "",
- u"<h4>%s</h4>" % format_date(day, format="full", locale=lang),
- format="html",
- )
- )
- output.append(nodes.raw("", render, format="html"))
- except:
- logging.exception("Error on talk grid")
- import traceback
- traceback.print_exc()
- return []
- css = get_css()
- if css:
- output.insert(
- 0,
- nodes.raw("", '<style type="text/css">%s</style>' % css, format="html"),
- )
- return output
- def talks_to_ics():
- c = ics.Calendar()
- c.creator = u"pelican"
- for t in all_talks():
- e = talk_to_ics(t)
- if e is not None:
- c.events.add(e)
- return six.text_type(c)
- def talk_to_ics(talk):
- def _decode(s):
- if six.PY2:
- return unidecode.unidecode(s)
- else:
- return s
- if "time" not in talk or "duration" not in talk:
- return None
- e = ics.Event(
- uid="%s@%d.hackmeeting.org" % (talk["id"], get_global_data()["startdate"].year),
- name=_decode(talk["title"]),
- begin=talk["time"],
- duration=datetime.timedelta(minutes=talk["duration"]),
- transparent=True,
- )
- # ics.py has some problems with unicode
- # unidecode replaces letters with their most similar ASCII counterparts
- # (ie: accents get stripped)
- if "text" in talk:
- e.description = _decode(talk["text"])
- e.url = pelican.settings["SCHEDULEURL"] + "#talk-" + talk["id"]
- if "room" in talk:
- e.location = talk["room"]
- return e
- class TalksGenerator(generators.Generator):
- def __init__(self, *args, **kwargs):
- self.talks = []
- super(TalksGenerator, self).__init__(*args, **kwargs)
- def generate_context(self):
- self.talks = {n: get_talk_data(n) for n in get_talk_names()}
- self._update_context(("talks",))
- check_overlaps()
- def generate_output(self, writer=None):
- for talkname in sorted(self.talks):
- if "resources" in self.talks[talkname]:
- outdir = os.path.join(
- self.output_path,
- pelican.settings["TALKS_PATH"],
- talkname,
- pelican.settings["TALKS_ATTACHMENT_PATH"],
- )
- if os.path.isdir(outdir):
- shutil.rmtree(outdir)
- shutil.copytree(self.talks[talkname]["resources"], outdir)
- if ICS_ENABLED:
- with io.open(
- os.path.join(self.output_path, pelican.settings.get("TALKS_ICS")),
- "w",
- encoding="utf8",
- ) as buf:
- buf.write(talks_to_ics())
- else:
- logging.warning(
- "module `ics` not found. " "ICS calendar will not be generated"
- )
- def add_talks_option_defaults(pelican):
- pelican.settings.setdefault("TALKS_PATH", "talks")
- pelican.settings.setdefault("TALKS_ATTACHMENT_PATH", "res")
- pelican.settings.setdefault("TALKS_ICS", "schedule.ics")
- pelican.settings.setdefault("TALKS_GRID_STEP", 30)
- def get_generators(gen):
- return TalksGenerator
- def pelican_init(pelicanobj):
- global pelican
- pelican = pelicanobj
- try:
- import yaml
- except ImportError:
- print("ERROR: yaml not found. Talks plugins will be disabled")
- def register():
- pass
- else:
- def register():
- signals.initialized.connect(pelican_init)
- signals.get_generators.connect(get_generators)
- signals.initialized.connect(add_talks_option_defaults)
- directives.register_directive("talklist", TalkListDirective)
- directives.register_directive("talk", TalkDirective)
- directives.register_directive("talkgrid", TalkGridDirective)
|