From 88ab4af511c675f6d9ecf9fb594edae0c6bc9561 Mon Sep 17 00:00:00 2001 From: boyska Date: Sat, 19 Feb 2022 16:58:38 +0100 Subject: [PATCH] + robustness - check if _we_ have been invoked indeed - supports PJSIP (different details in channels and bridges) --- transfer_back.py | 190 ++++++++++++++++++++++++++++------------------- 1 file changed, 115 insertions(+), 75 deletions(-) diff --git a/transfer_back.py b/transfer_back.py index 76a2a65..ad3764e 100755 --- a/transfer_back.py +++ b/transfer_back.py @@ -3,19 +3,22 @@ import asyncio import logging +import urllib.parse from panoramisk import Manager from panoramisk.actions import Action -LOOP=asyncio.get_event_loop() +LOOP = asyncio.get_event_loop() + +log = logging.getLogger("transferback") -log = logging.getLogger('transferback') async def on_tutto(manager, msg): - for prefix in ['Var', 'RTC']: + for prefix in ["Var", "RTC"]: if msg.event.startswith(prefix): return - log.debug('... Event %s', msg.event) + log.debug("... Event %s", msg.event) + class BaseApp: def __init__(self, manager, msg): @@ -29,8 +32,7 @@ class BaseApp: return self.manager.send_agi_command(self.local_channel, *args, **kwargs) def register_event(self, event, callback): - self.events.append( - (event, self.manager.register_event(event, callback))) + self.events.append((event, self.manager.register_event(event, callback))) def unregister_event(self, event_name, callback=None): if callback is None: @@ -46,21 +48,34 @@ class BaseApp: for ev, callback in self.events: self.manager.callbacks[ev].remove(callback) + def parse_env(self, env: str) -> dict: + lines = urllib.parse.unquote(env).split("\n") + ret = {} + for line in lines: + if ": " in line: + key, value = line.split(": ", 1) + ret[key] = value + return ret + + def filter_channel(channel): def create_func(f): def new_func(manager, event): if event.channel != channel: return return f(manager, event) + return new_func + return create_func class RefusedError(Exception): pass + class TransferAndTakeBack(BaseApp): - ''' + """ The idea is to provide a method to implement a transfer, and then be able to "revoke" the transfer whenever we want. In our scenario, there is a local user and a remote user. @@ -72,88 +87,104 @@ class TransferAndTakeBack(BaseApp): This is the behavior we want: - the remote user is trasferred to a well-known place - the local user gets music - - when the local user presses "#", we + - when the local user presses "#", we + + """ - ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - if not self.is_context_auth(self.initial_msg.context): - raise RefusedError("Invalid context") - self.log.info('%s init: %s', self.__class__.__name__, self.initial_msg) - + context = self.initial_msg.context + if not self.is_context_auth(context): + raise RefusedError("Invalid context: %s" % context) + env = self.parse_env(self.initial_msg.env) + command_name = env.get("agi_arg_1", "") + if not command_name.startswith("mandainonda"): + raise RefusedError("Invalid command name: %s" % command_name) + self.log.info("init: %s", self.initial_msg) + self.log.info("init: %s", command_name) # meant to be overridden by children {{{ def get_destination(self): - return 'private,9438,1' + return "private,9438,1" def get_backroom(self): - return 'private,9401,1' + return "private,9401,1" def get_waiting_room(self): - return 'private,9401,1' + return "private,9401,1" def is_context_auth(self, context): - return context == 'from-regia' + return context == "from-regia" # }}} async def run(self): self.channels = [] - self.register_event('CoreShowChannel', self.on_channel) - self.register_event('CoreShowChannelsComplete', self.on_channels) - self.manager.send_action(Action({'Action': 'CoreShowChannels'}), as_list=False) - # the response will actually be a series of CoreShowChannel, which will trigger self.on_channel + self.register_event("CoreShowChannel", self.on_channel) + self.register_event("CoreShowChannelsComplete", self.on_channels) + self.manager.send_action(Action({"Action": "CoreShowChannels"}), as_list=False) + # the response will actually be a series of CoreShowChannel, which will + # trigger self.on_channel, followed by a CoreShowChannelsComplete, which + # will trigger on_channels def on_channel(self, _, msg): self.channels.append(msg) async def on_channels(self, _, msg): - self.unregister_event('CoreShowChannel') - self.unregister_event('CoreShowChannelsComplete') + self.unregister_event("CoreShowChannel") + self.unregister_event("CoreShowChannelsComplete") # we've got everything in many "on_channel" invocation, now let's check better - our_channel = [m for m in self.channels if m.channel == self.local_channel] - assert len(our_channel) == 1 - our_channel = our_channel[0] - bridgeid = our_channel.bridgeid + our_bridge = { + m.bridgeid for m in self.channels if m.channel == self.local_channel + } + self.log.debug("valid channels: %s", str(our_bridge)) + assert len(our_bridge) == 1 + bridgeid = next(iter(our_bridge)) - channels_in_bridge = [m for m in self.channels if m.bridgeid == bridgeid and m.channel != self.local_channel] - self.remote_channel = channels_in_bridge[0].channel.split(';')[0] + channels_in_bridge = [ + m + for m in self.channels + if m.bridgeid == bridgeid and m.channel != self.local_channel + ] + self.remote_channel = channels_in_bridge[0].channel.split(";")[0] - self.log.info('FOUND: %s', self.remote_channel) + self.log.info("FOUND: %s", self.remote_channel) await self.on_remote_channel_found() async def on_remote_channel_found(self): # now we know what is the other channel; this means we can move them wherever we want - cmd = 'channel redirect {other} {dest}'.format( - other=self.remote_channel, - dest=self.get_destination() - ) - self.log.debug('CMD = %s', cmd) + cmd = "channel redirect {other} {dest}".format( + other=self.remote_channel, dest=self.get_destination() + ) + self.log.debug("CMD = %s", cmd) self.manager.send_command(cmd) # let's wait them somewhere - cmd = 'channel redirect {channel} {waitingroom}'.format( - channel=self.local_channel, - waitingroom=self.get_waiting_room(), - ) - self.log.debug('CMD = %s', cmd) + cmd = "channel redirect {channel} {waitingroom}".format( + channel=self.local_channel, + waitingroom=self.get_waiting_room(), + ) + self.log.debug("CMD = %s", cmd) self.manager.send_command(cmd) # eventi possibili: # - l'utente locale preme dei tasti # - l'utente locale riaggancia - self.register_event('DTMFEnd', filter_channel(self.local_channel)(self.on_dtmf)) - self.register_event('Hangup', filter_channel(self.local_channel)(self.on_hangup)) - self.register_event('Hangup', filter_channel(self.remote_channel)(self.on_hangup)) - + self.register_event("DTMFEnd", filter_channel(self.local_channel)(self.on_dtmf)) + self.register_event( + "Hangup", filter_channel(self.local_channel)(self.on_hangup) + ) + self.register_event( + "Hangup", filter_channel(self.remote_channel)(self.on_hangup) + ) def on_dtmf(self, _, msg): - ''' + """ when the local user uses dtmf, they can talk back to the outside user - ''' - if msg.digit != '9': + """ + if msg.digit != "9": return - self.unregister_event('DTMFEnd') + self.unregister_event("DTMFEnd") # a questo punto possiamo procedere a spostare il canale iniziale self.log.info("Local user requested to talk back with the remote user") @@ -162,66 +193,75 @@ class TransferAndTakeBack(BaseApp): to_move.append(self.local_channel) for channel in to_move: - cmd = 'channel redirect {chan} {newroom}'.format( - chan=channel, + cmd = "channel redirect {chan} {newroom}".format( + chan=channel, newroom=self.get_backroom(), - ) - self.log.debug('CMD = %s', cmd) + ) + self.log.debug("CMD = %s", cmd) self.manager.send_command(cmd) # we must not self_destroy, or we'll never notice the Hangup - assert set(e[0] for e in self.events) == {'Hangup'} + assert set(e[0] for e in self.events) == {"Hangup"} def on_hangup(self, _, msg): if msg.channel == self.remote_channel: - self.log.info('Remote user hanged up') + self.log.info("Remote user hanged up") channel = self.local_channel elif msg.channel == self.local_channel: - self.log.info('Local user hanged up') + self.log.info("Local user hanged up") channel = self.remote_channel else: - self.log.error('This should never happen! someone else hanged??? %s', msg.channel) + self.log.error( + "This should never happen! someone else hanged??? %s", msg.channel + ) return - cmd = 'channel request hangup {channel}'.format(channel=channel) - self.log.debug('CMD = %s', cmd) + cmd = "channel request hangup {channel}".format(channel=channel) + self.log.debug("CMD = %s", cmd) self.manager.send_command(cmd) self.self_destroy() + def run_app(Cls): async def real_run(manager, msg): try: instance = Cls(manager, msg) - except RefusedError: + except RefusedError as exc: + log.info("refused: %s", exc) return await instance.run() + return real_run async def init(manager): # manager.send_command('sip show peers') - manager.register_event('*', on_tutto) - manager.register_event('AsyncAGIStart', run_app(TransferAndTakeBack)) - + manager.register_event("*", on_tutto) + manager.register_event("AsyncAGIStart", run_app(TransferAndTakeBack)) + + async def on_shutdown(m): pass + def main(): - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) + logging.getLogger("panoramisk.manager").setLevel(logging.WARNING) manager = Manager( - loop=LOOP, - host='127.0.0.1', - port=5038, - ssl=False, - encoding='utf8', - username='admin', - secret='secret123password', - ping_delay=10, # Delay after start - ping_interval=10, # Periodically ping AMI (dead or alive) - reconnect_timeout=2, # Timeout reconnect if connection lost - ) + loop=LOOP, + host="127.0.0.1", + port=5038, + ssl=False, + encoding="utf8", + username="admin", + secret="secret123password", + ping_delay=10, # Delay after start + ping_interval=10, # Periodically ping AMI (dead or alive) + reconnect_timeout=2, # Timeout reconnect if connection lost + ) manager.connect(run_forever=True, on_startup=init, on_shutdown=on_shutdown) -if __name__ == '__main__': + +if __name__ == "__main__": main()