diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js index 8d2d374dc8e6ae..d0911148bcabf9 100644 --- a/lib/diagnostics_channel.js +++ b/lib/diagnostics_channel.js @@ -2,7 +2,6 @@ const { ArrayPrototypeAt, - ArrayPrototypeIndexOf, ArrayPrototypePush, ArrayPrototypePushApply, ArrayPrototypeSlice, @@ -15,6 +14,7 @@ const { ReflectApply, SafeFinalizationRegistry, SafeMap, + SafeSet, SymbolDispose, SymbolHasInstance, } = primordials; @@ -36,6 +36,33 @@ const { subscribers: subscriberCounts } = dc_binding; const { WeakReference } = require('internal/util'); const { isPromise } = require('internal/util/types'); +let suppressionStorage; + +function getSuppressionsStorage() { + if (suppressionStorage === undefined) { + const { AsyncLocalStorage } = require('async_hooks'); + suppressionStorage = new AsyncLocalStorage(); + } + return suppressionStorage; +} + +function withSuppressionsContext(set, fn, thisArg, args) { + return getSuppressionsStorage().run( + set, + () => ReflectApply(fn, thisArg, args), + ); +} + +function validateBypassKey(value, name) { + if (value == null) { + throw new ERR_INVALID_ARG_TYPE(name, ['object', 'symbol'], value); + } + const type = typeof value; + if (type !== 'object' && type !== 'symbol') { + throw new ERR_INVALID_ARG_TYPE(name, ['object', 'symbol'], value); + } +} + // Can't delete when weakref count reaches 0 as it could increment again. // Only GC can be used as a valid time to clean up the channels map. class WeakRefMap extends SafeMap { @@ -71,16 +98,23 @@ function markActive(channel) { // eslint-disable-next-line no-use-before-define ObjectSetPrototypeOf(channel, ActiveChannel.prototype); channel._subscribers = []; + channel._bypassSubscribers = null; channel._stores = new SafeMap(); + channel._bypassStores = null; } function maybeMarkInactive(channel) { // When there are no more active subscribers or bound, restore to fast prototype. - if (!channel._subscribers.length && !channel._stores.size) { + if (!channel._subscribers.length && + !channel._stores.size && + !channel._bypassSubscribers?.length && + !channel._bypassStores?.size) { // eslint-disable-next-line no-use-before-define ObjectSetPrototypeOf(channel, Channel.prototype); channel._subscribers = undefined; channel._stores = undefined; + channel._bypassSubscribers = undefined; + channel._bypassStores = undefined; } } @@ -91,12 +125,11 @@ class RunStoresScope { // eslint-disable-next-line no-restricted-globals using stack = new DisposableStack(); - // Enter stores using withScope + // Normal stores - exactly as before, zero extra cost if (activeChannel._stores) { for (const entry of activeChannel._stores.entries()) { const store = entry[0]; const transform = entry[1]; - let newContext = data; if (transform) { try { @@ -108,7 +141,28 @@ class RunStoresScope { continue; } } + stack.use(store.withScope(newContext)); + } + } + // Bypass stores - only entered if bypass stores exist + if (activeChannel._bypassStores) { + const activeKeys = getSuppressionsStorage().getStore(); + for (const entry of activeChannel._bypassStores.entries()) { + const store = entry[0]; + const { transform, subscriberId } = entry[1]; + if (activeKeys?.has(subscriberId)) continue; + let newContext = data; + if (transform) { + try { + newContext = transform(data); + } catch (err) { + process.nextTick(() => { + triggerUncaughtException(err, false); + }); + continue; + } + } stack.use(store.withScope(newContext)); } } @@ -127,51 +181,123 @@ class RunStoresScope { // TODO(qard): should there be a C++ channel interface? class ActiveChannel { - subscribe(subscription) { + subscribe(subscription, options = {}) { validateFunction(subscription, 'subscription'); - this._subscribers = ArrayPrototypeSlice(this._subscribers); - ArrayPrototypePush(this._subscribers, subscription); + const subscriberId = options?.subscriberId; + + if (subscriberId !== undefined) { + validateBypassKey(subscriberId, 'subscriberId'); + // Bypass path - lazy separate array + if (this._bypassSubscribers === null) { + this._bypassSubscribers = []; + } + this._bypassSubscribers = ArrayPrototypeSlice(this._bypassSubscribers); + ArrayPrototypePush(this._bypassSubscribers, { handler: subscription, subscriberId }); + } else { + // Normal path - plain function, zero extra cost + this._subscribers = ArrayPrototypeSlice(this._subscribers); + ArrayPrototypePush(this._subscribers, subscription); + } + channels.incRef(this.name); if (this._index !== undefined) subscriberCounts[this._index]++; } unsubscribe(subscription) { - const index = ArrayPrototypeIndexOf(this._subscribers, subscription); - if (index === -1) return false; + // Check normal subscribers first + let index = -1; + for (let i = 0; i < this._subscribers.length; i++) { + if (this._subscribers[i] === subscription) { + index = i; + break; + } + } - const before = ArrayPrototypeSlice(this._subscribers, 0, index); - const after = ArrayPrototypeSlice(this._subscribers, index + 1); - this._subscribers = before; - ArrayPrototypePushApply(this._subscribers, after); + if (index !== -1) { + const before = ArrayPrototypeSlice(this._subscribers, 0, index); + const after = ArrayPrototypeSlice(this._subscribers, index + 1); + this._subscribers = before; + ArrayPrototypePushApply(this._subscribers, after); + channels.decRef(this.name); + if (this._index !== undefined) subscriberCounts[this._index]--; + maybeMarkInactive(this); + return true; + } - channels.decRef(this.name); - if (this._index !== undefined) subscriberCounts[this._index]--; - maybeMarkInactive(this); + // Check bypass subscribers + if (this._bypassSubscribers !== null) { + let bypassIndex = -1; + for (let i = 0; i < this._bypassSubscribers.length; i++) { + if (this._bypassSubscribers[i].handler === subscription) { + bypassIndex = i; + break; + } + } + if (bypassIndex !== -1) { + const before = ArrayPrototypeSlice(this._bypassSubscribers, 0, bypassIndex); + const after = ArrayPrototypeSlice(this._bypassSubscribers, bypassIndex + 1); + this._bypassSubscribers = before; + ArrayPrototypePushApply(this._bypassSubscribers, after); + if (this._bypassSubscribers.length === 0) { + this._bypassSubscribers = null; + } + channels.decRef(this.name); + if (this._index !== undefined) subscriberCounts[this._index]--; + maybeMarkInactive(this); + return true; + } + } - return true; + return false; } - bindStore(store, transform) { - const replacing = this._stores.has(store); - if (!replacing) { - channels.incRef(this.name); - if (this._index !== undefined) subscriberCounts[this._index]++; + bindStore(store, transform, options = {}) { + const subscriberId = options?.subscriberId; + + if (subscriberId !== undefined) { + validateBypassKey(subscriberId, 'subscriberId'); + // Bypass path - lazy separate SafeMap + if (this._bypassStores === null) { + this._bypassStores = new SafeMap(); + } + const replacing = this._bypassStores.has(store); + if (!replacing) { + channels.incRef(this.name); + if (this._index !== undefined) subscriberCounts[this._index]++; + } + this._bypassStores.set(store, { transform, subscriberId }); + } else { + // Normal path - plain transform, zero extra cost + const replacing = this._stores.has(store); + if (!replacing) { + channels.incRef(this.name); + if (this._index !== undefined) subscriberCounts[this._index]++; + } + this._stores.set(store, transform); } - this._stores.set(store, transform); } unbindStore(store) { - if (!this._stores.has(store)) { - return false; + if (this._stores.has(store)) { + this._stores.delete(store); + channels.decRef(this.name); + if (this._index !== undefined) subscriberCounts[this._index]--; + maybeMarkInactive(this); + return true; } - this._stores.delete(store); - - channels.decRef(this.name); - if (this._index !== undefined) subscriberCounts[this._index]--; - maybeMarkInactive(this); + if (this._bypassStores?.has(store)) { + this._bypassStores.delete(store); + if (this._bypassStores.size === 0) { + this._bypassStores = null; + } + channels.decRef(this.name); + if (this._index !== undefined) subscriberCounts[this._index]--; + maybeMarkInactive(this); + return true; + } - return true; + return false; } get hasSubscribers() { @@ -179,17 +305,34 @@ class ActiveChannel { } publish(data) { + // Normal path - no ALS lookup, plain function call, zero overhead const subscribers = this._subscribers; - for (let i = 0; i < (subscribers?.length || 0); i++) { + for (let i = 0; i < subscribers.length; i++) { try { - const onMessage = subscribers[i]; - onMessage(data, this.name); + subscribers[i](data, this.name); } catch (err) { process.nextTick(() => { triggerUncaughtException(err, false); }); } } + + // Bypass path - only entered if bypass subscribers exist + if (this._bypassSubscribers !== null) { + const activeKeys = getSuppressionsStorage().getStore(); + const bypassSubscribers = this._bypassSubscribers; + for (let i = 0; i < bypassSubscribers.length; i++) { + try { + const { handler, subscriberId } = bypassSubscribers[i]; + if (activeKeys?.has(subscriberId)) continue; + handler(data, this.name); + } catch (err) { + process.nextTick(() => { + triggerUncaughtException(err, false); + }); + } + } + } } withStoreScope(data) { @@ -221,18 +364,18 @@ class Channel { prototype === ActiveChannel.prototype; } - subscribe(subscription) { + subscribe(subscription, options) { markActive(this); - this.subscribe(subscription); + this.subscribe(subscription, options); } unsubscribe() { return false; } - bindStore(store, transform) { + bindStore(store, transform, options) { markActive(this); - this.bindStore(store, transform); + this.bindStore(store, transform, options); } unbindStore() { @@ -366,12 +509,12 @@ class BoundedChannel { this.end?.hasSubscribers; } - subscribe(handlers) { + subscribe(handlers, options) { for (let i = 0; i < boundedEvents.length; ++i) { const name = boundedEvents[i]; if (!handlers[name]) continue; - this[name]?.subscribe(handlers[name]); + this[name]?.subscribe(handlers[name], options); } } @@ -458,13 +601,13 @@ class TracingChannel { this.error?.hasSubscribers; } - subscribe(handlers) { + subscribe(handlers, options) { // Subscribe to call window (start/end) if (handlers.start || handlers.end) { this.#callWindow.subscribe({ start: handlers.start, end: handlers.end, - }); + }, options); } // Subscribe to continuation window (asyncStart/asyncEnd) @@ -472,12 +615,12 @@ class TracingChannel { this.#continuationWindow.subscribe({ start: handlers.asyncStart, end: handlers.asyncEnd, - }); + }, options); } // Subscribe to error channel if (handlers.error) { - this.error.subscribe(handlers.error); + this.error.subscribe(handlers.error, options); } } @@ -633,10 +776,22 @@ function tracingChannel(nameOrChannels) { dc_binding.linkNativeChannel((name) => channel(name)); +function suppressed(key, fn, thisArg, ...args) { + validateFunction(fn, 'fn'); + + validateBypassKey(key, 'key'); + + const currentSet = getSuppressionsStorage().getStore(); + const next = currentSet ? new SafeSet(currentSet) : new SafeSet(); + next.add(key); + return withSuppressionsContext(next, fn, thisArg, args); +} + module.exports = { channel, hasSubscribers, subscribe, + suppressed, tracingChannel, unsubscribe, boundedChannel, diff --git a/test/parallel/test-diagnostics-channel-suppression.js b/test/parallel/test-diagnostics-channel-suppression.js new file mode 100644 index 00000000000000..04cfc42e3e43e9 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-suppression.js @@ -0,0 +1,210 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { channel, suppressed } = require('node:diagnostics_channel'); +const { AsyncLocalStorage } = require('async_hooks'); + +// Test 1: Basic suppression - subscriber with subscriberId is skipped inside suppressed() +{ + const key = Symbol('tracer'); + const ch = channel('test-suppression-basic'); + const handler = common.mustNotCall(); + ch.subscribe(handler, { subscriberId: key }); + + suppressed(key, common.mustCall(() => { + ch.publish({}); + })); + + ch.unsubscribe(handler); +} + +// Test 2: Non-opted subscriber fires even inside suppressed() scope +{ + const key = Symbol('tracer2'); + const ch = channel('test-suppression-nonopted'); + const optedHandler = common.mustNotCall(); + const regularHandler = common.mustCall(); + ch.subscribe(optedHandler, { subscriberId: key }); + ch.subscribe(regularHandler); // no suppression + + suppressed(key, common.mustCall(() => { + ch.publish({}); + })); + + ch.unsubscribe(optedHandler); + ch.unsubscribe(regularHandler); +} + +// Test 3: Two APMs with different keys don't suppress each other +{ + const k1 = Symbol('k1'); + const k2 = Symbol('k2'); + const ch = channel('test-suppression-two-keys'); + let h1Calls = 0; + let h2Calls = 0; + const h1 = common.mustCall(() => { h1Calls++; }, 1); + const h2 = common.mustCall(() => { h2Calls++; }, 1); + ch.subscribe(h1, { subscriberId: k1 }); + ch.subscribe(h2, { subscriberId: k2 }); + + suppressed(k1, common.mustCall(() => { + ch.publish({}); + })); + + assert.strictEqual(h1Calls, 0); + assert.strictEqual(h2Calls, 1); + + suppressed(k2, common.mustCall(() => { + ch.publish({}); + })); + + assert.strictEqual(h1Calls, 1); + assert.strictEqual(h2Calls, 1); + + ch.unsubscribe(h1); + ch.unsubscribe(h2); +} + +// Test 4: Nested suppressed() calls (same key, different keys) +{ + const k1 = Symbol('nested1'); + const k2 = Symbol('nested2'); + const ch = channel('test-suppression-nested'); + const h1 = common.mustNotCall(); + let h2Calls = 0; + const h2 = common.mustCall(() => { h2Calls++; }, 2); + ch.subscribe(h1, { subscriberId: k1 }); + ch.subscribe(h2, { subscriberId: k2 }); + + suppressed(k1, common.mustCall(() => { + // Inside k1, h1 skipped, h2 runs + ch.publish({}); + assert.strictEqual(h2Calls, 1); + + suppressed(k2, common.mustCall(() => { + // Inside both, both skipped + ch.publish({}); + assert.strictEqual(h2Calls, 1); + })); + + // Back to only k1 + ch.publish({}); + assert.strictEqual(h2Calls, 2); + })); + + ch.unsubscribe(h1); + ch.unsubscribe(h2); +} + +// Test 5: suppressed() across a Promise boundary +{ + const key = Symbol('promise'); + const ch = channel('test-suppression-promise'); + const handler = common.mustNotCall(); + ch.subscribe(handler, { subscriberId: key }); + const done = common.mustCall(); + + suppressed(key, common.mustCall(async () => { + await Promise.resolve(); + ch.publish({}); + })).then(common.mustCall(() => { + ch.unsubscribe(handler); + done(); + })); +} + +// Test 6: suppressed() across setImmediate and queueMicrotask +{ + const key = Symbol('timers'); + const ch = channel('test-suppression-timers'); + const handler = common.mustNotCall(); + ch.subscribe(handler, { subscriberId: key }); + const done = common.mustCall(); + + suppressed(key, common.mustCall(async () => { + await new Promise((resolve) => { + setImmediate(common.mustCall(() => { + ch.publish({}); + + queueMicrotask(common.mustCall(() => { + ch.publish({}); + resolve(); + })); + })); + }); + })).then(common.mustCall(() => { + ch.unsubscribe(handler); + done(); + })); +} + +// Test 7: unsubscribe() works correctly after using subscriberId +{ + const key = Symbol('unsub'); + const ch = channel('test-suppression-unsubscribe'); + const handler = common.mustNotCall(); + ch.subscribe(handler, { subscriberId: key }); + ch.unsubscribe(handler); + + // Should not throw and should not be called + suppressed(key, common.mustCall(() => { + ch.publish({}); + })); + +} + +// Test 8: bindStore with subscriberId is skipped inside suppressed() +{ + const key = Symbol('store'); + const ch = channel('test-suppression-store'); + const als = new AsyncLocalStorage(); + + // Bypass store transform - must NOT be called inside suppressed() + ch.bindStore(als, common.mustNotCall(), { subscriberId: key }); + + // Handler counts how many times it's called + let handlerCalls = 0; + const handler = common.mustCall(() => { + handlerCalls++; + assert.strictEqual(als.getStore(), undefined); + }); + ch.subscribe(handler); + + // Use runStores so stores are actually entered + suppressed(key, common.mustCall(() => { + ch.runStores({}, () => { + // runStores already calls ch.publish() internally + }, null); + })); + + assert.strictEqual(handlerCalls, 1); + ch.unsubscribe(handler); + ch.unbindStore(als); +} + +// Test 9: Wrong type for subscriberId throws ERR_INVALID_ARG_TYPE +{ + const ch = channel('test-suppression-wrong-type'); + const bad = 'not-allowed'; + assert.throws(() => ch.subscribe(() => {}, { subscriberId: bad }), { + name: 'TypeError' + }); + const als = new AsyncLocalStorage(); + assert.throws(() => ch.bindStore(als, (d) => d, { subscriberId: bad }), { + name: 'TypeError' + }); +} + +// Test 10: suppressed() return value passes through fn's return value +{ + const key = Symbol('return'); + const receiver = { value: 41 }; + const result = suppressed(key, common.mustCall(function(a, b) { + assert.strictEqual(this, receiver); + assert.strictEqual(a, 'a'); + assert.strictEqual(b, 'b'); + return this.value + 1; + }), receiver, 'a', 'b'); + assert.strictEqual(result, 42); +}