sito-hackit-17/plugins/talks.py

336 lines
11 KiB
Python

'''
Manage talks scheduling in a semantic way
'''
from __future__ import print_function
import os
import io
from functools import wraps
import logging
import re
import datetime
import shutil
import time
from copy import copy
import locale
from contextlib import contextmanager
from babel.dates import format_date, format_datetime, format_time
import markdown
from docutils import nodes
from docutils.parsers.rst import directives, Directive
from pelican import signals, generators
import jinja2
TALKS_PATH = 'talks'
TALK_ATTACHMENT_PATH = 'res'
TALK_ICS = 'schedule.ics'
GRID_STEP = 15
def memoize(function):
'''decorators to cache'''
memo = {}
@wraps(function)
def wrapper(*args):
if args in memo:
return memo[args]
else:
rv = function(*args)
memo[args] = rv
return rv
return wrapper
@contextmanager
def setlocale(name):
saved = locale.setlocale(locale.LC_ALL)
try:
yield locale.setlocale(locale.LC_ALL, name)
finally:
locale.setlocale(locale.LC_ALL, saved)
@memoize
def get_talk_names():
return [name for name in os.listdir(TALKS_PATH)
if not name.startswith('_') and
get_talk_data(name) is not None
]
def all_talks():
return [get_talk_data(tn) for tn in get_talk_names()]
def unique_attr(iterable, attr):
return {x[attr] for x in iterable
if attr in x}
@memoize
def get_global_data():
fname = os.path.join(TALKS_PATH, 'meta.yaml')
if not os.path.isfile(fname):
return None
with io.open(fname, encoding='utf8') as buf:
try:
data = yaml.load(buf)
except Exception:
logging.exception("Syntax error reading %s; skipping", fname)
return None
if data is None:
return None
if 'startdate' not in data:
logging.error("Missing startdate in global data")
data['startdate'] = datetime.datetime.now()
return data
@memoize
def get_talk_data(talkname):
fname = os.path.join(TALKS_PATH, talkname, 'meta.yaml')
if not os.path.isfile(fname):
return None
with io.open(fname, encoding='utf8') as buf:
try:
data = yaml.load(buf)
except Exception as exc:
logging.exception("Syntax error reading %s; skipping", fname)
return None
if data is None:
return None
if 'title' not in data:
logging.warn("Talk <{}> has no `title` field".format(talkname))
data['title'] = talkname
if 'text' not in data:
logging.warn("Talk <{}> has no `text` field".format(talkname))
data['text'] = ''
if 'duration' not in data:
logging.info("Talk <{}> has no `duration` field (50min used)"
.format(talkname))
data['duration'] = 50
data['duration'] = int(data['duration'])
if data['duration'] < GRID_STEP:
logging.info("Talk <{}> lasts only {} minutes; changing to {}"
.format(talkname, data['duration'], GRID_STEP))
data['duration'] = GRID_STEP
if 'links' not in data or not data['links']:
data['links'] = []
if 'contacts' not in data or not data['contacts']:
data['contacts'] = []
if 'needs' not in data or not data['needs']:
data['needs'] = []
if 'room' not in data:
logging.warn("Talk <{}> has no `room` field".format(talkname))
if 'time' not in data or 'day' not in data:
logging.warn("Talk <{}> has no `time` or `day`".format(talkname))
if 'time' in data:
del data['time']
if 'day' in data:
del data['day']
if 'day' in data:
data['day'] = get_global_data()['startdate'] + datetime.timedelta(days=data['day'])
if 'time' in data and 'day' in data:
timeparts = re.findall(r'\d+', str(data['time']))
if 4 > len(timeparts) > 0:
timeparts = [int(p) for p in timeparts]
data['time'] = datetime.datetime.combine(data['day'],
datetime.time(*timeparts))
else:
logging.error("Talk <{}> has malformed `time`".format(talkname))
data['id'] = talkname
resdir = os.path.join(TALKS_PATH, talkname, TALK_ATTACHMENT_PATH)
if os.path.isdir(resdir) and os.listdir(resdir):
data['resources'] = resdir
return data
jinja_env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.join(TALKS_PATH, '_templates')),
autoescape=True,
)
jinja_env.filters['markdown'] = lambda text: \
jinja2.Markup(markdown.Markdown(extensions=['meta']).
convert(text))
jinja_env.filters['dateformat'] = format_date
jinja_env.filters['datetimeformat'] = format_datetime
jinja_env.filters['timeformat'] = format_time
class TalkListDirective(Directive):
final_argument_whitespace = True
has_content = True
option_spec = {
'lang': directives.unchanged
}
def run(self):
lang = self.options.get('lang', 'C')
tmpl = jinja_env.get_template('talk.html')
return [
nodes.raw('', tmpl.render(lang=lang, **get_talk_data(n)),
format='html')
for n in get_talk_names()
]
class TalkDirective(Directive):
required_arguments = 1
final_argument_whitespace = True
has_content = True
option_spec = {
'lang': directives.unchanged
}
def run(self):
lang = self.options.get('lang', 'C')
tmpl = jinja_env.get_template('talk.html')
data = get_talk_data(self.arguments[0])
if data is None:
return []
return [
nodes.raw('', tmpl.render(lang=lang, **data),
format='html')
]
class TalkGridDirective(Directive):
'''A complete grid'''
final_argument_whitespace = True
has_content = True
option_spec = {
'lang': directives.unchanged
}
def run(self):
lang = self.options.get('lang', 'C')
tmpl = jinja_env.get_template('grid.html')
output = []
days = unique_attr(all_talks(), 'day')
for day in sorted(days):
talks = {talk['id'] for talk in all_talks()
if talk.get('day', None) == day
and 'time' in talk
and 'room' in talk}
if not talks:
continue
talks = [get_talk_data(t) for t in talks]
rooms = tuple(sorted(unique_attr(talks, 'room')))
mintime = min({talk['time'].hour * 60 +
talk['time'].minute
for talk in talks}) // GRID_STEP * GRID_STEP
maxtime = max({talk['time'].hour * 60 +
talk['time'].minute +
talk['duration']
for talk in talks})
times = {}
for t in range(mintime, maxtime, GRID_STEP):
times[t] = [None] * len(rooms)
for talk in sorted(talks, key=lambda x: x['time']):
talktime = talk['time'].hour * 60 + talk['time'].minute
position = talktime // GRID_STEP * GRID_STEP # round
assert position in times
roomnum = rooms.index(talk['room'])
if times[position][roomnum] is not None:
logging.error("Talk {} and {} overlap! "
.format(times[position][roomnum]['id'],
talk['id']))
continue
times[position][roomnum] = copy(talk)
times[position][roomnum]['skip'] = False
for i in range(1, talk['duration'] // GRID_STEP):
times[position + i*GRID_STEP][roomnum] = copy(talk)
times[position + i*GRID_STEP][roomnum]['skip'] = True
#with setlocale(locale.normalize(lang)):
render = tmpl.render(times=times,
rooms=rooms,
mintime=mintime, maxtime=maxtime,
timestep=GRID_STEP,
lang=lang,
)
output.append(nodes.raw('', u'<h4>%s</h4>' %
format_date(day, format='full', locale=lang),
format='html'))
output.append(nodes.raw('', render, format='html'))
return output
def talks_to_ics():
content = 'BEGIN:VCALENDAR\nVERSION:2.0\nPRODID:pelican\n'
for t in all_talks():
content += talk_to_ics(t)
content += 'END:VCALENDAR\n'
return content
def talk_to_ics(talk):
if 'time' not in talk or 'duration' not in talk:
return ''
start = talk['time']
end = start + datetime.timedelta(minutes=talk['duration'])
content = 'BEGIN:VEVENT\n'
content += "UID:%s@%d.hackmeeting.org\n" % (talk['id'], talk['day'].year)
content += "SUMMARY:%s\n" % talk['title']
content += "DTSTAMP:%s\n" % time.strftime('%Y%m%dT%H%M%SZ',
time.gmtime(float(start.strftime('%s'))))
content += "DTSTART:%s\n" % time.strftime('%Y%m%dT%H%M%SZ',
time.gmtime(float(
start.strftime('%s'))))
content += "DTEND:%s\n" % time.strftime('%Y%m%dT%H%M%SZ',
time.gmtime(float(
end.strftime('%s'))))
content += "LOCATION:%s\n" % talk['room']
content += 'END:VEVENT\n'
return content
class TalksGenerator(generators.Generator):
def __init__(self, *args, **kwargs):
self.talks = []
super(TalksGenerator, self).__init__(*args, **kwargs)
def generate_context(self):
self.talks = {n: get_talk_data(n) for n in get_talk_names()}
self._update_context(('talks',))
def generate_output(self, writer=None):
for talkname in self.talks:
if 'resources' in self.talks[talkname]:
outdir = os.path.join(self.output_path, TALKS_PATH, talkname,
TALK_ATTACHMENT_PATH)
if os.path.isdir(outdir):
shutil.rmtree(outdir)
shutil.copytree(self.talks[talkname]['resources'], outdir)
with open(os.path.join(self.output_path, TALK_ICS), 'w') as buf:
buf.write(talks_to_ics())
def add_talks_option_defaults(pelican):
pelican.settings.setdefault('TALKS_PATH', TALKS_PATH)
def get_generators(gen):
return TalksGenerator
try:
import yaml
except ImportError:
print('ERROR: yaml not found. Talks plugins will be disabled')
def register():
pass
else:
def register():
signals.get_generators.connect(get_generators)
signals.initialized.connect(add_talks_option_defaults)
directives.register_directive('talklist', TalkListDirective)
directives.register_directive('talk', TalkDirective)
directives.register_directive('talkgrid', TalkGridDirective)