123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- // @ts-check
- import WebSocketClient from '@gamestdio/websocket';
- /**
- * @type {WebSocketClient | undefined}
- */
- let sharedConnection;
- /**
- * @typedef Subscription
- * @property {string} channelName
- * @property {Object.<string, string>} params
- * @property {function(): void} onConnect
- * @property {function(StreamEvent): void} onReceive
- * @property {function(): void} onDisconnect
- */
- /**
- * @typedef StreamEvent
- * @property {string} event
- * @property {object} payload
- */
- /**
- * @type {Array.<Subscription>}
- */
- const subscriptions = [];
- /**
- * @type {Object.<string, number>}
- */
- const subscriptionCounters = {};
- /**
- * @param {Subscription} subscription
- */
- const addSubscription = subscription => {
- subscriptions.push(subscription);
- };
- /**
- * @param {Subscription} subscription
- */
- const removeSubscription = subscription => {
- const index = subscriptions.indexOf(subscription);
- if (index !== -1) {
- subscriptions.splice(index, 1);
- }
- };
- /**
- * @param {Subscription} subscription
- */
- const subscribe = ({ channelName, params, onConnect }) => {
- const key = channelNameWithInlineParams(channelName, params);
- subscriptionCounters[key] = subscriptionCounters[key] || 0;
- if (subscriptionCounters[key] === 0) {
- sharedConnection.send(JSON.stringify({ type: 'subscribe', stream: channelName, ...params }));
- }
- subscriptionCounters[key] += 1;
- onConnect();
- };
- /**
- * @param {Subscription} subscription
- */
- const unsubscribe = ({ channelName, params, onDisconnect }) => {
- const key = channelNameWithInlineParams(channelName, params);
- subscriptionCounters[key] = subscriptionCounters[key] || 1;
- if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) {
- sharedConnection.send(JSON.stringify({ type: 'unsubscribe', stream: channelName, ...params }));
- }
- subscriptionCounters[key] -= 1;
- onDisconnect();
- };
- const sharedCallbacks = {
- connected () {
- subscriptions.forEach(subscription => subscribe(subscription));
- },
- received (data) {
- const { stream } = data;
- subscriptions.filter(({ channelName, params }) => {
- const streamChannelName = stream[0];
- if (stream.length === 1) {
- return channelName === streamChannelName;
- }
- const streamIdentifier = stream[1];
- if (['hashtag', 'hashtag:local'].includes(channelName)) {
- return channelName === streamChannelName && params.tag === streamIdentifier;
- } else if (channelName === 'list') {
- return channelName === streamChannelName && params.list === streamIdentifier;
- }
- return false;
- }).forEach(subscription => {
- subscription.onReceive(data);
- });
- },
- disconnected () {
- subscriptions.forEach(subscription => unsubscribe(subscription));
- },
- reconnected () {
- },
- };
- /**
- * @param {string} channelName
- * @param {Object.<string, string>} params
- * @return {string}
- */
- const channelNameWithInlineParams = (channelName, params) => {
- if (Object.keys(params).length === 0) {
- return channelName;
- }
- return `${channelName}&${Object.keys(params).map(key => `${key}=${params[key]}`).join('&')}`;
- };
- /**
- * @param {string} channelName
- * @param {Object.<string, string>} params
- * @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks
- * @return {function(): void}
- */
- export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => {
- const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
- const accessToken = getState().getIn(['meta', 'access_token']);
- const { onConnect, onReceive, onDisconnect } = callbacks(dispatch, getState);
- // If we cannot use a websockets connection, we must fall back
- // to using individual connections for each channel
- if (!streamingAPIBaseURL.startsWith('ws')) {
- const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), {
- connected () {
- onConnect();
- },
- received (data) {
- onReceive(data);
- },
- disconnected () {
- onDisconnect();
- },
- reconnected () {
- onConnect();
- },
- });
- return () => {
- connection.close();
- };
- }
- const subscription = {
- channelName,
- params,
- onConnect,
- onReceive,
- onDisconnect,
- };
- addSubscription(subscription);
- // If a connection is open, we can execute the subscription right now. Otherwise,
- // because we have already registered it, it will be executed on connect
- if (!sharedConnection) {
- sharedConnection = /** @type {WebSocketClient} */ (createConnection(streamingAPIBaseURL, accessToken, '', sharedCallbacks));
- } else if (sharedConnection.readyState === WebSocketClient.OPEN) {
- subscribe(subscription);
- }
- return () => {
- removeSubscription(subscription);
- unsubscribe(subscription);
- };
- };
- const KNOWN_EVENT_TYPES = [
- 'update',
- 'delete',
- 'notification',
- 'conversation',
- 'filters_changed',
- 'encrypted_message',
- 'announcement',
- 'announcement.delete',
- 'announcement.reaction',
- ];
- /**
- * @param {MessageEvent} e
- * @param {function(StreamEvent): void} received
- */
- const handleEventSourceMessage = (e, received) => {
- received({
- event: e.type,
- payload: e.data,
- });
- };
- /**
- * @param {string} streamingAPIBaseURL
- * @param {string} accessToken
- * @param {string} channelName
- * @param {{ connected: Function, received: function(StreamEvent): void, disconnected: Function, reconnected: Function }} callbacks
- * @return {WebSocketClient | EventSource}
- */
- const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => {
- const params = channelName.split('&');
- channelName = params.shift();
- if (streamingAPIBaseURL.startsWith('ws')) {
- const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
- ws.onopen = connected;
- ws.onmessage = e => received(JSON.parse(e.data));
- ws.onclose = disconnected;
- ws.onreconnect = reconnected;
- return ws;
- }
- channelName = channelName.replace(/:/g, '/');
- if (channelName.endsWith(':media')) {
- channelName = channelName.replace('/media', '');
- params.push('only_media=true');
- }
- params.push(`access_token=${accessToken}`);
- const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${channelName}?${params.join('&')}`);
- es.onopen = () => {
- connected();
- };
- KNOWN_EVENT_TYPES.forEach(type => {
- es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */ (e), received));
- });
- es.onerror = /** @type {function(): void} */ (disconnected);
- return es;
- };
|