asterisk-misc/transfer_back/transfer_back.py
2022-02-19 17:01:13 +01:00

267 行
8.4 KiB
Python
可执行文件

#!/usr/bin/env python3
import asyncio
import logging
import urllib.parse
from panoramisk import Manager
from panoramisk.actions import Action
LOOP = asyncio.get_event_loop()
log = logging.getLogger("transferback")
async def on_tutto(manager, msg):
for prefix in ["Var", "RTC"]:
if msg.event.startswith(prefix):
return
log.debug("... Event %s", msg.event)
class BaseApp:
def __init__(self, manager, msg):
self.manager = manager
self.initial_msg = msg
self.local_channel = msg.channel
self.events = []
self.log = logging.getLogger(self.__class__.__name__)
def send_agi_command(self, *args, **kwargs):
return self.manager.send_agi_command(self.local_channel, *args, **kwargs)
def register_event(self, event, callback):
self.events.append((event, self.manager.register_event(event, callback)))
def unregister_event(self, event_name, callback=None):
if callback is None:
for ev, callback in self.events:
if ev == event_name:
self.unregister_event(event_name, callback)
return
else:
self.events.remove((event_name, callback))
self.manager.callbacks[event_name].remove(callback)
def self_destroy(self):
for ev, callback in self.events:
self.manager.callbacks[ev].remove(callback)
def parse_env(self, env: str) -> dict:
lines = urllib.parse.unquote(env).split("\n")
ret = {}
for line in lines:
if ": " in line:
key, value = line.split(": ", 1)
ret[key] = value
return ret
def filter_channel(channel):
def create_func(f):
def new_func(manager, event):
if event.channel != channel:
return
return f(manager, event)
return new_func
return create_func
class RefusedError(Exception):
pass
class TransferAndTakeBack(BaseApp):
"""
The idea is to provide a method to implement a transfer, and then be able to "revoke" the transfer whenever we want.
In our scenario, there is a local user and a remote user.
The remote user must have no power.
The local user triggers it all.
When the local user activates a feature, which calls AGI(agi:sync), this is called.
This is the behavior we want:
- the remote user is trasferred to a well-known place
- the local user gets music
- when the local user presses "#", we
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
context = self.initial_msg.context
if not self.is_context_auth(context):
raise RefusedError("Invalid context: %s" % context)
env = self.parse_env(self.initial_msg.env)
command_name = env.get("agi_arg_1", "")
if not command_name.startswith("mandainonda"):
raise RefusedError("Invalid command name: %s" % command_name)
self.log.info("init: %s", self.initial_msg)
self.log.info("init: %s", command_name)
# meant to be overridden by children {{{
def get_destination(self):
return "private,9438,1"
def get_backroom(self):
return "private,9401,1"
def get_waiting_room(self):
return "private,9401,1"
def is_context_auth(self, context):
return context == "from-regia"
# }}}
async def run(self):
self.channels = []
self.register_event("CoreShowChannel", self.on_channel)
self.register_event("CoreShowChannelsComplete", self.on_channels)
self.manager.send_action(Action({"Action": "CoreShowChannels"}), as_list=False)
# the response will actually be a series of CoreShowChannel, which will
# trigger self.on_channel, followed by a CoreShowChannelsComplete, which
# will trigger on_channels
def on_channel(self, _, msg):
self.channels.append(msg)
async def on_channels(self, _, msg):
self.unregister_event("CoreShowChannel")
self.unregister_event("CoreShowChannelsComplete")
# we've got everything in many "on_channel" invocation, now let's check better
our_bridge = {
m.bridgeid for m in self.channels if m.channel == self.local_channel
}
self.log.debug("valid channels: %s", str(our_bridge))
assert len(our_bridge) == 1
bridgeid = next(iter(our_bridge))
channels_in_bridge = [
m
for m in self.channels
if m.bridgeid == bridgeid and m.channel != self.local_channel
]
self.remote_channel = channels_in_bridge[0].channel.split(";")[0]
self.log.info("FOUND: %s", self.remote_channel)
await self.on_remote_channel_found()
async def on_remote_channel_found(self):
# now we know what is the other channel; this means we can move them wherever we want
cmd = "channel redirect {other} {dest}".format(
other=self.remote_channel, dest=self.get_destination()
)
self.log.debug("CMD = %s", cmd)
self.manager.send_command(cmd)
# let's wait them somewhere
cmd = "channel redirect {channel} {waitingroom}".format(
channel=self.local_channel,
waitingroom=self.get_waiting_room(),
)
self.log.debug("CMD = %s", cmd)
self.manager.send_command(cmd)
# eventi possibili:
# - l'utente locale preme dei tasti
# - l'utente locale riaggancia
self.register_event("DTMFEnd", filter_channel(self.local_channel)(self.on_dtmf))
self.register_event(
"Hangup", filter_channel(self.local_channel)(self.on_hangup)
)
self.register_event(
"Hangup", filter_channel(self.remote_channel)(self.on_hangup)
)
def on_dtmf(self, _, msg):
"""
when the local user uses dtmf, they can talk back to the outside user
"""
if msg.digit != "9":
return
self.unregister_event("DTMFEnd")
# a questo punto possiamo procedere a spostare il canale iniziale
self.log.info("Local user requested to talk back with the remote user")
to_move = [self.remote_channel]
if self.get_backroom() != self.get_waiting_room():
to_move.append(self.local_channel)
for channel in to_move:
cmd = "channel redirect {chan} {newroom}".format(
chan=channel,
newroom=self.get_backroom(),
)
self.log.debug("CMD = %s", cmd)
self.manager.send_command(cmd)
# we must not self_destroy, or we'll never notice the Hangup
assert set(e[0] for e in self.events) == {"Hangup"}
def on_hangup(self, _, msg):
if msg.channel == self.remote_channel:
self.log.info("Remote user hanged up")
channel = self.local_channel
elif msg.channel == self.local_channel:
self.log.info("Local user hanged up")
channel = self.remote_channel
else:
self.log.error(
"This should never happen! someone else hanged??? %s", msg.channel
)
return
cmd = "channel request hangup {channel}".format(channel=channel)
self.log.debug("CMD = %s", cmd)
self.manager.send_command(cmd)
self.self_destroy()
def run_app(Cls):
async def real_run(manager, msg):
try:
instance = Cls(manager, msg)
except RefusedError as exc:
log.info("refused: %s", exc)
return
await instance.run()
return real_run
async def init(manager):
# manager.send_command('sip show peers')
manager.register_event("*", on_tutto)
manager.register_event("AsyncAGIStart", run_app(TransferAndTakeBack))
async def on_shutdown(m):
pass
def main():
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("panoramisk.manager").setLevel(logging.WARNING)
manager = Manager(
loop=LOOP,
host="127.0.0.1",
port=5038,
ssl=False,
encoding="utf8",
username="admin",
secret="secret123password",
ping_delay=10, # Delay after start
ping_interval=10, # Periodically ping AMI (dead or alive)
reconnect_timeout=2, # Timeout reconnect if connection lost
)
manager.connect(run_forever=True, on_startup=init, on_shutdown=on_shutdown)
if __name__ == "__main__":
main()