apply-state 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #!/usr/bin/env python3
  2. """
  3. This script handles phone ringing and provide led notifications, blinking with different patterns.
  4. It will handle multiple devices.
  5. For maximum configurability without writing crazy configuration file, most things can be "plugged" as
  6. commands.
  7. To get the status of a device, it will just invoke a program. Write your code there (a simple SQL query?
  8. reading a file? rasterisk?) and return one of ring/up/onair/off
  9. The state is then applied running another command. Write your script, and make it do HTTP requests, mqtt set,
  10. directly control a GPIO... you know it.
  11. It does not provide any integration with inotify. However, it will behave nicely if you send it a SIGHUP, so
  12. you can just run your favourite inotify-like tool (entr, watchman, you name it) and send it a SIGHUP every
  13. time you need it.
  14. """
  15. import logging
  16. import os
  17. import signal
  18. import sys
  19. import time
  20. from argparse import ArgumentParser
  21. from multiprocessing import Pipe, Process
  22. from pathlib import Path
  23. from subprocess import CalledProcessError, Popen, check_output
  24. def rotate(lst: list, n: int) -> list:
  25. return lst[n:] + lst[:n]
  26. # all waveforms must have the same length
  27. WAVEFORMS = {
  28. "ring": ([True] * 4 + [False] * 4) * 5,
  29. "up": ([True] * 1 + [False] * 9) * 4,
  30. "onair": [True] * 40,
  31. "off": [False] * 40,
  32. }
  33. assert len({len(waveform) for waveform in WAVEFORMS.values()}) == 1
  34. for wf_name in list(WAVEFORMS.keys()):
  35. WAVEFORMS[wf_name] = rotate(WAVEFORMS[wf_name], -2)
  36. class LightManager:
  37. """Handles a single light."""
  38. def __init__(self, args, tick_duration_s=0.1, device_name="1", device_number=0):
  39. self.args = args
  40. self.waveforms = WAVEFORMS
  41. self.tick_duration_s = tick_duration_s
  42. self.device_name = device_name
  43. self.device_number = device_number
  44. self.log = logging.getLogger(
  45. "%s-%s" % (self.__class__.__name__, self.device_name)
  46. )
  47. self.stop_pipe_r, self.stop_pipe_w = Pipe(False)
  48. def set_light(self, on: bool):
  49. self.log.debug("set %s", on)
  50. env = os.environ.copy()
  51. env["DEVICE_NUMBER"] = str(self.device_number)
  52. p = Popen(
  53. [str(self.args.bin_set_light.resolve()), "on" if on else "off"], env=env
  54. )
  55. p.communicate()
  56. def do_waveform(self, status) -> tuple:
  57. last_set = None
  58. self.log.debug("status=%s", status)
  59. sys.stderr.flush()
  60. waveform = self.waveforms[status]
  61. for i, elem in enumerate(waveform):
  62. # if i % 10 == 0:
  63. # print(" ", i, end="")
  64. # sys.stdout.flush()
  65. if last_set != elem:
  66. self.set_light(elem)
  67. last_set = elem
  68. if self.stop_pipe_r.poll():
  69. reason = self.stop_pipe_r.recv()
  70. return reason
  71. time.sleep(self.tick_duration_s)
  72. return ("ok", None)
  73. def get_status(self) -> str:
  74. try:
  75. status = check_output(
  76. [str(self.args.bin_state.resolve()), self.device_name]
  77. )
  78. except CalledProcessError:
  79. status = "off"
  80. status = status.lower().strip()
  81. if status not in self.waveforms:
  82. return "off"
  83. return status
  84. def run(self):
  85. status = self.get_status()
  86. while True:
  87. # this will do a complete waveform, unless data is written to stop_pipe_w; in which case, it will
  88. # return early
  89. reason, _ = self.do_waveform(status)
  90. if self.args.enable_polling or reason != "ok":
  91. status = self.get_status()
  92. def refresh(self, reason=("refresh", None)):
  93. self.stop_pipe_w.send(reason)
  94. EXE_DIR = Path(sys.argv[0]).parent
  95. MANAGERS = {}
  96. DEVICES = []
  97. def on_sighup(signal, *args):
  98. global MANAGERS
  99. for dev in MANAGERS:
  100. print("refreshing", dev)
  101. MANAGERS[dev]["manager"].refresh(("signal", signal))
  102. def parse_devices(s: str) -> list:
  103. return s.split(",")
  104. def main():
  105. global MANAGERS
  106. global DEVICES
  107. p = ArgumentParser()
  108. p.add_argument("--devices", type=parse_devices, default="1")
  109. p.add_argument("--bin-state", type=Path, default=(EXE_DIR / "get-status"))
  110. p.add_argument("--bin-set-light", type=Path, default=(EXE_DIR / "set-light"))
  111. p.add_argument(
  112. "--disable-polling", dest="enable_polling", default=True, action="store_false"
  113. )
  114. p.add_argument(
  115. "--log-level",
  116. choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
  117. default="INFO",
  118. )
  119. args = p.parse_args()
  120. logging.basicConfig(level=args.log_level)
  121. DEVICES += args.devices
  122. signal.signal(signal.SIGHUP, on_sighup)
  123. for i, dev in enumerate(DEVICES):
  124. m = LightManager(args, device_name=dev, device_number=i)
  125. MANAGERS[dev] = {"manager": m}
  126. Process(target=m.run).start()
  127. if __name__ == "__main__":
  128. main()