random/mumble-bot/bot.py
2020-04-19 20:12:29 +02:00

205 lines
6.4 KiB
Python

import argparse
import audioop
import logging
import math
import sys
import struct
import time
import threading
from functools import partial
import pymumble_py3 as pymumble
from pymumble_py3.constants import PYMUMBLE_CLBK_TEXTMESSAGERECEIVED
is_streaming = False
silence_time = 0
SHORT_NORMALIZE = (1.0/32768.0)
## peak detection
def get_rms( block ):
# RMS amplitude is defined as the square root of the
# mean over time of the square of the amplitude.
# so we need to convert this string of bytes into
# a string of 16-bit samples...
# we will get one short out for each
# two chars in the string.
count = len(block)/2
format = "%dh"%(count)
shorts = struct.unpack( format, block )
# iterate over the block.
sum_squares = 0.0
for sample in shorts:
# sample is a signed short in +/- 32768.
# normalize it to 1.0
n = sample * SHORT_NORMALIZE
sum_squares += n*n
return math.sqrt( sum_squares / count )
def message_received(mumble, message):
global is_streaming
global silence_time
command = message.message
if command == "/start":
is_streaming = True
silence_time = 0
mumble.my_channel().send_text_message("Diretta iniziata")
logging.info("Diretta iniziata")
mumble.users.myself.recording()
elif command == "/stop":
is_streaming = False
mumble.my_channel().send_text_message("Diretta terminata")
logging.info("Diretta terminata")
mumble.users.myself.unrecording()
def get_parser():
parser = argparse.ArgumentParser(description="Regia pienamente automatizzata")
parser.add_argument("--channel", help="Set channel", default="")
parser.add_argument("--name", help="Set bot nickname", default="RadioRobbot")
parser.add_argument("--server", help="Set server", default="mumble.esiliati.org")
parser.add_argument("--password", help="Set password", default="")
parser.add_argument("--port", help="Set port", type=int, default=64738)
parser.add_argument(
"--stream",
action="store_true",
help="Ignore commands in chat and stream everything",
)
parser.add_argument(
"--transmit",
action="store_true",
help="Transmit from your stdin",
)
parser.add_argument(
"--auto-suspend-stream",
action="store_true",
help="Ignore commands in chat and stream everything",
)
parser.add_argument(
"--max-silence-time", type=int, help="max silence time in seconds", default=30
)
parser.add_argument(
"--tokens",
help="Set tokens list",
nargs="*"
)
parser.add_argument(
"--ignore",
help="List of ignored users",
nargs="*",
default=[]
)
return parser
def transmit(name, mumble, chunk_size):
buffer = 0
while True:
try:
data = sys.stdin.buffer.read(int(chunk_size))
if(get_rms(data) > 0.09):
mumble.sound_output.add_sound(data)
buffer = 1000
else:
# keep the audio open for one second
if(buffer >= 0):
mumble.sound_output.add_sound(data)
buffer -= 10
except IOError as e:
print( " Error recording: %s"%e )
def main():
global is_streaming
global silence_time
args = get_parser().parse_args()
logging.basicConfig(level=logging.DEBUG)
pwd = args.password
server = args.server
nick = args.name
channel = args.channel
port = args.port
tokens = args.tokens
is_streaming = False
stream_always = args.stream
ignored_users = args.ignore
# Spin up a client and connect to mumble server
mumble = pymumble.Mumble(server, nick, password=pwd, port=port, tokens=tokens)
mumble.callbacks.set_callback(
PYMUMBLE_CLBK_TEXTMESSAGERECEIVED, partial(message_received, mumble)
)
mumble.set_receive_sound(1) # Enable receiving sound from mumble server
mumble.start()
mumble.is_ready() # Wait for client is ready
mumble.channels.find_by_tree(channel.split('/')).move_in()
if not args.transmit:
mumble.users.myself.mute()
if is_streaming:
mumble.users.myself.recording()
BUFFER = 0.1
BITRATE = 48000
RESOLUTION = 10 # in ms
FLOAT_RESOLUTION = float(RESOLUTION) / 1000
MONO_CHUNK_SIZE = BITRATE * 2 * RESOLUTION / 1000
STEREO_CHUNK_SIZE = MONO_CHUNK_SIZE * 2
silent = b"\x00" * int(STEREO_CHUNK_SIZE)
cursor_time = time.time() - BUFFER
if args.transmit:
transmit_thread = threading.Thread(target=transmit, args=(1,mumble, MONO_CHUNK_SIZE))
transmit_thread.start()
while mumble.is_alive():
if cursor_time < time.time() - BUFFER:
base_sound = None
try:
for user in mumble.users.values(): # check the audio queue of each user
if user.sound.is_sound() and user["name"] not in ignored_users:
# available sound is to be treated now and not later
sound = user.sound.get_sound(FLOAT_RESOLUTION)
stereo_pcm = audioop.tostereo(sound.pcm, 2, 1, 1)
if base_sound is None:
base_sound = stereo_pcm
else:
base_sound = audioop.add(base_sound, stereo_pcm, 2)
except RuntimeError:
print("ignored exception in stderr...", file=sys.stderr)
if is_streaming or stream_always:
if base_sound:
silence_time = 0
sys.stdout.buffer.write(base_sound)
else:
silence_time += RESOLUTION
sys.stdout.buffer.write(silent)
if (
args.auto_suspend_stream
and (silence_time >= args.max_silence_time * 1000)
and is_streaming
):
is_streaming = False
logging.info("max-silence-time reached")
mumble.my_channel().send_text_message(
"Diretta terminata in automatico dopo %d secondi circa di silenzio"
% args.max_silence_time
)
mumble.users.myself.unrecording()
cursor_time += FLOAT_RESOLUTION
else:
time.sleep(FLOAT_RESOLUTION)
if __name__ == "__main__":
main()