Refactor streaming to simplify for logging change (#28056)
This commit is contained in:
parent
4949b6da58
commit
a4de0e364b
3 changed files with 139 additions and 92 deletions
|
@ -12,10 +12,12 @@ const { JSDOM } = require('jsdom');
|
||||||
const log = require('npmlog');
|
const log = require('npmlog');
|
||||||
const pg = require('pg');
|
const pg = require('pg');
|
||||||
const dbUrlToConfig = require('pg-connection-string').parse;
|
const dbUrlToConfig = require('pg-connection-string').parse;
|
||||||
const metrics = require('prom-client');
|
|
||||||
const uuid = require('uuid');
|
const uuid = require('uuid');
|
||||||
const WebSocket = require('ws');
|
const WebSocket = require('ws');
|
||||||
|
|
||||||
|
const { setupMetrics } = require('./metrics');
|
||||||
|
const { isTruthy } = require("./utils");
|
||||||
|
|
||||||
const environment = process.env.NODE_ENV || 'development';
|
const environment = process.env.NODE_ENV || 'development';
|
||||||
|
|
||||||
// Correctly detect and load .env or .env.production file based on environment:
|
// Correctly detect and load .env or .env.production file based on environment:
|
||||||
|
@ -196,78 +198,15 @@ const startServer = async () => {
|
||||||
const redisClient = await createRedisClient(redisConfig);
|
const redisClient = await createRedisClient(redisConfig);
|
||||||
const { redisPrefix } = redisConfig;
|
const { redisPrefix } = redisConfig;
|
||||||
|
|
||||||
// Collect metrics from Node.js
|
const metrics = setupMetrics(CHANNEL_NAMES, pgPool);
|
||||||
metrics.collectDefaultMetrics();
|
// TODO: migrate all metrics to metrics.X.method() instead of just X.method()
|
||||||
|
const {
|
||||||
new metrics.Gauge({
|
connectedClients,
|
||||||
name: 'pg_pool_total_connections',
|
connectedChannels,
|
||||||
help: 'The total number of clients existing within the pool',
|
redisSubscriptions,
|
||||||
collect() {
|
redisMessagesReceived,
|
||||||
this.set(pgPool.totalCount);
|
messagesSent,
|
||||||
},
|
} = metrics;
|
||||||
});
|
|
||||||
|
|
||||||
new metrics.Gauge({
|
|
||||||
name: 'pg_pool_idle_connections',
|
|
||||||
help: 'The number of clients which are not checked out but are currently idle in the pool',
|
|
||||||
collect() {
|
|
||||||
this.set(pgPool.idleCount);
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
new metrics.Gauge({
|
|
||||||
name: 'pg_pool_waiting_queries',
|
|
||||||
help: 'The number of queued requests waiting on a client when all clients are checked out',
|
|
||||||
collect() {
|
|
||||||
this.set(pgPool.waitingCount);
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
const connectedClients = new metrics.Gauge({
|
|
||||||
name: 'connected_clients',
|
|
||||||
help: 'The number of clients connected to the streaming server',
|
|
||||||
labelNames: ['type'],
|
|
||||||
});
|
|
||||||
|
|
||||||
const connectedChannels = new metrics.Gauge({
|
|
||||||
name: 'connected_channels',
|
|
||||||
help: 'The number of channels the streaming server is streaming to',
|
|
||||||
labelNames: [ 'type', 'channel' ]
|
|
||||||
});
|
|
||||||
|
|
||||||
const redisSubscriptions = new metrics.Gauge({
|
|
||||||
name: 'redis_subscriptions',
|
|
||||||
help: 'The number of Redis channels the streaming server is subscribed to',
|
|
||||||
});
|
|
||||||
|
|
||||||
const redisMessagesReceived = new metrics.Counter({
|
|
||||||
name: 'redis_messages_received_total',
|
|
||||||
help: 'The total number of messages the streaming server has received from redis subscriptions'
|
|
||||||
});
|
|
||||||
|
|
||||||
const messagesSent = new metrics.Counter({
|
|
||||||
name: 'messages_sent_total',
|
|
||||||
help: 'The total number of messages the streaming server sent to clients per connection type',
|
|
||||||
labelNames: [ 'type' ]
|
|
||||||
});
|
|
||||||
|
|
||||||
// Prime the gauges so we don't loose metrics between restarts:
|
|
||||||
redisSubscriptions.set(0);
|
|
||||||
connectedClients.set({ type: 'websocket' }, 0);
|
|
||||||
connectedClients.set({ type: 'eventsource' }, 0);
|
|
||||||
|
|
||||||
// For each channel, initialize the gauges at zero; There's only a finite set of channels available
|
|
||||||
CHANNEL_NAMES.forEach(( channel ) => {
|
|
||||||
connectedChannels.set({ type: 'websocket', channel }, 0);
|
|
||||||
connectedChannels.set({ type: 'eventsource', channel }, 0);
|
|
||||||
});
|
|
||||||
|
|
||||||
// Prime the counters so that we don't loose metrics between restarts.
|
|
||||||
// Unfortunately counters don't support the set() API, so instead I'm using
|
|
||||||
// inc(0) to achieve the same result.
|
|
||||||
redisMessagesReceived.inc(0);
|
|
||||||
messagesSent.inc({ type: 'websocket' }, 0);
|
|
||||||
messagesSent.inc({ type: 'eventsource' }, 0);
|
|
||||||
|
|
||||||
// When checking metrics in the browser, the favicon is requested this
|
// When checking metrics in the browser, the favicon is requested this
|
||||||
// prevents the request from falling through to the API Router, which would
|
// prevents the request from falling through to the API Router, which would
|
||||||
|
@ -388,25 +327,6 @@ const startServer = async () => {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
const FALSE_VALUES = [
|
|
||||||
false,
|
|
||||||
0,
|
|
||||||
'0',
|
|
||||||
'f',
|
|
||||||
'F',
|
|
||||||
'false',
|
|
||||||
'FALSE',
|
|
||||||
'off',
|
|
||||||
'OFF',
|
|
||||||
];
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {any} value
|
|
||||||
* @returns {boolean}
|
|
||||||
*/
|
|
||||||
const isTruthy = value =>
|
|
||||||
value && !FALSE_VALUES.includes(value);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {any} req
|
* @param {any} req
|
||||||
* @param {any} res
|
* @param {any} res
|
||||||
|
|
105
streaming/metrics.js
Normal file
105
streaming/metrics.js
Normal file
|
@ -0,0 +1,105 @@
|
||||||
|
// @ts-check
|
||||||
|
|
||||||
|
const metrics = require('prom-client');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef StreamingMetrics
|
||||||
|
* @property {metrics.Registry} register
|
||||||
|
* @property {metrics.Gauge<"type">} connectedClients
|
||||||
|
* @property {metrics.Gauge<"type" | "channel">} connectedChannels
|
||||||
|
* @property {metrics.Gauge} redisSubscriptions
|
||||||
|
* @property {metrics.Counter} redisMessagesReceived
|
||||||
|
* @property {metrics.Counter<"type">} messagesSent
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param {string[]} channels
|
||||||
|
* @param {import('pg').Pool} pgPool
|
||||||
|
* @returns {StreamingMetrics}
|
||||||
|
*/
|
||||||
|
function setupMetrics(channels, pgPool) {
|
||||||
|
// Collect metrics from Node.js
|
||||||
|
metrics.collectDefaultMetrics();
|
||||||
|
|
||||||
|
new metrics.Gauge({
|
||||||
|
name: 'pg_pool_total_connections',
|
||||||
|
help: 'The total number of clients existing within the pool',
|
||||||
|
collect() {
|
||||||
|
this.set(pgPool.totalCount);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
new metrics.Gauge({
|
||||||
|
name: 'pg_pool_idle_connections',
|
||||||
|
help: 'The number of clients which are not checked out but are currently idle in the pool',
|
||||||
|
collect() {
|
||||||
|
this.set(pgPool.idleCount);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
new metrics.Gauge({
|
||||||
|
name: 'pg_pool_waiting_queries',
|
||||||
|
help: 'The number of queued requests waiting on a client when all clients are checked out',
|
||||||
|
collect() {
|
||||||
|
this.set(pgPool.waitingCount);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const connectedClients = new metrics.Gauge({
|
||||||
|
name: 'connected_clients',
|
||||||
|
help: 'The number of clients connected to the streaming server',
|
||||||
|
labelNames: ['type'],
|
||||||
|
});
|
||||||
|
|
||||||
|
const connectedChannels = new metrics.Gauge({
|
||||||
|
name: 'connected_channels',
|
||||||
|
help: 'The number of channels the streaming server is streaming to',
|
||||||
|
labelNames: [ 'type', 'channel' ]
|
||||||
|
});
|
||||||
|
|
||||||
|
const redisSubscriptions = new metrics.Gauge({
|
||||||
|
name: 'redis_subscriptions',
|
||||||
|
help: 'The number of Redis channels the streaming server is subscribed to',
|
||||||
|
});
|
||||||
|
|
||||||
|
const redisMessagesReceived = new metrics.Counter({
|
||||||
|
name: 'redis_messages_received_total',
|
||||||
|
help: 'The total number of messages the streaming server has received from redis subscriptions'
|
||||||
|
});
|
||||||
|
|
||||||
|
const messagesSent = new metrics.Counter({
|
||||||
|
name: 'messages_sent_total',
|
||||||
|
help: 'The total number of messages the streaming server sent to clients per connection type',
|
||||||
|
labelNames: [ 'type' ]
|
||||||
|
});
|
||||||
|
|
||||||
|
// Prime the gauges so we don't loose metrics between restarts:
|
||||||
|
redisSubscriptions.set(0);
|
||||||
|
connectedClients.set({ type: 'websocket' }, 0);
|
||||||
|
connectedClients.set({ type: 'eventsource' }, 0);
|
||||||
|
|
||||||
|
// For each channel, initialize the gauges at zero; There's only a finite set of channels available
|
||||||
|
channels.forEach(( channel ) => {
|
||||||
|
connectedChannels.set({ type: 'websocket', channel }, 0);
|
||||||
|
connectedChannels.set({ type: 'eventsource', channel }, 0);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Prime the counters so that we don't loose metrics between restarts.
|
||||||
|
// Unfortunately counters don't support the set() API, so instead I'm using
|
||||||
|
// inc(0) to achieve the same result.
|
||||||
|
redisMessagesReceived.inc(0);
|
||||||
|
messagesSent.inc({ type: 'websocket' }, 0);
|
||||||
|
messagesSent.inc({ type: 'eventsource' }, 0);
|
||||||
|
|
||||||
|
return {
|
||||||
|
register: metrics.register,
|
||||||
|
connectedClients,
|
||||||
|
connectedChannels,
|
||||||
|
redisSubscriptions,
|
||||||
|
redisMessagesReceived,
|
||||||
|
messagesSent,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
exports.setupMetrics = setupMetrics;
|
22
streaming/utils.js
Normal file
22
streaming/utils.js
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
// @ts-check
|
||||||
|
|
||||||
|
const FALSE_VALUES = [
|
||||||
|
false,
|
||||||
|
0,
|
||||||
|
'0',
|
||||||
|
'f',
|
||||||
|
'F',
|
||||||
|
'false',
|
||||||
|
'FALSE',
|
||||||
|
'off',
|
||||||
|
'OFF',
|
||||||
|
];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {any} value
|
||||||
|
* @returns {boolean}
|
||||||
|
*/
|
||||||
|
const isTruthy = value =>
|
||||||
|
value && !FALSE_VALUES.includes(value);
|
||||||
|
|
||||||
|
exports.isTruthy = isTruthy;
|
Loading…
Reference in a new issue