asterisk-misc/ring_mon/apply-state

164 lines
4.8 KiB
Text
Raw Permalink Normal View History

2022-03-04 02:34:48 +01:00
#!/usr/bin/env python3
"""
This script handles phone ringing and provide led notifications, blinking with different patterns.
It will handle multiple devices.
For maximum configurability without writing crazy configuration file, most things can be "plugged" as
commands.
To get the status of a device, it will just invoke a program. Write your code there (a simple SQL query?
reading a file? rasterisk?) and return one of ring/up/onair/off
The state is then applied running another command. Write your script, and make it do HTTP requests, mqtt set,
directly control a GPIO... you know it.
It does not provide any integration with inotify. However, it will behave nicely if you send it a SIGHUP, so
you can just run your favourite inotify-like tool (entr, watchman, you name it) and send it a SIGHUP every
time you need it.
"""
import logging
2022-03-15 19:09:16 +01:00
import os
2022-03-04 02:34:48 +01:00
import signal
2022-03-15 19:09:16 +01:00
import sys
import time
2022-03-04 02:34:48 +01:00
from argparse import ArgumentParser
2022-03-15 19:09:16 +01:00
from multiprocessing import Pipe, Process
2022-03-04 02:34:48 +01:00
from pathlib import Path
2022-03-15 19:09:16 +01:00
from subprocess import CalledProcessError, Popen, check_output
2022-03-04 02:34:48 +01:00
def rotate(lst: list, n: int) -> list:
return lst[n:] + lst[:n]
# all waveforms must have the same length
WAVEFORMS = {
"ring": ([True] * 4 + [False] * 4) * 5,
"up": ([True] * 1 + [False] * 9) * 4,
"onair": [True] * 40,
"off": [False] * 40,
}
assert len({len(waveform) for waveform in WAVEFORMS.values()}) == 1
for wf_name in list(WAVEFORMS.keys()):
WAVEFORMS[wf_name] = rotate(WAVEFORMS[wf_name], -2)
class LightManager:
"""Handles a single light."""
def __init__(self, args, tick_duration_s=0.1, device_name="1", device_number=0):
self.args = args
self.waveforms = WAVEFORMS
self.tick_duration_s = tick_duration_s
self.device_name = device_name
self.device_number = device_number
self.log = logging.getLogger(
"%s-%s" % (self.__class__.__name__, self.device_name)
)
self.stop_pipe_r, self.stop_pipe_w = Pipe(False)
def set_light(self, on: bool):
2022-03-15 19:09:16 +01:00
self.log.debug("set %s", on)
2022-03-04 02:34:48 +01:00
env = os.environ.copy()
env["DEVICE_NUMBER"] = str(self.device_number)
2022-03-15 19:12:59 +01:00
p = Popen(
[str(self.args.bin_set_light.resolve()), "on" if on else "off"], env=env
)
p.communicate()
2022-03-04 02:34:48 +01:00
def do_waveform(self, status) -> tuple:
last_set = None
self.log.debug("status=%s", status)
sys.stderr.flush()
waveform = self.waveforms[status]
for i, elem in enumerate(waveform):
# if i % 10 == 0:
# print(" ", i, end="")
# sys.stdout.flush()
if last_set != elem:
self.set_light(elem)
last_set = elem
if self.stop_pipe_r.poll():
reason = self.stop_pipe_r.recv()
return reason
time.sleep(self.tick_duration_s)
return ("ok", None)
def get_status(self) -> str:
try:
2022-03-15 19:09:16 +01:00
status = check_output(
[str(self.args.bin_state.resolve()), self.device_name]
)
2022-03-04 02:34:48 +01:00
except CalledProcessError:
2022-03-15 19:09:16 +01:00
status = "off"
2022-03-04 02:34:48 +01:00
2022-03-15 19:09:16 +01:00
status = status.lower().strip()
2022-03-04 02:34:48 +01:00
if status not in self.waveforms:
return "off"
return status
def run(self):
status = self.get_status()
while True:
# this will do a complete waveform, unless data is written to stop_pipe_w; in which case, it will
# return early
reason, _ = self.do_waveform(status)
if self.args.enable_polling or reason != "ok":
status = self.get_status()
def refresh(self, reason=("refresh", None)):
self.stop_pipe_w.send(reason)
EXE_DIR = Path(sys.argv[0]).parent
MANAGERS = {}
DEVICES = []
def on_sighup(signal, *args):
global MANAGERS
for dev in MANAGERS:
print("refreshing", dev)
MANAGERS[dev]["manager"].refresh(("signal", signal))
def parse_devices(s: str) -> list:
return s.split(",")
def main():
global MANAGERS
global DEVICES
p = ArgumentParser()
p.add_argument("--devices", type=parse_devices, default="1")
p.add_argument("--bin-state", type=Path, default=(EXE_DIR / "get-status"))
p.add_argument("--bin-set-light", type=Path, default=(EXE_DIR / "set-light"))
2022-03-15 19:09:16 +01:00
p.add_argument(
"--disable-polling", dest="enable_polling", default=True, action="store_false"
)
2022-03-04 02:34:48 +01:00
p.add_argument(
"--log-level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="INFO",
)
args = p.parse_args()
logging.basicConfig(level=args.log_level)
DEVICES += args.devices
signal.signal(signal.SIGHUP, on_sighup)
for i, dev in enumerate(DEVICES):
m = LightManager(args, device_name=dev, device_number=i)
MANAGERS[dev] = {"manager": m}
Process(target=m.run).start()
if __name__ == "__main__":
main()