apply-state 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. #!/usr/bin/env python3
  2. """
  3. This script handles phone ringing and provide led notifications, blinking with different patterns.
  4. It will handle multiple devices.
  5. For maximum configurability without writing crazy configuration file, most things can be "plugged" as
  6. commands.
  7. To get the status of a device, it will just invoke a program. Write your code there (a simple SQL query?
  8. reading a file? rasterisk?) and return one of ring/up/onair/off
  9. The state is then applied running another command. Write your script, and make it do HTTP requests, mqtt set,
  10. directly control a GPIO... you know it.
  11. It does not provide any integration with inotify. However, it will behave nicely if you send it a SIGHUP, so
  12. you can just run your favourite inotify-like tool (entr, watchman, you name it) and send it a SIGHUP every
  13. time you need it.
  14. """
  15. import sys
  16. import os
  17. import logging
  18. import time
  19. import signal
  20. from argparse import ArgumentParser
  21. from pathlib import Path
  22. from subprocess import Popen, check_output, CalledProcessError
  23. from multiprocessing import Process, Pipe
  24. def rotate(lst: list, n: int) -> list:
  25. return lst[n:] + lst[:n]
  26. # all waveforms must have the same length
  27. WAVEFORMS = {
  28. "ring": ([True] * 4 + [False] * 4) * 5,
  29. "up": ([True] * 1 + [False] * 9) * 4,
  30. "onair": [True] * 40,
  31. "off": [False] * 40,
  32. }
  33. assert len({len(waveform) for waveform in WAVEFORMS.values()}) == 1
  34. for wf_name in list(WAVEFORMS.keys()):
  35. WAVEFORMS[wf_name] = rotate(WAVEFORMS[wf_name], -2)
  36. class LightManager:
  37. """Handles a single light."""
  38. def __init__(self, args, tick_duration_s=0.1, device_name="1", device_number=0):
  39. self.args = args
  40. self.waveforms = WAVEFORMS
  41. self.tick_duration_s = tick_duration_s
  42. self.device_name = device_name
  43. self.device_number = device_number
  44. self.log = logging.getLogger(
  45. "%s-%s" % (self.__class__.__name__, self.device_name)
  46. )
  47. self.stop_pipe_r, self.stop_pipe_w = Pipe(False)
  48. def set_light(self, on: bool):
  49. self.log.debug('set %s', on)
  50. env = os.environ.copy()
  51. env["DEVICE_NUMBER"] = str(self.device_number)
  52. Popen([str(self.args.bin_set_light.resolve()), "on" if on else "off"], env=env)
  53. def do_waveform(self, status) -> tuple:
  54. last_set = None
  55. self.log.debug("status=%s", status)
  56. sys.stderr.flush()
  57. waveform = self.waveforms[status]
  58. for i, elem in enumerate(waveform):
  59. # if i % 10 == 0:
  60. # print(" ", i, end="")
  61. # sys.stdout.flush()
  62. if last_set != elem:
  63. self.set_light(elem)
  64. last_set = elem
  65. if self.stop_pipe_r.poll():
  66. reason = self.stop_pipe_r.recv()
  67. return reason
  68. time.sleep(self.tick_duration_s)
  69. return ("ok", None)
  70. def get_status(self) -> str:
  71. try:
  72. status = check_output([str(self.args.bin_state.resolve()), self.device_name])
  73. except CalledProcessError:
  74. status = 'off'
  75. status = status.lower().strip().decode('ascii', errors='ignore')
  76. if status not in self.waveforms:
  77. return "off"
  78. return status
  79. def run(self):
  80. status = self.get_status()
  81. while True:
  82. # this will do a complete waveform, unless data is written to stop_pipe_w; in which case, it will
  83. # return early
  84. reason, _ = self.do_waveform(status)
  85. if self.args.enable_polling or reason != "ok":
  86. status = self.get_status()
  87. def refresh(self, reason=("refresh", None)):
  88. self.stop_pipe_w.send(reason)
  89. EXE_DIR = Path(sys.argv[0]).parent
  90. MANAGERS = {}
  91. DEVICES = []
  92. def on_sighup(signal, *args):
  93. global MANAGERS
  94. for dev in MANAGERS:
  95. print("refreshing", dev)
  96. MANAGERS[dev]["manager"].refresh(("signal", signal))
  97. def parse_devices(s: str) -> list:
  98. return s.split(",")
  99. def main():
  100. global MANAGERS
  101. global DEVICES
  102. p = ArgumentParser()
  103. p.add_argument("--devices", type=parse_devices, default="1")
  104. p.add_argument("--bin-state", type=Path, default=(EXE_DIR / "get-status"))
  105. p.add_argument("--bin-set-light", type=Path, default=(EXE_DIR / "set-light"))
  106. p.add_argument("--disable-polling", dest='enable_polling', default=True, action='store_false')
  107. p.add_argument(
  108. "--log-level",
  109. choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
  110. default="INFO",
  111. )
  112. args = p.parse_args()
  113. logging.basicConfig(level=args.log_level)
  114. DEVICES += args.devices
  115. signal.signal(signal.SIGHUP, on_sighup)
  116. for i, dev in enumerate(DEVICES):
  117. m = LightManager(args, device_name=dev, device_number=i)
  118. MANAGERS[dev] = {"manager": m}
  119. Process(target=m.run).start()
  120. if __name__ == "__main__":
  121. main()