__init__.py 8.3 KB


  1. """
  2. This module contains a flask blueprint for db administration stuff
  3. Templates are self-contained in this directory
  4. """
  5. from __future__ import print_function
  6. import mimetypes
  7. import os
  8. from collections import defaultdict
  9. from datetime import datetime, time, timedelta
  10. from flask import (Blueprint, Response, abort, current_app, flash, jsonify,
  11. redirect, render_template, request, url_for)
  12. from larigira import forms
  13. from larigira.audiogen import get_audiogenerator
  14. from larigira.config import get_conf
  15. from larigira.entrypoints_utils import get_avail_entrypoints
  16. from larigira.timegen import get_timegenerator, timegenerate
  17. from larigira.timegen_every import FrequencyAlarm
  18. from .suggestions import get_suggestions
  19. db = Blueprint(
  20. "db",
  21. __name__,
  22. url_prefix=get_conf()["ROUTE_PREFIX"] + "/db",
  23. template_folder="templates",
  24. )
  25. def request_wants_json():
  26. best = request.accept_mimetypes.best_match(
  27. ["application/json", "text/html"]
  28. )
  29. return (
  30. best == "application/json"
  31. and request.accept_mimetypes[best]
  32. > request.accept_mimetypes["text/html"]
  33. )
  34. def get_model():
  35. return current_app.larigira.controller.monitor.model
  36. @db.route("/")
  37. def home():
  38. return render_template("dbadmin_base.html")
  39. @db.route("/list")
  40. def events_list():
  41. model = current_app.larigira.controller.monitor.model
  42. alarms = tuple(model.get_all_alarms())
  43. events = [(alarm, model.get_actions_by_alarm(alarm)) for alarm in alarms]
  44. return render_template("list.html", events=events)
  45. @db.route("/calendar")
  46. def events_calendar():
  47. model = current_app.larigira.controller.monitor.model
  48. today = datetime.now().date()
  49. max_days = 30
  50. max_occurrences = get_conf()["UI_CALENDAR_OCCURRENCIES_THRESHOLD"]
  51. # {date: {datetime: [(alarm1,actions1), (alarm2,actions2)]}}
  52. days = defaultdict(lambda: defaultdict(list))
  53. show_all = request.args.get("all", "0") == "1"
  54. for alarm in model.get_all_alarms():
  55. actions = tuple(model.get_actions_by_alarm(alarm))
  56. if not actions:
  57. continue
  58. today_dt = datetime.fromtimestamp(int(today.strftime("%s")))
  59. max_dt = datetime.combine(today_dt + timedelta(days=max_days), time())
  60. occurrences = []
  61. for t in timegenerate(
  62. alarm, now=today_dt, howmany=max_occurrences + 1
  63. ):
  64. if t is None:
  65. break
  66. if t > max_dt:
  67. break
  68. occurrences.append(t)
  69. if not occurrences:
  70. continue
  71. if not show_all and len(occurrences) > max_occurrences:
  72. continue
  73. for t in occurrences:
  74. days[t.date()][t].append((alarm, actions))
  75. # { weeknum: [day1, day2, day3] }
  76. weeks = defaultdict(list)
  77. for d in sorted(days.keys()):
  78. weeks[d.isocalendar()[:2]].append(d)
  79. return render_template(
  80. "calendar.html", days=days, weeks=weeks, show_all=show_all
  81. )
  82. @db.route("/add/time")
  83. def addtime():
  84. kinds = get_avail_entrypoints("larigira.timeform_create")
  85. def gen_info(gen):
  86. return dict(description=getattr(gen, "description", ""))
  87. info = {kind: gen_info(get_timegenerator(kind)) for kind in kinds}
  88. return render_template("add_time.html", kinds=kinds, info=info)
  89. @db.route("/edit/time/<int:alarmid>", methods=["GET", "POST"])
  90. def edit_time(alarmid):
  91. model = get_model()
  92. timespec = model.get_alarm_by_id(alarmid)
  93. kind = timespec["kind"]
  94. Form, receiver = tuple(forms.get_timeform(kind))
  95. form = Form()
  96. if request.method == "GET":
  97. form.populate_from_timespec(timespec)
  98. if request.method == "POST" and form.validate():
  99. data = receiver(form)
  100. model.update_alarm(alarmid, data)
  101. model.reload()
  102. return redirect(
  103. url_for("db.events_calendar", highlight="%d" % alarmid)
  104. )
  105. return render_template(
  106. "add_time_kind.html",
  107. form=form,
  108. kind=kind,
  109. mode="edit",
  110. alarmid=alarmid,
  111. )
  112. def is_xhr():
  113. return request.headers.get("x-requested-with") is not None
  114. @db.route("/add/time/<kind>", methods=["GET", "POST"])
  115. def addtime_kind(kind):
  116. Form, receiver = tuple(forms.get_timeform(kind))
  117. form = Form(csrf_enabled=(not is_xhr()))
  118. if request.method == "POST":
  119. if form.validate():
  120. data = receiver(form)
  121. eid = get_model().add_alarm(data)
  122. if not is_xhr():
  123. return redirect(url_for("db.edit_event", alarmid=eid))
  124. else:
  125. resp = jsonify(alarmid=eid)
  126. resp.status_code = 201
  127. return resp
  128. elif is_xhr():
  129. resp = jsonify(errors=form.errors)
  130. resp.status_code = 400
  131. return resp
  132. return render_template(
  133. "add_time_kind.html", form=form, kind=kind, mode="add"
  134. )
  135. @db.route("/add/audio")
  136. def addaudio():
  137. kinds = get_avail_entrypoints("larigira.audioform_create")
  138. def gen_info(gen):
  139. return dict(description=getattr(gen, "description", ""))
  140. info = {kind: gen_info(get_audiogenerator(kind)) for kind in kinds}
  141. return render_template("add_audio.html", kinds=kinds, info=info)
  142. @db.route("/add/audio/<kind>", methods=["GET", "POST"])
  143. def addaudio_kind(kind):
  144. Form, receiver = tuple(forms.get_audioform(kind))
  145. form = Form(csrf_enabled=(not is_xhr()))
  146. if request.method == "POST":
  147. if form.validate():
  148. data = receiver(form)
  149. model = current_app.larigira.controller.monitor.model
  150. eid = model.add_action(data)
  151. return jsonify(dict(inserted=eid, data=data))
  152. else:
  153. resp = jsonify(errors=form.errors)
  154. resp.status_code = 400
  155. return resp
  156. return render_template(
  157. "add_audio_kind.html",
  158. form=form,
  159. kind=kind,
  160. suggestions=get_suggestions(),
  161. )
  162. @db.route("/edit/audio/<int:actionid>", methods=["GET", "POST"])
  163. def edit_audio(actionid):
  164. model = get_model()
  165. audiospec = model.get_action_by_id(actionid)
  166. kind = audiospec["kind"]
  167. Form, receiver = tuple(forms.get_audioform(kind))
  168. form = Form()
  169. if request.method == "GET":
  170. form.populate_from_audiospec(audiospec)
  171. if request.method == "POST" and form.validate():
  172. data = receiver(form)
  173. model.update_action(actionid, data)
  174. model.reload()
  175. return redirect(url_for("db.events_list"))
  176. return render_template(
  177. "add_audio_kind.html",
  178. form=form,
  179. kind=kind,
  180. mode="edit",
  181. suggestions=get_suggestions(),
  182. )
  183. @db.route("/edit/event/<alarmid>")
  184. def edit_event(alarmid):
  185. model = current_app.larigira.controller.monitor.model
  186. alarm = model.get_alarm_by_id(int(alarmid))
  187. if alarm is None:
  188. abort(404)
  189. allactions = model.get_all_actions()
  190. actions = tuple(model.get_actions_by_alarm(alarm))
  191. return render_template(
  192. "edit_event.html",
  193. alarm=alarm,
  194. all_actions=allactions,
  195. actions=actions,
  196. routeprefix=get_conf()["ROUTE_PREFIX"],
  197. )
  198. @db.route("/api/alarm/<alarmid>/actions", methods=["POST"])
  199. def change_actions(alarmid):
  200. new_actions = request.form.getlist("actions[]")
  201. if new_actions is None:
  202. new_actions = []
  203. model = current_app.larigira.controller.monitor.model
  204. ret = model.update_alarm(
  205. int(alarmid), new_fields={"actions": [int(a) for a in new_actions]}
  206. )
  207. return jsonify(dict(updated=alarmid, ret=ret))
  208. @db.route("/api/alarm/<int:alarmid>/delete", methods=["POST"])
  209. def delete_alarm(alarmid):
  210. model = current_app.larigira.controller.monitor.model
  211. try:
  212. alarm = model.get_alarm_by_id(int(alarmid))
  213. print(alarm["nick"])
  214. model.delete_alarm(alarmid)
  215. except KeyError:
  216. abort(404)
  217. if request_wants_json():
  218. return jsonify(dict(deleted=alarmid))
  219. flash("Evento %d `%s` cancellato" % (alarmid, alarm["nick"]))
  220. return redirect(url_for("db.events_list"))
  221. @db.route("/extra/<path:relpath>")
  222. def static_custom(relpath):
  223. basepath = get_conf()["EXTRA_STATIC_PATH"]
  224. if not basepath:
  225. abort(405)
  226. fpath = os.path.join(basepath, relpath)
  227. print(basepath, fpath)
  228. if not os.path.isfile(fpath):
  229. abort(404, "File non trovato")
  230. mime, _encoding = mimetypes.guess_type(fpath)
  231. return Response(open(fpath, "rb").read(), mimetype=mime)