#!/usr/bin/env python3 import asyncio import logging import urllib.parse from panoramisk import Manager from panoramisk.actions import Action LOOP = asyncio.get_event_loop() log = logging.getLogger("transferback") async def on_tutto(manager, msg): for prefix in ["Var", "RTC"]: if msg.event.startswith(prefix): return log.debug("... Event %s", msg.event) class BaseApp: def __init__(self, manager, msg): self.manager = manager self.initial_msg = msg self.local_channel = msg.channel self.events = [] self.log = logging.getLogger(self.__class__.__name__) def send_agi_command(self, *args, **kwargs): return self.manager.send_agi_command(self.local_channel, *args, **kwargs) def register_event(self, event, callback): self.events.append((event, self.manager.register_event(event, callback))) def unregister_event(self, event_name, callback=None): if callback is None: for ev, callback in self.events: if ev == event_name: self.unregister_event(event_name, callback) return else: self.events.remove((event_name, callback)) self.manager.callbacks[event_name].remove(callback) def self_destroy(self): for ev, callback in self.events: self.manager.callbacks[ev].remove(callback) def parse_env(self, env: str) -> dict: lines = urllib.parse.unquote(env).split("\n") ret = {} for line in lines: if ": " in line: key, value = line.split(": ", 1) ret[key] = value return ret def filter_channel(channel): def create_func(f): def new_func(manager, event): if event.channel != channel: return return f(manager, event) return new_func return create_func class RefusedError(Exception): pass class TransferAndTakeBack(BaseApp): """ The idea is to provide a method to implement a transfer, and then be able to "revoke" the transfer whenever we want. In our scenario, there is a local user and a remote user. The remote user must have no power. The local user triggers it all. When the local user activates a feature, which calls AGI(agi:sync), this is called. This is the behavior we want: - the remote user is trasferred to a well-known place - the local user gets music - when the local user presses "#", we """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) context = self.initial_msg.context if not self.is_context_auth(context): raise RefusedError("Invalid context: %s" % context) env = self.parse_env(self.initial_msg.env) command_name = env.get("agi_arg_1", "") if not command_name.startswith("mandainonda"): raise RefusedError("Invalid command name: %s" % command_name) self.log.info("init: %s", self.initial_msg) self.log.info("init: %s", command_name) # meant to be overridden by children {{{ def get_destination(self): return "private,9438,1" def get_backroom(self): return "private,9401,1" def get_waiting_room(self): return "private,9401,1" def is_context_auth(self, context): return context == "from-regia" # }}} async def run(self): self.channels = [] self.register_event("CoreShowChannel", self.on_channel) self.register_event("CoreShowChannelsComplete", self.on_channels) self.manager.send_action(Action({"Action": "CoreShowChannels"}), as_list=False) # the response will actually be a series of CoreShowChannel, which will # trigger self.on_channel, followed by a CoreShowChannelsComplete, which # will trigger on_channels def on_channel(self, _, msg): self.channels.append(msg) async def on_channels(self, _, msg): self.unregister_event("CoreShowChannel") self.unregister_event("CoreShowChannelsComplete") # we've got everything in many "on_channel" invocation, now let's check better our_bridge = { m.bridgeid for m in self.channels if m.channel == self.local_channel } self.log.debug("valid channels: %s", str(our_bridge)) assert len(our_bridge) == 1 bridgeid = next(iter(our_bridge)) channels_in_bridge = [ m for m in self.channels if m.bridgeid == bridgeid and m.channel != self.local_channel ] self.remote_channel = channels_in_bridge[0].channel.split(";")[0] self.log.info("FOUND: %s", self.remote_channel) await self.on_remote_channel_found() async def on_remote_channel_found(self): # now we know what is the other channel; this means we can move them wherever we want cmd = "channel redirect {other} {dest}".format( other=self.remote_channel, dest=self.get_destination() ) self.log.debug("CMD = %s", cmd) self.manager.send_command(cmd) # let's wait them somewhere cmd = "channel redirect {channel} {waitingroom}".format( channel=self.local_channel, waitingroom=self.get_waiting_room(), ) self.log.debug("CMD = %s", cmd) self.manager.send_command(cmd) # eventi possibili: # - l'utente locale preme dei tasti # - l'utente locale riaggancia self.register_event("DTMFEnd", filter_channel(self.local_channel)(self.on_dtmf)) self.register_event( "Hangup", filter_channel(self.local_channel)(self.on_hangup) ) self.register_event( "Hangup", filter_channel(self.remote_channel)(self.on_hangup) ) def on_dtmf(self, _, msg): """ when the local user uses dtmf, they can talk back to the outside user """ if msg.digit != "9": return self.unregister_event("DTMFEnd") # a questo punto possiamo procedere a spostare il canale iniziale self.log.info("Local user requested to talk back with the remote user") to_move = [self.remote_channel] if self.get_backroom() != self.get_waiting_room(): to_move.append(self.local_channel) for channel in to_move: cmd = "channel redirect {chan} {newroom}".format( chan=channel, newroom=self.get_backroom(), ) self.log.debug("CMD = %s", cmd) self.manager.send_command(cmd) # we must not self_destroy, or we'll never notice the Hangup assert set(e[0] for e in self.events) == {"Hangup"} def on_hangup(self, _, msg): if msg.channel == self.remote_channel: self.log.info("Remote user hanged up") channel = self.local_channel elif msg.channel == self.local_channel: self.log.info("Local user hanged up") channel = self.remote_channel else: self.log.error( "This should never happen! someone else hanged??? %s", msg.channel ) return cmd = "channel request hangup {channel}".format(channel=channel) self.log.debug("CMD = %s", cmd) self.manager.send_command(cmd) self.self_destroy() def run_app(Cls): async def real_run(manager, msg): try: instance = Cls(manager, msg) except RefusedError as exc: log.info("refused: %s", exc) return await instance.run() return real_run async def init(manager): # manager.send_command('sip show peers') manager.register_event("*", on_tutto) manager.register_event("AsyncAGIStart", run_app(TransferAndTakeBack)) async def on_shutdown(m): pass def main(): logging.basicConfig(level=logging.DEBUG) logging.getLogger("panoramisk.manager").setLevel(logging.WARNING) manager = Manager( loop=LOOP, host="127.0.0.1", port=5038, ssl=False, encoding="utf8", username="admin", secret="secret123password", ping_delay=10, # Delay after start ping_interval=10, # Periodically ping AMI (dead or alive) reconnect_timeout=2, # Timeout reconnect if connection lost ) manager.connect(run_forever=True, on_startup=init, on_shutdown=on_shutdown) if __name__ == "__main__": main()