''' Manage talks scheduling in a semantic way ''' from __future__ import print_function import os import io from functools import wraps import logging import re import datetime import shutil import time from copy import copy import markdown from docutils import nodes from docutils.parsers.rst import directives, Directive from pelican import signals, generators import jinja2 TALKS_PATH = 'talks' TALK_ATTACHMENT_PATH = 'res' TALK_ICS = 'schedule.ics' GRID_STEP = 15 def memoize(function): '''decorators to cache''' memo = {} @wraps(function) def wrapper(*args): if args in memo: return memo[args] else: rv = function(*args) memo[args] = rv return rv return wrapper @memoize def get_talk_names(): return [name for name in os.listdir(TALKS_PATH) if not name.startswith('_') and get_talk_data(name) is not None ] def all_talks(): return [get_talk_data(tn) for tn in get_talk_names()] def unique_attr(iterable, attr): return {x[attr] for x in iterable if attr in x} @memoize def get_global_data(): fname = os.path.join(TALKS_PATH, 'meta.yaml') if not os.path.isfile(fname): return None with io.open(fname, encoding='utf8') as buf: try: data = yaml.load(buf) except Exception: logging.exception("Syntax error reading %s; skipping", fname) return None if data is None: return None if 'startdate' not in data: logging.error("Missing startdate in global data") data['startdate'] = datetime.datetime.now() return data @memoize def get_talk_data(talkname): fname = os.path.join(TALKS_PATH, talkname, 'meta.yaml') if not os.path.isfile(fname): return None with io.open(fname, encoding='utf8') as buf: try: data = yaml.load(buf) except Exception as exc: logging.exception("Syntax error reading %s; skipping", fname) return None if data is None: return None if 'title' not in data: logging.warn("Talk <{}> has no `title` field".format(talkname)) data['title'] = talkname if 'text' not in data: logging.warn("Talk <{}> has no `text` field".format(talkname)) data['text'] = '' if 'duration' not in data: logging.info("Talk <{}> has no `duration` field (50min used)" .format(talkname)) data['duration'] = 50 data['duration'] = int(data['duration']) if data['duration'] < GRID_STEP: logging.info("Talk <{}> lasts only {} minutes; changing to {}" .format(talkname, data['duration'], GRID_STEP)) data['duration'] = GRID_STEP if 'links' not in data or not data['links']: data['links'] = [] if 'contacts' not in data or not data['contacts']: data['contacts'] = [] if 'needs' not in data or not data['needs']: data['needs'] = [] if 'room' not in data: logging.warn("Talk <{}> has no `room` field".format(talkname)) if 'time' not in data or 'day' not in data: logging.warn("Talk <{}> has no `time` or `day`".format(talkname)) if 'day' in data: data['day'] = get_global_data()['startdate'] + datetime.timedelta(days=data['day']) if 'time' in data and 'day' in data: timeparts = re.findall(r'\d+', str(data['time'])) if 4 > len(timeparts) > 0: timeparts = [int(p) for p in timeparts] data['time'] = datetime.datetime.combine(data['day'], datetime.time(*timeparts)) else: logging.error("Talk <{}> has malformed `time`".format(talkname)) data['id'] = talkname resdir = os.path.join(TALKS_PATH, talkname, TALK_ATTACHMENT_PATH) if os.path.isdir(resdir) and os.listdir(resdir): data['resources'] = resdir return data jinja_env = jinja2.Environment( loader=jinja2.FileSystemLoader(os.path.join(TALKS_PATH, '_templates')), autoescape=True, ) jinja_env.filters['markdown'] = lambda text: \ jinja2.Markup(markdown.Markdown(extensions=['meta']). convert(text)) class TalkListDirective(Directive): required_arguments = 0 optional_arguments = 0 final_argument_whitespace = True has_content = True def run(self): tmpl = jinja_env.get_template('talk.html') return [ nodes.raw('', tmpl.render(**get_talk_data(n)), format='html') for n in get_talk_names() ] class TalkDirective(Directive): required_arguments = 1 optional_arguments = 0 final_argument_whitespace = True has_content = True def run(self): tmpl = jinja_env.get_template('talk.html') data = get_talk_data(self.arguments[0]) if data is None: return [] return [ nodes.raw('', tmpl.render(**data), format='html') ] class TalkGridDirective(Directive): '''A complete grid''' required_arguments = 0 optional_arguments = 0 final_argument_whitespace = True has_content = True def run(self): tmpl = jinja_env.get_template('grid.html') output = [] days = unique_attr(all_talks(), 'day') for day in sorted(days): talks = {talk['id'] for talk in all_talks() if talk.get('day', None) == day and 'time' in talk and 'room' in talk} if not talks: continue talks = [get_talk_data(t) for t in talks] rooms = tuple(sorted(unique_attr(talks, 'room'))) mintime = min({talk['time'].hour * 60 + talk['time'].minute for talk in talks}) // GRID_STEP * GRID_STEP maxtime = max({talk['time'].hour * 60 + talk['time'].minute + talk['duration'] for talk in talks}) times = {} for t in range(mintime, maxtime, GRID_STEP): times[t] = [None] * len(rooms) for talk in sorted(talks, key=lambda x: x['time']): talktime = talk['time'].hour * 60 + talk['time'].minute position = talktime // GRID_STEP * GRID_STEP # round assert position in times roomnum = rooms.index(talk['room']) if times[position][roomnum] is not None: logging.error("Talk {} and {} overlap! " .format(times[position][roomnum]['id'], talk['id'])) continue times[position][roomnum] = copy(talk) times[position][roomnum]['skip'] = False for i in range(1, talk['duration'] // GRID_STEP): times[position + i*GRID_STEP][roomnum] = copy(talk) times[position + i*GRID_STEP][roomnum]['skip'] = True render = tmpl.render(times=times, rooms=rooms, mintime=mintime, maxtime=maxtime, timestep=GRID_STEP, ) output.append(nodes.raw('', u'

%s

' % day.strftime('%A %d').decode('utf8').title(), format='html')) output.append(nodes.raw('', render, format='html')) return output def talks_to_ics(): content = 'BEGIN:VCALENDAR\nVERSION:2.0\nPRODID:pelican\n' for t in all_talks(): content += talk_to_ics(t) content += 'END:VCALENDAR\n' return content def talk_to_ics(talk): if 'time' not in talk: return '' start = talk['time'] end = start + datetime.timedelta(minutes=talk['duration']) content = 'BEGIN:VEVENT\n' content += "UID:%s@%d.hackmeeting.org\n" % (talk['id'], talk['day'].year) content += "SUMMARY:%s\n" % talk['title'] content += "DTSTAMP:%s\n" % time.strftime('%Y%m%dT%H%M%SZ', time.gmtime(float(start.strftime('%s')))) content += "DTSTART:%s\n" % time.strftime('%Y%m%dT%H%M%SZ', time.gmtime(float( start.strftime('%s')))) content += "DTEND:%s\n" % time.strftime('%Y%m%dT%H%M%SZ', time.gmtime(float( end.strftime('%s')))) content += "LOCATION:%s\n" % talk['room'] content += 'END:VEVENT\n' return content class TalksGenerator(generators.Generator): def __init__(self, *args, **kwargs): self.talks = [] super(TalksGenerator, self).__init__(*args, **kwargs) def generate_context(self): self.talks = {n: get_talk_data(n) for n in get_talk_names()} self._update_context(('talks',)) def generate_output(self, writer=None): for talkname in self.talks: if 'resources' in self.talks[talkname]: outdir = os.path.join(self.output_path, TALKS_PATH, talkname, TALK_ATTACHMENT_PATH) if os.path.isdir(outdir): shutil.rmtree(outdir) shutil.copytree(self.talks[talkname]['resources'], outdir) with open(os.path.join(self.output_path, TALK_ICS), 'w') as buf: buf.write(talks_to_ics()) def add_talks_option_defaults(pelican): pelican.settings.setdefault('TALKS_PATH', TALKS_PATH) def get_generators(gen): return TalksGenerator try: import yaml except ImportError: print('ERROR: yaml not found. Talks plugins will be disabled') def register(): pass else: def register(): signals.get_generators.connect(get_generators) signals.initialized.connect(add_talks_option_defaults) directives.register_directive('talklist', TalkListDirective) directives.register_directive('talk', TalkDirective) directives.register_directive('talkgrid', TalkGridDirective)