diff --git a/benchmark/diagnostics_channel/http.js b/benchmark/diagnostics_channel/http.js new file mode 100644 index 00000000000000..55fac8a706df15 --- /dev/null +++ b/benchmark/diagnostics_channel/http.js @@ -0,0 +1,96 @@ +'use strict'; +const common = require('../common.js'); +const dc = require('diagnostics_channel'); +const { AsyncLocalStorage } = require('async_hooks'); +const http = require('http'); + +const bench = common.createBenchmark(main, { + apm: ['none', 'diagnostics_channel', 'patch'], + type: 'buffer', + len: 1024, + chunks: 4, + connections: [50, 500], + chunkedEnc: 1, + duration: 5 +}); + +function main({ apm, connections, duration, type, len, chunks, chunkedEnc }) { + const done = { none, patch, diagnostics_channel }[apm](); + + const server = require('../fixtures/simple-http-server.js') + .listen(common.PORT) + .on('listening', () => { + const path = `/${type}/${len}/${chunks}/normal/${chunkedEnc}`; + bench.http({ + path, + connections, + duration + }, () => { + server.close(); + if (done) done(); + }); + }); +} + +function none() {} + +function patch() { + const als = new AsyncLocalStorage(); + const times = []; + + const { emit } = http.Server.prototype; + function wrappedEmit(...args) { + const [name, req, res] = args; + if (name === 'request') { + als.enterWith({ + url: req.url, + start: process.hrtime.bigint() + }); + + res.on('finish', () => { + times.push({ + ...als.getStore(), + statusCode: res.statusCode, + end: process.hrtime.bigint() + }); + }); + } + return emit.apply(this, args); + } + http.Server.prototype.emit = wrappedEmit; + + return () => { + http.Server.prototype.emit = emit; + }; +} + +function diagnostics_channel() { + const als = new AsyncLocalStorage(); + const times = []; + + const start = dc.channel('http.server.request.start'); + const finish = dc.channel('http.server.response.finish'); + + function onStart(req) { + als.enterWith({ + url: req.url, + start: process.hrtime.bigint() + }); + } + + function onFinish(res) { + times.push({ + ...als.getStore(), + statusCode: res.statusCode, + end: process.hrtime.bigint() + }); + } + + start.subscribe(onStart); + finish.subscribe(onFinish); + + return () => { + start.unsubscribe(onStart); + finish.unsubscribe(onFinish); + }; +} diff --git a/benchmark/diagnostics_channel/publish.js b/benchmark/diagnostics_channel/publish.js new file mode 100644 index 00000000000000..31a770c8627919 --- /dev/null +++ b/benchmark/diagnostics_channel/publish.js @@ -0,0 +1,29 @@ +'use strict'; +const common = require('../common.js'); +const dc = require('diagnostics_channel'); + +const bench = common.createBenchmark(main, { + n: [1e8], + subscribers: [0, 1, 10], +}); + +function noop() {} + +function main({ n, subscribers }) { + const channel = dc.channel('test'); + for (let i = 0; i < subscribers; i++) { + channel.subscribe(noop); + } + + const data = { + foo: 'bar' + }; + + bench.start(); + for (let i = 0; i < n; i++) { + if (channel.hasSubscribers) { + channel.publish(data); + } + } + bench.end(n); +} diff --git a/benchmark/diagnostics_channel/subscribe.js b/benchmark/diagnostics_channel/subscribe.js new file mode 100644 index 00000000000000..1415054588c4b1 --- /dev/null +++ b/benchmark/diagnostics_channel/subscribe.js @@ -0,0 +1,19 @@ +'use strict'; +const common = require('../common.js'); +const dc = require('diagnostics_channel'); + +const bench = common.createBenchmark(main, { + n: [1e8], +}); + +function noop() {} + +function main({ n }) { + const channel = dc.channel('channel.0'); + + bench.start(); + for (let i = 0; i < n; i++) { + channel.subscribe(noop); + } + bench.end(n); +} diff --git a/doc/api/diagnostics_channel.md b/doc/api/diagnostics_channel.md new file mode 100644 index 00000000000000..2f78ea80f6775e --- /dev/null +++ b/doc/api/diagnostics_channel.md @@ -0,0 +1,180 @@ +# Diagnostics Channel + + + +> Stability: 1 - Experimental + + + +The `diagnostics_channel` module provides an API to create named channels +to report arbitrary message data for diagnostics purposes. + +It can be accessed using: + +```js +const diagnostics_channel = require('diagnostics_channel'); +``` + +It is intended that a module writer wanting to report diagnostics messages +will create one or many top-level channels to report messages through. +Channels may also be acquired at runtime but it is not encouraged +due to the additional overhead of doing so. Channels may be exported for +convenience, but as long as the name is known it can be acquired anywhere. + +If you intend for your module to produce diagnostics data for others to +consume it is recommended that you include documentation of what named +channels are used along with the shape of the message data. Channel names +should generally include the module name to avoid collisions with data from +other modules. + +## Public API + +### Overview + +Following is a simple overview of the public API. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +// Get a reusable channel object +const channel = diagnostics_channel.channel('my-channel'); + +// Subscribe to the channel +channel.subscribe((message, name) => { + // Received data +}); + +// Check if the channel has an active subscriber +if (channel.hasSubscribers) { + // Publish data to the channel + channel.publish({ + some: 'data' + }); +} +``` + +#### `diagnostics_channel.hasSubscribers(name)` + +* `name` {string|symbol} The channel name +* Returns: {boolean} If there are active subscribers + +Check if there are active subscribers to the named channel. This is helpful if +the message you want to send might be expensive to prepare. + +This API is optional but helpful when trying to publish messages from very +performance-senstive code. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +if (diagnostics_channel.hasSubscribers('my-channel')) { + // There are subscribers, prepare and publish message +} +``` + +#### `diagnostics_channel.channel(name)` + +* `name` {string|symbol} The channel name +* Returns: {Channel} The named channel object + +This is the primary entry-point for anyone wanting to interact with a named +channel. It produces a channel object which is optimized to reduce overhead at +publish time as much as possible. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +const channel = diagnostics_channel.channel('my-channel'); +``` + +### Class: `Channel` + +The class `Channel` represents an individual named channel within the data +pipeline. It is use to track subscribers and to publish messages when there +are subscribers present. It exists as a separate object to avoid channel +lookups at publish time, enabling very fast publish speeds and allowing +for heavy use while incurring very minimal cost. Channels are created with +[`diagnostics_channel.channel(name)`][], constructing a channel directly +with `new Channel(name)` is not supported. + +#### `channel.hasSubscribers` + +* Returns: {boolean} If there are active subscribers + +Check if there are active subscribers to this channel. This is helpful if +the message you want to send might be expensive to prepare. + +This API is optional but helpful when trying to publish messages from very +performance-senstive code. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +const channel = diagnostics_channel.channel('my-channel'); + +if (channel.hasSubscribers) { + // There are subscribers, prepare and publish message +} +``` + +#### `channel.publish(message)` + +* `message` {any} The message to send to the channel subscribers + +Publish a message to any subscribers to the channel. This will trigger +message handlers synchronously so they will execute within the same context. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +const channel = diagnostics_channel.channel('my-channel'); + +channel.publish({ + some: 'message' +}); +``` + +#### `channel.subscribe(onMessage)` + +* `onMessage` {Function} The handler to receive channel messages + * `message` {any} The message data + * `name` {string|symbol} The name of the channel + +Register a message handler to subscribe to this channel. This message handler +will be run synchronously whenever a message is published to the channel. Any +errors thrown in the message handler will trigger an [`'uncaughtException'`][]. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +const channel = diagnostics_channel.channel('my-channel'); + +channel.subscribe((message, name) => { + // Received data +}); +``` + +#### `channel.unsubscribe(onMessage)` + +* `onMessage` {Function} The previous subscribed handler to remove + +Remove a message handler previously registered to this channel with +[`channel.subscribe(onMessage)`][]. + +```js +const diagnostics_channel = require('diagnostics_channel'); + +const channel = diagnostics_channel.channel('my-channel'); + +function onMessage(message, name) { + // Received data +} + +channel.subscribe(onMessage); + +channel.unsubscribe(onMessage); +``` + +[`diagnostics_channel.channel(name)`]: #diagnostics_channel_diagnostics_channel_channel_name +[`channel.subscribe(onMessage)`]: #diagnostics_channel_channel_subscribe_onmessage +[`'uncaughtException'`]: process.md#process_event_uncaughtexception diff --git a/doc/api/index.md b/doc/api/index.md index d10bd5988a9056..391873c16122c6 100644 --- a/doc/api/index.md +++ b/doc/api/index.md @@ -23,6 +23,7 @@ * [Crypto](crypto.md) * [Debugger](debugger.md) * [Deprecated APIs](deprecations.md) +* [Diagnostics Channel](diagnostics_channel.md) * [DNS](dns.md) * [Domain](domain.md) * [Errors](errors.md) diff --git a/lib/_http_server.js b/lib/_http_server.js index ac6dd1c50f94fe..7343d5f52791b9 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -84,6 +84,10 @@ const { observerCounts, constants } = internalBinding('performance'); const { setTimeout, clearTimeout } = require('timers'); const { NODE_PERFORMANCE_ENTRY_TYPE_HTTP } = constants; +const dc = require('diagnostics_channel'); +const onRequestStartChannel = dc.channel('http.server.request.start'); +const onResponseFinishChannel = dc.channel('http.server.response.finish'); + const kServerResponse = Symbol('ServerResponse'); const kServerResponseStatistics = Symbol('ServerResponseStatistics'); @@ -775,6 +779,15 @@ function clearRequestTimeout(req) { } function resOnFinish(req, res, socket, state, server) { + if (onResponseFinishChannel.hasSubscribers) { + onResponseFinishChannel.publish({ + request: req, + response: res, + socket, + server + }); + } + // Usually the first incoming element should be our request. it may // be that in the case abortIncoming() was called that the incoming // array will be empty. @@ -862,6 +875,15 @@ function parserOnIncoming(server, socket, state, req, keepAlive) { res.shouldKeepAlive = keepAlive; DTRACE_HTTP_SERVER_REQUEST(req, socket); + if (onRequestStartChannel.hasSubscribers) { + onRequestStartChannel.publish({ + request: req, + response: res, + socket, + server + }); + } + if (socket._httpMessage) { // There are already pending outgoing res, append. state.outgoing.push(res); diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js new file mode 100644 index 00000000000000..0a3552dc975040 --- /dev/null +++ b/lib/diagnostics_channel.js @@ -0,0 +1,122 @@ +'use strict'; + +const { + ArrayPrototypeIndexOf, + ArrayPrototypePush, + ArrayPrototypeSplice, + ObjectCreate, + ObjectGetPrototypeOf, + ObjectSetPrototypeOf, + SymbolHasInstance, + WeakRefPrototypeGet +} = primordials; + +const { + codes: { + ERR_INVALID_ARG_TYPE, + } +} = require('internal/errors'); + +const { triggerUncaughtException } = internalBinding('errors'); + +const { WeakReference } = internalBinding('util'); + +// TODO(qard): should there be a C++ channel interface? +class ActiveChannel { + subscribe(subscription) { + if (typeof subscription !== 'function') { + throw new ERR_INVALID_ARG_TYPE('subscription', ['function'], + subscription); + } + ArrayPrototypePush(this._subscribers, subscription); + } + + unsubscribe(subscription) { + const index = ArrayPrototypeIndexOf(this._subscribers, subscription); + if (index >= 0) { + ArrayPrototypeSplice(this._subscribers, index, 1); + + // When there are no more active subscribers, restore to fast prototype. + if (!this._subscribers.length) { + // eslint-disable-next-line no-use-before-define + ObjectSetPrototypeOf(this, Channel.prototype); + } + } + } + + get hasSubscribers() { + return true; + } + + publish(data) { + for (let i = 0; i < this._subscribers.length; i++) { + try { + const onMessage = this._subscribers[i]; + onMessage(data, this.name); + } catch (err) { + process.nextTick(() => { + triggerUncaughtException(err, false); + }); + } + } + } +} + +class Channel { + constructor(name) { + this._subscribers = undefined; + this.name = name; + } + + static [SymbolHasInstance](instance) { + const prototype = ObjectGetPrototypeOf(instance); + return prototype === Channel.prototype || + prototype === ActiveChannel.prototype; + } + + subscribe(subscription) { + ObjectSetPrototypeOf(this, ActiveChannel.prototype); + this._subscribers = []; + this.subscribe(subscription); + } + + get hasSubscribers() { + return false; + } + + publish() {} +} + +const channels = ObjectCreate(null); + +function channel(name) { + let channel; + const ref = channels[name]; + if (ref) channel = ref.get(); + if (channel) return channel; + + if (typeof name !== 'string' && typeof name !== 'symbol') { + throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name); + } + + channel = new Channel(name); + channels[name] = new WeakReference(channel); + return channel; +} + +function hasSubscribers(name) { + let channel; + const ref = channels[name]; + if (ref) channel = WeakRefPrototypeGet(ref); + if (!channel) { + return false; + } + + return channel.hasSubscribers; +} + +module.exports = { + channel, + hasSubscribers, + Channel +}; diff --git a/node.gyp b/node.gyp index 29b59d04d31f98..d2f0be54e933e3 100644 --- a/node.gyp +++ b/node.gyp @@ -51,6 +51,7 @@ 'lib/constants.js', 'lib/crypto.js', 'lib/cluster.js', + 'lib/diagnostics_channel.js', 'lib/dgram.js', 'lib/dns.js', 'lib/dns/promises.js', diff --git a/test/parallel/test-diagnostics-channel-http-server-start.js b/test/parallel/test-diagnostics-channel-http-server-start.js new file mode 100644 index 00000000000000..9a8136d4cc5839 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-http-server-start.js @@ -0,0 +1,65 @@ +'use strict'; + +const common = require('../common'); +const { AsyncLocalStorage } = require('async_hooks'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); +const http = require('http'); + +const incomingStartChannel = dc.channel('http.server.request.start'); +const outgoingFinishChannel = dc.channel('http.server.response.finish'); + +const als = new AsyncLocalStorage(); +let context; + +// Bind requests to an AsyncLocalStorage context +incomingStartChannel.subscribe(common.mustCall((message) => { + als.enterWith(message); + context = message; +})); + +// When the request ends, verify the context has been maintained +// and that the messages contain the expected data +outgoingFinishChannel.subscribe(common.mustCall((message) => { + const data = { + request, + response, + server, + socket: request.socket + }; + + // Context is maintained + compare(als.getStore(), context); + + compare(context, data); + compare(message, data); +})); + +let request; +let response; + +const server = http.createServer(common.mustCall((req, res) => { + request = req; + response = res; + + setTimeout(() => { + res.end('done'); + }, 1); +})); + +server.listen(() => { + const { port } = server.address(); + http.get(`http://localhost:${port}`, (res) => { + res.resume(); + res.on('end', () => { + server.close(); + }); + }); +}); + +function compare(a, b) { + assert.strictEqual(a.request, b.request); + assert.strictEqual(a.response, b.response); + assert.strictEqual(a.socket, b.socket); + assert.strictEqual(a.server, b.server); +} diff --git a/test/parallel/test-diagnostics-channel-object-channel-pub-sub.js b/test/parallel/test-diagnostics-channel-object-channel-pub-sub.js new file mode 100644 index 00000000000000..bae69b02415785 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-object-channel-pub-sub.js @@ -0,0 +1,39 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); +const { Channel } = dc; + +const input = { + foo: 'bar' +}; + +// Should not have named channel +assert.ok(!dc.hasSubscribers('test')); + +// Individual channel objects can be created to avoid future lookups +const channel = dc.channel('test'); +assert.ok(channel instanceof Channel); + +// No subscribers yet, should not publish +assert.ok(!channel.hasSubscribers); + +const subscriber = common.mustCall((message, name) => { + assert.strictEqual(name, channel.name); + assert.deepStrictEqual(message, input); +}); + +// Now there's a subscriber, should publish +channel.subscribe(subscriber); +assert.ok(channel.hasSubscribers); + +// The ActiveChannel prototype swap should not fail instanceof +assert.ok(channel instanceof Channel); + +// Should trigger the subscriber once +channel.publish(input); + +// Should not publish after subscriber is unsubscribed +channel.unsubscribe(subscriber); +assert.ok(!channel.hasSubscribers); diff --git a/test/parallel/test-diagnostics-channel-safe-subscriber-errors.js b/test/parallel/test-diagnostics-channel-safe-subscriber-errors.js new file mode 100644 index 00000000000000..b0c5ab2480e374 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-safe-subscriber-errors.js @@ -0,0 +1,29 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const input = { + foo: 'bar' +}; + +const channel = dc.channel('fail'); + +const error = new Error('nope'); + +process.on('uncaughtException', common.mustCall((err) => { + assert.strictEqual(err, error); +})); + +channel.subscribe(common.mustCall((message, name) => { + throw error; +})); + +// The failing subscriber should not stop subsequent subscribers from running +channel.subscribe(common.mustCall()); + +// Publish should continue without throwing +const fn = common.mustCall(); +channel.publish(input); +fn(); diff --git a/test/parallel/test-diagnostics-channel-symbol-named.js b/test/parallel/test-diagnostics-channel-symbol-named.js new file mode 100644 index 00000000000000..b98c2a1ef3ec6c --- /dev/null +++ b/test/parallel/test-diagnostics-channel-symbol-named.js @@ -0,0 +1,22 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); + +const input = { + foo: 'bar' +}; + +const symbol = Symbol('test'); + +// Individual channel objects can be created to avoid future lookups +const channel = dc.channel(symbol); + +// Expect two successful publishes later +channel.subscribe(common.mustCall((message, name) => { + assert.strictEqual(name, symbol); + assert.deepStrictEqual(message, input); +})); + +channel.publish(input); diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index 9a96d857a25177..e3d032319de135 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -111,6 +111,8 @@ const customTypesMap = { 'dgram.Socket': 'dgram.html#dgram_class_dgram_socket', + 'Channel': 'diagnostics_channel.html#diagnostics_channel_class_channel', + 'Domain': 'domain.html#domain_class_domain', 'errors.Error': 'errors.html#errors_class_error',