123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- #!/usr/bin/env python3
- """
- This script handles phone ringing and provide led notifications, blinking with different patterns.
- It will handle multiple devices.
- For maximum configurability without writing crazy configuration file, most things can be "plugged" as
- commands.
- To get the status of a device, it will just invoke a program. Write your code there (a simple SQL query?
- reading a file? rasterisk?) and return one of ring/up/onair/off
- The state is then applied running another command. Write your script, and make it do HTTP requests, mqtt set,
- directly control a GPIO... you know it.
- It does not provide any integration with inotify. However, it will behave nicely if you send it a SIGHUP, so
- you can just run your favourite inotify-like tool (entr, watchman, you name it) and send it a SIGHUP every
- time you need it.
- """
- import logging
- import os
- import signal
- import sys
- import time
- from argparse import ArgumentParser
- from multiprocessing import Pipe, Process
- from pathlib import Path
- from subprocess import CalledProcessError, Popen, check_output
- def rotate(lst: list, n: int) -> list:
- return lst[n:] + lst[:n]
- # all waveforms must have the same length
- WAVEFORMS = {
- "ring": ([True] * 4 + [False] * 4) * 5,
- "up": ([True] * 1 + [False] * 9) * 4,
- "onair": [True] * 40,
- "off": [False] * 40,
- }
- assert len({len(waveform) for waveform in WAVEFORMS.values()}) == 1
- for wf_name in list(WAVEFORMS.keys()):
- WAVEFORMS[wf_name] = rotate(WAVEFORMS[wf_name], -2)
- class LightManager:
- """Handles a single light."""
- def __init__(self, args, tick_duration_s=0.1, device_name="1", device_number=0):
- self.args = args
- self.waveforms = WAVEFORMS
- self.tick_duration_s = tick_duration_s
- self.device_name = device_name
- self.device_number = device_number
- self.log = logging.getLogger(
- "%s-%s" % (self.__class__.__name__, self.device_name)
- )
- self.stop_pipe_r, self.stop_pipe_w = Pipe(False)
- def set_light(self, on: bool):
- self.log.debug("set %s", on)
- env = os.environ.copy()
- env["DEVICE_NUMBER"] = str(self.device_number)
- p = Popen(
- [str(self.args.bin_set_light.resolve()), "on" if on else "off"], env=env
- )
- p.communicate()
- def do_waveform(self, status) -> tuple:
- last_set = None
- self.log.debug("status=%s", status)
- sys.stderr.flush()
- waveform = self.waveforms[status]
- for i, elem in enumerate(waveform):
- # if i % 10 == 0:
- # print(" ", i, end="")
- # sys.stdout.flush()
- if last_set != elem:
- self.set_light(elem)
- last_set = elem
- if self.stop_pipe_r.poll():
- reason = self.stop_pipe_r.recv()
- return reason
- time.sleep(self.tick_duration_s)
- return ("ok", None)
- def get_status(self) -> str:
- try:
- status = check_output(
- [str(self.args.bin_state.resolve()), self.device_name]
- )
- except CalledProcessError:
- status = "off"
- status = status.lower().strip()
- if status not in self.waveforms:
- return "off"
- return status
- def run(self):
- status = self.get_status()
- while True:
- # this will do a complete waveform, unless data is written to stop_pipe_w; in which case, it will
- # return early
- reason, _ = self.do_waveform(status)
- if self.args.enable_polling or reason != "ok":
- status = self.get_status()
- def refresh(self, reason=("refresh", None)):
- self.stop_pipe_w.send(reason)
- EXE_DIR = Path(sys.argv[0]).parent
- MANAGERS = {}
- DEVICES = []
- def on_sighup(signal, *args):
- global MANAGERS
- for dev in MANAGERS:
- print("refreshing", dev)
- MANAGERS[dev]["manager"].refresh(("signal", signal))
- def parse_devices(s: str) -> list:
- return s.split(",")
- def main():
- global MANAGERS
- global DEVICES
- p = ArgumentParser()
- p.add_argument("--devices", type=parse_devices, default="1")
- p.add_argument("--bin-state", type=Path, default=(EXE_DIR / "get-status"))
- p.add_argument("--bin-set-light", type=Path, default=(EXE_DIR / "set-light"))
- p.add_argument(
- "--disable-polling", dest="enable_polling", default=True, action="store_false"
- )
- p.add_argument(
- "--log-level",
- choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
- default="INFO",
- )
- args = p.parse_args()
- logging.basicConfig(level=args.log_level)
- DEVICES += args.devices
- signal.signal(signal.SIGHUP, on_sighup)
- for i, dev in enumerate(DEVICES):
- m = LightManager(args, device_name=dev, device_number=i)
- MANAGERS[dev] = {"manager": m}
- Process(target=m.run).start()
- if __name__ == "__main__":
- main()
|