+ robustness

- check if _we_ have been invoked indeed
 - supports PJSIP (different details in channels and bridges)
This commit is contained in:
boyska 2022-02-19 16:58:38 +01:00
parent 94a1962043
commit 88ab4af511

View file

@ -3,19 +3,22 @@
import asyncio import asyncio
import logging import logging
import urllib.parse
from panoramisk import Manager from panoramisk import Manager
from panoramisk.actions import Action from panoramisk.actions import Action
LOOP = asyncio.get_event_loop() LOOP = asyncio.get_event_loop()
log = logging.getLogger('transferback') log = logging.getLogger("transferback")
async def on_tutto(manager, msg): async def on_tutto(manager, msg):
for prefix in ['Var', 'RTC']: for prefix in ["Var", "RTC"]:
if msg.event.startswith(prefix): if msg.event.startswith(prefix):
return return
log.debug('... Event %s', msg.event) log.debug("... Event %s", msg.event)
class BaseApp: class BaseApp:
def __init__(self, manager, msg): def __init__(self, manager, msg):
@ -29,8 +32,7 @@ class BaseApp:
return self.manager.send_agi_command(self.local_channel, *args, **kwargs) return self.manager.send_agi_command(self.local_channel, *args, **kwargs)
def register_event(self, event, callback): def register_event(self, event, callback):
self.events.append( self.events.append((event, self.manager.register_event(event, callback)))
(event, self.manager.register_event(event, callback)))
def unregister_event(self, event_name, callback=None): def unregister_event(self, event_name, callback=None):
if callback is None: if callback is None:
@ -46,21 +48,34 @@ class BaseApp:
for ev, callback in self.events: for ev, callback in self.events:
self.manager.callbacks[ev].remove(callback) self.manager.callbacks[ev].remove(callback)
def parse_env(self, env: str) -> dict:
lines = urllib.parse.unquote(env).split("\n")
ret = {}
for line in lines:
if ": " in line:
key, value = line.split(": ", 1)
ret[key] = value
return ret
def filter_channel(channel): def filter_channel(channel):
def create_func(f): def create_func(f):
def new_func(manager, event): def new_func(manager, event):
if event.channel != channel: if event.channel != channel:
return return
return f(manager, event) return f(manager, event)
return new_func return new_func
return create_func return create_func
class RefusedError(Exception): class RefusedError(Exception):
pass pass
class TransferAndTakeBack(BaseApp): class TransferAndTakeBack(BaseApp):
''' """
The idea is to provide a method to implement a transfer, and then be able to "revoke" the transfer whenever we want. The idea is to provide a method to implement a transfer, and then be able to "revoke" the transfer whenever we want.
In our scenario, there is a local user and a remote user. In our scenario, there is a local user and a remote user.
@ -74,86 +89,102 @@ class TransferAndTakeBack(BaseApp):
- the local user gets music - the local user gets music
- when the local user presses "#", we - when the local user presses "#", we
''' """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
if not self.is_context_auth(self.initial_msg.context): context = self.initial_msg.context
raise RefusedError("Invalid context") if not self.is_context_auth(context):
self.log.info('%s init: %s', self.__class__.__name__, self.initial_msg) raise RefusedError("Invalid context: %s" % context)
env = self.parse_env(self.initial_msg.env)
command_name = env.get("agi_arg_1", "")
if not command_name.startswith("mandainonda"):
raise RefusedError("Invalid command name: %s" % command_name)
self.log.info("init: %s", self.initial_msg)
self.log.info("init: %s", command_name)
# meant to be overridden by children {{{ # meant to be overridden by children {{{
def get_destination(self): def get_destination(self):
return 'private,9438,1' return "private,9438,1"
def get_backroom(self): def get_backroom(self):
return 'private,9401,1' return "private,9401,1"
def get_waiting_room(self): def get_waiting_room(self):
return 'private,9401,1' return "private,9401,1"
def is_context_auth(self, context): def is_context_auth(self, context):
return context == 'from-regia' return context == "from-regia"
# }}} # }}}
async def run(self): async def run(self):
self.channels = [] self.channels = []
self.register_event('CoreShowChannel', self.on_channel) self.register_event("CoreShowChannel", self.on_channel)
self.register_event('CoreShowChannelsComplete', self.on_channels) self.register_event("CoreShowChannelsComplete", self.on_channels)
self.manager.send_action(Action({'Action': 'CoreShowChannels'}), as_list=False) self.manager.send_action(Action({"Action": "CoreShowChannels"}), as_list=False)
# the response will actually be a series of CoreShowChannel, which will trigger self.on_channel # the response will actually be a series of CoreShowChannel, which will
# trigger self.on_channel, followed by a CoreShowChannelsComplete, which
# will trigger on_channels
def on_channel(self, _, msg): def on_channel(self, _, msg):
self.channels.append(msg) self.channels.append(msg)
async def on_channels(self, _, msg): async def on_channels(self, _, msg):
self.unregister_event('CoreShowChannel') self.unregister_event("CoreShowChannel")
self.unregister_event('CoreShowChannelsComplete') self.unregister_event("CoreShowChannelsComplete")
# we've got everything in many "on_channel" invocation, now let's check better # we've got everything in many "on_channel" invocation, now let's check better
our_channel = [m for m in self.channels if m.channel == self.local_channel] our_bridge = {
assert len(our_channel) == 1 m.bridgeid for m in self.channels if m.channel == self.local_channel
our_channel = our_channel[0] }
bridgeid = our_channel.bridgeid self.log.debug("valid channels: %s", str(our_bridge))
assert len(our_bridge) == 1
bridgeid = next(iter(our_bridge))
channels_in_bridge = [m for m in self.channels if m.bridgeid == bridgeid and m.channel != self.local_channel] channels_in_bridge = [
self.remote_channel = channels_in_bridge[0].channel.split(';')[0] m
for m in self.channels
if m.bridgeid == bridgeid and m.channel != self.local_channel
]
self.remote_channel = channels_in_bridge[0].channel.split(";")[0]
self.log.info('FOUND: %s', self.remote_channel) self.log.info("FOUND: %s", self.remote_channel)
await self.on_remote_channel_found() await self.on_remote_channel_found()
async def on_remote_channel_found(self): async def on_remote_channel_found(self):
# now we know what is the other channel; this means we can move them wherever we want # now we know what is the other channel; this means we can move them wherever we want
cmd = 'channel redirect {other} {dest}'.format( cmd = "channel redirect {other} {dest}".format(
other=self.remote_channel, other=self.remote_channel, dest=self.get_destination()
dest=self.get_destination()
) )
self.log.debug('CMD = %s', cmd) self.log.debug("CMD = %s", cmd)
self.manager.send_command(cmd) self.manager.send_command(cmd)
# let's wait them somewhere # let's wait them somewhere
cmd = 'channel redirect {channel} {waitingroom}'.format( cmd = "channel redirect {channel} {waitingroom}".format(
channel=self.local_channel, channel=self.local_channel,
waitingroom=self.get_waiting_room(), waitingroom=self.get_waiting_room(),
) )
self.log.debug('CMD = %s', cmd) self.log.debug("CMD = %s", cmd)
self.manager.send_command(cmd) self.manager.send_command(cmd)
# eventi possibili: # eventi possibili:
# - l'utente locale preme dei tasti # - l'utente locale preme dei tasti
# - l'utente locale riaggancia # - l'utente locale riaggancia
self.register_event('DTMFEnd', filter_channel(self.local_channel)(self.on_dtmf)) self.register_event("DTMFEnd", filter_channel(self.local_channel)(self.on_dtmf))
self.register_event('Hangup', filter_channel(self.local_channel)(self.on_hangup)) self.register_event(
self.register_event('Hangup', filter_channel(self.remote_channel)(self.on_hangup)) "Hangup", filter_channel(self.local_channel)(self.on_hangup)
)
self.register_event(
"Hangup", filter_channel(self.remote_channel)(self.on_hangup)
)
def on_dtmf(self, _, msg): def on_dtmf(self, _, msg):
''' """
when the local user uses dtmf, they can talk back to the outside user when the local user uses dtmf, they can talk back to the outside user
''' """
if msg.digit != '9': if msg.digit != "9":
return return
self.unregister_event('DTMFEnd') self.unregister_event("DTMFEnd")
# a questo punto possiamo procedere a spostare il canale iniziale # a questo punto possiamo procedere a spostare il canale iniziale
self.log.info("Local user requested to talk back with the remote user") self.log.info("Local user requested to talk back with the remote user")
@ -162,66 +193,75 @@ class TransferAndTakeBack(BaseApp):
to_move.append(self.local_channel) to_move.append(self.local_channel)
for channel in to_move: for channel in to_move:
cmd = 'channel redirect {chan} {newroom}'.format( cmd = "channel redirect {chan} {newroom}".format(
chan=channel, chan=channel,
newroom=self.get_backroom(), newroom=self.get_backroom(),
) )
self.log.debug('CMD = %s', cmd) self.log.debug("CMD = %s", cmd)
self.manager.send_command(cmd) self.manager.send_command(cmd)
# we must not self_destroy, or we'll never notice the Hangup # we must not self_destroy, or we'll never notice the Hangup
assert set(e[0] for e in self.events) == {'Hangup'} assert set(e[0] for e in self.events) == {"Hangup"}
def on_hangup(self, _, msg): def on_hangup(self, _, msg):
if msg.channel == self.remote_channel: if msg.channel == self.remote_channel:
self.log.info('Remote user hanged up') self.log.info("Remote user hanged up")
channel = self.local_channel channel = self.local_channel
elif msg.channel == self.local_channel: elif msg.channel == self.local_channel:
self.log.info('Local user hanged up') self.log.info("Local user hanged up")
channel = self.remote_channel channel = self.remote_channel
else: else:
self.log.error('This should never happen! someone else hanged??? %s', msg.channel) self.log.error(
"This should never happen! someone else hanged??? %s", msg.channel
)
return return
cmd = 'channel request hangup {channel}'.format(channel=channel) cmd = "channel request hangup {channel}".format(channel=channel)
self.log.debug('CMD = %s', cmd) self.log.debug("CMD = %s", cmd)
self.manager.send_command(cmd) self.manager.send_command(cmd)
self.self_destroy() self.self_destroy()
def run_app(Cls): def run_app(Cls):
async def real_run(manager, msg): async def real_run(manager, msg):
try: try:
instance = Cls(manager, msg) instance = Cls(manager, msg)
except RefusedError: except RefusedError as exc:
log.info("refused: %s", exc)
return return
await instance.run() await instance.run()
return real_run return real_run
async def init(manager): async def init(manager):
# manager.send_command('sip show peers') # manager.send_command('sip show peers')
manager.register_event('*', on_tutto) manager.register_event("*", on_tutto)
manager.register_event('AsyncAGIStart', run_app(TransferAndTakeBack)) manager.register_event("AsyncAGIStart", run_app(TransferAndTakeBack))
async def on_shutdown(m): async def on_shutdown(m):
pass pass
def main(): def main():
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.DEBUG)
logging.getLogger("panoramisk.manager").setLevel(logging.WARNING)
manager = Manager( manager = Manager(
loop=LOOP, loop=LOOP,
host='127.0.0.1', host="127.0.0.1",
port=5038, port=5038,
ssl=False, ssl=False,
encoding='utf8', encoding="utf8",
username='admin', username="admin",
secret='secret123password', secret="secret123password",
ping_delay=10, # Delay after start ping_delay=10, # Delay after start
ping_interval=10, # Periodically ping AMI (dead or alive) ping_interval=10, # Periodically ping AMI (dead or alive)
reconnect_timeout=2, # Timeout reconnect if connection lost reconnect_timeout=2, # Timeout reconnect if connection lost
) )
manager.connect(run_forever=True, on_startup=init, on_shutdown=on_shutdown) manager.connect(run_forever=True, on_startup=init, on_shutdown=on_shutdown)
if __name__ == '__main__':
if __name__ == "__main__":
main() main()