asterisk-misc/ring_mon/apply-state
2022-03-04 02:34:48 +01:00

158 lines
4.8 KiB
Python
Executable file

#!/usr/bin/env python3
"""
This script handles phone ringing and provide led notifications, blinking with different patterns.
It will handle multiple devices.
For maximum configurability without writing crazy configuration file, most things can be "plugged" as
commands.
To get the status of a device, it will just invoke a program. Write your code there (a simple SQL query?
reading a file? rasterisk?) and return one of ring/up/onair/off
The state is then applied running another command. Write your script, and make it do HTTP requests, mqtt set,
directly control a GPIO... you know it.
It does not provide any integration with inotify. However, it will behave nicely if you send it a SIGHUP, so
you can just run your favourite inotify-like tool (entr, watchman, you name it) and send it a SIGHUP every
time you need it.
"""
import sys
import os
import logging
import time
import signal
from argparse import ArgumentParser
from pathlib import Path
from subprocess import Popen, check_output, CalledProcessError
from multiprocessing import Process, Pipe
import pyinotify
def rotate(lst: list, n: int) -> list:
return lst[n:] + lst[:n]
# all waveforms must have the same length
WAVEFORMS = {
"ring": ([True] * 4 + [False] * 4) * 5,
"up": ([True] * 1 + [False] * 9) * 4,
"onair": [True] * 40,
"off": [False] * 40,
}
assert len({len(waveform) for waveform in WAVEFORMS.values()}) == 1
for wf_name in list(WAVEFORMS.keys()):
WAVEFORMS[wf_name] = rotate(WAVEFORMS[wf_name], -2)
class LightManager:
"""Handles a single light."""
def __init__(self, args, tick_duration_s=0.1, device_name="1", device_number=0):
self.args = args
self.waveforms = WAVEFORMS
self.tick_duration_s = tick_duration_s
self.device_name = device_name
self.device_number = device_number
self.log = logging.getLogger(
"%s-%s" % (self.__class__.__name__, self.device_name)
)
self.stop_pipe_r, self.stop_pipe_w = Pipe(False)
def set_light(self, on: bool):
self.log.debug('set %s', on)
env = os.environ.copy()
env["DEVICE_NUMBER"] = str(self.device_number)
Popen([str(self.args.bin_set_light.resolve()), "on" if on else "off"], env=env)
def do_waveform(self, status) -> tuple:
last_set = None
self.log.debug("status=%s", status)
sys.stderr.flush()
waveform = self.waveforms[status]
for i, elem in enumerate(waveform):
# if i % 10 == 0:
# print(" ", i, end="")
# sys.stdout.flush()
if last_set != elem:
self.set_light(elem)
last_set = elem
if self.stop_pipe_r.poll():
reason = self.stop_pipe_r.recv()
return reason
time.sleep(self.tick_duration_s)
return ("ok", None)
def get_status(self) -> str:
try:
status = check_output([str(self.args.bin_state.resolve()), self.device_name])
except CalledProcessError:
status = 'off'
status = status.lower().strip().decode('ascii', errors='ignore')
if status not in self.waveforms:
return "off"
return status
def run(self):
status = self.get_status()
while True:
# this will do a complete waveform, unless data is written to stop_pipe_w; in which case, it will
# return early
reason, _ = self.do_waveform(status)
if self.args.enable_polling or reason != "ok":
status = self.get_status()
def refresh(self, reason=("refresh", None)):
self.stop_pipe_w.send(reason)
EXE_DIR = Path(sys.argv[0]).parent
MANAGERS = {}
DEVICES = []
def on_sighup(signal, *args):
global MANAGERS
for dev in MANAGERS:
print("refreshing", dev)
MANAGERS[dev]["manager"].refresh(("signal", signal))
def parse_devices(s: str) -> list:
return s.split(",")
def main():
global MANAGERS
global DEVICES
p = ArgumentParser()
p.add_argument("--devices", type=parse_devices, default="1")
p.add_argument("--bin-state", type=Path, default=(EXE_DIR / "get-status"))
p.add_argument("--bin-set-light", type=Path, default=(EXE_DIR / "set-light"))
p.add_argument("--disable-polling", dest='enable_polling', default=True, action='store_false')
p.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="INFO",
)
args = p.parse_args()
logging.basicConfig(level=args.log_level)
DEVICES += args.devices
signal.signal(signal.SIGHUP, on_sighup)
for i, dev in enumerate(DEVICES):
m = LightManager(args, device_name=dev, device_number=i)
MANAGERS[dev] = {"manager": m}
Process(target=m.run).start()
if __name__ == "__main__":
main()