Browse Source

Move mumble client and audio encoding/resampling into a web worker

Jonas Herzig 5 years ago
parent
commit
beafbe162f
6 changed files with 667 additions and 36 deletions
  1. 6 15
      app/index.js
  2. 3 19
      app/voice.js
  3. 371 0
      app/worker-client.js
  4. 284 0
      app/worker.js
  5. 2 1
      package.json
  6. 1 1
      webpack.config.js

+ 6 - 15
app/index.js

@@ -2,11 +2,9 @@ import 'stream-browserify' // see https://github.com/ericgundrum/pouch-websocket
 import url from 'url'
 import ByteBuffer from 'bytebuffer'
 import MumbleClient from 'mumble-client'
-import mumbleConnect from 'mumble-client-websocket'
-import CodecsBrowser from 'mumble-client-codecs-browser'
+import WorkerBasedMumbleConnector from './worker-client'
 import BufferQueueNode from 'web-audio-buffer-queue'
 import audioContext from 'audio-context'
-import Resampler from 'libsamplerate.js'
 import ko from 'knockout'
 import _dompurify from 'dompurify'
 import keyboardjs from 'keyboardjs'
@@ -263,6 +261,7 @@ class Settings {
 class GlobalBindings {
   constructor () {
     this.settings = new Settings()
+    this.connector = new WorkerBasedMumbleConnector(audioContext.sampleRate)
     this.client = null
     this.userContextMenu = new ContextMenu()
     this.channelContextMenu = new ContextMenu()
@@ -335,10 +334,9 @@ class GlobalBindings {
       log('Connecting to server ', host)
 
       // TODO: token
-      mumbleConnect(`wss://${host}:${port}`, {
+      this.connector.connect(`wss://${host}:${port}`, {
         username: username,
-        password: password,
-        codecs: CodecsBrowser
+        password: password
       }).done(client => {
         log('Connected!')
 
@@ -560,13 +558,6 @@ class GlobalBindings {
         })
         userNode.connect(audioContext.destination)
 
-        var resampler = new Resampler({
-          unsafe: true,
-          type: Resampler.Type.ZERO_ORDER_HOLD,
-          ratio: audioContext.sampleRate / 48000
-        })
-        resampler.pipe(userNode)
-
         stream.on('data', data => {
           if (data.target === 'normal') {
             ui.talking('on')
@@ -575,11 +566,11 @@ class GlobalBindings {
           } else if (data.target === 'whisper') {
             ui.talking('whisper')
           }
-          resampler.write(Buffer.from(data.pcm.buffer))
+          userNode.write(data.buffer)
         }).on('end', () => {
           console.log(`User ${user.username} stopped takling`)
           ui.talking('off')
-          resampler.end()
+          userNode.end()
         })
       })
     }

+ 3 - 19
app/voice.js

@@ -1,8 +1,6 @@
-import { Writable, Transform } from 'stream'
+import { Writable } from 'stream'
 import MicrophoneStream from 'microphone-stream'
 import audioContext from 'audio-context'
-import chunker from 'stream-chunker'
-import Resampler from 'libsamplerate.js'
 import getUserMedia from 'getusermedia'
 import keyboardjs from 'keyboardjs'
 import vad from 'voice-activity-detection'
@@ -34,23 +32,9 @@ class VoiceHandler extends Writable {
         this.emit('started_talking')
         return this._outbound
       }
-      this._outbound = new Resampler({
-        unsafe: true,
-        type: Resampler.Type.SINC_FASTEST,
-        ratio: 48000 / audioContext.sampleRate
-      })
-
-      const buffer2Float32Array = new Transform({
-        transform (data, _, callback) {
-          callback(null, new Float32Array(data.buffer, data.byteOffset, data.byteLength / 4))
-        },
-        readableObjectMode: true
-      })
 
-      this._outbound
-        .pipe(chunker(4 * this._settings.samplesPerPacket))
-        .pipe(buffer2Float32Array)
-        .pipe(this._client.createVoiceStream())
+      // Note: the samplesPerPacket argument is handled in worker.js and not passed on
+      this._outbound = this._client.createVoiceStream(this._settings.samplesPerPacket)
 
       this.emit('started_talking')
     }

+ 371 - 0
app/worker-client.js

@@ -0,0 +1,371 @@
+import MumbleClient from 'mumble-client'
+import Promise from 'promise'
+import EventEmitter from 'events'
+import { Writable, PassThrough } from 'stream'
+import toArrayBuffer from 'to-arraybuffer'
+import ByteBuffer from 'bytebuffer'
+import webworkify from 'webworkify'
+import worker from './worker'
+
+/**
+ * Creates proxy MumbleClients to a real ones running on a web worker.
+ * Only stuff which we need in mumble-web is proxied, i.e. this is not a generic solution.
+ */
+class WorkerBasedMumbleConnector {
+  constructor (sampleRate) {
+    this._worker = webworkify(worker)
+    this._worker.addEventListener('message', this._onMessage.bind(this))
+    this._reqId = 1
+    this._requests = {}
+    this._clients = {}
+    this._nextVoiceId = 1
+    this._voiceStreams = {}
+
+    this._postMessage({
+      method: '_init',
+      sampleRate: sampleRate
+    })
+  }
+
+  _postMessage (msg, transfer) {
+    try {
+      this._worker.postMessage(msg, transfer)
+    } catch (err) {
+      console.error('Failed to postMessage', msg)
+      throw err
+    }
+  }
+
+  _call (id, method, payload, transfer) {
+    let reqId = this._reqId++
+    console.debug(method, id, payload)
+    this._postMessage({
+      clientId: id.client,
+      channelId: id.channel,
+      userId: id.user,
+      method: method,
+      reqId: reqId,
+      payload: payload
+    }, transfer)
+    return reqId
+  }
+
+  _query (id, method, payload, transfer) {
+    let reqId = this._call(id, method, payload, transfer)
+    return new Promise((resolve, reject) => {
+      this._requests[reqId] = [resolve, reject]
+    })
+  }
+
+  _addCall (proxy, name, id) {
+    let self = this
+    proxy[name] = function () {
+      self._call(id, name, Array.from(arguments))
+    }
+  }
+
+  connect (host, args) {
+    return this._query({}, '_connect', { host: host, args: args })
+      .then(id => this._client(id))
+  }
+
+  _client (id) {
+    let client = this._clients[id]
+    if (!client) {
+      client = new WorkerBasedMumbleClient(this, id)
+      this._clients[id] = client
+    }
+    return client
+  }
+
+  _onMessage (ev) {
+    let data = ev.data
+    if (data.reqId != null) {
+      console.debug(data)
+      let { reqId, result, error } = data
+      let [ resolve, reject ] = this._requests[reqId]
+      delete this._requests[reqId]
+      if (result) {
+        resolve(result)
+      } else {
+        reject(error)
+      }
+    } else if (data.clientId != null) {
+      console.debug(data)
+      let client = this._client(data.clientId)
+
+      let target
+      if (data.userId != null) {
+        target = client._user(data.userId)
+      } else if (data.channelId != null) {
+        target = client._channel(data.channelId)
+      } else {
+        target = client
+      }
+
+      if (data.event) {
+        target._dispatchEvent(data.event, data.value)
+      } else if (data.prop) {
+        target._setProp(data.prop, data.value)
+      }
+    } else if (data.voiceId != null) {
+      let stream = this._voiceStreams[data.voiceId]
+      let buffer = data.buffer
+      if (buffer) {
+        stream.write({
+          target: data.target,
+          buffer: Buffer.from(buffer)
+        })
+      } else {
+        delete this._voiceStreams[data.voiceId]
+        stream.end()
+      }
+    }
+  }
+}
+
+class WorkerBasedMumbleClient extends EventEmitter {
+  constructor (connector, clientId) {
+    super()
+    this._connector = connector
+    this._id = clientId
+    this._users = {}
+    this._channels = {}
+
+    let id = { client: clientId }
+    connector._addCall(this, 'setSelfDeaf', id)
+    connector._addCall(this, 'setSelfMute', id)
+    connector._addCall(this, 'setSelfTexture', id)
+    connector._addCall(this, 'setAudioQuality', id)
+
+    connector._addCall(this, 'disconnect', id)
+    let _disconnect = this.disconnect
+    this.disconnect = () => {
+      _disconnect.apply(this)
+      delete connector._clients[id]
+    }
+
+    connector._addCall(this, 'createVoiceStream', id)
+    let _createVoiceStream = this.createVoiceStream
+    this.createVoiceStream = function () {
+      let voiceId = connector._nextVoiceId++
+
+      let args = Array.from(arguments)
+      args.unshift(voiceId)
+      _createVoiceStream.apply(this, args)
+
+      return new Writable({
+        write (chunk, encoding, callback) {
+          chunk = toArrayBuffer(chunk)
+          connector._postMessage({
+            voiceId: voiceId,
+            chunk: chunk
+          })
+          callback()
+        },
+        final (callback) {
+          connector._postMessage({
+            voiceId: voiceId
+          })
+          callback()
+        }
+      })
+    }
+
+    // Dummy client used for bandwidth calculations
+    this._dummyClient = new MumbleClient({ username: 'dummy' })
+    let defineDummyMethod = (name) => {
+      this[name] = function () {
+        return this._dummyClient[name].apply(this._dummyClient, arguments)
+      }
+    }
+    defineDummyMethod('getMaxBitrate')
+    defineDummyMethod('getActualBitrate')
+  }
+
+  _user (id) {
+    let user = this._users[id]
+    if (!user) {
+      user = new WorkerBasedMumbleUser(this._connector, this, id)
+      this._users[id] = user
+    }
+    return user
+  }
+
+  _channel (id) {
+    let channel = this._channels[id]
+    if (!channel) {
+      channel = new WorkerBasedMumbleChannel(this._connector, this, id)
+      this._channels[id] = channel
+    }
+    return channel
+  }
+
+  _dispatchEvent (name, args) {
+    if (name === 'newChannel') {
+      args[0] = this._channel(args[0])
+    } else if (name === 'newUser') {
+      args[0] = this._user(args[0])
+    } else if (name === 'message') {
+      args[0] = this._user(args[0])
+      args[2] = args[2].map((id) => this._user(id))
+      args[3] = args[3].map((id) => this._channel(id))
+      args[4] = args[4].map((id) => this._channel(id))
+    }
+    args.unshift(name)
+    this.emit.apply(this, args)
+  }
+
+  _setProp (name, value) {
+    if (name === 'root') {
+      name = '_rootId'
+    }
+    if (name === 'self') {
+      name = '_selfId'
+    }
+    if (name === 'maxBandwidth') {
+      this._dummyClient.maxBandwidth = value
+    }
+    this[name] = value
+  }
+
+  get root () {
+    return this._channel(this._rootId)
+  }
+
+  get channels () {
+    return Object.values(this._channels)
+  }
+
+  get users () {
+    return Object.values(this._users)
+  }
+
+  get self () {
+    return this._user(this._selfId)
+  }
+}
+
+class WorkerBasedMumbleChannel extends EventEmitter {
+  constructor (connector, client, channelId) {
+    super()
+    this._connector = connector
+    this._client = client
+    this._id = channelId
+
+    let id = { client: client._id, channel: channelId }
+    connector._addCall(this, 'sendMessage', id)
+  }
+
+  _dispatchEvent (name, args) {
+    if (name === 'update') {
+      let [actor, props] = args
+      Object.entries(props).forEach((entry) => {
+        this._setProp(entry[0], entry[1])
+      })
+      if (props.parent != null) {
+        props.parent = this.parent
+      }
+      if (props.links != null) {
+        props.links = this.links
+      }
+      args = [
+        this._client._user(actor),
+        props
+      ]
+    } else if (name === 'remove') {
+      delete this._client._channels[this._id]
+    }
+    args.unshift(name)
+    this.emit.apply(this, args)
+  }
+
+  _setProp (name, value) {
+    if (name === 'parent') {
+      name = '_parentId'
+    }
+    if (name === 'links') {
+      value = value.map((id) => this._client._channel(id))
+    }
+    this[name] = value
+  }
+
+  get parent () {
+    if (this._parentId != null) {
+      return this._client._channel(this._parentId)
+    }
+  }
+
+  get children () {
+    return Object.values(this._client._channels).filter((it) => it.parent === this)
+  }
+}
+
+class WorkerBasedMumbleUser extends EventEmitter {
+  constructor (connector, client, userId) {
+    super()
+    this._connector = connector
+    this._client = client
+    this._id = userId
+
+    let id = { client: client._id, user: userId }
+    connector._addCall(this, 'requestTexture', id)
+    connector._addCall(this, 'clearTexture', id)
+    connector._addCall(this, 'setMute', id)
+    connector._addCall(this, 'setDeaf', id)
+    connector._addCall(this, 'sendMessage', id)
+    this.setChannel = (channel) => {
+      connector._call(id, 'setChannel', channel._id)
+    }
+  }
+
+  _dispatchEvent (name, args) {
+    if (name === 'update') {
+      let [actor, props] = args
+      Object.entries(props).forEach((entry) => {
+        this._setProp(entry[0], entry[1])
+      })
+      if (props.channel != null) {
+        props.channel = this.channel
+      }
+      if (props.texture != null) {
+        props.texture = this.texture
+      }
+      args = [
+        this._client._user(actor),
+        props
+      ]
+    } else if (name === 'voice') {
+      let [id] = args
+      let stream = new PassThrough({
+        objectMode: true
+      })
+      this._connector._voiceStreams[id] = stream
+      args = [stream]
+    } else if (name === 'remove') {
+      delete this._client._users[this._id]
+    }
+    args.unshift(name)
+    this.emit.apply(this, args)
+  }
+
+  _setProp (name, value) {
+    if (name === 'channel') {
+      name = '_channelId'
+    }
+    if (name === 'texture') {
+      if (value) {
+        let buf = ByteBuffer.wrap(value.buffer)
+        buf.offset = value.offset
+        buf.limit = value.limit
+        value = buf
+      }
+    }
+    this[name] = value
+  }
+
+  get channel () {
+    return this._client.channels[this._channelId]
+  }
+}
+export default WorkerBasedMumbleConnector

+ 284 - 0
app/worker.js

@@ -0,0 +1,284 @@
+import { Transform } from 'stream'
+import mumbleConnect from 'mumble-client-websocket'
+import toArrayBuffer from 'to-arraybuffer'
+import chunker from 'stream-chunker'
+import Resampler from 'libsamplerate.js'
+
+// Monkey-patch to allow webworkify-webpack and codecs to work inside of web worker
+/* global URL */
+window.URL = URL
+
+// Using require to ensure ordering relative to monkey-patch above
+let CodecsBrowser = require('mumble-client-codecs-browser')
+
+export default function (self) {
+  let sampleRate
+  let nextClientId = 1
+  let nextVoiceId = 1
+  let voiceStreams = []
+  let clients = []
+
+  function postMessage (msg, transfer) {
+    try {
+      self.postMessage(msg, transfer)
+    } catch (err) {
+      console.error('Failed to postMessage', msg)
+      throw err
+    }
+  }
+
+  function resolve (reqId, value, transfer) {
+    postMessage({
+      reqId: reqId,
+      result: value
+    }, transfer)
+  }
+
+  function reject (reqId, value, transfer) {
+    console.error(value)
+    let jsonValue = JSON.parse(JSON.stringify(value))
+    if (value.$type) {
+      jsonValue.$type = { name: value.$type.name }
+    }
+    postMessage({
+      reqId: reqId,
+      error: jsonValue
+    }, transfer)
+  }
+
+  function registerEventProxy (id, obj, event, transform) {
+    obj.on(event, function (_) {
+      postMessage({
+        clientId: id.client,
+        channelId: id.channel,
+        userId: id.user,
+        event: event,
+        value: transform ? transform.apply(null, arguments) : Array.from(arguments)
+      })
+    })
+  }
+
+  function pushProp (id, obj, prop, transform) {
+    let value = obj[prop]
+    postMessage({
+      clientId: id.client,
+      channelId: id.channel,
+      userId: id.user,
+      prop: prop,
+      value: transform ? transform(value) : value
+    })
+  }
+
+  function setupOutboundVoice (voiceId, samplesPerPacket, stream) {
+    let resampler = new Resampler({
+      unsafe: true,
+      type: Resampler.Type.SINC_FASTEST,
+      ratio: 48000 / sampleRate
+    })
+
+    let buffer2Float32Array = new Transform({
+      transform (data, _, callback) {
+        callback(null, new Float32Array(data.buffer, data.byteOffset, data.byteLength / 4))
+      },
+      readableObjectMode: true
+    })
+
+    resampler
+      .pipe(chunker(4 * samplesPerPacket))
+      .pipe(buffer2Float32Array)
+      .pipe(stream)
+
+    voiceStreams[voiceId] = resampler
+  }
+
+  function setupChannel (id, channel) {
+    id = Object.assign({}, id, { channel: channel.id })
+
+    registerEventProxy(id, channel, 'update', (actor, props) => {
+      if (actor) {
+        actor = actor.id
+      }
+      if (props.parent) {
+        props.parent = props.parent.id
+      }
+      if (props.links) {
+        props.links = props.links.map((it) => it.id)
+      }
+      return [actor, props]
+    })
+    registerEventProxy(id, channel, 'remove')
+
+    pushProp(id, channel, 'parent', (it) => it ? it.id : it)
+    pushProp(id, channel, 'links', (it) => it.map((it) => it.id))
+    let props = [
+      'position', 'name', 'description'
+    ]
+    for (let prop of props) {
+      pushProp(id, channel, prop)
+    }
+
+    for (let child of channel.children) {
+      setupChannel(id, child)
+    }
+
+    return channel.id
+  }
+
+  function setupUser (id, user) {
+    id = Object.assign({}, id, { user: user.id })
+
+    registerEventProxy(id, user, 'update', (actor, props) => {
+      if (actor) {
+        actor = actor.id
+      }
+      if (props.channel != null) {
+        props.channel = props.channel.id
+      }
+      return [actor, props]
+    })
+    registerEventProxy(id, user, 'voice', (stream) => {
+      let voiceId = nextVoiceId++
+
+      let target
+
+      // We want to do as little on the UI thread as possible, so do resampling here as well
+      var resampler = new Resampler({
+        unsafe: true,
+        type: Resampler.Type.ZERO_ORDER_HOLD,
+        ratio: sampleRate / 48000
+      })
+
+      // Pipe stream into resampler
+      stream.on('data', (data) => {
+        // store target so we can pass it on after resampling
+        target = data.target
+        resampler.write(Buffer.from(data.pcm.buffer))
+      }).on('end', () => {
+        resampler.end()
+      })
+
+      // Pipe resampler into output stream on UI thread
+      resampler.on('data', (data) => {
+        data = toArrayBuffer(data) // postMessage can't transfer node's Buffer
+        postMessage({
+          voiceId: voiceId,
+          target: target,
+          buffer: data
+        }, [data])
+      }).on('end', () => {
+        postMessage({
+          voiceId: voiceId
+        })
+      })
+
+      return [voiceId]
+    })
+    registerEventProxy(id, user, 'remove')
+
+    pushProp(id, user, 'channel', (it) => it ? it.id : it)
+    let props = [
+      'uniqueId', 'username', 'mute', 'deaf', 'suppress', 'selfMute', 'selfDeaf',
+      'texture', 'textureHash', 'comment'
+    ]
+    for (let prop of props) {
+      pushProp(id, user, prop)
+    }
+
+    return user.id
+  }
+
+  function setupClient (id, client) {
+    id = { client: id }
+
+    registerEventProxy(id, client, 'error')
+    registerEventProxy(id, client, 'newChannel', (it) => [setupChannel(id, it)])
+    registerEventProxy(id, client, 'newUser', (it) => [setupUser(id, it)])
+    registerEventProxy(id, client, 'message', (sender, message, users, channels, trees) => {
+      return [
+        sender.id,
+        message,
+        users.map((it) => it.id),
+        channels.map((it) => it.id),
+        trees.map((it) => it.id)
+      ]
+    })
+    client.on('dataPing', () => {
+      pushProp(id, client, 'dataStats')
+    })
+
+    setupChannel(id, client.root)
+    for (let user of client.users) {
+      setupUser(id, user)
+    }
+
+    pushProp(id, client, 'root', (it) => it.id)
+    pushProp(id, client, 'self', (it) => it.id)
+    pushProp(id, client, 'welcomeMessage')
+    pushProp(id, client, 'serverVersion')
+    pushProp(id, client, 'maxBandwidth')
+  }
+
+  function onMessage (data) {
+    let { reqId, method, payload } = data
+    if (method === '_init') {
+      sampleRate = data.sampleRate
+    } else if (method === '_connect') {
+      payload.args.codecs = CodecsBrowser
+      mumbleConnect(payload.host, payload.args).then((client) => {
+        let id = nextClientId++
+        clients[id] = client
+        setupClient(id, client)
+        return id
+      }).done((id) => {
+        resolve(reqId, id)
+      }, (err) => {
+        reject(reqId, err)
+      })
+    } else if (data.clientId != null) {
+      let client = clients[data.clientId]
+
+      let target
+      if (data.userId != null) {
+        target = client.getUserById(data.userId)
+        if (method === 'setChannel') {
+          payload = [client.getChannelById(payload)]
+        }
+      } else if (data.channelId != null) {
+        target = client.getChannelById(data.channelId)
+      } else {
+        target = client
+        if (method === 'createVoiceStream') {
+          let voiceId = payload.shift()
+          let samplesPerPacket = payload.shift()
+
+          let stream = target.createVoiceStream.apply(target, payload)
+
+          setupOutboundVoice(voiceId, samplesPerPacket, stream)
+          return
+        }
+        if (method === 'disconnect') {
+          delete clients[data.clientId]
+        }
+      }
+
+      target[method].apply(target, payload)
+    } else if (data.voiceId != null) {
+      let stream = voiceStreams[data.voiceId]
+      let buffer = data.chunk
+      if (buffer) {
+        stream.write(Buffer.from(buffer))
+      } else {
+        delete voiceStreams[data.voiceId]
+        stream.end()
+      }
+    }
+  }
+
+  self.addEventListener('message', (ev) => {
+    try {
+      onMessage(ev.data)
+    } catch (ex) {
+      console.error('exception during message event', ev.data, ex)
+    }
+  })
+}

+ 2 - 1
package.json

@@ -42,10 +42,11 @@
     "regexp-replace-loader": "0.0.1",
     "sass-loader": "^4.1.1",
     "stream-chunker": "^1.2.8",
+    "to-arraybuffer": "^1.0.1",
     "transform-loader": "^0.2.3",
     "voice-activity-detection": "johni0702/voice-activity-detection#9f8bd90",
     "webpack": "^1.13.3",
-    "webworkify-webpack-dropin": "^1.1.9",
+    "webworkify-webpack": "^1.1.8",
     "libsamplerate.js": "^1.0.0",
     "mumble-client-codecs-browser": "^1.2.0",
     "mumble-client-websocket": "^1.0.0",

+ 1 - 1
webpack.config.js

@@ -89,7 +89,7 @@ module.exports = {
   },
   resolve: {
     alias: {
-      webworkify: 'webworkify-webpack-dropin'
+      webworkify: 'webworkify-webpack'
     },
     root: [
       path.resolve('./themes/')