#!/usr/bin/env python3 """ This script handles phone ringing and provide led notifications, blinking with different patterns. It will handle multiple devices. For maximum configurability without writing crazy configuration file, most things can be "plugged" as commands. To get the status of a device, it will just invoke a program. Write your code there (a simple SQL query? reading a file? rasterisk?) and return one of ring/up/onair/off The state is then applied running another command. Write your script, and make it do HTTP requests, mqtt set, directly control a GPIO... you know it. It does not provide any integration with inotify. However, it will behave nicely if you send it a SIGHUP, so you can just run your favourite inotify-like tool (entr, watchman, you name it) and send it a SIGHUP every time you need it. """ import logging import os import signal import sys import time from argparse import ArgumentParser from multiprocessing import Pipe, Process from pathlib import Path from subprocess import CalledProcessError, Popen, check_output def rotate(lst: list, n: int) -> list: return lst[n:] + lst[:n] # all waveforms must have the same length WAVEFORMS = { "ring": ([True] * 4 + [False] * 4) * 5, "up": ([True] * 1 + [False] * 9) * 4, "onair": [True] * 40, "off": [False] * 40, } assert len({len(waveform) for waveform in WAVEFORMS.values()}) == 1 for wf_name in list(WAVEFORMS.keys()): WAVEFORMS[wf_name] = rotate(WAVEFORMS[wf_name], -2) class LightManager: """Handles a single light.""" def __init__(self, args, tick_duration_s=0.1, device_name="1", device_number=0): self.args = args self.waveforms = WAVEFORMS self.tick_duration_s = tick_duration_s self.device_name = device_name self.device_number = device_number self.log = logging.getLogger( "%s-%s" % (self.__class__.__name__, self.device_name) ) self.stop_pipe_r, self.stop_pipe_w = Pipe(False) def set_light(self, on: bool): self.log.debug("set %s", on) env = os.environ.copy() env["DEVICE_NUMBER"] = str(self.device_number) p = Popen( [str(self.args.bin_set_light.resolve()), "on" if on else "off"], env=env ) p.communicate() def do_waveform(self, status) -> tuple: last_set = None self.log.debug("status=%s", status) sys.stderr.flush() waveform = self.waveforms[status] for i, elem in enumerate(waveform): # if i % 10 == 0: # print(" ", i, end="") # sys.stdout.flush() if last_set != elem: self.set_light(elem) last_set = elem if self.stop_pipe_r.poll(): reason = self.stop_pipe_r.recv() return reason time.sleep(self.tick_duration_s) return ("ok", None) def get_status(self) -> str: try: status = check_output( [str(self.args.bin_state.resolve()), self.device_name] ) except CalledProcessError: status = "off" status = status.lower().strip() if status not in self.waveforms: return "off" return status def run(self): status = self.get_status() while True: # this will do a complete waveform, unless data is written to stop_pipe_w; in which case, it will # return early reason, _ = self.do_waveform(status) if self.args.enable_polling or reason != "ok": status = self.get_status() def refresh(self, reason=("refresh", None)): self.stop_pipe_w.send(reason) EXE_DIR = Path(sys.argv[0]).parent MANAGERS = {} DEVICES = [] def on_sighup(signal, *args): global MANAGERS for dev in MANAGERS: print("refreshing", dev) MANAGERS[dev]["manager"].refresh(("signal", signal)) def parse_devices(s: str) -> list: return s.split(",") def main(): global MANAGERS global DEVICES p = ArgumentParser() p.add_argument("--devices", type=parse_devices, default="1") p.add_argument("--bin-state", type=Path, default=(EXE_DIR / "get-status")) p.add_argument("--bin-set-light", type=Path, default=(EXE_DIR / "set-light")) p.add_argument( "--disable-polling", dest="enable_polling", default=True, action="store_false" ) p.add_argument( "--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="INFO", ) args = p.parse_args() logging.basicConfig(level=args.log_level) DEVICES += args.devices signal.signal(signal.SIGHUP, on_sighup) for i, dev in enumerate(DEVICES): m = LightManager(args, device_name=dev, device_number=i) MANAGERS[dev] = {"manager": m} Process(target=m.run).start() if __name__ == "__main__": main()