index.js 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522
  1. // @ts-check
  2. const fs = require('fs');
  3. const http = require('http');
  4. const url = require('url');
  5. const dotenv = require('dotenv');
  6. const express = require('express');
  7. const Redis = require('ioredis');
  8. const { JSDOM } = require('jsdom');
  9. const log = require('npmlog');
  10. const pg = require('pg');
  11. const dbUrlToConfig = require('pg-connection-string').parse;
  12. const metrics = require('prom-client');
  13. const uuid = require('uuid');
  14. const WebSocket = require('ws');
  15. const environment = process.env.NODE_ENV || 'development';
  16. dotenv.config({
  17. path: environment === 'production' ? '.env.production' : '.env',
  18. });
  19. log.level = process.env.LOG_LEVEL || 'verbose';
  20. /**
  21. * @param {Object.<string, any>} config
  22. */
  23. const createRedisClient = async (config) => {
  24. const { redisParams, redisUrl } = config;
  25. const client = new Redis(redisUrl, redisParams);
  26. client.on('error', (err) => log.error('Redis Client Error!', err));
  27. return client;
  28. };
  29. /**
  30. * Attempts to safely parse a string as JSON, used when both receiving a message
  31. * from redis and when receiving a message from a client over a websocket
  32. * connection, this is why it accepts a `req` argument.
  33. * @param {string} json
  34. * @param {any?} req
  35. * @returns {Object.<string, any>|null}
  36. */
  37. const parseJSON = (json, req) => {
  38. try {
  39. return JSON.parse(json);
  40. } catch (err) {
  41. /* FIXME: This logging isn't great, and should probably be done at the
  42. * call-site of parseJSON, not in the method, but this would require changing
  43. * the signature of parseJSON to return something akin to a Result type:
  44. * [Error|null, null|Object<string,any}], and then handling the error
  45. * scenarios.
  46. */
  47. if (req) {
  48. if (req.accountId) {
  49. log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
  50. } else {
  51. log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
  52. }
  53. } else {
  54. log.warn(`Error parsing message from redis: ${err}`);
  55. }
  56. return null;
  57. }
  58. };
  59. /**
  60. * @param {Object.<string, any>} env the `process.env` value to read configuration from
  61. * @returns {Object.<string, any>} the configuration for the PostgreSQL connection
  62. */
  63. const pgConfigFromEnv = (env) => {
  64. const pgConfigs = {
  65. development: {
  66. user: env.DB_USER || pg.defaults.user,
  67. password: env.DB_PASS || pg.defaults.password,
  68. database: env.DB_NAME || 'mastodon_development',
  69. host: env.DB_HOST || pg.defaults.host,
  70. port: env.DB_PORT || pg.defaults.port,
  71. },
  72. production: {
  73. user: env.DB_USER || 'mastodon',
  74. password: env.DB_PASS || '',
  75. database: env.DB_NAME || 'mastodon_production',
  76. host: env.DB_HOST || 'localhost',
  77. port: env.DB_PORT || 5432,
  78. },
  79. };
  80. let baseConfig;
  81. if (env.DATABASE_URL) {
  82. baseConfig = dbUrlToConfig(env.DATABASE_URL);
  83. // Support overriding the database password in the connection URL
  84. if (!baseConfig.password && env.DB_PASS) {
  85. baseConfig.password = env.DB_PASS;
  86. }
  87. } else {
  88. baseConfig = pgConfigs[environment];
  89. if (env.DB_SSLMODE) {
  90. switch(env.DB_SSLMODE) {
  91. case 'disable':
  92. case '':
  93. baseConfig.ssl = false;
  94. break;
  95. case 'no-verify':
  96. baseConfig.ssl = { rejectUnauthorized: false };
  97. break;
  98. default:
  99. baseConfig.ssl = {};
  100. break;
  101. }
  102. }
  103. }
  104. return {
  105. ...baseConfig,
  106. max: env.DB_POOL || 10,
  107. connectionTimeoutMillis: 15000,
  108. application_name: '',
  109. };
  110. };
  111. /**
  112. * @param {Object.<string, any>} env the `process.env` value to read configuration from
  113. * @returns {Object.<string, any>} configuration for the Redis connection
  114. */
  115. const redisConfigFromEnv = (env) => {
  116. // ioredis *can* transparently add prefixes for us, but it doesn't *in some cases*,
  117. // which means we can't use it. But this is something that should be looked into.
  118. const redisPrefix = env.REDIS_NAMESPACE ? `${env.REDIS_NAMESPACE}:` : '';
  119. const redisParams = {
  120. host: env.REDIS_HOST || '127.0.0.1',
  121. port: env.REDIS_PORT || 6379,
  122. db: env.REDIS_DB || 0,
  123. password: env.REDIS_PASSWORD || undefined,
  124. };
  125. // redisParams.path takes precedence over host and port.
  126. if (env.REDIS_URL && env.REDIS_URL.startsWith('unix://')) {
  127. redisParams.path = env.REDIS_URL.slice(7);
  128. }
  129. return {
  130. redisParams,
  131. redisPrefix,
  132. redisUrl: env.REDIS_URL,
  133. };
  134. };
  135. const PUBLIC_CHANNELS = [
  136. 'public',
  137. 'public:media',
  138. 'public:local',
  139. 'public:local:media',
  140. 'public:remote',
  141. 'public:remote:media',
  142. 'hashtag',
  143. 'hashtag:local',
  144. ];
  145. // Used for priming the counters/gauges for the various metrics that are
  146. // per-channel
  147. const CHANNEL_NAMES = [
  148. 'system',
  149. 'user',
  150. 'user:notification',
  151. 'list',
  152. 'direct',
  153. ...PUBLIC_CHANNELS
  154. ];
  155. const startServer = async () => {
  156. const app = express();
  157. app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');
  158. const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
  159. const server = http.createServer(app);
  160. /**
  161. * @type {Object.<string, Array.<function(Object<string, any>): void>>}
  162. */
  163. const subs = {};
  164. const redisConfig = redisConfigFromEnv(process.env);
  165. const redisSubscribeClient = await createRedisClient(redisConfig);
  166. const redisClient = await createRedisClient(redisConfig);
  167. const { redisPrefix } = redisConfig;
  168. // Collect metrics from Node.js
  169. metrics.collectDefaultMetrics();
  170. new metrics.Gauge({
  171. name: 'pg_pool_total_connections',
  172. help: 'The total number of clients existing within the pool',
  173. collect() {
  174. this.set(pgPool.totalCount);
  175. },
  176. });
  177. new metrics.Gauge({
  178. name: 'pg_pool_idle_connections',
  179. help: 'The number of clients which are not checked out but are currently idle in the pool',
  180. collect() {
  181. this.set(pgPool.idleCount);
  182. },
  183. });
  184. new metrics.Gauge({
  185. name: 'pg_pool_waiting_queries',
  186. help: 'The number of queued requests waiting on a client when all clients are checked out',
  187. collect() {
  188. this.set(pgPool.waitingCount);
  189. },
  190. });
  191. const connectedClients = new metrics.Gauge({
  192. name: 'connected_clients',
  193. help: 'The number of clients connected to the streaming server',
  194. labelNames: ['type'],
  195. });
  196. const connectedChannels = new metrics.Gauge({
  197. name: 'connected_channels',
  198. help: 'The number of channels the streaming server is streaming to',
  199. labelNames: [ 'type', 'channel' ]
  200. });
  201. const redisSubscriptions = new metrics.Gauge({
  202. name: 'redis_subscriptions',
  203. help: 'The number of Redis channels the streaming server is subscribed to',
  204. });
  205. const redisMessagesReceived = new metrics.Counter({
  206. name: 'redis_messages_received_total',
  207. help: 'The total number of messages the streaming server has received from redis subscriptions'
  208. });
  209. const messagesSent = new metrics.Counter({
  210. name: 'messages_sent_total',
  211. help: 'The total number of messages the streaming server sent to clients per connection type',
  212. labelNames: [ 'type' ]
  213. });
  214. // Prime the gauges so we don't loose metrics between restarts:
  215. redisSubscriptions.set(0);
  216. connectedClients.set({ type: 'websocket' }, 0);
  217. connectedClients.set({ type: 'eventsource' }, 0);
  218. // For each channel, initialize the gauges at zero; There's only a finite set of channels available
  219. CHANNEL_NAMES.forEach(( channel ) => {
  220. connectedChannels.set({ type: 'websocket', channel }, 0);
  221. connectedChannels.set({ type: 'eventsource', channel }, 0);
  222. })
  223. // Prime the counters so that we don't loose metrics between restarts.
  224. // Unfortunately counters don't support the set() API, so instead I'm using
  225. // inc(0) to achieve the same result.
  226. redisMessagesReceived.inc(0);
  227. messagesSent.inc({ type: 'websocket' }, 0);
  228. messagesSent.inc({ type: 'eventsource' }, 0);
  229. // When checking metrics in the browser, the favicon is requested this
  230. // prevents the request from falling through to the API Router, which would
  231. // error for this endpoint:
  232. app.get('/favicon.ico', (req, res) => res.status(404).end());
  233. app.get('/api/v1/streaming/health', (req, res) => {
  234. res.writeHead(200, { 'Content-Type': 'text/plain' });
  235. res.end('OK');
  236. });
  237. app.get('/metrics', async (req, res) => {
  238. try {
  239. res.set('Content-Type', metrics.register.contentType);
  240. res.end(await metrics.register.metrics());
  241. } catch (ex) {
  242. log.error(ex);
  243. res.status(500).end();
  244. }
  245. });
  246. /**
  247. * @param {string[]} channels
  248. * @returns {function(): void}
  249. */
  250. const subscriptionHeartbeat = channels => {
  251. const interval = 6 * 60;
  252. const tellSubscribed = () => {
  253. channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
  254. };
  255. tellSubscribed();
  256. const heartbeat = setInterval(tellSubscribed, interval * 1000);
  257. return () => {
  258. clearInterval(heartbeat);
  259. };
  260. };
  261. /**
  262. * @param {string} channel
  263. * @param {string} message
  264. */
  265. const onRedisMessage = (channel, message) => {
  266. redisMessagesReceived.inc();
  267. const callbacks = subs[channel];
  268. log.silly(`New message on channel ${redisPrefix}${channel}`);
  269. if (!callbacks) {
  270. return;
  271. }
  272. const json = parseJSON(message, null);
  273. if (!json) return;
  274. callbacks.forEach(callback => callback(json));
  275. };
  276. redisSubscribeClient.on("message", onRedisMessage);
  277. /**
  278. * @callback SubscriptionListener
  279. * @param {ReturnType<parseJSON>} json of the message
  280. * @returns void
  281. */
  282. /**
  283. * @param {string} channel
  284. * @param {SubscriptionListener} callback
  285. */
  286. const subscribe = (channel, callback) => {
  287. log.silly(`Adding listener for ${channel}`);
  288. subs[channel] = subs[channel] || [];
  289. if (subs[channel].length === 0) {
  290. log.verbose(`Subscribe ${channel}`);
  291. redisSubscribeClient.subscribe(channel, (err, count) => {
  292. if (err) {
  293. log.error(`Error subscribing to ${channel}`);
  294. }
  295. else {
  296. redisSubscriptions.set(count);
  297. }
  298. });
  299. }
  300. subs[channel].push(callback);
  301. };
  302. /**
  303. * @param {string} channel
  304. * @param {SubscriptionListener} callback
  305. */
  306. const unsubscribe = (channel, callback) => {
  307. log.silly(`Removing listener for ${channel}`);
  308. if (!subs[channel]) {
  309. return;
  310. }
  311. subs[channel] = subs[channel].filter(item => item !== callback);
  312. if (subs[channel].length === 0) {
  313. log.verbose(`Unsubscribe ${channel}`);
  314. redisSubscribeClient.unsubscribe(channel, (err, count) => {
  315. if (err) {
  316. log.error(`Error unsubscribing to ${channel}`);
  317. }
  318. else {
  319. redisSubscriptions.set(count);
  320. }
  321. });
  322. delete subs[channel];
  323. }
  324. };
  325. const FALSE_VALUES = [
  326. false,
  327. 0,
  328. '0',
  329. 'f',
  330. 'F',
  331. 'false',
  332. 'FALSE',
  333. 'off',
  334. 'OFF',
  335. ];
  336. /**
  337. * @param {any} value
  338. * @returns {boolean}
  339. */
  340. const isTruthy = value =>
  341. value && !FALSE_VALUES.includes(value);
  342. /**
  343. * @param {any} req
  344. * @param {any} res
  345. * @param {function(Error=): void} next
  346. */
  347. const allowCrossDomain = (req, res, next) => {
  348. res.header('Access-Control-Allow-Origin', '*');
  349. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  350. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  351. next();
  352. };
  353. /**
  354. * @param {any} req
  355. * @param {any} res
  356. * @param {function(Error=): void} next
  357. */
  358. const setRequestId = (req, res, next) => {
  359. req.requestId = uuid.v4();
  360. res.header('X-Request-Id', req.requestId);
  361. next();
  362. };
  363. /**
  364. * @param {any} req
  365. * @param {any} res
  366. * @param {function(Error=): void} next
  367. */
  368. const setRemoteAddress = (req, res, next) => {
  369. req.remoteAddress = req.connection.remoteAddress;
  370. next();
  371. };
  372. /**
  373. * @param {any} req
  374. * @param {string[]} necessaryScopes
  375. * @returns {boolean}
  376. */
  377. const isInScope = (req, necessaryScopes) =>
  378. req.scopes.some(scope => necessaryScopes.includes(scope));
  379. /**
  380. * @param {string} token
  381. * @param {any} req
  382. * @returns {Promise.<void>}
  383. */
  384. const accountFromToken = (token, req) => new Promise((resolve, reject) => {
  385. pgPool.connect((err, client, done) => {
  386. if (err) {
  387. reject(err);
  388. return;
  389. }
  390. client.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  391. done();
  392. if (err) {
  393. reject(err);
  394. return;
  395. }
  396. if (result.rows.length === 0) {
  397. err = new Error('Invalid access token');
  398. err.status = 401;
  399. reject(err);
  400. return;
  401. }
  402. req.accessTokenId = result.rows[0].id;
  403. req.scopes = result.rows[0].scopes.split(' ');
  404. req.accountId = result.rows[0].account_id;
  405. req.chosenLanguages = result.rows[0].chosen_languages;
  406. req.deviceId = result.rows[0].device_id;
  407. resolve();
  408. });
  409. });
  410. });
  411. /**
  412. * @param {any} req
  413. * @returns {Promise.<void>}
  414. */
  415. const accountFromRequest = (req) => new Promise((resolve, reject) => {
  416. const authorization = req.headers.authorization;
  417. const location = url.parse(req.url, true);
  418. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  419. if (!authorization && !accessToken) {
  420. const err = new Error('Missing access token');
  421. err.status = 401;
  422. reject(err);
  423. return;
  424. }
  425. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  426. resolve(accountFromToken(token, req));
  427. });
  428. /**
  429. * @param {any} req
  430. * @returns {string|undefined}
  431. */
  432. const channelNameFromPath = req => {
  433. const { path, query } = req;
  434. const onlyMedia = isTruthy(query.only_media);
  435. switch (path) {
  436. case '/api/v1/streaming/user':
  437. return 'user';
  438. case '/api/v1/streaming/user/notification':
  439. return 'user:notification';
  440. case '/api/v1/streaming/public':
  441. return onlyMedia ? 'public:media' : 'public';
  442. case '/api/v1/streaming/public/local':
  443. return onlyMedia ? 'public:local:media' : 'public:local';
  444. case '/api/v1/streaming/public/remote':
  445. return onlyMedia ? 'public:remote:media' : 'public:remote';
  446. case '/api/v1/streaming/hashtag':
  447. return 'hashtag';
  448. case '/api/v1/streaming/hashtag/local':
  449. return 'hashtag:local';
  450. case '/api/v1/streaming/direct':
  451. return 'direct';
  452. case '/api/v1/streaming/list':
  453. return 'list';
  454. default:
  455. return undefined;
  456. }
  457. };
  458. /**
  459. * @param {any} req
  460. * @param {string|undefined} channelName
  461. * @returns {Promise.<void>}
  462. */
  463. const checkScopes = (req, channelName) => new Promise((resolve, reject) => {
  464. log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`);
  465. // When accessing public channels, no scopes are needed
  466. if (PUBLIC_CHANNELS.includes(channelName)) {
  467. resolve();
  468. return;
  469. }
  470. // The `read` scope has the highest priority, if the token has it
  471. // then it can access all streams
  472. const requiredScopes = ['read'];
  473. // When accessing specifically the notifications stream,
  474. // we need a read:notifications, while in all other cases,
  475. // we can allow access with read:statuses. Mind that the
  476. // user stream will not contain notifications unless
  477. // the token has either read or read:notifications scope
  478. // as well, this is handled separately.
  479. if (channelName === 'user:notification') {
  480. requiredScopes.push('read:notifications');
  481. } else {
  482. requiredScopes.push('read:statuses');
  483. }
  484. if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) {
  485. resolve();
  486. return;
  487. }
  488. const err = new Error('Access token does not cover required scopes');
  489. err.status = 401;
  490. reject(err);
  491. });
  492. /**
  493. * @param {any} info
  494. * @param {function(boolean, number, string): void} callback
  495. */
  496. const wsVerifyClient = (info, callback) => {
  497. // When verifying the websockets connection, we no longer pre-emptively
  498. // check OAuth scopes and drop the connection if they're missing. We only
  499. // drop the connection if access without token is not allowed by environment
  500. // variables. OAuth scope checks are moved to the point of subscription
  501. // to a specific stream.
  502. accountFromRequest(info.req).then(() => {
  503. callback(true, undefined, undefined);
  504. }).catch(err => {
  505. log.error(info.req.requestId, err.toString());
  506. callback(false, 401, 'Unauthorized');
  507. });
  508. };
  509. /**
  510. * @typedef SystemMessageHandlers
  511. * @property {function(): void} onKill
  512. */
  513. /**
  514. * @param {any} req
  515. * @param {SystemMessageHandlers} eventHandlers
  516. * @returns {function(object): void}
  517. */
  518. const createSystemMessageListener = (req, eventHandlers) => {
  519. return message => {
  520. const { event } = message;
  521. log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
  522. if (event === 'kill') {
  523. log.verbose(req.requestId, `Closing connection for ${req.accountId} due to expired access token`);
  524. eventHandlers.onKill();
  525. } else if (event === 'filters_changed') {
  526. log.verbose(req.requestId, `Invalidating filters cache for ${req.accountId}`);
  527. req.cachedFilters = null;
  528. }
  529. };
  530. };
  531. /**
  532. * @param {any} req
  533. * @param {any} res
  534. */
  535. const subscribeHttpToSystemChannel = (req, res) => {
  536. const accessTokenChannelId = `timeline:access_token:${req.accessTokenId}`;
  537. const systemChannelId = `timeline:system:${req.accountId}`;
  538. const listener = createSystemMessageListener(req, {
  539. onKill() {
  540. res.end();
  541. },
  542. });
  543. res.on('close', () => {
  544. unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  545. unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
  546. connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2);
  547. });
  548. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  549. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  550. connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2);
  551. };
  552. /**
  553. * @param {any} req
  554. * @param {any} res
  555. * @param {function(Error=): void} next
  556. */
  557. const authenticationMiddleware = (req, res, next) => {
  558. if (req.method === 'OPTIONS') {
  559. next();
  560. return;
  561. }
  562. const channelName = channelNameFromPath(req);
  563. // If no channelName can be found for the request, then we should terminate
  564. // the connection, as there's nothing to stream back
  565. if (!channelName) {
  566. const err = new Error('Unknown channel requested');
  567. err.status = 400;
  568. next(err);
  569. return;
  570. }
  571. accountFromRequest(req).then(() => checkScopes(req, channelName)).then(() => {
  572. subscribeHttpToSystemChannel(req, res);
  573. }).then(() => {
  574. next();
  575. }).catch(err => {
  576. next(err);
  577. });
  578. };
  579. /**
  580. * @param {Error} err
  581. * @param {any} req
  582. * @param {any} res
  583. * @param {function(Error=): void} next
  584. */
  585. const errorMiddleware = (err, req, res, next) => {
  586. log.error(req.requestId, err.toString());
  587. if (res.headersSent) {
  588. next(err);
  589. return;
  590. }
  591. res.writeHead(err.status || 500, { 'Content-Type': 'application/json' });
  592. res.end(JSON.stringify({ error: err.status ? err.toString() : 'An unexpected error occurred' }));
  593. };
  594. /**
  595. * @param {array} arr
  596. * @param {number=} shift
  597. * @returns {string}
  598. */
  599. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  600. /**
  601. * @param {string} listId
  602. * @param {any} req
  603. * @returns {Promise.<void>}
  604. */
  605. const authorizeListAccess = (listId, req) => new Promise((resolve, reject) => {
  606. const { accountId } = req;
  607. pgPool.connect((err, client, done) => {
  608. if (err) {
  609. reject();
  610. return;
  611. }
  612. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [listId], (err, result) => {
  613. done();
  614. if (err || result.rows.length === 0 || result.rows[0].account_id !== accountId) {
  615. reject();
  616. return;
  617. }
  618. resolve();
  619. });
  620. });
  621. });
  622. /**
  623. * @param {string[]} ids
  624. * @param {any} req
  625. * @param {function(string, string): void} output
  626. * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
  627. * @param {'websocket' | 'eventsource'} destinationType
  628. * @param {boolean=} needsFiltering
  629. * @returns {SubscriptionListener}
  630. */
  631. const streamFrom = (ids, req, output, attachCloseHandler, destinationType, needsFiltering = false) => {
  632. const accountId = req.accountId || req.remoteAddress;
  633. log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
  634. const transmit = (event, payload) => {
  635. // TODO: Replace "string"-based delete payloads with object payloads:
  636. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  637. messagesSent.labels({ type: destinationType }).inc(1);
  638. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
  639. output(event, encodedPayload);
  640. };
  641. // The listener used to process each message off the redis subscription,
  642. // message here is an object with an `event` and `payload` property. Some
  643. // events also include a queued_at value, but this is being removed shortly.
  644. /** @type {SubscriptionListener} */
  645. const listener = message => {
  646. const { event, payload } = message;
  647. // Streaming only needs to apply filtering to some channels and only to
  648. // some events. This is because majority of the filtering happens on the
  649. // Ruby on Rails side when producing the event for streaming.
  650. //
  651. // The only events that require filtering from the streaming server are
  652. // `update` and `status.update`, all other events are transmitted to the
  653. // client as soon as they're received (pass-through).
  654. //
  655. // The channels that need filtering are determined in the function
  656. // `channelNameToIds` defined below:
  657. if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
  658. transmit(event, payload);
  659. return;
  660. }
  661. // The rest of the logic from here on in this function is to handle
  662. // filtering of statuses:
  663. // Filter based on language:
  664. if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
  665. log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
  666. return;
  667. }
  668. // When the account is not logged in, it is not necessary to confirm the block or mute
  669. if (!req.accountId) {
  670. transmit(event, payload);
  671. return;
  672. }
  673. // Filter based on domain blocks, blocks, mutes, or custom filters:
  674. const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
  675. const accountDomain = payload.account.acct.split('@')[1];
  676. // TODO: Move this logic out of the message handling loop
  677. pgPool.connect((err, client, releasePgConnection) => {
  678. if (err) {
  679. log.error(err);
  680. return;
  681. }
  682. const queries = [
  683. client.query(`SELECT 1
  684. FROM blocks
  685. WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
  686. OR (account_id = $2 AND target_account_id = $1)
  687. UNION
  688. SELECT 1
  689. FROM mutes
  690. WHERE account_id = $1
  691. AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
  692. ];
  693. if (accountDomain) {
  694. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  695. }
  696. if (!payload.filtered && !req.cachedFilters) {
  697. queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
  698. }
  699. Promise.all(queries).then(values => {
  700. releasePgConnection();
  701. // Handling blocks & mutes and domain blocks: If one of those applies,
  702. // then we don't transmit the payload of the event to the client
  703. if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
  704. return;
  705. }
  706. // If the payload already contains the `filtered` property, it means
  707. // that filtering has been applied on the ruby on rails side, as
  708. // such, we don't need to construct or apply the filters in streaming:
  709. if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
  710. transmit(event, payload);
  711. return;
  712. }
  713. // Handling for constructing the custom filters and caching them on the request
  714. // TODO: Move this logic out of the message handling lifecycle
  715. if (!req.cachedFilters) {
  716. const filterRows = values[accountDomain ? 2 : 1].rows;
  717. req.cachedFilters = filterRows.reduce((cache, filter) => {
  718. if (cache[filter.id]) {
  719. cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
  720. } else {
  721. cache[filter.id] = {
  722. keywords: [[filter.keyword, filter.whole_word]],
  723. expires_at: filter.expires_at,
  724. filter: {
  725. id: filter.id,
  726. title: filter.title,
  727. context: filter.context,
  728. expires_at: filter.expires_at,
  729. // filter.filter_action is the value from the
  730. // custom_filters.action database column, it is an integer
  731. // representing a value in an enum defined by Ruby on Rails:
  732. //
  733. // enum { warn: 0, hide: 1 }
  734. filter_action: ['warn', 'hide'][filter.filter_action],
  735. },
  736. };
  737. }
  738. return cache;
  739. }, {});
  740. // Construct the regular expressions for the custom filters: This
  741. // needs to be done in a separate loop as the database returns one
  742. // filterRow per keyword, so we need all the keywords before
  743. // constructing the regular expression
  744. Object.keys(req.cachedFilters).forEach((key) => {
  745. req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
  746. let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
  747. if (whole_word) {
  748. if (/^[\w]/.test(expr)) {
  749. expr = `\\b${expr}`;
  750. }
  751. if (/[\w]$/.test(expr)) {
  752. expr = `${expr}\\b`;
  753. }
  754. }
  755. return expr;
  756. }).join('|'), 'i');
  757. });
  758. }
  759. // Apply cachedFilters against the payload, constructing a
  760. // `filter_results` array of FilterResult entities
  761. if (req.cachedFilters) {
  762. const status = payload;
  763. // TODO: Calculate searchableContent in Ruby on Rails:
  764. const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
  765. const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
  766. const now = new Date();
  767. const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
  768. // Check the filter hasn't expired before applying:
  769. if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
  770. return results;
  771. }
  772. // Just in-case JSDOM fails to find textContent in searchableContent
  773. if (!searchableTextContent) {
  774. return results;
  775. }
  776. const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
  777. if (keyword_matches) {
  778. // results is an Array of FilterResult; status_matches is always
  779. // null as we only are only applying the keyword-based custom
  780. // filters, not the status-based custom filters.
  781. // https://docs.joinmastodon.org/entities/FilterResult/
  782. results.push({
  783. filter: cachedFilter.filter,
  784. keyword_matches,
  785. status_matches: null
  786. });
  787. }
  788. return results;
  789. }, []);
  790. // Send the payload + the FilterResults as the `filtered` property
  791. // to the streaming connection. To reach this code, the `event` must
  792. // have been either `update` or `status.update`, meaning the
  793. // `payload` is a Status entity, which has a `filtered` property:
  794. //
  795. // filtered: https://docs.joinmastodon.org/entities/Status/#filtered
  796. transmit(event, {
  797. ...payload,
  798. filtered: filter_results
  799. });
  800. } else {
  801. transmit(event, payload);
  802. }
  803. }).catch(err => {
  804. log.error(err);
  805. releasePgConnection();
  806. });
  807. });
  808. };
  809. ids.forEach(id => {
  810. subscribe(`${redisPrefix}${id}`, listener);
  811. });
  812. if (typeof attachCloseHandler === 'function') {
  813. attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
  814. }
  815. return listener;
  816. };
  817. /**
  818. * @param {any} req
  819. * @param {any} res
  820. * @returns {function(string, string): void}
  821. */
  822. const streamToHttp = (req, res) => {
  823. const accountId = req.accountId || req.remoteAddress;
  824. const channelName = channelNameFromPath(req);
  825. connectedClients.labels({ type: 'eventsource' }).inc();
  826. // In theory we'll always have a channel name, but channelNameFromPath can return undefined:
  827. if (typeof channelName === 'string') {
  828. connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc();
  829. }
  830. res.setHeader('Content-Type', 'text/event-stream');
  831. res.setHeader('Cache-Control', 'no-store');
  832. res.setHeader('Transfer-Encoding', 'chunked');
  833. res.write(':)\n');
  834. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  835. req.on('close', () => {
  836. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  837. // We decrement these counters here instead of in streamHttpEnd as in that
  838. // method we don't have knowledge of the channel names
  839. connectedClients.labels({ type: 'eventsource' }).dec();
  840. // In theory we'll always have a channel name, but channelNameFromPath can return undefined:
  841. if (typeof channelName === 'string') {
  842. connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec();
  843. }
  844. clearInterval(heartbeat);
  845. });
  846. return (event, payload) => {
  847. res.write(`event: ${event}\n`);
  848. res.write(`data: ${payload}\n\n`);
  849. };
  850. };
  851. /**
  852. * @param {any} req
  853. * @param {function(): void} [closeHandler]
  854. * @returns {function(string[], SubscriptionListener): void}
  855. */
  856. const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
  857. req.on('close', () => {
  858. ids.forEach(id => {
  859. unsubscribe(id, listener);
  860. });
  861. if (closeHandler) {
  862. closeHandler();
  863. }
  864. });
  865. };
  866. /**
  867. * @param {any} req
  868. * @param {any} ws
  869. * @param {string[]} streamName
  870. * @returns {function(string, string): void}
  871. */
  872. const streamToWs = (req, ws, streamName) => (event, payload) => {
  873. if (ws.readyState !== ws.OPEN) {
  874. log.error(req.requestId, 'Tried writing to closed socket');
  875. return;
  876. }
  877. ws.send(JSON.stringify({ stream: streamName, event, payload }), (err) => {
  878. if (err) {
  879. log.error(req.requestId, `Failed to send to websocket: ${err}`);
  880. }
  881. });
  882. };
  883. /**
  884. * @param {any} res
  885. */
  886. const httpNotFound = res => {
  887. res.writeHead(404, { 'Content-Type': 'application/json' });
  888. res.end(JSON.stringify({ error: 'Not found' }));
  889. };
  890. const api = express.Router();
  891. app.use(api);
  892. api.use(setRequestId);
  893. api.use(setRemoteAddress);
  894. api.use(allowCrossDomain);
  895. api.use(authenticationMiddleware);
  896. api.use(errorMiddleware);
  897. api.get('/api/v1/streaming/*', (req, res) => {
  898. channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
  899. const onSend = streamToHttp(req, res);
  900. const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
  901. streamFrom(channelIds, req, onSend, onEnd, 'eventsource', options.needsFiltering);
  902. }).catch(err => {
  903. log.verbose(req.requestId, 'Subscription error:', err.toString());
  904. httpNotFound(res);
  905. });
  906. });
  907. const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
  908. /**
  909. * @typedef StreamParams
  910. * @property {string} [tag]
  911. * @property {string} [list]
  912. * @property {string} [only_media]
  913. */
  914. /**
  915. * @param {any} req
  916. * @returns {string[]}
  917. */
  918. const channelsForUserStream = req => {
  919. const arr = [`timeline:${req.accountId}`];
  920. if (isInScope(req, ['crypto']) && req.deviceId) {
  921. arr.push(`timeline:${req.accountId}:${req.deviceId}`);
  922. }
  923. if (isInScope(req, ['read', 'read:notifications'])) {
  924. arr.push(`timeline:${req.accountId}:notifications`);
  925. }
  926. return arr;
  927. };
  928. /**
  929. * See app/lib/ascii_folder.rb for the canon definitions
  930. * of these constants
  931. */
  932. const NON_ASCII_CHARS = 'ÀÁÂÃÄÅàáâãäåĀāĂ㥹ÇçĆćĈĉĊċČčÐðĎďĐđÈÉÊËèéêëĒēĔĕĖėĘęĚěĜĝĞğĠġĢģĤĥĦħÌÍÎÏìíîïĨĩĪīĬĭĮįİıĴĵĶķĸĹĺĻļĽľĿŀŁłÑñŃńŅņŇňʼnŊŋÒÓÔÕÖØòóôõöøŌōŎŏŐőŔŕŖŗŘřŚśŜŝŞşŠšſŢţŤťŦŧÙÚÛÜùúûüŨũŪūŬŭŮůŰűŲųŴŵÝýÿŶŷŸŹźŻżŽž';
  933. const EQUIVALENT_ASCII_CHARS = 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEeEeEeGgGgGgGgHhHhIIIIiiiiIiIiIiIiIiJjKkkLlLlLlLlLlNnNnNnNnnNnOOOOOOooooooOoOoOoRrRrRrSsSsSsSssTtTtTtUUUUuuuuUuUuUuUuUuUuWwYyyYyYZzZzZz';
  934. /**
  935. * @param {string} str
  936. * @returns {string}
  937. */
  938. const foldToASCII = str => {
  939. const regex = new RegExp(NON_ASCII_CHARS.split('').join('|'), 'g');
  940. return str.replace(regex, match => {
  941. const index = NON_ASCII_CHARS.indexOf(match);
  942. return EQUIVALENT_ASCII_CHARS[index];
  943. });
  944. };
  945. /**
  946. * @param {string} str
  947. * @returns {string}
  948. */
  949. const normalizeHashtag = str => {
  950. return foldToASCII(str.normalize('NFKC').toLowerCase()).replace(/[^\p{L}\p{N}_\u00b7\u200c]/gu, '');
  951. };
  952. /**
  953. * @param {any} req
  954. * @param {string} name
  955. * @param {StreamParams} params
  956. * @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
  957. */
  958. const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
  959. switch (name) {
  960. case 'user':
  961. resolve({
  962. channelIds: channelsForUserStream(req),
  963. options: { needsFiltering: false },
  964. });
  965. break;
  966. case 'user:notification':
  967. resolve({
  968. channelIds: [`timeline:${req.accountId}:notifications`],
  969. options: { needsFiltering: false },
  970. });
  971. break;
  972. case 'public':
  973. resolve({
  974. channelIds: ['timeline:public'],
  975. options: { needsFiltering: true },
  976. });
  977. break;
  978. case 'public:local':
  979. resolve({
  980. channelIds: ['timeline:public:local'],
  981. options: { needsFiltering: true },
  982. });
  983. break;
  984. case 'public:remote':
  985. resolve({
  986. channelIds: ['timeline:public:remote'],
  987. options: { needsFiltering: true },
  988. });
  989. break;
  990. case 'public:media':
  991. resolve({
  992. channelIds: ['timeline:public:media'],
  993. options: { needsFiltering: true },
  994. });
  995. break;
  996. case 'public:local:media':
  997. resolve({
  998. channelIds: ['timeline:public:local:media'],
  999. options: { needsFiltering: true },
  1000. });
  1001. break;
  1002. case 'public:remote:media':
  1003. resolve({
  1004. channelIds: ['timeline:public:remote:media'],
  1005. options: { needsFiltering: true },
  1006. });
  1007. break;
  1008. case 'direct':
  1009. resolve({
  1010. channelIds: [`timeline:direct:${req.accountId}`],
  1011. options: { needsFiltering: false },
  1012. });
  1013. break;
  1014. case 'hashtag':
  1015. if (!params.tag || params.tag.length === 0) {
  1016. reject('No tag for stream provided');
  1017. } else {
  1018. resolve({
  1019. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
  1020. options: { needsFiltering: true },
  1021. });
  1022. }
  1023. break;
  1024. case 'hashtag:local':
  1025. if (!params.tag || params.tag.length === 0) {
  1026. reject('No tag for stream provided');
  1027. } else {
  1028. resolve({
  1029. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
  1030. options: { needsFiltering: true },
  1031. });
  1032. }
  1033. break;
  1034. case 'list':
  1035. authorizeListAccess(params.list, req).then(() => {
  1036. resolve({
  1037. channelIds: [`timeline:list:${params.list}`],
  1038. options: { needsFiltering: false },
  1039. });
  1040. }).catch(() => {
  1041. reject('Not authorized to stream this list');
  1042. });
  1043. break;
  1044. default:
  1045. reject('Unknown stream type');
  1046. }
  1047. });
  1048. /**
  1049. * @param {string} channelName
  1050. * @param {StreamParams} params
  1051. * @returns {string[]}
  1052. */
  1053. const streamNameFromChannelName = (channelName, params) => {
  1054. if (channelName === 'list') {
  1055. return [channelName, params.list];
  1056. } else if (['hashtag', 'hashtag:local'].includes(channelName)) {
  1057. return [channelName, params.tag];
  1058. } else {
  1059. return [channelName];
  1060. }
  1061. };
  1062. /**
  1063. * @typedef WebSocketSession
  1064. * @property {any} socket
  1065. * @property {any} request
  1066. * @property {Object.<string, { channelName: string, listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
  1067. */
  1068. /**
  1069. * @param {WebSocketSession} session
  1070. * @param {string} channelName
  1071. * @param {StreamParams} params
  1072. * @returns {void}
  1073. */
  1074. const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => {
  1075. checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
  1076. channelIds,
  1077. options,
  1078. }) => {
  1079. if (subscriptions[channelIds.join(';')]) {
  1080. return;
  1081. }
  1082. const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
  1083. const stopHeartbeat = subscriptionHeartbeat(channelIds);
  1084. const listener = streamFrom(channelIds, request, onSend, undefined, 'websocket', options.needsFiltering);
  1085. connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();
  1086. subscriptions[channelIds.join(';')] = {
  1087. channelName,
  1088. listener,
  1089. stopHeartbeat,
  1090. };
  1091. }).catch(err => {
  1092. log.verbose(request.requestId, 'Subscription error:', err.toString());
  1093. socket.send(JSON.stringify({ error: err.toString() }));
  1094. });
  1095. }
  1096. const removeSubscription = (subscriptions, channelIds, request) => {
  1097. log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
  1098. const subscription = subscriptions[channelIds.join(';')];
  1099. if (!subscription) {
  1100. return;
  1101. }
  1102. channelIds.forEach(channelId => {
  1103. unsubscribe(`${redisPrefix}${channelId}`, subscription.listener);
  1104. });
  1105. connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec();
  1106. subscription.stopHeartbeat();
  1107. delete subscriptions[channelIds.join(';')];
  1108. }
  1109. /**
  1110. * @param {WebSocketSession} session
  1111. * @param {string} channelName
  1112. * @param {StreamParams} params
  1113. * @returns {void}
  1114. */
  1115. const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) => {
  1116. channelNameToIds(request, channelName, params).then(({ channelIds }) => {
  1117. removeSubscription(subscriptions, channelIds, request);
  1118. }).catch(err => {
  1119. log.verbose(request.requestId, 'Unsubscribe error:', err);
  1120. // If we have a socket that is alive and open still, send the error back to the client:
  1121. // FIXME: In other parts of the code ws === socket
  1122. if (socket.isAlive && socket.readyState === socket.OPEN) {
  1123. socket.send(JSON.stringify({ error: "Error unsubscribing from channel" }));
  1124. }
  1125. });
  1126. }
  1127. /**
  1128. * @param {WebSocketSession} session
  1129. */
  1130. const subscribeWebsocketToSystemChannel = ({ socket, request, subscriptions }) => {
  1131. const accessTokenChannelId = `timeline:access_token:${request.accessTokenId}`;
  1132. const systemChannelId = `timeline:system:${request.accountId}`;
  1133. const listener = createSystemMessageListener(request, {
  1134. onKill() {
  1135. socket.close();
  1136. },
  1137. });
  1138. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  1139. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  1140. subscriptions[accessTokenChannelId] = {
  1141. channelName: 'system',
  1142. listener,
  1143. stopHeartbeat: () => {
  1144. },
  1145. };
  1146. subscriptions[systemChannelId] = {
  1147. channelName: 'system',
  1148. listener,
  1149. stopHeartbeat: () => {
  1150. },
  1151. };
  1152. connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2);
  1153. };
  1154. /**
  1155. * @param {string|string[]} arrayOrString
  1156. * @returns {string}
  1157. */
  1158. const firstParam = arrayOrString => {
  1159. if (Array.isArray(arrayOrString)) {
  1160. return arrayOrString[0];
  1161. } else {
  1162. return arrayOrString;
  1163. }
  1164. };
  1165. wss.on('connection', (ws, req) => {
  1166. // Note: url.parse could throw, which would terminate the connection, so we
  1167. // increment the connected clients metric straight away when we establish
  1168. // the connection, without waiting:
  1169. connectedClients.labels({ type: 'websocket' }).inc();
  1170. // Setup request properties:
  1171. req.requestId = uuid.v4();
  1172. req.remoteAddress = ws._socket.remoteAddress;
  1173. // Setup connection keep-alive state:
  1174. ws.isAlive = true;
  1175. ws.on('pong', () => {
  1176. ws.isAlive = true;
  1177. });
  1178. /**
  1179. * @type {WebSocketSession}
  1180. */
  1181. const session = {
  1182. socket: ws,
  1183. request: req,
  1184. subscriptions: {},
  1185. };
  1186. ws.on('close', function onWebsocketClose() {
  1187. const subscriptions = Object.keys(session.subscriptions);
  1188. subscriptions.forEach(channelIds => {
  1189. removeSubscription(session.subscriptions, channelIds.split(';'), req)
  1190. });
  1191. // Decrement the metrics for connected clients:
  1192. connectedClients.labels({ type: 'websocket' }).dec();
  1193. // ensure garbage collection:
  1194. session.socket = null;
  1195. session.request = null;
  1196. session.subscriptions = {};
  1197. });
  1198. // Note: immediately after the `error` event is emitted, the `close` event
  1199. // is emitted. As such, all we need to do is log the error here.
  1200. ws.on('error', (err) => {
  1201. log.error('websocket', err.toString());
  1202. });
  1203. ws.on('message', (data, isBinary) => {
  1204. if (isBinary) {
  1205. log.warn('websocket', 'Received binary data, closing connection');
  1206. ws.close(1003, 'The mastodon streaming server does not support binary messages');
  1207. return;
  1208. }
  1209. const message = data.toString('utf8');
  1210. const json = parseJSON(message, session.request);
  1211. if (!json) return;
  1212. const { type, stream, ...params } = json;
  1213. if (type === 'subscribe') {
  1214. subscribeWebsocketToChannel(session, firstParam(stream), params);
  1215. } else if (type === 'unsubscribe') {
  1216. unsubscribeWebsocketFromChannel(session, firstParam(stream), params);
  1217. } else {
  1218. // Unknown action type
  1219. }
  1220. });
  1221. subscribeWebsocketToSystemChannel(session);
  1222. // Parse the URL for the connection arguments (if supplied), url.parse can throw:
  1223. const location = req.url && url.parse(req.url, true);
  1224. if (location && location.query.stream) {
  1225. subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query);
  1226. }
  1227. });
  1228. setInterval(() => {
  1229. wss.clients.forEach(ws => {
  1230. if (ws.isAlive === false) {
  1231. ws.terminate();
  1232. return;
  1233. }
  1234. ws.isAlive = false;
  1235. ws.ping('', false);
  1236. });
  1237. }, 30000);
  1238. attachServerWithConfig(server, address => {
  1239. log.warn(`Streaming API now listening on ${address}`);
  1240. });
  1241. const onExit = () => {
  1242. server.close();
  1243. process.exit(0);
  1244. };
  1245. const onError = (err) => {
  1246. log.error(err);
  1247. server.close();
  1248. process.exit(0);
  1249. };
  1250. process.on('SIGINT', onExit);
  1251. process.on('SIGTERM', onExit);
  1252. process.on('exit', onExit);
  1253. process.on('uncaughtException', onError);
  1254. };
  1255. /**
  1256. * @param {any} server
  1257. * @param {function(string): void} [onSuccess]
  1258. */
  1259. const attachServerWithConfig = (server, onSuccess) => {
  1260. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  1261. server.listen(process.env.SOCKET || process.env.PORT, () => {
  1262. if (onSuccess) {
  1263. fs.chmodSync(server.address(), 0o666);
  1264. onSuccess(server.address());
  1265. }
  1266. });
  1267. } else {
  1268. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  1269. if (onSuccess) {
  1270. onSuccess(`${server.address().address}:${server.address().port}`);
  1271. }
  1272. });
  1273. }
  1274. };
  1275. startServer();