diff --git a/ring_mon/apply-state b/ring_mon/apply-state new file mode 100755 index 0000000..680b078 --- /dev/null +++ b/ring_mon/apply-state @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +""" +This script handles phone ringing and provide led notifications, blinking with different patterns. + +It will handle multiple devices. + +For maximum configurability without writing crazy configuration file, most things can be "plugged" as +commands. +To get the status of a device, it will just invoke a program. Write your code there (a simple SQL query? +reading a file? rasterisk?) and return one of ring/up/onair/off + +The state is then applied running another command. Write your script, and make it do HTTP requests, mqtt set, +directly control a GPIO... you know it. + +It does not provide any integration with inotify. However, it will behave nicely if you send it a SIGHUP, so +you can just run your favourite inotify-like tool (entr, watchman, you name it) and send it a SIGHUP every +time you need it. +""" + +import sys +import os +import logging +import time +import signal +from argparse import ArgumentParser +from pathlib import Path +from subprocess import Popen, check_output, CalledProcessError +from multiprocessing import Process, Pipe + +import pyinotify + + +def rotate(lst: list, n: int) -> list: + return lst[n:] + lst[:n] + + +# all waveforms must have the same length +WAVEFORMS = { + "ring": ([True] * 4 + [False] * 4) * 5, + "up": ([True] * 1 + [False] * 9) * 4, + "onair": [True] * 40, + "off": [False] * 40, +} +assert len({len(waveform) for waveform in WAVEFORMS.values()}) == 1 +for wf_name in list(WAVEFORMS.keys()): + WAVEFORMS[wf_name] = rotate(WAVEFORMS[wf_name], -2) + + +class LightManager: + """Handles a single light.""" + + def __init__(self, args, tick_duration_s=0.1, device_name="1", device_number=0): + self.args = args + self.waveforms = WAVEFORMS + self.tick_duration_s = tick_duration_s + self.device_name = device_name + self.device_number = device_number + + self.log = logging.getLogger( + "%s-%s" % (self.__class__.__name__, self.device_name) + ) + + self.stop_pipe_r, self.stop_pipe_w = Pipe(False) + + def set_light(self, on: bool): + self.log.debug('set %s', on) + env = os.environ.copy() + env["DEVICE_NUMBER"] = str(self.device_number) + + Popen([str(self.args.bin_set_light.resolve()), "on" if on else "off"], env=env) + + def do_waveform(self, status) -> tuple: + last_set = None + self.log.debug("status=%s", status) + sys.stderr.flush() + waveform = self.waveforms[status] + for i, elem in enumerate(waveform): + # if i % 10 == 0: + # print(" ", i, end="") + # sys.stdout.flush() + if last_set != elem: + self.set_light(elem) + last_set = elem + if self.stop_pipe_r.poll(): + reason = self.stop_pipe_r.recv() + return reason + time.sleep(self.tick_duration_s) + return ("ok", None) + + def get_status(self) -> str: + try: + status = check_output([str(self.args.bin_state.resolve()), self.device_name]) + except CalledProcessError: + status = 'off' + + status = status.lower().strip().decode('ascii', errors='ignore') + if status not in self.waveforms: + return "off" + return status + + def run(self): + status = self.get_status() + while True: + # this will do a complete waveform, unless data is written to stop_pipe_w; in which case, it will + # return early + reason, _ = self.do_waveform(status) + if self.args.enable_polling or reason != "ok": + status = self.get_status() + + def refresh(self, reason=("refresh", None)): + self.stop_pipe_w.send(reason) + + +EXE_DIR = Path(sys.argv[0]).parent + +MANAGERS = {} +DEVICES = [] + + +def on_sighup(signal, *args): + global MANAGERS + for dev in MANAGERS: + print("refreshing", dev) + MANAGERS[dev]["manager"].refresh(("signal", signal)) + + +def parse_devices(s: str) -> list: + return s.split(",") + + +def main(): + global MANAGERS + global DEVICES + + p = ArgumentParser() + p.add_argument("--devices", type=parse_devices, default="1") + p.add_argument("--bin-state", type=Path, default=(EXE_DIR / "get-status")) + p.add_argument("--bin-set-light", type=Path, default=(EXE_DIR / "set-light")) + p.add_argument("--disable-polling", dest='enable_polling', default=True, action='store_false') + p.add_argument( + "--log-level", + choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + default="INFO", + ) + args = p.parse_args() + + logging.basicConfig(level=args.log_level) + + DEVICES += args.devices + signal.signal(signal.SIGHUP, on_sighup) + for i, dev in enumerate(DEVICES): + m = LightManager(args, device_name=dev, device_number=i) + MANAGERS[dev] = {"manager": m} + Process(target=m.run).start() + + +if __name__ == "__main__": + main()