''' Manage talks scheduling in a semantic way ''' from __future__ import print_function import os import io from functools import wraps import logging import re import datetime import shutil from copy import copy import locale from contextlib import contextmanager from babel.dates import format_date, format_datetime, format_time from markdown import markdown from docutils import nodes from docutils.parsers.rst import directives, Directive import six from pelican import signals, generators import jinja2 try: import ics except ImportError: ICS_ENABLED = False else: ICS_ENABLED = True import unidecode import dateutil pelican = None # This will be set during register() def memoize(function): '''decorators to cache''' memo = {} @wraps(function) def wrapper(*args): if args in memo: return memo[args] else: rv = function(*args) memo[args] = rv return rv return wrapper @contextmanager def setlocale(name): saved = locale.setlocale(locale.LC_ALL) try: yield locale.setlocale(locale.LC_ALL, name) finally: locale.setlocale(locale.LC_ALL, saved) @memoize def get_talk_names(): names = [name for name in os.listdir(pelican.settings['TALKS_PATH']) if not name.startswith('_') and get_talk_data(name) is not None ] names.sort() return names def all_talks(): return [get_talk_data(tn) for tn in get_talk_names()] def unique_attr(iterable, attr): return {x[attr] for x in iterable if attr in x} @memoize def get_global_data(): fname = os.path.join(pelican.settings['TALKS_PATH'], 'meta.yaml') if not os.path.isfile(fname): return None with io.open(fname, encoding='utf8') as buf: try: data = yaml.load(buf) except Exception: logging.exception("Syntax error reading %s; skipping", fname) return None if data is None: return None if 'startdate' not in data: logging.error("Missing startdate in global data") data['startdate'] = datetime.datetime.now() if 'rooms' not in data: data['rooms'] = {} return data @memoize def get_talk_data(talkname): fname = os.path.join(pelican.settings['TALKS_PATH'], talkname, 'meta.yaml') if not os.path.isfile(fname): return None with io.open(fname, encoding='utf8') as buf: try: data = yaml.load(buf) except Exception: logging.exception("Syntax error reading %s; skipping", fname) return None if data is None: return None try: gridstep = pelican.settings['TALKS_GRID_STEP'] data.setdefault('nooverlap', []) if 'title' not in data: logging.warn("Talk <{}> has no `title` field".format(talkname)) data['title'] = six.text_type(talkname) else: data['title'] = six.text_type(data['title']) if 'text' not in data: logging.warn("Talk <{}> has no `text` field".format(talkname)) data['text'] = '' else: data['text'] = six.text_type(data['text']) if 'duration' not in data: logging.info("Talk <{}> has no `duration` field (50min used)" .format(talkname)) data['duration'] = 50 data['duration'] = int(data['duration']) if data['duration'] < gridstep: logging.info("Talk <{}> lasts only {} minutes; changing to {}" .format(talkname, data['duration'], gridstep)) data['duration'] = gridstep if 'links' not in data or not data['links']: data['links'] = [] if 'contacts' not in data or not data['contacts']: data['contacts'] = [] if 'needs' not in data or not data['needs']: data['needs'] = [] if 'room' not in data: logging.warn("Talk <{}> has no `room` field".format(talkname)) else: if data['room'] in get_global_data()['rooms']: data['room'] = get_global_data()['rooms'][data['room']] if 'time' not in data or 'day' not in data: logging.warn("Talk <{}> has no `time` or `day`".format(talkname)) if 'time' in data: del data['time'] if 'day' in data: del data['day'] else: data['day'] = get_global_data()['startdate'] + \ datetime.timedelta(days=data['day']) timeparts = re.findall(r'\d+', str(data['time'])) if 4 > len(timeparts) > 0: timeparts = [int(p) for p in timeparts] data['time'] = datetime.datetime.combine( data['day'], datetime.time(*timeparts)) data['time'] = data['time'].replace( tzinfo=dateutil.tz.gettz('Europe/Rome')) else: logging.error("Talk <%s> has malformed `time`", talkname) data['id'] = talkname resdir = os.path.join(pelican.settings['TALKS_PATH'], talkname, pelican.settings['TALKS_ATTACHMENT_PATH']) if os.path.isdir(resdir) and os.listdir(resdir): data['resources'] = resdir return data except Exception: logging.exception("Error on talk %s", talkname) raise def overlap(interval_a, interval_b): '''how many minutes do they overlap?''' return max(0, min(interval_a[1], interval_b[1]) - max(interval_a[0], interval_b[0])) def get_talk_overlaps(name): data = get_talk_data(name) overlapping_talks = set() if 'time' not in data: return overlapping_talks start = int(data['time'].strftime('%s')) end = start + data['duration'] * 60 for other in get_talk_names(): if other == name: continue if 'time' not in get_talk_data(other): continue other_start = int(get_talk_data(other)['time'].strftime('%s')) other_end = other_start + get_talk_data(other)['duration'] * 60 minutes = overlap((start, end), (other_start, other_end)) if minutes > 0: overlapping_talks.add(other) return overlapping_talks @memoize def check_overlaps(): for t in get_talk_names(): over = get_talk_overlaps(t) noover = get_talk_data(t)['nooverlap'] contacts = set(get_talk_data(t)['contacts']) for overlapping in over: if overlapping in noover or \ set(get_talk_data(overlapping)['contacts']).\ intersection(contacts): logging.warning('Talk %s overlaps with %s' % (t, overlapping)) @memoize def jinja_env(): env = jinja2.Environment( loader=jinja2.FileSystemLoader(os.path.join( pelican.settings['TALKS_PATH'], '_templates')), autoescape=True, ) env.filters['markdown'] = lambda text: jinja2.Markup(markdown(text)) env.filters['dateformat'] = format_date env.filters['datetimeformat'] = format_datetime env.filters['timeformat'] = format_time return env class TalkListDirective(Directive): final_argument_whitespace = True has_content = True option_spec = { 'lang': directives.unchanged } def run(self): lang = self.options.get('lang', 'C') tmpl = jinja_env().get_template('talk.html') def _sort_date(name): ''' This function is a helper to sort talks by start date When no date is available, put at the beginning ''' d = get_talk_data(name) room = d.get('room', '') time = d.get('time', datetime.datetime(1, 1, 1).replace( tzinfo=dateutil.tz.gettz('Europe/Rome'))) title = d.get('title', '') return (time, room, title) return [ nodes.raw('', tmpl.render(lang=lang, **get_talk_data(n)), format='html') for n in sorted(get_talk_names(), key=_sort_date) ] class TalkDirective(Directive): required_arguments = 1 final_argument_whitespace = True has_content = True option_spec = { 'lang': directives.unchanged } def run(self): lang = self.options.get('lang', 'C') tmpl = jinja_env().get_template('talk.html') data = get_talk_data(self.arguments[0]) if data is None: return [] return [ nodes.raw('', tmpl.render(lang=lang, **data), format='html') ] class TalkGridDirective(Directive): '''A complete grid''' final_argument_whitespace = True has_content = True option_spec = { 'lang': directives.unchanged } def run(self): lang = self.options.get('lang', 'C') tmpl = jinja_env().get_template('grid.html') output = [] days = unique_attr(all_talks(), 'day') gridstep = pelican.settings['TALKS_GRID_STEP'] for day in sorted(days): talks = {talk['id'] for talk in all_talks() if talk.get('day', None) == day and 'time' in talk and 'room' in talk} if not talks: continue talks = [get_talk_data(t) for t in talks] rooms = set() for t in talks: if type(t['room']) is list: for r in t['room']: rooms.add(r) else: rooms.add(t['room']) # TODO: ordina in base a qualcosa nel meta.yaml globale rooms = list(sorted(rooms)) # room=* is not a real room. # Remove it unless that day only has special rooms if '*' in rooms and len(rooms) > 1: del rooms[rooms.index('*')] mintime = min({talk['time'].hour * 60 + talk['time'].minute for talk in talks}) // gridstep * gridstep maxtime = max({talk['time'].hour * 60 + talk['time'].minute + talk['duration'] for talk in talks}) times = {} for t in range(mintime, maxtime, gridstep): times[t] = [None] * len(rooms) for talk in sorted(talks, key=lambda x: x['time']): talktime = talk['time'].hour * 60 + talk['time'].minute position = talktime // gridstep * gridstep # round assert position in times if talk['room'] == '*': roomnums = range(len(rooms)) elif type(talk['room']) is list: roomnums = [rooms.index(r) for r in talk['room']] else: roomnums = [rooms.index(talk['room'])] for roomnum in roomnums: if times[position][roomnum] is not None: logging.error("Talk %s and %s overlap! (room %s)", times[position][roomnum]['id'], talk['id'], rooms[roomnum] ) continue times[position][roomnum] = copy(talk) times[position][roomnum]['skip'] = False for i in range(1, talk['duration'] // gridstep): times[position + i*gridstep][roomnum] = copy(talk) times[position + i*gridstep][roomnum]['skip'] = True render = tmpl.render(times=times, rooms=rooms, mintime=mintime, maxtime=maxtime, timestep=gridstep, lang=lang, ) output.append(nodes.raw( '', u'

%s

' % format_date(day, format='full', locale=lang), format='html')) output.append(nodes.raw('', render, format='html')) return output def talks_to_ics(): c = ics.Calendar() c.creator = u'pelican' for t in all_talks(): e = talk_to_ics(t) if e is not None: c.events.add(e) return six.text_type(c) def talk_to_ics(talk): def _decode(s): if six.PY2: return unidecode.unidecode(s) else: return s if 'time' not in talk or 'duration' not in talk: return None e = ics.Event( uid="%s@%d.hackmeeting.org" % (talk['id'], get_global_data()['startdate'].year), name=_decode(talk['title']), begin=talk['time'], duration=datetime.timedelta(minutes=talk['duration']), transparent=True, ) # ics.py has some problems with unicode # unidecode replaces letters with their most similar ASCII counterparts # (ie: accents get stripped) if 'text' in talk: e.description = _decode(talk['text']) e.url = pelican.settings['SCHEDULEURL'] + '#talk-' + talk['id'] if 'room' in talk: e.location = talk['room'] return e class TalksGenerator(generators.Generator): def __init__(self, *args, **kwargs): self.talks = [] super(TalksGenerator, self).__init__(*args, **kwargs) def generate_context(self): self.talks = {n: get_talk_data(n) for n in get_talk_names()} self._update_context(('talks',)) check_overlaps() def generate_output(self, writer=None): for talkname in sorted(self.talks): if 'resources' in self.talks[talkname]: outdir = os.path.join(self.output_path, pelican.settings['TALKS_PATH'], talkname, pelican.settings['TALKS_ATTACHMENT_PATH'] ) if os.path.isdir(outdir): shutil.rmtree(outdir) shutil.copytree(self.talks[talkname]['resources'], outdir) if ICS_ENABLED: with io.open(os.path.join(self.output_path, pelican.settings.get('TALKS_ICS')), 'w', encoding='utf8') as buf: buf.write(talks_to_ics()) else: logging.warning('module `ics` not found. ' 'ICS calendar will not be generated') def add_talks_option_defaults(pelican): pelican.settings.setdefault('TALKS_PATH', 'talks') pelican.settings.setdefault('TALKS_ATTACHMENT_PATH', 'res') pelican.settings.setdefault('TALKS_ICS', 'schedule.ics') pelican.settings.setdefault('TALKS_GRID_STEP', 30) def get_generators(gen): return TalksGenerator def pelican_init(pelicanobj): global pelican pelican = pelicanobj try: import yaml except ImportError: print('ERROR: yaml not found. Talks plugins will be disabled') def register(): pass else: def register(): signals.initialized.connect(pelican_init) signals.get_generators.connect(get_generators) signals.initialized.connect(add_talks_option_defaults) directives.register_directive('talklist', TalkListDirective) directives.register_directive('talk', TalkDirective) directives.register_directive('talkgrid', TalkGridDirective)