__init__.py 8.1 KB


  1. """
  2. This module contains a flask blueprint for db administration stuff
  3. Templates are self-contained in this directory
  4. """
  5. from __future__ import print_function
  6. import os
  7. from datetime import datetime, timedelta, time
  8. from collections import defaultdict
  9. import mimetypes
  10. from flask import (
  11. current_app,
  12. Blueprint,
  13. Response,
  14. render_template,
  15. jsonify,
  16. abort,
  17. request,
  18. redirect,
  19. url_for,
  20. flash,
  21. )
  22. from larigira.entrypoints_utils import get_avail_entrypoints
  23. from larigira.audiogen import get_audiogenerator
  24. from larigira.timegen_every import FrequencyAlarm
  25. from larigira.timegen import get_timegenerator, timegenerate
  26. from larigira import forms
  27. from larigira.config import get_conf
  28. from .suggestions import get_suggestions
  29. db = Blueprint(
  30. "db",
  31. __name__,
  32. url_prefix=get_conf()["ROUTE_PREFIX"] + "/db",
  33. template_folder="templates",
  34. )
  35. def request_wants_json():
  36. best = request.accept_mimetypes.best_match(["application/json", "text/html"])
  37. return (
  38. best == "application/json"
  39. and request.accept_mimetypes[best] > request.accept_mimetypes["text/html"]
  40. )
  41. def get_model():
  42. return current_app.larigira.controller.monitor.model
  43. @db.route("/")
  44. def home():
  45. return render_template("dbadmin_base.html")
  46. @db.route("/list")
  47. def events_list():
  48. model = current_app.larigira.controller.monitor.model
  49. alarms = tuple(model.get_all_alarms())
  50. events = [(alarm, model.get_actions_by_alarm(alarm)) for alarm in alarms]
  51. return render_template("list.html", events=events)
  52. @db.route("/calendar")
  53. def events_calendar():
  54. model = current_app.larigira.controller.monitor.model
  55. today = datetime.now().date()
  56. max_days = 30
  57. max_occurrences = get_conf()["UI_CALENDAR_OCCURRENCIES_THRESHOLD"]
  58. # {date: {datetime: [(alarm1,actions1), (alarm2,actions2)]}}
  59. days = defaultdict(lambda: defaultdict(list))
  60. show_all = (request.args.get('all', '0') == '1')
  61. for alarm in model.get_all_alarms():
  62. actions = tuple(model.get_actions_by_alarm(alarm))
  63. if not actions:
  64. continue
  65. t = datetime.fromtimestamp(int(today.strftime("%s")))
  66. occurrences = [ t for t in timegenerate(alarm, now=t, howmany=max_occurrences +
  67. 1) if t is not None and t <= datetime.combine(
  68. today + timedelta(days=max_days), time()
  69. )]
  70. if not occurrences:
  71. continue
  72. if not show_all and len(occurrences) > max_occurrences:
  73. continue
  74. for t in occurrences:
  75. days[t.date()][t].append((alarm, actions))
  76. # { weeknum: [day1, day2, day3] }
  77. weeks = defaultdict(list)
  78. for d in sorted(days.keys()):
  79. weeks[d.isocalendar()[:2]].append(d)
  80. return render_template("calendar.html", days=days, weeks=weeks,
  81. show_all=show_all)
  82. @db.route("/add/time")
  83. def addtime():
  84. kinds = get_avail_entrypoints("larigira.timeform_create")
  85. def gen_info(gen):
  86. return dict(description=getattr(gen, "description", ""))
  87. info = {kind: gen_info(get_timegenerator(kind)) for kind in kinds}
  88. return render_template("add_time.html", kinds=kinds, info=info)
  89. @db.route("/edit/time/<int:alarmid>", methods=["GET", "POST"])
  90. def edit_time(alarmid):
  91. model = get_model()
  92. timespec = model.get_alarm_by_id(alarmid)
  93. kind = timespec["kind"]
  94. Form, receiver = tuple(forms.get_timeform(kind))
  95. form = Form()
  96. if request.method == "GET":
  97. form.populate_from_timespec(timespec)
  98. if request.method == "POST" and form.validate():
  99. data = receiver(form)
  100. model.update_alarm(alarmid, data)
  101. model.reload()
  102. return redirect(url_for("db.events_calendar", highlight="%d" % alarmid))
  103. return render_template(
  104. "add_time_kind.html", form=form, kind=kind, mode="edit", alarmid=alarmid
  105. )
  106. def is_xhr():
  107. return request.headers.get("x-requested-with") is not None
  108. @db.route("/add/time/<kind>", methods=["GET", "POST"])
  109. def addtime_kind(kind):
  110. Form, receiver = tuple(forms.get_timeform(kind))
  111. form = Form(csrf_enabled=(not is_xhr()))
  112. if request.method == "POST":
  113. if form.validate():
  114. data = receiver(form)
  115. eid = get_model().add_alarm(data)
  116. if not is_xhr():
  117. return redirect(url_for("db.edit_event", alarmid=eid))
  118. else:
  119. resp = jsonify(alarmid=eid)
  120. resp.status_code = 201
  121. return resp
  122. elif is_xhr():
  123. resp = jsonify(errors=form.errors)
  124. resp.status_code = 400
  125. return resp
  126. return render_template("add_time_kind.html", form=form, kind=kind, mode="add")
  127. @db.route("/add/audio")
  128. def addaudio():
  129. kinds = get_avail_entrypoints("larigira.audioform_create")
  130. def gen_info(gen):
  131. return dict(description=getattr(gen, "description", ""))
  132. info = {kind: gen_info(get_audiogenerator(kind)) for kind in kinds}
  133. return render_template("add_audio.html", kinds=kinds, info=info)
  134. @db.route("/add/audio/<kind>", methods=["GET", "POST"])
  135. def addaudio_kind(kind):
  136. Form, receiver = tuple(forms.get_audioform(kind))
  137. form = Form(csrf_enabled=(not is_xhr()))
  138. if request.method == "POST":
  139. if form.validate():
  140. data = receiver(form)
  141. model = current_app.larigira.controller.monitor.model
  142. eid = model.add_action(data)
  143. return jsonify(dict(inserted=eid, data=data))
  144. else:
  145. resp = jsonify(errors=form.errors)
  146. resp.status_code = 400
  147. return resp
  148. return render_template(
  149. "add_audio_kind.html", form=form, kind=kind, suggestions=get_suggestions()
  150. )
  151. @db.route("/edit/audio/<int:actionid>", methods=["GET", "POST"])
  152. def edit_audio(actionid):
  153. model = get_model()
  154. audiospec = model.get_action_by_id(actionid)
  155. kind = audiospec["kind"]
  156. Form, receiver = tuple(forms.get_audioform(kind))
  157. form = Form()
  158. if request.method == "GET":
  159. form.populate_from_audiospec(audiospec)
  160. if request.method == "POST" and form.validate():
  161. data = receiver(form)
  162. model.update_action(actionid, data)
  163. model.reload()
  164. return redirect(url_for("db.events_list"))
  165. return render_template(
  166. "add_audio_kind.html",
  167. form=form,
  168. kind=kind,
  169. mode="edit",
  170. suggestions=get_suggestions(),
  171. )
  172. @db.route("/edit/event/<alarmid>")
  173. def edit_event(alarmid):
  174. model = current_app.larigira.controller.monitor.model
  175. alarm = model.get_alarm_by_id(int(alarmid))
  176. if alarm is None:
  177. abort(404)
  178. allactions = model.get_all_actions()
  179. actions = tuple(model.get_actions_by_alarm(alarm))
  180. return render_template(
  181. "edit_event.html",
  182. alarm=alarm,
  183. all_actions=allactions,
  184. actions=actions,
  185. routeprefix=get_conf()["ROUTE_PREFIX"],
  186. )
  187. @db.route("/api/alarm/<alarmid>/actions", methods=["POST"])
  188. def change_actions(alarmid):
  189. new_actions = request.form.getlist("actions[]")
  190. if new_actions is None:
  191. new_actions = []
  192. model = current_app.larigira.controller.monitor.model
  193. ret = model.update_alarm(
  194. int(alarmid), new_fields={"actions": [int(a) for a in new_actions]}
  195. )
  196. return jsonify(dict(updated=alarmid, ret=ret))
  197. @db.route("/api/alarm/<int:alarmid>/delete", methods=["POST"])
  198. def delete_alarm(alarmid):
  199. model = current_app.larigira.controller.monitor.model
  200. try:
  201. alarm = model.get_alarm_by_id(int(alarmid))
  202. print(alarm["nick"])
  203. model.delete_alarm(alarmid)
  204. except KeyError:
  205. abort(404)
  206. if request_wants_json():
  207. return jsonify(dict(deleted=alarmid))
  208. flash("Evento %d `%s` cancellato" % (alarmid, alarm["nick"]))
  209. return redirect(url_for("db.events_list"))
  210. @db.route("/extra/<path:relpath>")
  211. def static_custom(relpath):
  212. basepath = get_conf()["EXTRA_STATIC_PATH"]
  213. if not basepath:
  214. abort(405)
  215. fpath = os.path.join(basepath, relpath)
  216. print(basepath, fpath)
  217. if not os.path.isfile(fpath):
  218. abort(404, "File non trovato")
  219. mime, _encoding = mimetypes.guess_type(fpath)
  220. return Response(open(fpath, "rb").read(), mimetype=mime)