index.js 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337
  1. // @ts-check
  2. const os = require('os');
  3. const throng = require('throng');
  4. const dotenv = require('dotenv');
  5. const express = require('express');
  6. const http = require('http');
  7. const redis = require('redis');
  8. const pg = require('pg');
  9. const log = require('npmlog');
  10. const url = require('url');
  11. const uuid = require('uuid');
  12. const fs = require('fs');
  13. const WebSocket = require('ws');
  14. const { JSDOM } = require('jsdom');
  15. const env = process.env.NODE_ENV || 'development';
  16. const alwaysRequireAuth = process.env.LIMITED_FEDERATION_MODE === 'true' || process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
  17. dotenv.config({
  18. path: env === 'production' ? '.env.production' : '.env',
  19. });
  20. log.level = process.env.LOG_LEVEL || 'verbose';
  21. /**
  22. * @param {string} dbUrl
  23. * @return {Object.<string, any>}
  24. */
  25. const dbUrlToConfig = (dbUrl) => {
  26. if (!dbUrl) {
  27. return {};
  28. }
  29. const params = url.parse(dbUrl, true);
  30. const config = {};
  31. if (params.auth) {
  32. [config.user, config.password] = params.auth.split(':');
  33. }
  34. if (params.hostname) {
  35. config.host = params.hostname;
  36. }
  37. if (params.port) {
  38. config.port = params.port;
  39. }
  40. if (params.pathname) {
  41. config.database = params.pathname.split('/')[1];
  42. }
  43. const ssl = params.query && params.query.ssl;
  44. if (ssl && ssl === 'true' || ssl === '1') {
  45. config.ssl = true;
  46. }
  47. return config;
  48. };
  49. /**
  50. * @param {Object.<string, any>} defaultConfig
  51. * @param {string} redisUrl
  52. */
  53. const redisUrlToClient = async (defaultConfig, redisUrl) => {
  54. const config = defaultConfig;
  55. let client;
  56. if (!redisUrl) {
  57. client = redis.createClient(config);
  58. } else if (redisUrl.startsWith('unix://')) {
  59. client = redis.createClient(Object.assign(config, {
  60. socket: {
  61. path: redisUrl.slice(7),
  62. },
  63. }));
  64. } else {
  65. client = redis.createClient(Object.assign(config, {
  66. url: redisUrl,
  67. }));
  68. }
  69. client.on('error', (err) => log.error('Redis Client Error!', err));
  70. await client.connect();
  71. return client;
  72. };
  73. const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
  74. /**
  75. * @param {string} json
  76. * @param {any} req
  77. * @return {Object.<string, any>|null}
  78. */
  79. const parseJSON = (json, req) => {
  80. try {
  81. return JSON.parse(json);
  82. } catch (err) {
  83. if (req.accountId) {
  84. log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
  85. } else {
  86. log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
  87. }
  88. return null;
  89. }
  90. };
  91. const startMaster = () => {
  92. if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
  93. log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  94. }
  95. log.warn(`Starting streaming API server master with ${numWorkers} workers`);
  96. };
  97. const startWorker = async (workerId) => {
  98. log.warn(`Starting worker ${workerId}`);
  99. const pgConfigs = {
  100. development: {
  101. user: process.env.DB_USER || pg.defaults.user,
  102. password: process.env.DB_PASS || pg.defaults.password,
  103. database: process.env.DB_NAME || 'mastodon_development',
  104. host: process.env.DB_HOST || pg.defaults.host,
  105. port: process.env.DB_PORT || pg.defaults.port,
  106. max: 10,
  107. },
  108. production: {
  109. user: process.env.DB_USER || 'mastodon',
  110. password: process.env.DB_PASS || '',
  111. database: process.env.DB_NAME || 'mastodon_production',
  112. host: process.env.DB_HOST || 'localhost',
  113. port: process.env.DB_PORT || 5432,
  114. max: 10,
  115. },
  116. };
  117. if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
  118. pgConfigs.development.ssl = true;
  119. pgConfigs.production.ssl = true;
  120. }
  121. const app = express();
  122. app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');
  123. const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL)));
  124. const server = http.createServer(app);
  125. const redisNamespace = process.env.REDIS_NAMESPACE || null;
  126. const redisParams = {
  127. socket: {
  128. host: process.env.REDIS_HOST || '127.0.0.1',
  129. port: process.env.REDIS_PORT || 6379,
  130. },
  131. database: process.env.REDIS_DB || 0,
  132. password: process.env.REDIS_PASSWORD || undefined,
  133. };
  134. if (redisNamespace) {
  135. redisParams.namespace = redisNamespace;
  136. }
  137. const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
  138. /**
  139. * @type {Object.<string, Array.<function(string): void>>}
  140. */
  141. const subs = {};
  142. const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
  143. const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
  144. /**
  145. * @param {string[]} channels
  146. * @return {function(): void}
  147. */
  148. const subscriptionHeartbeat = channels => {
  149. const interval = 6 * 60;
  150. const tellSubscribed = () => {
  151. channels.forEach(channel => redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval * 3));
  152. };
  153. tellSubscribed();
  154. const heartbeat = setInterval(tellSubscribed, interval * 1000);
  155. return () => {
  156. clearInterval(heartbeat);
  157. };
  158. };
  159. /**
  160. * @param {string} message
  161. * @param {string} channel
  162. */
  163. const onRedisMessage = (message, channel) => {
  164. const callbacks = subs[channel];
  165. log.silly(`New message on channel ${channel}`);
  166. if (!callbacks) {
  167. return;
  168. }
  169. callbacks.forEach(callback => callback(message));
  170. };
  171. /**
  172. * @param {string} channel
  173. * @param {function(string): void} callback
  174. */
  175. const subscribe = (channel, callback) => {
  176. log.silly(`Adding listener for ${channel}`);
  177. subs[channel] = subs[channel] || [];
  178. if (subs[channel].length === 0) {
  179. log.verbose(`Subscribe ${channel}`);
  180. redisSubscribeClient.subscribe(channel, onRedisMessage);
  181. }
  182. subs[channel].push(callback);
  183. };
  184. /**
  185. * @param {string} channel
  186. */
  187. const unsubscribe = (channel, callback) => {
  188. log.silly(`Removing listener for ${channel}`);
  189. if (!subs[channel]) {
  190. return;
  191. }
  192. subs[channel] = subs[channel].filter(item => item !== callback);
  193. if (subs[channel].length === 0) {
  194. log.verbose(`Unsubscribe ${channel}`);
  195. redisSubscribeClient.unsubscribe(channel);
  196. delete subs[channel];
  197. }
  198. };
  199. const FALSE_VALUES = [
  200. false,
  201. 0,
  202. '0',
  203. 'f',
  204. 'F',
  205. 'false',
  206. 'FALSE',
  207. 'off',
  208. 'OFF',
  209. ];
  210. /**
  211. * @param {any} value
  212. * @return {boolean}
  213. */
  214. const isTruthy = value =>
  215. value && !FALSE_VALUES.includes(value);
  216. /**
  217. * @param {any} req
  218. * @param {any} res
  219. * @param {function(Error=): void}
  220. */
  221. const allowCrossDomain = (req, res, next) => {
  222. res.header('Access-Control-Allow-Origin', '*');
  223. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  224. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  225. next();
  226. };
  227. /**
  228. * @param {any} req
  229. * @param {any} res
  230. * @param {function(Error=): void}
  231. */
  232. const setRequestId = (req, res, next) => {
  233. req.requestId = uuid.v4();
  234. res.header('X-Request-Id', req.requestId);
  235. next();
  236. };
  237. /**
  238. * @param {any} req
  239. * @param {any} res
  240. * @param {function(Error=): void}
  241. */
  242. const setRemoteAddress = (req, res, next) => {
  243. req.remoteAddress = req.connection.remoteAddress;
  244. next();
  245. };
  246. /**
  247. * @param {any} req
  248. * @param {string[]} necessaryScopes
  249. * @return {boolean}
  250. */
  251. const isInScope = (req, necessaryScopes) =>
  252. req.scopes.some(scope => necessaryScopes.includes(scope));
  253. /**
  254. * @param {string} token
  255. * @param {any} req
  256. * @return {Promise.<void>}
  257. */
  258. const accountFromToken = (token, req) => new Promise((resolve, reject) => {
  259. pgPool.connect((err, client, done) => {
  260. if (err) {
  261. reject(err);
  262. return;
  263. }
  264. client.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, devices.device_id FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id LEFT OUTER JOIN devices ON oauth_access_tokens.id = devices.access_token_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  265. done();
  266. if (err) {
  267. reject(err);
  268. return;
  269. }
  270. if (result.rows.length === 0) {
  271. err = new Error('Invalid access token');
  272. err.status = 401;
  273. reject(err);
  274. return;
  275. }
  276. req.accessTokenId = result.rows[0].id;
  277. req.scopes = result.rows[0].scopes.split(' ');
  278. req.accountId = result.rows[0].account_id;
  279. req.chosenLanguages = result.rows[0].chosen_languages;
  280. req.deviceId = result.rows[0].device_id;
  281. resolve();
  282. });
  283. });
  284. });
  285. /**
  286. * @param {any} req
  287. * @param {boolean=} required
  288. * @return {Promise.<void>}
  289. */
  290. const accountFromRequest = (req, required = true) => new Promise((resolve, reject) => {
  291. const authorization = req.headers.authorization;
  292. const location = url.parse(req.url, true);
  293. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  294. if (!authorization && !accessToken) {
  295. if (required) {
  296. const err = new Error('Missing access token');
  297. err.status = 401;
  298. reject(err);
  299. return;
  300. } else {
  301. resolve();
  302. return;
  303. }
  304. }
  305. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  306. resolve(accountFromToken(token, req));
  307. });
  308. /**
  309. * @param {any} req
  310. * @return {string}
  311. */
  312. const channelNameFromPath = req => {
  313. const { path, query } = req;
  314. const onlyMedia = isTruthy(query.only_media);
  315. switch (path) {
  316. case '/api/v1/streaming/user':
  317. return 'user';
  318. case '/api/v1/streaming/user/notification':
  319. return 'user:notification';
  320. case '/api/v1/streaming/public':
  321. return onlyMedia ? 'public:media' : 'public';
  322. case '/api/v1/streaming/public/local':
  323. return onlyMedia ? 'public:local:media' : 'public:local';
  324. case '/api/v1/streaming/public/remote':
  325. return onlyMedia ? 'public:remote:media' : 'public:remote';
  326. case '/api/v1/streaming/autogestione':
  327. return onlyMedia ? 'autogestione:media' : 'autogestione';
  328. case '/api/v1/streaming/autogestione/remote':
  329. return onlyMedia ? 'autogestione:remote:media' : 'autogestione:remote';
  330. case '/api/v1/streaming/hashtag':
  331. return 'hashtag';
  332. case '/api/v1/streaming/hashtag/local':
  333. return 'hashtag:local';
  334. case '/api/v1/streaming/direct':
  335. return 'direct';
  336. case '/api/v1/streaming/list':
  337. return 'list';
  338. default:
  339. return undefined;
  340. }
  341. };
  342. const PUBLIC_CHANNELS = [
  343. 'public',
  344. 'public:media',
  345. 'public:local',
  346. 'public:local:media',
  347. 'public:remote',
  348. 'public:remote:media',
  349. 'autogestione',
  350. 'autogestione:media',
  351. 'autogestione:remote',
  352. 'hashtag',
  353. 'hashtag:local',
  354. ];
  355. /**
  356. * @param {any} req
  357. * @param {string} channelName
  358. * @return {Promise.<void>}
  359. */
  360. const checkScopes = (req, channelName) => new Promise((resolve, reject) => {
  361. log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`);
  362. // When accessing public channels, no scopes are needed
  363. if (PUBLIC_CHANNELS.includes(channelName)) {
  364. resolve();
  365. return;
  366. }
  367. // The `read` scope has the highest priority, if the token has it
  368. // then it can access all streams
  369. const requiredScopes = ['read'];
  370. // When accessing specifically the notifications stream,
  371. // we need a read:notifications, while in all other cases,
  372. // we can allow access with read:statuses. Mind that the
  373. // user stream will not contain notifications unless
  374. // the token has either read or read:notifications scope
  375. // as well, this is handled separately.
  376. if (channelName === 'user:notification') {
  377. requiredScopes.push('read:notifications');
  378. } else {
  379. requiredScopes.push('read:statuses');
  380. }
  381. if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) {
  382. resolve();
  383. return;
  384. }
  385. const err = new Error('Access token does not cover required scopes');
  386. err.status = 401;
  387. reject(err);
  388. });
  389. /**
  390. * @param {any} info
  391. * @param {function(boolean, number, string): void} callback
  392. */
  393. const wsVerifyClient = (info, callback) => {
  394. // When verifying the websockets connection, we no longer pre-emptively
  395. // check OAuth scopes and drop the connection if they're missing. We only
  396. // drop the connection if access without token is not allowed by environment
  397. // variables. OAuth scope checks are moved to the point of subscription
  398. // to a specific stream.
  399. accountFromRequest(info.req, alwaysRequireAuth).then(() => {
  400. callback(true, undefined, undefined);
  401. }).catch(err => {
  402. log.error(info.req.requestId, err.toString());
  403. callback(false, 401, 'Unauthorized');
  404. });
  405. };
  406. /**
  407. * @typedef SystemMessageHandlers
  408. * @property {function(): void} onKill
  409. */
  410. /**
  411. * @param {any} req
  412. * @param {SystemMessageHandlers} eventHandlers
  413. * @return {function(string): void}
  414. */
  415. const createSystemMessageListener = (req, eventHandlers) => {
  416. return message => {
  417. const json = parseJSON(message, req);
  418. if (!json) return;
  419. const { event } = json;
  420. log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
  421. if (event === 'kill') {
  422. log.verbose(req.requestId, `Closing connection for ${req.accountId} due to expired access token`);
  423. eventHandlers.onKill();
  424. } else if (event === 'filters_changed') {
  425. log.verbose(req.requestId, `Invalidating filters cache for ${req.accountId}`);
  426. req.cachedFilters = null;
  427. }
  428. };
  429. };
  430. /**
  431. * @param {any} req
  432. * @param {any} res
  433. */
  434. const subscribeHttpToSystemChannel = (req, res) => {
  435. const accessTokenChannelId = `timeline:access_token:${req.accessTokenId}`;
  436. const systemChannelId = `timeline:system:${req.accountId}`;
  437. const listener = createSystemMessageListener(req, {
  438. onKill() {
  439. res.end();
  440. },
  441. });
  442. res.on('close', () => {
  443. unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  444. unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
  445. });
  446. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  447. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  448. };
  449. /**
  450. * @param {any} req
  451. * @param {any} res
  452. * @param {function(Error=): void} next
  453. */
  454. const authenticationMiddleware = (req, res, next) => {
  455. if (req.method === 'OPTIONS') {
  456. next();
  457. return;
  458. }
  459. accountFromRequest(req, alwaysRequireAuth).then(() => checkScopes(req, channelNameFromPath(req))).then(() => {
  460. subscribeHttpToSystemChannel(req, res);
  461. }).then(() => {
  462. next();
  463. }).catch(err => {
  464. next(err);
  465. });
  466. };
  467. /**
  468. * @param {Error} err
  469. * @param {any} req
  470. * @param {any} res
  471. * @param {function(Error=): void} next
  472. */
  473. const errorMiddleware = (err, req, res, next) => {
  474. log.error(req.requestId, err.toString());
  475. if (res.headersSent) {
  476. next(err);
  477. return;
  478. }
  479. res.writeHead(err.status || 500, { 'Content-Type': 'application/json' });
  480. res.end(JSON.stringify({ error: err.status ? err.toString() : 'An unexpected error occurred' }));
  481. };
  482. /**
  483. * @param {array} arr
  484. * @param {number=} shift
  485. * @return {string}
  486. */
  487. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  488. /**
  489. * @param {string} listId
  490. * @param {any} req
  491. * @return {Promise.<void>}
  492. */
  493. const authorizeListAccess = (listId, req) => new Promise((resolve, reject) => {
  494. const { accountId } = req;
  495. pgPool.connect((err, client, done) => {
  496. if (err) {
  497. reject();
  498. return;
  499. }
  500. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [listId], (err, result) => {
  501. done();
  502. if (err || result.rows.length === 0 || result.rows[0].account_id !== accountId) {
  503. reject();
  504. return;
  505. }
  506. resolve();
  507. });
  508. });
  509. });
  510. /**
  511. * @param {string[]} ids
  512. * @param {any} req
  513. * @param {function(string, string): void} output
  514. * @param {function(string[], function(string): void): void} attachCloseHandler
  515. * @param {boolean=} needsFiltering
  516. * @return {function(string): void}
  517. */
  518. const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
  519. const accountId = req.accountId || req.remoteAddress;
  520. log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
  521. const listener = message => {
  522. const json = parseJSON(message, req);
  523. if (!json) return;
  524. const { event, payload, queued_at } = json;
  525. const transmit = () => {
  526. const now = new Date().getTime();
  527. const delta = now - queued_at;
  528. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  529. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
  530. output(event, encodedPayload);
  531. };
  532. // Only messages that may require filtering are statuses, since notifications
  533. // are already personalized and deletes do not matter
  534. if (!needsFiltering || event !== 'update') {
  535. transmit();
  536. return;
  537. }
  538. const unpackedPayload = payload;
  539. const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
  540. const accountDomain = unpackedPayload.account.acct.split('@')[1];
  541. if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
  542. log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
  543. return;
  544. }
  545. // When the account is not logged in, it is not necessary to confirm the block or mute
  546. if (!req.accountId) {
  547. transmit();
  548. return;
  549. }
  550. pgPool.connect((err, client, done) => {
  551. if (err) {
  552. log.error(err);
  553. return;
  554. }
  555. const queries = [
  556. client.query(`SELECT 1
  557. FROM blocks
  558. WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
  559. OR (account_id = $2 AND target_account_id = $1)
  560. UNION
  561. SELECT 1
  562. FROM mutes
  563. WHERE account_id = $1
  564. AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
  565. ];
  566. if (accountDomain) {
  567. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  568. }
  569. if (!unpackedPayload.filtered && !req.cachedFilters) {
  570. queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND filter.expires_at IS NULL OR filter.expires_at > NOW()', [req.accountId]));
  571. }
  572. Promise.all(queries).then(values => {
  573. done();
  574. if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
  575. return;
  576. }
  577. if (!unpackedPayload.filtered && !req.cachedFilters) {
  578. const filterRows = values[accountDomain ? 2 : 1].rows;
  579. req.cachedFilters = filterRows.reduce((cache, row) => {
  580. if (cache[row.id]) {
  581. cache[row.id].keywords.push([row.keyword, row.whole_word]);
  582. } else {
  583. cache[row.id] = {
  584. keywords: [[row.keyword, row.whole_word]],
  585. expires_at: row.expires_at,
  586. repr: {
  587. id: row.id,
  588. title: row.title,
  589. context: row.context,
  590. expires_at: row.expires_at,
  591. filter_action: ['warn', 'hide'][row.filter_action],
  592. },
  593. };
  594. }
  595. return cache;
  596. }, {});
  597. Object.keys(req.cachedFilters).forEach((key) => {
  598. req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
  599. let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');;
  600. if (whole_word) {
  601. if (/^[\w]/.test(expr)) {
  602. expr = `\\b${expr}`;
  603. }
  604. if (/[\w]$/.test(expr)) {
  605. expr = `${expr}\\b`;
  606. }
  607. }
  608. return expr;
  609. }).join('|'), 'i');
  610. });
  611. }
  612. // Check filters
  613. if (req.cachedFilters && !unpackedPayload.filtered) {
  614. const status = unpackedPayload;
  615. const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
  616. const searchIndex = JSDOM.fragment(searchContent).textContent;
  617. const now = new Date();
  618. payload.filtered = [];
  619. Object.values(req.cachedFilters).forEach((cachedFilter) => {
  620. if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) {
  621. const keyword_matches = searchIndex.match(cachedFilter.regexp);
  622. if (keyword_matches) {
  623. payload.filtered.push({
  624. filter: cachedFilter.repr,
  625. keyword_matches,
  626. });
  627. }
  628. }
  629. });
  630. }
  631. transmit();
  632. }).catch(err => {
  633. log.error(err);
  634. done();
  635. });
  636. });
  637. };
  638. ids.forEach(id => {
  639. subscribe(`${redisPrefix}${id}`, listener);
  640. });
  641. if (attachCloseHandler) {
  642. attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
  643. }
  644. return listener;
  645. };
  646. /**
  647. * @param {any} req
  648. * @param {any} res
  649. * @return {function(string, string): void}
  650. */
  651. const streamToHttp = (req, res) => {
  652. const accountId = req.accountId || req.remoteAddress;
  653. res.setHeader('Content-Type', 'text/event-stream');
  654. res.setHeader('Cache-Control', 'no-store');
  655. res.setHeader('Transfer-Encoding', 'chunked');
  656. res.write(':)\n');
  657. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  658. req.on('close', () => {
  659. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  660. clearInterval(heartbeat);
  661. });
  662. return (event, payload) => {
  663. res.write(`event: ${event}\n`);
  664. res.write(`data: ${payload}\n\n`);
  665. };
  666. };
  667. /**
  668. * @param {any} req
  669. * @param {function(): void} [closeHandler]
  670. * @return {function(string[]): void}
  671. */
  672. const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
  673. req.on('close', () => {
  674. ids.forEach(id => {
  675. unsubscribe(id);
  676. });
  677. if (closeHandler) {
  678. closeHandler();
  679. }
  680. });
  681. };
  682. /**
  683. * @param {any} req
  684. * @param {any} ws
  685. * @param {string[]} streamName
  686. * @return {function(string, string): void}
  687. */
  688. const streamToWs = (req, ws, streamName) => (event, payload) => {
  689. if (ws.readyState !== ws.OPEN) {
  690. log.error(req.requestId, 'Tried writing to closed socket');
  691. return;
  692. }
  693. ws.send(JSON.stringify({ stream: streamName, event, payload }));
  694. };
  695. /**
  696. * @param {any} res
  697. */
  698. const httpNotFound = res => {
  699. res.writeHead(404, { 'Content-Type': 'application/json' });
  700. res.end(JSON.stringify({ error: 'Not found' }));
  701. };
  702. app.use(setRequestId);
  703. app.use(setRemoteAddress);
  704. app.use(allowCrossDomain);
  705. app.get('/api/v1/streaming/health', (req, res) => {
  706. res.writeHead(200, { 'Content-Type': 'text/plain' });
  707. res.end('OK');
  708. });
  709. app.use(authenticationMiddleware);
  710. app.use(errorMiddleware);
  711. app.get('/api/v1/streaming/*', (req, res) => {
  712. channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
  713. const onSend = streamToHttp(req, res);
  714. const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
  715. streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering);
  716. }).catch(err => {
  717. log.verbose(req.requestId, 'Subscription error:', err.toString());
  718. httpNotFound(res);
  719. });
  720. });
  721. const wss = new WebSocket.Server({ server, verifyClient: wsVerifyClient });
  722. /**
  723. * @typedef StreamParams
  724. * @property {string} [tag]
  725. * @property {string} [list]
  726. * @property {string} [only_media]
  727. */
  728. /**
  729. * @param {any} req
  730. * @return {string[]}
  731. */
  732. const channelsForUserStream = req => {
  733. const arr = [`timeline:${req.accountId}`];
  734. if (isInScope(req, ['crypto']) && req.deviceId) {
  735. arr.push(`timeline:${req.accountId}:${req.deviceId}`);
  736. }
  737. if (isInScope(req, ['read', 'read:notifications'])) {
  738. arr.push(`timeline:${req.accountId}:notifications`);
  739. }
  740. return arr;
  741. };
  742. /**
  743. * See app/lib/ascii_folder.rb for the canon definitions
  744. * of these constants
  745. */
  746. const NON_ASCII_CHARS = 'ÀÁÂÃÄÅàáâãäåĀāĂ㥹ÇçĆćĈĉĊċČčÐðĎďĐđÈÉÊËèéêëĒēĔĕĖėĘęĚěĜĝĞğĠġĢģĤĥĦħÌÍÎÏìíîïĨĩĪīĬĭĮįİıĴĵĶķĸĹĺĻļĽľĿŀŁłÑñŃńŅņŇňʼnŊŋÒÓÔÕÖØòóôõöøŌōŎŏŐőŔŕŖŗŘřŚśŜŝŞşŠšſŢţŤťŦŧÙÚÛÜùúûüŨũŪūŬŭŮůŰűŲųŴŵÝýÿŶŷŸŹźŻżŽž';
  747. const EQUIVALENT_ASCII_CHARS = 'AAAAAAaaaaaaAaAaAaCcCcCcCcCcDdDdDdEEEEeeeeEeEeEeEeEeGgGgGgGgHhHhIIIIiiiiIiIiIiIiIiJjKkkLlLlLlLlLlNnNnNnNnnNnOOOOOOooooooOoOoOoRrRrRrSsSsSsSssTtTtTtUUUUuuuuUuUuUuUuUuUuWwYyyYyYZzZzZz';
  748. /**
  749. * @param {string} str
  750. * @return {string}
  751. */
  752. const foldToASCII = str => {
  753. const regex = new RegExp(NON_ASCII_CHARS.split('').join('|'), 'g');
  754. return str.replace(regex, match => {
  755. const index = NON_ASCII_CHARS.indexOf(match);
  756. return EQUIVALENT_ASCII_CHARS[index];
  757. });
  758. };
  759. /**
  760. * @param {string} str
  761. * @return {string}
  762. */
  763. const normalizeHashtag = str => {
  764. return foldToASCII(str.normalize('NFKC').toLowerCase()).replace(/[^\p{L}\p{N}_\u00b7\u200c]/gu, '');
  765. };
  766. /**
  767. * @param {any} req
  768. * @param {string} name
  769. * @param {StreamParams} params
  770. * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
  771. */
  772. const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
  773. switch(name) {
  774. case 'user':
  775. resolve({
  776. channelIds: channelsForUserStream(req),
  777. options: { needsFiltering: false },
  778. });
  779. break;
  780. case 'user:notification':
  781. resolve({
  782. channelIds: [`timeline:${req.accountId}:notifications`],
  783. options: { needsFiltering: false },
  784. });
  785. break;
  786. case 'public':
  787. resolve({
  788. channelIds: ['timeline:public'],
  789. options: { needsFiltering: true },
  790. });
  791. break;
  792. case 'public:local':
  793. resolve({
  794. channelIds: ['timeline:public:local'],
  795. options: { needsFiltering: true },
  796. });
  797. break;
  798. case 'public:remote':
  799. resolve({
  800. channelIds: ['timeline:public:remote'],
  801. options: { needsFiltering: true },
  802. });
  803. break;
  804. case 'public:media':
  805. resolve({
  806. channelIds: ['timeline:public:media'],
  807. options: { needsFiltering: true },
  808. });
  809. break;
  810. case 'public:local:media':
  811. resolve({
  812. channelIds: ['timeline:public:local:media'],
  813. options: { needsFiltering: true },
  814. });
  815. break;
  816. case 'public:remote:media':
  817. resolve({
  818. channelIds: ['timeline:public:remote:media'],
  819. options: { needsFiltering: true },
  820. });
  821. break;
  822. case 'autogestione':
  823. resolve({
  824. channelIds: ['timeline:autogestione'],
  825. options: { needsFiltering: true },
  826. });
  827. break;
  828. case 'autogestione:remote':
  829. resolve({
  830. channelIds: ['timeline:autogestione:remote'],
  831. options: { needsFiltering: true },
  832. });
  833. break;
  834. case 'autogestione:media':
  835. resolve({
  836. channelIds: ['timeline:autogestione:media'],
  837. options: { needsFiltering: true },
  838. });
  839. break;
  840. case 'autogestione:remote:media':
  841. resolve({
  842. channelIds: ['autogestione:public:remote:media'],
  843. options: { needsFiltering: true },
  844. });
  845. break;
  846. case 'direct':
  847. resolve({
  848. channelIds: [`timeline:direct:${req.accountId}`],
  849. options: { needsFiltering: false },
  850. });
  851. break;
  852. case 'hashtag':
  853. if (!params.tag || params.tag.length === 0) {
  854. reject('No tag for stream provided');
  855. } else {
  856. resolve({
  857. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
  858. options: { needsFiltering: true },
  859. });
  860. }
  861. break;
  862. case 'hashtag:local':
  863. if (!params.tag || params.tag.length === 0) {
  864. reject('No tag for stream provided');
  865. } else {
  866. resolve({
  867. channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
  868. options: { needsFiltering: true },
  869. });
  870. }
  871. break;
  872. case 'list':
  873. authorizeListAccess(params.list, req).then(() => {
  874. resolve({
  875. channelIds: [`timeline:list:${params.list}`],
  876. options: { needsFiltering: false },
  877. });
  878. }).catch(() => {
  879. reject('Not authorized to stream this list');
  880. });
  881. break;
  882. default:
  883. reject('Unknown stream type');
  884. }
  885. });
  886. /**
  887. * @param {string} channelName
  888. * @param {StreamParams} params
  889. * @return {string[]}
  890. */
  891. const streamNameFromChannelName = (channelName, params) => {
  892. if (channelName === 'list') {
  893. return [channelName, params.list];
  894. } else if (['hashtag', 'hashtag:local'].includes(channelName)) {
  895. return [channelName, params.tag];
  896. } else {
  897. return [channelName];
  898. }
  899. };
  900. /**
  901. * @typedef WebSocketSession
  902. * @property {any} socket
  903. * @property {any} request
  904. * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
  905. */
  906. /**
  907. * @param {WebSocketSession} session
  908. * @param {string} channelName
  909. * @param {StreamParams} params
  910. */
  911. const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
  912. checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
  913. channelIds,
  914. options,
  915. }) => {
  916. if (subscriptions[channelIds.join(';')]) {
  917. return;
  918. }
  919. const onSend = streamToWs(request, socket, streamNameFromChannelName(channelName, params));
  920. const stopHeartbeat = subscriptionHeartbeat(channelIds);
  921. const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
  922. subscriptions[channelIds.join(';')] = {
  923. listener,
  924. stopHeartbeat,
  925. };
  926. }).catch(err => {
  927. log.verbose(request.requestId, 'Subscription error:', err.toString());
  928. socket.send(JSON.stringify({ error: err.toString() }));
  929. });
  930. /**
  931. * @param {WebSocketSession} session
  932. * @param {string} channelName
  933. * @param {StreamParams} params
  934. */
  935. const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) =>
  936. channelNameToIds(request, channelName, params).then(({ channelIds }) => {
  937. log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
  938. const subscription = subscriptions[channelIds.join(';')];
  939. if (!subscription) {
  940. return;
  941. }
  942. const { listener, stopHeartbeat } = subscription;
  943. channelIds.forEach(channelId => {
  944. unsubscribe(`${redisPrefix}${channelId}`, listener);
  945. });
  946. stopHeartbeat();
  947. delete subscriptions[channelIds.join(';')];
  948. }).catch(err => {
  949. log.verbose(request.requestId, 'Unsubscription error:', err);
  950. socket.send(JSON.stringify({ error: err.toString() }));
  951. });
  952. /**
  953. * @param {WebSocketSession} session
  954. */
  955. const subscribeWebsocketToSystemChannel = ({ socket, request, subscriptions }) => {
  956. const accessTokenChannelId = `timeline:access_token:${request.accessTokenId}`;
  957. const systemChannelId = `timeline:system:${request.accountId}`;
  958. const listener = createSystemMessageListener(request, {
  959. onKill() {
  960. socket.close();
  961. },
  962. });
  963. subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
  964. subscribe(`${redisPrefix}${systemChannelId}`, listener);
  965. subscriptions[accessTokenChannelId] = {
  966. listener,
  967. stopHeartbeat: () => {
  968. },
  969. };
  970. subscriptions[systemChannelId] = {
  971. listener,
  972. stopHeartbeat: () => {
  973. },
  974. };
  975. };
  976. /**
  977. * @param {string|string[]} arrayOrString
  978. * @return {string}
  979. */
  980. const firstParam = arrayOrString => {
  981. if (Array.isArray(arrayOrString)) {
  982. return arrayOrString[0];
  983. } else {
  984. return arrayOrString;
  985. }
  986. };
  987. wss.on('connection', (ws, req) => {
  988. const location = url.parse(req.url, true);
  989. req.requestId = uuid.v4();
  990. req.remoteAddress = ws._socket.remoteAddress;
  991. ws.isAlive = true;
  992. ws.on('pong', () => {
  993. ws.isAlive = true;
  994. });
  995. /**
  996. * @type {WebSocketSession}
  997. */
  998. const session = {
  999. socket: ws,
  1000. request: req,
  1001. subscriptions: {},
  1002. };
  1003. const onEnd = () => {
  1004. const keys = Object.keys(session.subscriptions);
  1005. keys.forEach(channelIds => {
  1006. const { listener, stopHeartbeat } = session.subscriptions[channelIds];
  1007. channelIds.split(';').forEach(channelId => {
  1008. unsubscribe(`${redisPrefix}${channelId}`, listener);
  1009. });
  1010. stopHeartbeat();
  1011. });
  1012. };
  1013. ws.on('close', onEnd);
  1014. ws.on('error', onEnd);
  1015. ws.on('message', data => {
  1016. const json = parseJSON(data, session.request);
  1017. if (!json) return;
  1018. const { type, stream, ...params } = json;
  1019. if (type === 'subscribe') {
  1020. subscribeWebsocketToChannel(session, firstParam(stream), params);
  1021. } else if (type === 'unsubscribe') {
  1022. unsubscribeWebsocketFromChannel(session, firstParam(stream), params);
  1023. } else {
  1024. // Unknown action type
  1025. }
  1026. });
  1027. subscribeWebsocketToSystemChannel(session);
  1028. if (location.query.stream) {
  1029. subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query);
  1030. }
  1031. });
  1032. setInterval(() => {
  1033. wss.clients.forEach(ws => {
  1034. if (ws.isAlive === false) {
  1035. ws.terminate();
  1036. return;
  1037. }
  1038. ws.isAlive = false;
  1039. ws.ping('', false);
  1040. });
  1041. }, 30000);
  1042. attachServerWithConfig(server, address => {
  1043. log.warn(`Worker ${workerId} now listening on ${address}`);
  1044. });
  1045. const onExit = () => {
  1046. log.warn(`Worker ${workerId} exiting`);
  1047. server.close();
  1048. process.exit(0);
  1049. };
  1050. const onError = (err) => {
  1051. log.error(err);
  1052. server.close();
  1053. process.exit(0);
  1054. };
  1055. process.on('SIGINT', onExit);
  1056. process.on('SIGTERM', onExit);
  1057. process.on('exit', onExit);
  1058. process.on('uncaughtException', onError);
  1059. };
  1060. /**
  1061. * @param {any} server
  1062. * @param {function(string): void} [onSuccess]
  1063. */
  1064. const attachServerWithConfig = (server, onSuccess) => {
  1065. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  1066. server.listen(process.env.SOCKET || process.env.PORT, () => {
  1067. if (onSuccess) {
  1068. fs.chmodSync(server.address(), 0o666);
  1069. onSuccess(server.address());
  1070. }
  1071. });
  1072. } else {
  1073. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  1074. if (onSuccess) {
  1075. onSuccess(`${server.address().address}:${server.address().port}`);
  1076. }
  1077. });
  1078. }
  1079. };
  1080. /**
  1081. * @param {function(Error=): void} onSuccess
  1082. */
  1083. const onPortAvailable = onSuccess => {
  1084. const testServer = http.createServer();
  1085. testServer.once('error', err => {
  1086. onSuccess(err);
  1087. });
  1088. testServer.once('listening', () => {
  1089. testServer.once('close', () => onSuccess());
  1090. testServer.close();
  1091. });
  1092. attachServerWithConfig(testServer);
  1093. };
  1094. onPortAvailable(err => {
  1095. if (err) {
  1096. log.error('Could not start server, the port or socket is in use');
  1097. return;
  1098. }
  1099. throng({
  1100. workers: numWorkers,
  1101. lifetime: Infinity,
  1102. start: startWorker,
  1103. master: startMaster,
  1104. });
  1105. });