Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6a9412f
errors: handle V8 warnings in DisallowJavascriptExecutionScope
DivyanshuX9 May 24, 2026
7b20b8a
Merge branch 'nodejs:main' into fix/issue-63473-repl-asm-warning
DivyanshuX9 May 29, 2026
a786501
lib: diagnostics_channel use AsyncLocalStorage for suppression context
DivyanshuX9 May 29, 2026
e4aea85
test: add diagnostics_channel suppression coverage
DivyanshuX9 May 29, 2026
a5c4940
lib: rename diagnostics_channel suppression option to subscriberId
DivyanshuX9 May 30, 2026
7e415c0
test: clean up diagnostics_channel suppression coverage
DivyanshuX9 May 30, 2026
8e6d7cb
test: simplify diagnostics_channel suppression handler
DivyanshuX9 May 30, 2026
07e7e97
test: finalize diagnostics_channel suppression test
DivyanshuX9 May 30, 2026
9468795
diag(suppression-als): use ALS.run bound fn for suppression context; …
DivyanshuX9 May 30, 2026
b8194af
test: wrap suppressed callbacks with common.mustCall
DivyanshuX9 May 30, 2026
66e0cc9
src: emit V8 warnings synchronously in PerIsolateMessageListener (rem…
DivyanshuX9 May 30, 2026
01d9cc0
src: reintroduce synchronous V8 warning emission (USE wrapper)
DivyanshuX9 May 30, 2026
f9f428a
Merge branch 'main' into diag/suppression-als
DivyanshuX9 May 30, 2026
ddf8179
lib: initialize suppression storage eagerly
DivyanshuX9 May 31, 2026
909f0fc
Revert "lib: initialize suppression storage eagerly"
DivyanshuX9 May 31, 2026
e630f72
src: defer V8 warning emission to next tick
DivyanshuX9 May 31, 2026
cd5c009
lib: unify bypass key validation in diagnostics_channel
DivyanshuX9 Jun 3, 2026
3fdcd8e
test: update diagnostics_channel suppression tests for lazy storage
DivyanshuX9 Jun 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 199 additions & 44 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const {
ArrayPrototypeAt,
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypePushApply,
ArrayPrototypeSlice,
Expand All @@ -15,6 +14,7 @@ const {
ReflectApply,
SafeFinalizationRegistry,
SafeMap,
SafeSet,
SymbolDispose,
SymbolHasInstance,
} = primordials;
Expand All @@ -36,6 +36,33 @@ const { subscribers: subscriberCounts } = dc_binding;
const { WeakReference } = require('internal/util');
const { isPromise } = require('internal/util/types');

let suppressionStorage;

function getSuppressionsStorage() {
if (suppressionStorage === undefined) {
const { AsyncLocalStorage } = require('async_hooks');
suppressionStorage = new AsyncLocalStorage();
}
return suppressionStorage;
}

function withSuppressionsContext(set, fn, thisArg, args) {
return getSuppressionsStorage().run(
set,
() => ReflectApply(fn, thisArg, args),
);
}

function validateBypassKey(value, name) {
if (value == null) {
throw new ERR_INVALID_ARG_TYPE(name, ['object', 'symbol'], value);
}
const type = typeof value;
if (type !== 'object' && type !== 'symbol') {
throw new ERR_INVALID_ARG_TYPE(name, ['object', 'symbol'], value);
}
}

// Can't delete when weakref count reaches 0 as it could increment again.
// Only GC can be used as a valid time to clean up the channels map.
class WeakRefMap extends SafeMap {
Expand Down Expand Up @@ -71,16 +98,23 @@ function markActive(channel) {
// eslint-disable-next-line no-use-before-define
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
channel._subscribers = [];
channel._bypassSubscribers = null;
channel._stores = new SafeMap();
channel._bypassStores = null;
}

function maybeMarkInactive(channel) {
// When there are no more active subscribers or bound, restore to fast prototype.
if (!channel._subscribers.length && !channel._stores.size) {
if (!channel._subscribers.length &&
!channel._stores.size &&
!channel._bypassSubscribers?.length &&
!channel._bypassStores?.size) {
// eslint-disable-next-line no-use-before-define
ObjectSetPrototypeOf(channel, Channel.prototype);
channel._subscribers = undefined;
channel._stores = undefined;
channel._bypassSubscribers = undefined;
channel._bypassStores = undefined;
}
}

Expand All @@ -91,12 +125,11 @@ class RunStoresScope {
// eslint-disable-next-line no-restricted-globals
using stack = new DisposableStack();

// Enter stores using withScope
// Normal stores - exactly as before, zero extra cost
if (activeChannel._stores) {
for (const entry of activeChannel._stores.entries()) {
const store = entry[0];
const transform = entry[1];

let newContext = data;
if (transform) {
try {
Expand All @@ -108,7 +141,28 @@ class RunStoresScope {
continue;
}
}
stack.use(store.withScope(newContext));
}
}

// Bypass stores - only entered if bypass stores exist
if (activeChannel._bypassStores) {
const activeKeys = getSuppressionsStorage().getStore();
for (const entry of activeChannel._bypassStores.entries()) {
const store = entry[0];
const { transform, subscriberId } = entry[1];
if (activeKeys?.has(subscriberId)) continue;
let newContext = data;
if (transform) {
try {
newContext = transform(data);
} catch (err) {
process.nextTick(() => {
triggerUncaughtException(err, false);
});
continue;
}
}
stack.use(store.withScope(newContext));
}
}
Expand All @@ -127,69 +181,158 @@ class RunStoresScope {

// TODO(qard): should there be a C++ channel interface?
class ActiveChannel {
subscribe(subscription) {
subscribe(subscription, options = {}) {
validateFunction(subscription, 'subscription');
this._subscribers = ArrayPrototypeSlice(this._subscribers);
ArrayPrototypePush(this._subscribers, subscription);
const subscriberId = options?.subscriberId;

if (subscriberId !== undefined) {
validateBypassKey(subscriberId, 'subscriberId');
// Bypass path - lazy separate array
if (this._bypassSubscribers === null) {
this._bypassSubscribers = [];
}
this._bypassSubscribers = ArrayPrototypeSlice(this._bypassSubscribers);
ArrayPrototypePush(this._bypassSubscribers, { handler: subscription, subscriberId });
} else {
// Normal path - plain function, zero extra cost
this._subscribers = ArrayPrototypeSlice(this._subscribers);
ArrayPrototypePush(this._subscribers, subscription);
}

channels.incRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]++;
}

unsubscribe(subscription) {
const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
if (index === -1) return false;
// Check normal subscribers first
let index = -1;
for (let i = 0; i < this._subscribers.length; i++) {
if (this._subscribers[i] === subscription) {
index = i;
break;
}
}

const before = ArrayPrototypeSlice(this._subscribers, 0, index);
const after = ArrayPrototypeSlice(this._subscribers, index + 1);
this._subscribers = before;
ArrayPrototypePushApply(this._subscribers, after);
if (index !== -1) {
const before = ArrayPrototypeSlice(this._subscribers, 0, index);
const after = ArrayPrototypeSlice(this._subscribers, index + 1);
this._subscribers = before;
ArrayPrototypePushApply(this._subscribers, after);
channels.decRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]--;
maybeMarkInactive(this);
return true;
}

channels.decRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]--;
maybeMarkInactive(this);
// Check bypass subscribers
if (this._bypassSubscribers !== null) {
let bypassIndex = -1;
for (let i = 0; i < this._bypassSubscribers.length; i++) {
if (this._bypassSubscribers[i].handler === subscription) {
bypassIndex = i;
break;
}
}
if (bypassIndex !== -1) {
const before = ArrayPrototypeSlice(this._bypassSubscribers, 0, bypassIndex);
const after = ArrayPrototypeSlice(this._bypassSubscribers, bypassIndex + 1);
this._bypassSubscribers = before;
ArrayPrototypePushApply(this._bypassSubscribers, after);
if (this._bypassSubscribers.length === 0) {
this._bypassSubscribers = null;
}
channels.decRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]--;
maybeMarkInactive(this);
return true;
}
}

return true;
return false;
}

bindStore(store, transform) {
const replacing = this._stores.has(store);
if (!replacing) {
channels.incRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]++;
bindStore(store, transform, options = {}) {
const subscriberId = options?.subscriberId;

if (subscriberId !== undefined) {
validateBypassKey(subscriberId, 'subscriberId');
// Bypass path - lazy separate SafeMap
if (this._bypassStores === null) {
this._bypassStores = new SafeMap();
}
const replacing = this._bypassStores.has(store);
if (!replacing) {
channels.incRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]++;
}
this._bypassStores.set(store, { transform, subscriberId });
} else {
// Normal path - plain transform, zero extra cost
const replacing = this._stores.has(store);
if (!replacing) {
channels.incRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]++;
}
this._stores.set(store, transform);
}
this._stores.set(store, transform);
}

unbindStore(store) {
if (!this._stores.has(store)) {
return false;
if (this._stores.has(store)) {
this._stores.delete(store);
channels.decRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]--;
maybeMarkInactive(this);
return true;
}

this._stores.delete(store);

channels.decRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]--;
maybeMarkInactive(this);
if (this._bypassStores?.has(store)) {
this._bypassStores.delete(store);
if (this._bypassStores.size === 0) {
this._bypassStores = null;
}
channels.decRef(this.name);
if (this._index !== undefined) subscriberCounts[this._index]--;
maybeMarkInactive(this);
return true;
}

return true;
return false;
}

get hasSubscribers() {
return true;
}

publish(data) {
// Normal path - no ALS lookup, plain function call, zero overhead
const subscribers = this._subscribers;
for (let i = 0; i < (subscribers?.length || 0); i++) {
for (let i = 0; i < subscribers.length; i++) {
try {
const onMessage = subscribers[i];
onMessage(data, this.name);
subscribers[i](data, this.name);
} catch (err) {
process.nextTick(() => {
triggerUncaughtException(err, false);
});
}
}

// Bypass path - only entered if bypass subscribers exist
if (this._bypassSubscribers !== null) {
const activeKeys = getSuppressionsStorage().getStore();
const bypassSubscribers = this._bypassSubscribers;
for (let i = 0; i < bypassSubscribers.length; i++) {
try {
const { handler, subscriberId } = bypassSubscribers[i];
if (activeKeys?.has(subscriberId)) continue;
handler(data, this.name);
} catch (err) {
process.nextTick(() => {
triggerUncaughtException(err, false);
});
}
}
}
}

withStoreScope(data) {
Expand Down Expand Up @@ -221,18 +364,18 @@ class Channel {
prototype === ActiveChannel.prototype;
}

subscribe(subscription) {
subscribe(subscription, options) {
markActive(this);
this.subscribe(subscription);
this.subscribe(subscription, options);
}

unsubscribe() {
return false;
}

bindStore(store, transform) {
bindStore(store, transform, options) {
markActive(this);
this.bindStore(store, transform);
this.bindStore(store, transform, options);
}

unbindStore() {
Expand Down Expand Up @@ -366,12 +509,12 @@ class BoundedChannel {
this.end?.hasSubscribers;
}

subscribe(handlers) {
subscribe(handlers, options) {
for (let i = 0; i < boundedEvents.length; ++i) {
const name = boundedEvents[i];
if (!handlers[name]) continue;

this[name]?.subscribe(handlers[name]);
this[name]?.subscribe(handlers[name], options);
}
}

Expand Down Expand Up @@ -458,26 +601,26 @@ class TracingChannel {
this.error?.hasSubscribers;
}

subscribe(handlers) {
subscribe(handlers, options) {
// Subscribe to call window (start/end)
if (handlers.start || handlers.end) {
this.#callWindow.subscribe({
start: handlers.start,
end: handlers.end,
});
}, options);
}

// Subscribe to continuation window (asyncStart/asyncEnd)
if (handlers.asyncStart || handlers.asyncEnd) {
this.#continuationWindow.subscribe({
start: handlers.asyncStart,
end: handlers.asyncEnd,
});
}, options);
}

// Subscribe to error channel
if (handlers.error) {
this.error.subscribe(handlers.error);
this.error.subscribe(handlers.error, options);
}
}

Expand Down Expand Up @@ -633,10 +776,22 @@ function tracingChannel(nameOrChannels) {

dc_binding.linkNativeChannel((name) => channel(name));

function suppressed(key, fn, thisArg, ...args) {
validateFunction(fn, 'fn');

validateBypassKey(key, 'key');

const currentSet = getSuppressionsStorage().getStore();
const next = currentSet ? new SafeSet(currentSet) : new SafeSet();
next.add(key);
return withSuppressionsContext(next, fn, thisArg, args);
}

module.exports = {
channel,
hasSubscribers,
subscribe,
suppressed,
tracingChannel,
unsubscribe,
boundedChannel,
Expand Down
Loading
Loading