stream.js 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. // @ts-check
  2. import WebSocketClient from '@gamestdio/websocket';
  3. /**
  4. * @type {WebSocketClient | undefined}
  5. */
  6. let sharedConnection;
  7. /**
  8. * @typedef Subscription
  9. * @property {string} channelName
  10. * @property {Object.<string, string>} params
  11. * @property {function(): void} onConnect
  12. * @property {function(StreamEvent): void} onReceive
  13. * @property {function(): void} onDisconnect
  14. */
  15. /**
  16. * @typedef StreamEvent
  17. * @property {string} event
  18. * @property {object} payload
  19. */
  20. /**
  21. * @type {Array.<Subscription>}
  22. */
  23. const subscriptions = [];
  24. /**
  25. * @type {Object.<string, number>}
  26. */
  27. const subscriptionCounters = {};
  28. /**
  29. * @param {Subscription} subscription
  30. */
  31. const addSubscription = subscription => {
  32. subscriptions.push(subscription);
  33. };
  34. /**
  35. * @param {Subscription} subscription
  36. */
  37. const removeSubscription = subscription => {
  38. const index = subscriptions.indexOf(subscription);
  39. if (index !== -1) {
  40. subscriptions.splice(index, 1);
  41. }
  42. };
  43. /**
  44. * @param {Subscription} subscription
  45. */
  46. const subscribe = ({ channelName, params, onConnect }) => {
  47. const key = channelNameWithInlineParams(channelName, params);
  48. subscriptionCounters[key] = subscriptionCounters[key] || 0;
  49. if (subscriptionCounters[key] === 0) {
  50. // @ts-expect-error
  51. sharedConnection.send(JSON.stringify({ type: 'subscribe', stream: channelName, ...params }));
  52. }
  53. subscriptionCounters[key] += 1;
  54. onConnect();
  55. };
  56. /**
  57. * @param {Subscription} subscription
  58. */
  59. const unsubscribe = ({ channelName, params, onDisconnect }) => {
  60. const key = channelNameWithInlineParams(channelName, params);
  61. subscriptionCounters[key] = subscriptionCounters[key] || 1;
  62. // @ts-expect-error
  63. if (subscriptionCounters[key] === 1 && sharedConnection.readyState === WebSocketClient.OPEN) {
  64. // @ts-expect-error
  65. sharedConnection.send(JSON.stringify({ type: 'unsubscribe', stream: channelName, ...params }));
  66. }
  67. subscriptionCounters[key] -= 1;
  68. onDisconnect();
  69. };
  70. const sharedCallbacks = {
  71. connected() {
  72. subscriptions.forEach(subscription => subscribe(subscription));
  73. },
  74. // @ts-expect-error
  75. received(data) {
  76. const { stream } = data;
  77. subscriptions.filter(({ channelName, params }) => {
  78. const streamChannelName = stream[0];
  79. if (stream.length === 1) {
  80. return channelName === streamChannelName;
  81. }
  82. const streamIdentifier = stream[1];
  83. if (['hashtag', 'hashtag:local'].includes(channelName)) {
  84. return channelName === streamChannelName && params.tag === streamIdentifier;
  85. } else if (channelName === 'list') {
  86. return channelName === streamChannelName && params.list === streamIdentifier;
  87. }
  88. return false;
  89. }).forEach(subscription => {
  90. subscription.onReceive(data);
  91. });
  92. },
  93. disconnected() {
  94. subscriptions.forEach(subscription => unsubscribe(subscription));
  95. },
  96. reconnected() {
  97. },
  98. };
  99. /**
  100. * @param {string} channelName
  101. * @param {Object.<string, string>} params
  102. * @returns {string}
  103. */
  104. const channelNameWithInlineParams = (channelName, params) => {
  105. if (Object.keys(params).length === 0) {
  106. return channelName;
  107. }
  108. return `${channelName}&${Object.keys(params).map(key => `${key}=${params[key]}`).join('&')}`;
  109. };
  110. /**
  111. * @param {string} channelName
  112. * @param {Object.<string, string>} params
  113. * @param {function(Function, Function): { onConnect: (function(): void), onReceive: (function(StreamEvent): void), onDisconnect: (function(): void) }} callbacks
  114. * @returns {function(): void}
  115. */
  116. // @ts-expect-error
  117. export const connectStream = (channelName, params, callbacks) => (dispatch, getState) => {
  118. const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
  119. const accessToken = getState().getIn(['meta', 'access_token']);
  120. const { onConnect, onReceive, onDisconnect } = callbacks(dispatch, getState);
  121. // If we cannot use a websockets connection, we must fall back
  122. // to using individual connections for each channel
  123. if (!streamingAPIBaseURL.startsWith('ws')) {
  124. const connection = createConnection(streamingAPIBaseURL, accessToken, channelNameWithInlineParams(channelName, params), {
  125. connected() {
  126. onConnect();
  127. },
  128. received(data) {
  129. onReceive(data);
  130. },
  131. disconnected() {
  132. onDisconnect();
  133. },
  134. reconnected() {
  135. onConnect();
  136. },
  137. });
  138. return () => {
  139. connection.close();
  140. };
  141. }
  142. const subscription = {
  143. channelName,
  144. params,
  145. onConnect,
  146. onReceive,
  147. onDisconnect,
  148. };
  149. addSubscription(subscription);
  150. // If a connection is open, we can execute the subscription right now. Otherwise,
  151. // because we have already registered it, it will be executed on connect
  152. if (!sharedConnection) {
  153. sharedConnection = /** @type {WebSocketClient} */ (createConnection(streamingAPIBaseURL, accessToken, '', sharedCallbacks));
  154. } else if (sharedConnection.readyState === WebSocketClient.OPEN) {
  155. subscribe(subscription);
  156. }
  157. return () => {
  158. removeSubscription(subscription);
  159. unsubscribe(subscription);
  160. };
  161. };
  162. const KNOWN_EVENT_TYPES = [
  163. 'update',
  164. 'delete',
  165. 'notification',
  166. 'conversation',
  167. 'filters_changed',
  168. 'encrypted_message',
  169. 'announcement',
  170. 'announcement.delete',
  171. 'announcement.reaction',
  172. ];
  173. /**
  174. * @param {MessageEvent} e
  175. * @param {function(StreamEvent): void} received
  176. */
  177. const handleEventSourceMessage = (e, received) => {
  178. received({
  179. event: e.type,
  180. payload: e.data,
  181. });
  182. };
  183. /**
  184. * @param {string} streamingAPIBaseURL
  185. * @param {string} accessToken
  186. * @param {string} channelName
  187. * @param {{ connected: Function, received: function(StreamEvent): void, disconnected: Function, reconnected: Function }} callbacks
  188. * @returns {WebSocketClient | EventSource}
  189. */
  190. const createConnection = (streamingAPIBaseURL, accessToken, channelName, { connected, received, disconnected, reconnected }) => {
  191. const params = channelName.split('&');
  192. // @ts-expect-error
  193. channelName = params.shift();
  194. if (streamingAPIBaseURL.startsWith('ws')) {
  195. // @ts-expect-error
  196. const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
  197. // @ts-expect-error
  198. ws.onopen = connected;
  199. ws.onmessage = e => received(JSON.parse(e.data));
  200. // @ts-expect-error
  201. ws.onclose = disconnected;
  202. // @ts-expect-error
  203. ws.onreconnect = reconnected;
  204. return ws;
  205. }
  206. channelName = channelName.replace(/:/g, '/');
  207. if (channelName.endsWith(':media')) {
  208. channelName = channelName.replace('/media', '');
  209. params.push('only_media=true');
  210. }
  211. params.push(`access_token=${accessToken}`);
  212. const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${channelName}?${params.join('&')}`);
  213. es.onopen = () => {
  214. connected();
  215. };
  216. KNOWN_EVENT_TYPES.forEach(type => {
  217. es.addEventListener(type, e => handleEventSourceMessage(/** @type {MessageEvent} */(e), received));
  218. });
  219. es.onerror = /** @type {function(): void} */ (disconnected);
  220. return es;
  221. };