summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/diagnostics_channel.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills/diagnostics_channel.js')
-rw-r--r--ext/node/polyfills/diagnostics_channel.js430
1 files changed, 430 insertions, 0 deletions
diff --git a/ext/node/polyfills/diagnostics_channel.js b/ext/node/polyfills/diagnostics_channel.js
new file mode 100644
index 000000000..807c33e47
--- /dev/null
+++ b/ext/node/polyfills/diagnostics_channel.js
@@ -0,0 +1,430 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+
+// TODO(petamoriken): enable prefer-primordials for node polyfills
+// deno-lint-ignore-file prefer-primordials ban-untagged-todo
+
+import { ERR_INVALID_ARG_TYPE } from "ext:deno_node/internal/errors.ts";
+import { validateFunction } from "ext:deno_node/internal/validators.mjs";
+import { nextTick } from "node:process";
+import { primordials } from "ext:core/mod.js";
+
+const {
+ ArrayPrototypeAt,
+ ArrayPrototypeIndexOf,
+ ArrayPrototypePush,
+ ArrayPrototypeSplice,
+ ObjectDefineProperty,
+ ObjectGetPrototypeOf,
+ ObjectSetPrototypeOf,
+ Promise,
+ PromisePrototypeThen,
+ PromiseReject,
+ PromiseResolve,
+ ReflectApply,
+ SafeFinalizationRegistry,
+ SafeMap,
+ SymbolHasInstance,
+} = primordials;
+import { WeakReference } from "ext:deno_node/internal/util.mjs";
+
+// Can't delete when weakref count reaches 0 as it could increment again.
+// Only GC can be used as a valid time to clean up the channels map.
+class WeakRefMap extends SafeMap {
+ #finalizers = new SafeFinalizationRegistry((key) => {
+ this.delete(key);
+ });
+
+ set(key, value) {
+ this.#finalizers.register(value, key);
+ return super.set(key, new WeakReference(value));
+ }
+
+ get(key) {
+ return super.get(key)?.get();
+ }
+
+ incRef(key) {
+ return super.get(key)?.incRef();
+ }
+
+ decRef(key) {
+ return super.get(key)?.decRef();
+ }
+}
+
+function markActive(channel) {
+ ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
+ channel._subscribers = [];
+ channel._stores = new SafeMap();
+}
+
+function maybeMarkInactive(channel) {
+ // When there are no more active subscribers or bound, restore to fast prototype.
+ if (!channel._subscribers.length && !channel._stores.size) {
+ ObjectSetPrototypeOf(channel, Channel.prototype);
+ channel._subscribers = undefined;
+ channel._stores = undefined;
+ }
+}
+
+function defaultTransform(data) {
+ return data;
+}
+
+function wrapStoreRun(store, data, next, transform = defaultTransform) {
+ return () => {
+ let context;
+ try {
+ context = transform(data);
+ } catch (err) {
+ nextTick(() => {
+ // TODO(bartlomieju): in Node.js this is using `triggerUncaughtException` API, need
+ // to clarify if we need that or if just throwing the error is enough here.
+ throw err;
+ // triggerUncaughtException(err, false);
+ });
+ return next();
+ }
+
+ return store.run(context, next);
+ };
+}
+
+class ActiveChannel {
+ subscribe(subscription) {
+ validateFunction(subscription, "subscription");
+ ArrayPrototypePush(this._subscribers, subscription);
+ channels.incRef(this.name);
+ }
+
+ unsubscribe(subscription) {
+ const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
+ if (index === -1) return false;
+
+ ArrayPrototypeSplice(this._subscribers, index, 1);
+
+ channels.decRef(this.name);
+ maybeMarkInactive(this);
+
+ return true;
+ }
+
+ bindStore(store, transform) {
+ const replacing = this._stores.has(store);
+ if (!replacing) channels.incRef(this.name);
+ this._stores.set(store, transform);
+ }
+
+ unbindStore(store) {
+ if (!this._stores.has(store)) {
+ return false;
+ }
+
+ this._stores.delete(store);
+
+ channels.decRef(this.name);
+ maybeMarkInactive(this);
+
+ return true;
+ }
+
+ get hasSubscribers() {
+ return true;
+ }
+
+ publish(data) {
+ for (let i = 0; i < (this._subscribers?.length || 0); i++) {
+ try {
+ const onMessage = this._subscribers[i];
+ onMessage(data, this.name);
+ } catch (err) {
+ nextTick(() => {
+ // TODO(bartlomieju): in Node.js this is using `triggerUncaughtException` API, need
+ // to clarify if we need that or if just throwing the error is enough here.
+ throw err;
+ // triggerUncaughtException(err, false);
+ });
+ }
+ }
+ }
+
+ runStores(data, fn, thisArg, ...args) {
+ let run = () => {
+ this.publish(data);
+ return ReflectApply(fn, thisArg, args);
+ };
+
+ for (const entry of this._stores.entries()) {
+ const store = entry[0];
+ const transform = entry[1];
+ run = wrapStoreRun(store, data, run, transform);
+ }
+
+ return run();
+ }
+}
+
+class Channel {
+ constructor(name) {
+ this._subscribers = undefined;
+ this._stores = undefined;
+ this.name = name;
+
+ channels.set(name, this);
+ }
+
+ static [SymbolHasInstance](instance) {
+ const prototype = ObjectGetPrototypeOf(instance);
+ return prototype === Channel.prototype ||
+ prototype === ActiveChannel.prototype;
+ }
+
+ subscribe(subscription) {
+ markActive(this);
+ this.subscribe(subscription);
+ }
+
+ unsubscribe() {
+ return false;
+ }
+
+ bindStore(store, transform) {
+ markActive(this);
+ this.bindStore(store, transform);
+ }
+
+ unbindStore() {
+ return false;
+ }
+
+ get hasSubscribers() {
+ return false;
+ }
+
+ publish() {}
+
+ runStores(_data, fn, thisArg, ...args) {
+ return ReflectApply(fn, thisArg, args);
+ }
+}
+
+const channels = new WeakRefMap();
+
+export function channel(name) {
+ const channel = channels.get(name);
+ if (channel) return channel;
+
+ if (typeof name !== "string" && typeof name !== "symbol") {
+ throw new ERR_INVALID_ARG_TYPE("channel", ["string", "symbol"], name);
+ }
+
+ return new Channel(name);
+}
+
+export function subscribe(name, subscription) {
+ return channel(name).subscribe(subscription);
+}
+
+export function unsubscribe(name, subscription) {
+ return channel(name).unsubscribe(subscription);
+}
+
+export function hasSubscribers(name) {
+ const channel = channels.get(name);
+ if (!channel) return false;
+
+ return channel.hasSubscribers;
+}
+
+const traceEvents = [
+ "start",
+ "end",
+ "asyncStart",
+ "asyncEnd",
+ "error",
+];
+
+function assertChannel(value, name) {
+ if (!(value instanceof Channel)) {
+ throw new ERR_INVALID_ARG_TYPE(name, ["Channel"], value);
+ }
+}
+
+function tracingChannelFrom(nameOrChannels, name) {
+ if (typeof nameOrChannels === "string") {
+ return channel(`tracing:${nameOrChannels}:${name}`);
+ }
+
+ if (typeof nameOrChannels === "object" && nameOrChannels !== null) {
+ const channel = nameOrChannels[name];
+ assertChannel(channel, `nameOrChannels.${name}`);
+ return channel;
+ }
+
+ throw new ERR_INVALID_ARG_TYPE("nameOrChannels", [
+ "string",
+ "object",
+ "Channel",
+ ], nameOrChannels);
+}
+
+class TracingChannel {
+ constructor(nameOrChannels) {
+ for (const eventName of traceEvents) {
+ ObjectDefineProperty(this, eventName, {
+ __proto__: null,
+ value: tracingChannelFrom(nameOrChannels, eventName),
+ });
+ }
+ }
+
+ get hasSubscribers() {
+ return this.start.hasSubscribers ||
+ this.end.hasSubscribers ||
+ this.asyncStart.hasSubscribers ||
+ this.asyncEnd.hasSubscribers ||
+ this.error.hasSubscribers;
+ }
+
+ subscribe(handlers) {
+ for (const name of traceEvents) {
+ if (!handlers[name]) continue;
+
+ this[name]?.subscribe(handlers[name]);
+ }
+ }
+
+ unsubscribe(handlers) {
+ let done = true;
+
+ for (const name of traceEvents) {
+ if (!handlers[name]) continue;
+
+ if (!this[name]?.unsubscribe(handlers[name])) {
+ done = false;
+ }
+ }
+
+ return done;
+ }
+
+ traceSync(fn, context = {}, thisArg, ...args) {
+ if (!this.hasSubscribers) {
+ return ReflectApply(fn, thisArg, args);
+ }
+
+ const { start, end, error } = this;
+
+ return start.runStores(context, () => {
+ try {
+ const result = ReflectApply(fn, thisArg, args);
+ context.result = result;
+ return result;
+ } catch (err) {
+ context.error = err;
+ error.publish(context);
+ throw err;
+ } finally {
+ end.publish(context);
+ }
+ });
+ }
+
+ tracePromise(fn, context = {}, thisArg, ...args) {
+ if (!this.hasSubscribers) {
+ return ReflectApply(fn, thisArg, args);
+ }
+
+ const { start, end, asyncStart, asyncEnd, error } = this;
+
+ function reject(err) {
+ context.error = err;
+ error.publish(context);
+ asyncStart.publish(context);
+ // TODO: Is there a way to have asyncEnd _after_ the continuation?
+ asyncEnd.publish(context);
+ return PromiseReject(err);
+ }
+
+ function resolve(result) {
+ context.result = result;
+ asyncStart.publish(context);
+ // TODO: Is there a way to have asyncEnd _after_ the continuation?
+ asyncEnd.publish(context);
+ return result;
+ }
+
+ return start.runStores(context, () => {
+ try {
+ let promise = ReflectApply(fn, thisArg, args);
+ // Convert thenables to native promises
+ if (!(promise instanceof Promise)) {
+ promise = PromiseResolve(promise);
+ }
+ return PromisePrototypeThen(promise, resolve, reject);
+ } catch (err) {
+ context.error = err;
+ error.publish(context);
+ throw err;
+ } finally {
+ end.publish(context);
+ }
+ });
+ }
+
+ traceCallback(fn, position = -1, context = {}, thisArg, ...args) {
+ if (!this.hasSubscribers) {
+ return ReflectApply(fn, thisArg, args);
+ }
+
+ const { start, end, asyncStart, asyncEnd, error } = this;
+
+ function wrappedCallback(err, res) {
+ if (err) {
+ context.error = err;
+ error.publish(context);
+ } else {
+ context.result = res;
+ }
+
+ // Using runStores here enables manual context failure recovery
+ asyncStart.runStores(context, () => {
+ try {
+ return ReflectApply(callback, this, arguments);
+ } finally {
+ asyncEnd.publish(context);
+ }
+ });
+ }
+
+ const callback = ArrayPrototypeAt(args, position);
+ validateFunction(callback, "callback");
+ ArrayPrototypeSplice(args, position, 1, wrappedCallback);
+
+ return start.runStores(context, () => {
+ try {
+ return ReflectApply(fn, thisArg, args);
+ } catch (err) {
+ context.error = err;
+ error.publish(context);
+ throw err;
+ } finally {
+ end.publish(context);
+ }
+ });
+ }
+}
+
+export function tracingChannel(nameOrChannels) {
+ return new TracingChannel(nameOrChannels);
+}
+
+export default {
+ channel,
+ hasSubscribers,
+ subscribe,
+ tracingChannel,
+ unsubscribe,
+ Channel,
+};