transfer_back.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. #!/usr/bin/env python3
  2. import asyncio
  3. import logging
  4. import urllib.parse
  5. from panoramisk import Manager
  6. from panoramisk.actions import Action
  7. LOOP = asyncio.get_event_loop()
  8. log = logging.getLogger("transferback")
  9. async def on_tutto(manager, msg):
  10. for prefix in ["Var", "RTC"]:
  11. if msg.event.startswith(prefix):
  12. return
  13. log.debug("... Event %s", msg.event)
  14. class BaseApp:
  15. def __init__(self, manager, msg):
  16. self.manager = manager
  17. self.initial_msg = msg
  18. self.local_channel = msg.channel
  19. self.events = []
  20. self.log = logging.getLogger(self.__class__.__name__)
  21. def send_agi_command(self, *args, **kwargs):
  22. return self.manager.send_agi_command(self.local_channel, *args, **kwargs)
  23. def register_event(self, event, callback):
  24. self.events.append((event, self.manager.register_event(event, callback)))
  25. def unregister_event(self, event_name, callback=None):
  26. if callback is None:
  27. for ev, callback in self.events:
  28. if ev == event_name:
  29. self.unregister_event(event_name, callback)
  30. return
  31. else:
  32. self.events.remove((event_name, callback))
  33. self.manager.callbacks[event_name].remove(callback)
  34. def self_destroy(self):
  35. for ev, callback in self.events:
  36. self.manager.callbacks[ev].remove(callback)
  37. def parse_env(self, env: str) -> dict:
  38. lines = urllib.parse.unquote(env).split("\n")
  39. ret = {}
  40. for line in lines:
  41. if ": " in line:
  42. key, value = line.split(": ", 1)
  43. ret[key] = value
  44. return ret
  45. def filter_channel(channel):
  46. def create_func(f):
  47. def new_func(manager, event):
  48. if event.channel != channel:
  49. return
  50. return f(manager, event)
  51. return new_func
  52. return create_func
  53. class RefusedError(Exception):
  54. pass
  55. class TransferAndTakeBack(BaseApp):
  56. """
  57. The idea is to provide a method to implement a transfer, and then be able to "revoke" the transfer whenever we want.
  58. In our scenario, there is a local user and a remote user.
  59. The remote user must have no power.
  60. The local user triggers it all.
  61. When the local user activates a feature, which calls AGI(agi:sync), this is called.
  62. This is the behavior we want:
  63. - the remote user is trasferred to a well-known place
  64. - the local user gets music
  65. - when the local user presses "#", we
  66. """
  67. def __init__(self, *args, **kwargs):
  68. super().__init__(*args, **kwargs)
  69. context = self.initial_msg.context
  70. if not self.is_context_auth(context):
  71. raise RefusedError("Invalid context: %s" % context)
  72. env = self.parse_env(self.initial_msg.env)
  73. command_name = env.get("agi_arg_1", "")
  74. if not command_name.startswith("mandainonda"):
  75. raise RefusedError("Invalid command name: %s" % command_name)
  76. self.log.info("init: %s", self.initial_msg)
  77. self.log.info("init: %s", command_name)
  78. # meant to be overridden by children {{{
  79. def get_destination(self):
  80. return "private,9438,1"
  81. def get_backroom(self):
  82. return "private,9401,1"
  83. def get_waiting_room(self):
  84. return "private,9401,1"
  85. def is_context_auth(self, context):
  86. return context == "from-regia"
  87. # }}}
  88. async def run(self):
  89. self.channels = []
  90. self.register_event("CoreShowChannel", self.on_channel)
  91. self.register_event("CoreShowChannelsComplete", self.on_channels)
  92. self.manager.send_action(Action({"Action": "CoreShowChannels"}), as_list=False)
  93. # the response will actually be a series of CoreShowChannel, which will
  94. # trigger self.on_channel, followed by a CoreShowChannelsComplete, which
  95. # will trigger on_channels
  96. def on_channel(self, _, msg):
  97. self.channels.append(msg)
  98. async def on_channels(self, _, msg):
  99. self.unregister_event("CoreShowChannel")
  100. self.unregister_event("CoreShowChannelsComplete")
  101. # we've got everything in many "on_channel" invocation, now let's check better
  102. our_bridge = {
  103. m.bridgeid for m in self.channels if m.channel == self.local_channel
  104. }
  105. self.log.debug("valid channels: %s", str(our_bridge))
  106. assert len(our_bridge) == 1
  107. bridgeid = next(iter(our_bridge))
  108. channels_in_bridge = [
  109. m
  110. for m in self.channels
  111. if m.bridgeid == bridgeid and m.channel != self.local_channel
  112. ]
  113. self.remote_channel = channels_in_bridge[0].channel.split(";")[0]
  114. self.log.info("FOUND: %s", self.remote_channel)
  115. await self.on_remote_channel_found()
  116. async def on_remote_channel_found(self):
  117. # now we know what is the other channel; this means we can move them wherever we want
  118. cmd = "channel redirect {other} {dest}".format(
  119. other=self.remote_channel, dest=self.get_destination()
  120. )
  121. self.log.debug("CMD = %s", cmd)
  122. self.manager.send_command(cmd)
  123. # let's wait them somewhere
  124. cmd = "channel redirect {channel} {waitingroom}".format(
  125. channel=self.local_channel,
  126. waitingroom=self.get_waiting_room(),
  127. )
  128. self.log.debug("CMD = %s", cmd)
  129. self.manager.send_command(cmd)
  130. # eventi possibili:
  131. # - l'utente locale preme dei tasti
  132. # - l'utente locale riaggancia
  133. self.register_event("DTMFEnd", filter_channel(self.local_channel)(self.on_dtmf))
  134. self.register_event(
  135. "Hangup", filter_channel(self.local_channel)(self.on_hangup)
  136. )
  137. self.register_event(
  138. "Hangup", filter_channel(self.remote_channel)(self.on_hangup)
  139. )
  140. def on_dtmf(self, _, msg):
  141. """
  142. when the local user uses dtmf, they can talk back to the outside user
  143. """
  144. if msg.digit != "9":
  145. return
  146. self.unregister_event("DTMFEnd")
  147. # a questo punto possiamo procedere a spostare il canale iniziale
  148. self.log.info("Local user requested to talk back with the remote user")
  149. to_move = [self.remote_channel]
  150. if self.get_backroom() != self.get_waiting_room():
  151. to_move.append(self.local_channel)
  152. for channel in to_move:
  153. cmd = "channel redirect {chan} {newroom}".format(
  154. chan=channel,
  155. newroom=self.get_backroom(),
  156. )
  157. self.log.debug("CMD = %s", cmd)
  158. self.manager.send_command(cmd)
  159. # we must not self_destroy, or we'll never notice the Hangup
  160. assert set(e[0] for e in self.events) == {"Hangup"}
  161. def on_hangup(self, _, msg):
  162. if msg.channel == self.remote_channel:
  163. self.log.info("Remote user hanged up")
  164. channel = self.local_channel
  165. elif msg.channel == self.local_channel:
  166. self.log.info("Local user hanged up")
  167. channel = self.remote_channel
  168. else:
  169. self.log.error(
  170. "This should never happen! someone else hanged??? %s", msg.channel
  171. )
  172. return
  173. cmd = "channel request hangup {channel}".format(channel=channel)
  174. self.log.debug("CMD = %s", cmd)
  175. self.manager.send_command(cmd)
  176. self.self_destroy()
  177. def run_app(Cls):
  178. async def real_run(manager, msg):
  179. try:
  180. instance = Cls(manager, msg)
  181. except RefusedError as exc:
  182. log.info("refused: %s", exc)
  183. return
  184. await instance.run()
  185. return real_run
  186. async def init(manager):
  187. # manager.send_command('sip show peers')
  188. manager.register_event("*", on_tutto)
  189. manager.register_event("AsyncAGIStart", run_app(TransferAndTakeBack))
  190. async def on_shutdown(m):
  191. pass
  192. def main():
  193. logging.basicConfig(level=logging.DEBUG)
  194. logging.getLogger("panoramisk.manager").setLevel(logging.WARNING)
  195. manager = Manager(
  196. loop=LOOP,
  197. host="127.0.0.1",
  198. port=5038,
  199. ssl=False,
  200. encoding="utf8",
  201. username="admin",
  202. secret="secret123password",
  203. ping_delay=10, # Delay after start
  204. ping_interval=10, # Periodically ping AMI (dead or alive)
  205. reconnect_timeout=2, # Timeout reconnect if connection lost
  206. )
  207. manager.connect(run_forever=True, on_startup=init, on_shutdown=on_shutdown)
  208. if __name__ == "__main__":
  209. main()