123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- #!/usr/bin/env python3
- import asyncio
- import logging
- import urllib.parse
- from panoramisk import Manager
- from panoramisk.actions import Action
- LOOP = asyncio.get_event_loop()
- log = logging.getLogger("transferback")
- async def on_tutto(manager, msg):
- for prefix in ["Var", "RTC"]:
- if msg.event.startswith(prefix):
- return
- log.debug("... Event %s", msg.event)
- class BaseApp:
- def __init__(self, manager, msg):
- self.manager = manager
- self.initial_msg = msg
- self.local_channel = msg.channel
- self.events = []
- self.log = logging.getLogger(self.__class__.__name__)
- def send_agi_command(self, *args, **kwargs):
- return self.manager.send_agi_command(self.local_channel, *args, **kwargs)
- def register_event(self, event, callback):
- self.events.append((event, self.manager.register_event(event, callback)))
- def unregister_event(self, event_name, callback=None):
- if callback is None:
- for ev, callback in self.events:
- if ev == event_name:
- self.unregister_event(event_name, callback)
- return
- else:
- self.events.remove((event_name, callback))
- self.manager.callbacks[event_name].remove(callback)
- def self_destroy(self):
- for ev, callback in self.events:
- self.manager.callbacks[ev].remove(callback)
- def parse_env(self, env: str) -> dict:
- lines = urllib.parse.unquote(env).split("\n")
- ret = {}
- for line in lines:
- if ": " in line:
- key, value = line.split(": ", 1)
- ret[key] = value
- return ret
- def filter_channel(channel):
- def create_func(f):
- def new_func(manager, event):
- if event.channel != channel:
- return
- return f(manager, event)
- return new_func
- return create_func
- class RefusedError(Exception):
- pass
- class TransferAndTakeBack(BaseApp):
- """
- The idea is to provide a method to implement a transfer, and then be able to "revoke" the transfer whenever we want.
- In our scenario, there is a local user and a remote user.
- The remote user must have no power.
- The local user triggers it all.
- When the local user activates a feature, which calls AGI(agi:sync), this is called.
- This is the behavior we want:
- - the remote user is trasferred to a well-known place
- - the local user gets music
- - when the local user presses "#", we
- """
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- context = self.initial_msg.context
- if not self.is_context_auth(context):
- raise RefusedError("Invalid context: %s" % context)
- env = self.parse_env(self.initial_msg.env)
- command_name = env.get("agi_arg_1", "")
- if not command_name.startswith("mandainonda"):
- raise RefusedError("Invalid command name: %s" % command_name)
- self.log.info("init: %s", self.initial_msg)
- self.log.info("init: %s", command_name)
- # meant to be overridden by children {{{
- def get_destination(self):
- return "private,9438,1"
- def get_backroom(self):
- return "private,9401,1"
- def get_waiting_room(self):
- return "private,9401,1"
- def is_context_auth(self, context):
- return context == "from-regia"
- # }}}
- async def run(self):
- self.channels = []
- self.register_event("CoreShowChannel", self.on_channel)
- self.register_event("CoreShowChannelsComplete", self.on_channels)
- self.manager.send_action(Action({"Action": "CoreShowChannels"}), as_list=False)
- # the response will actually be a series of CoreShowChannel, which will
- # trigger self.on_channel, followed by a CoreShowChannelsComplete, which
- # will trigger on_channels
- def on_channel(self, _, msg):
- self.channels.append(msg)
- async def on_channels(self, _, msg):
- self.unregister_event("CoreShowChannel")
- self.unregister_event("CoreShowChannelsComplete")
- # we've got everything in many "on_channel" invocation, now let's check better
- our_bridge = {
- m.bridgeid for m in self.channels if m.channel == self.local_channel
- }
- self.log.debug("valid channels: %s", str(our_bridge))
- assert len(our_bridge) == 1
- bridgeid = next(iter(our_bridge))
- channels_in_bridge = [
- m
- for m in self.channels
- if m.bridgeid == bridgeid and m.channel != self.local_channel
- ]
- self.remote_channel = channels_in_bridge[0].channel.split(";")[0]
- self.log.info("FOUND: %s", self.remote_channel)
- await self.on_remote_channel_found()
- async def on_remote_channel_found(self):
- # now we know what is the other channel; this means we can move them wherever we want
- cmd = "channel redirect {other} {dest}".format(
- other=self.remote_channel, dest=self.get_destination()
- )
- self.log.debug("CMD = %s", cmd)
- self.manager.send_command(cmd)
- # let's wait them somewhere
- cmd = "channel redirect {channel} {waitingroom}".format(
- channel=self.local_channel,
- waitingroom=self.get_waiting_room(),
- )
- self.log.debug("CMD = %s", cmd)
- self.manager.send_command(cmd)
- # eventi possibili:
- # - l'utente locale preme dei tasti
- # - l'utente locale riaggancia
- self.register_event("DTMFEnd", filter_channel(self.local_channel)(self.on_dtmf))
- self.register_event(
- "Hangup", filter_channel(self.local_channel)(self.on_hangup)
- )
- self.register_event(
- "Hangup", filter_channel(self.remote_channel)(self.on_hangup)
- )
- def on_dtmf(self, _, msg):
- """
- when the local user uses dtmf, they can talk back to the outside user
- """
- if msg.digit != "9":
- return
- self.unregister_event("DTMFEnd")
- # a questo punto possiamo procedere a spostare il canale iniziale
- self.log.info("Local user requested to talk back with the remote user")
- to_move = [self.remote_channel]
- if self.get_backroom() != self.get_waiting_room():
- to_move.append(self.local_channel)
- for channel in to_move:
- cmd = "channel redirect {chan} {newroom}".format(
- chan=channel,
- newroom=self.get_backroom(),
- )
- self.log.debug("CMD = %s", cmd)
- self.manager.send_command(cmd)
- # we must not self_destroy, or we'll never notice the Hangup
- assert set(e[0] for e in self.events) == {"Hangup"}
- def on_hangup(self, _, msg):
- if msg.channel == self.remote_channel:
- self.log.info("Remote user hanged up")
- channel = self.local_channel
- elif msg.channel == self.local_channel:
- self.log.info("Local user hanged up")
- channel = self.remote_channel
- else:
- self.log.error(
- "This should never happen! someone else hanged??? %s", msg.channel
- )
- return
- cmd = "channel request hangup {channel}".format(channel=channel)
- self.log.debug("CMD = %s", cmd)
- self.manager.send_command(cmd)
- self.self_destroy()
- def run_app(Cls):
- async def real_run(manager, msg):
- try:
- instance = Cls(manager, msg)
- except RefusedError as exc:
- log.info("refused: %s", exc)
- return
- await instance.run()
- return real_run
- async def init(manager):
- # manager.send_command('sip show peers')
- manager.register_event("*", on_tutto)
- manager.register_event("AsyncAGIStart", run_app(TransferAndTakeBack))
- async def on_shutdown(m):
- pass
- def main():
- logging.basicConfig(level=logging.DEBUG)
- logging.getLogger("panoramisk.manager").setLevel(logging.WARNING)
- manager = Manager(
- loop=LOOP,
- host="127.0.0.1",
- port=5038,
- ssl=False,
- encoding="utf8",
- username="admin",
- secret="secret123password",
- ping_delay=10, # Delay after start
- ping_interval=10, # Periodically ping AMI (dead or alive)
- reconnect_timeout=2, # Timeout reconnect if connection lost
- )
- manager.connect(run_forever=True, on_startup=init, on_shutdown=on_shutdown)
- if __name__ == "__main__":
- main()
|