Browse Source

set lights blinking appropriately

boyska 2 years ago
parent
commit
5ab34f1eb6
1 changed files with 158 additions and 0 deletions
  1. 158 0
      ring_mon/apply-state

+ 158 - 0
ring_mon/apply-state

@@ -0,0 +1,158 @@
+#!/usr/bin/env python3
+"""
+This script handles phone ringing and provide led notifications, blinking with different patterns.
+
+It will handle multiple devices.
+
+For maximum configurability without writing crazy configuration file, most things can be "plugged" as
+commands.
+To get the status of a device, it will just invoke a program. Write your code there (a simple SQL query?
+reading a file? rasterisk?) and return one of ring/up/onair/off
+
+The state is then applied running another command. Write your script, and make it do HTTP requests, mqtt set,
+directly control a GPIO... you know it.
+
+It does not provide any integration with inotify. However, it will behave nicely if you send it a SIGHUP, so
+you can just run your favourite inotify-like tool (entr, watchman, you name it) and send it a SIGHUP every
+time you need it.
+"""
+
+import sys
+import os
+import logging
+import time
+import signal
+from argparse import ArgumentParser
+from pathlib import Path
+from subprocess import Popen, check_output, CalledProcessError
+from multiprocessing import Process, Pipe
+
+import pyinotify
+
+
+def rotate(lst: list, n: int) -> list:
+    return lst[n:] + lst[:n]
+
+
+# all waveforms must have the same length
+WAVEFORMS = {
+    "ring": ([True] * 4 + [False] * 4) * 5,
+    "up": ([True] * 1 + [False] * 9) * 4,
+    "onair": [True] * 40,
+    "off": [False] * 40,
+}
+assert len({len(waveform) for waveform in WAVEFORMS.values()}) == 1
+for wf_name in list(WAVEFORMS.keys()):
+    WAVEFORMS[wf_name] = rotate(WAVEFORMS[wf_name], -2)
+
+
+class LightManager:
+    """Handles a single light."""
+
+    def __init__(self, args, tick_duration_s=0.1, device_name="1", device_number=0):
+        self.args = args
+        self.waveforms = WAVEFORMS
+        self.tick_duration_s = tick_duration_s
+        self.device_name = device_name
+        self.device_number = device_number
+
+        self.log = logging.getLogger(
+            "%s-%s" % (self.__class__.__name__, self.device_name)
+        )
+
+        self.stop_pipe_r, self.stop_pipe_w = Pipe(False)
+
+    def set_light(self, on: bool):
+        self.log.debug('set %s', on)
+        env = os.environ.copy()
+        env["DEVICE_NUMBER"] = str(self.device_number)
+
+        Popen([str(self.args.bin_set_light.resolve()), "on" if on else "off"], env=env)
+
+    def do_waveform(self, status) -> tuple:
+        last_set = None
+        self.log.debug("status=%s", status)
+        sys.stderr.flush()
+        waveform = self.waveforms[status]
+        for i, elem in enumerate(waveform):
+            # if i % 10 == 0:
+            #     print("  ", i, end="")
+            #     sys.stdout.flush()
+            if last_set != elem:
+                self.set_light(elem)
+                last_set = elem
+            if self.stop_pipe_r.poll():
+                reason = self.stop_pipe_r.recv()
+                return reason
+            time.sleep(self.tick_duration_s)
+        return ("ok", None)
+
+    def get_status(self) -> str:
+        try:
+            status = check_output([str(self.args.bin_state.resolve()), self.device_name])
+        except CalledProcessError:
+            status = 'off'
+
+        status = status.lower().strip().decode('ascii', errors='ignore')
+        if status not in self.waveforms:
+            return "off"
+        return status
+
+    def run(self):
+        status = self.get_status()
+        while True:
+            # this will do a complete waveform, unless data is written to stop_pipe_w; in which case, it will
+            # return early
+            reason, _ = self.do_waveform(status)
+            if self.args.enable_polling or reason != "ok":
+                status = self.get_status()
+
+    def refresh(self, reason=("refresh", None)):
+        self.stop_pipe_w.send(reason)
+
+
+EXE_DIR = Path(sys.argv[0]).parent
+
+MANAGERS = {}
+DEVICES = []
+
+
+def on_sighup(signal, *args):
+    global MANAGERS
+    for dev in MANAGERS:
+        print("refreshing", dev)
+        MANAGERS[dev]["manager"].refresh(("signal", signal))
+
+
+def parse_devices(s: str) -> list:
+    return s.split(",")
+
+
+def main():
+    global MANAGERS
+    global DEVICES
+
+    p = ArgumentParser()
+    p.add_argument("--devices", type=parse_devices, default="1")
+    p.add_argument("--bin-state", type=Path, default=(EXE_DIR / "get-status"))
+    p.add_argument("--bin-set-light", type=Path, default=(EXE_DIR / "set-light"))
+    p.add_argument("--disable-polling", dest='enable_polling', default=True, action='store_false')
+    p.add_argument(
+        "--log-level",
+        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
+        default="INFO",
+    )
+    args = p.parse_args()
+
+    logging.basicConfig(level=args.log_level)
+
+    DEVICES += args.devices
+    signal.signal(signal.SIGHUP, on_sighup)
+    for i, dev in enumerate(DEVICES):
+        m = LightManager(args, device_name=dev, device_number=i)
+        MANAGERS[dev] = {"manager": m}
+        Process(target=m.run).start()
+
+
+if __name__ == "__main__":
+    main()