summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/worker_threads.ts
diff options
context:
space:
mode:
authorSatya Rohith <me@satyarohith.com>2024-03-08 20:15:55 +0530
committerGitHub <noreply@github.com>2024-03-08 14:45:55 +0000
commitd5b01e41586c05d2af0bcf361ed4eadd03d60765 (patch)
treeeacd35ce984644d7635fcb5bbd76631c25635ee7 /ext/node/polyfills/worker_threads.ts
parent44da066359916632ba529ca7b05849b269d48648 (diff)
refactor(ext/node): use worker ops directly in worker_threads (#22794)
Diffstat (limited to 'ext/node/polyfills/worker_threads.ts')
-rw-r--r--ext/node/polyfills/worker_threads.ts313
1 files changed, 242 insertions, 71 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index 1efada056..5da5ec07d 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -1,21 +1,44 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
-// TODO(petamoriken): enable prefer-primordials for node polyfills
-// deno-lint-ignore-file prefer-primordials
-
-import { core, internals } from "ext:core/mod.js";
-import { op_require_read_closest_package_json } from "ext:core/ops";
-
-import { isAbsolute, resolve } from "node:path";
+import { core, internals, primordials } from "ext:core/mod.js";
+import {
+ op_create_worker,
+ op_host_post_message,
+ op_host_recv_ctrl,
+ op_host_recv_message,
+ op_host_terminate_worker,
+ op_require_read_closest_package_json,
+} from "ext:core/ops";
+import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
+import {
+ deserializeJsMessageData,
+ MessageChannel,
+ MessagePort,
+ serializeJsMessageData,
+} from "ext:deno_web/13_message_port.js";
+import * as webidl from "ext:deno_webidl/00_webidl.js";
+import { log } from "ext:runtime/06_util.js";
import { notImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter, once } from "node:events";
-import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
-import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js";
-import { refWorker, unrefWorker } from "ext:runtime/11_workers.js";
+import { isAbsolute, resolve } from "node:path";
-let environmentData = new Map();
-let threads = 0;
+const {
+ Error,
+ Symbol,
+ SymbolFor,
+ SymbolIterator,
+ StringPrototypeEndsWith,
+ StringPrototypeReplace,
+ StringPrototypeMatch,
+ StringPrototypeReplaceAll,
+ StringPrototypeToString,
+ SafeWeakMap,
+ SafeRegExp,
+ SafeMap,
+ TypeError,
+ PromisePrototypeThen,
+} = primordials;
export interface WorkerOptions {
// only for typings
@@ -33,9 +56,11 @@ export interface WorkerOptions {
stackSizeMb?: number;
};
+ // deno-lint-ignore prefer-primordials
eval?: boolean;
transferList?: Transferable[];
workerData?: unknown;
+ name?: string;
}
const WHITESPACE_ENCODINGS: Record<string, string> = {
@@ -48,7 +73,7 @@ const WHITESPACE_ENCODINGS: Record<string, string> = {
};
function encodeWhitespace(string: string): string {
- return string.replaceAll(/[\s]/g, (c) => {
+ return StringPrototypeReplaceAll(string, new SafeRegExp(/[\s]/g), (c) => {
return WHITESPACE_ENCODINGS[c] ?? c;
});
}
@@ -59,7 +84,11 @@ function toFileUrlPosix(path: string): URL {
}
const url = new URL("file:///");
url.pathname = encodeWhitespace(
- path.replace(/%/g, "%25").replace(/\\/g, "%5C"),
+ StringPrototypeReplace(
+ StringPrototypeReplace(path, new SafeRegExp(/%/g), "%25"),
+ new SafeRegExp(/\\/g),
+ "%5C",
+ ),
);
return url;
}
@@ -68,11 +97,14 @@ function toFileUrlWin32(path: string): URL {
if (!isAbsolute(path)) {
throw new TypeError("Must be an absolute path.");
}
- const [, hostname, pathname] = path.match(
- /^(?:[/\\]{2}([^/\\]+)(?=[/\\](?:[^/\\]|$)))?(.*)/,
- )!;
+ const { 0: _, 1: hostname, 2: pathname } = StringPrototypeMatch(
+ path,
+ new SafeRegExp(/^(?:[/\\]{2}([^/\\]+)(?=[/\\](?:[^/\\]|$)))?(.*)/),
+ );
const url = new URL("file:///");
- url.pathname = encodeWhitespace(pathname.replace(/%/g, "%25"));
+ url.pathname = encodeWhitespace(
+ StringPrototypeReplace(pathname, new SafeRegExp(/%/g), "%25"),
+ );
if (hostname != null && hostname != "localhost") {
url.hostname = hostname;
if (!url.hostname) {
@@ -99,11 +131,27 @@ function toFileUrl(path: string): URL {
: toFileUrlPosix(path);
}
-const kHandle = Symbol("kHandle");
+let threads = 0;
+const privateWorkerRef = Symbol("privateWorkerRef");
const PRIVATE_WORKER_THREAD_NAME = "$DENO_STD_NODE_WORKER_THREAD";
-class _Worker extends EventEmitter {
- readonly threadId: number;
- readonly resourceLimits: Required<
+class NodeWorker extends EventEmitter {
+ #id = 0;
+ // TODO(satyarohith): remove after https://github.com/denoland/deno/pull/22785 lands
+ #name = PRIVATE_WORKER_THREAD_NAME;
+ #refCount = 1;
+ #messagePromise = undefined;
+ #controlPromise = undefined;
+ // "RUNNING" | "CLOSED" | "TERMINATED"
+ // "TERMINATED" means that any controls or messages received will be
+ // discarded. "CLOSED" means that we have received a control
+ // indicating that the worker is no longer running, but there might
+ // still be messages left to receive.
+ #status = "RUNNING";
+
+ // https://nodejs.org/api/worker_threads.html#workerthreadid
+ threadId = this.#id;
+ // https://nodejs.org/api/worker_threads.html#workerresourcelimits
+ resourceLimits: Required<
NonNullable<WorkerOptions["resourceLimits"]>
> = {
maxYoungGenerationSizeMb: -1,
@@ -111,9 +159,6 @@ class _Worker extends EventEmitter {
codeRangeSizeMb: -1,
stackSizeMb: 4,
};
- private readonly [kHandle]: Worker;
-
- postMessage: Worker["postMessage"];
constructor(specifier: URL | string, options?: WorkerOptions) {
super();
@@ -128,8 +173,11 @@ class _Worker extends EventEmitter {
// empty catch block when package json might not be present
}
if (
- !(specifier.toString().endsWith(".mjs") ||
- (pkg && pkg.exists && pkg.typ == "module"))
+ !(StringPrototypeEndsWith(
+ StringPrototypeToString(specifier),
+ ".mjs",
+ )) ||
+ (pkg && pkg.exists && pkg.typ == "module")
) {
const cwdFileUrl = toFileUrl(Deno.cwd());
specifier =
@@ -138,45 +186,160 @@ class _Worker extends EventEmitter {
specifier = toFileUrl(specifier as string);
}
}
- const handle = this[kHandle] = new Worker(
- specifier,
+
+ const id = op_create_worker(
{
- name: PRIVATE_WORKER_THREAD_NAME,
- type: "module",
- } as globalThis.WorkerOptions, // bypass unstable type error
- );
- handle.addEventListener(
- "error",
- (event) => this.emit("error", event.error || event.message),
- );
- handle.addEventListener(
- "messageerror",
- (event) => this.emit("messageerror", event.data),
- );
- handle.addEventListener(
- "message",
- (event) => this.emit("message", event.data),
+ // deno-lint-ignore prefer-primordials
+ specifier: specifier.toString(),
+ hasSourceCode: false,
+ sourceCode: "",
+ permissions: null,
+ name: this.#name,
+ workerType: "module",
+ },
);
- handle.postMessage({
+ this.#id = id;
+ this.#pollControl();
+ this.#pollMessages();
+
+ this.postMessage({
environmentData,
threadId: (this.threadId = ++threads),
workerData: options?.workerData,
}, options?.transferList || []);
- this.postMessage = handle.postMessage.bind(handle);
+ // https://nodejs.org/api/worker_threads.html#event-online
this.emit("online");
}
+ [privateWorkerRef](ref) {
+ if (ref) {
+ this.#refCount++;
+ } else {
+ this.#refCount--;
+ }
+
+ if (!ref && this.#refCount == 0) {
+ if (this.#controlPromise) {
+ core.unrefOpPromise(this.#controlPromise);
+ }
+ if (this.#messagePromise) {
+ core.unrefOpPromise(this.#messagePromise);
+ }
+ } else if (ref && this.#refCount == 1) {
+ if (this.#controlPromise) {
+ core.refOpPromise(this.#controlPromise);
+ }
+ if (this.#messagePromise) {
+ core.refOpPromise(this.#messagePromise);
+ }
+ }
+ }
+
+ #handleError(err) {
+ this.emit("error", err);
+ }
+
+ #pollControl = async () => {
+ while (this.#status === "RUNNING") {
+ this.#controlPromise = op_host_recv_ctrl(this.#id);
+ if (this.#refCount < 1) {
+ core.unrefOpPromise(this.#controlPromise);
+ }
+ const { 0: type, 1: data } = await this.#controlPromise;
+
+ // If terminate was called then we ignore all messages
+ if (this.#status === "TERMINATED") {
+ return;
+ }
+
+ switch (type) {
+ case 1: { // TerminalError
+ this.#status = "CLOSED";
+ } /* falls through */
+ case 2: { // Error
+ this.#handleError(data);
+ break;
+ }
+ case 3: { // Close
+ log(`Host got "close" message from worker: ${this.#name}`);
+ this.#status = "CLOSED";
+ return;
+ }
+ default: {
+ throw new Error(`Unknown worker event: "${type}"`);
+ }
+ }
+ }
+ };
+
+ #pollMessages = async () => {
+ while (this.#status !== "TERMINATED") {
+ this.#messagePromise = op_host_recv_message(this.#id);
+ if (this.#refCount < 1) {
+ core.unrefOpPromise(this.#messagePromise);
+ }
+ const data = await this.#messagePromise;
+ if (this.#status === "TERMINATED" || data === null) {
+ return;
+ }
+ let message, _transferables;
+ try {
+ const v = deserializeJsMessageData(data);
+ message = v[0];
+ _transferables = v[1];
+ } catch (err) {
+ this.emit("messageerror", err);
+ return;
+ }
+ this.emit("message", message);
+ }
+ };
+
+ postMessage(message, transferOrOptions = {}) {
+ const prefix = "Failed to execute 'postMessage' on 'MessagePort'";
+ webidl.requiredArguments(arguments.length, 1, prefix);
+ message = webidl.converters.any(message);
+ let options;
+ if (
+ webidl.type(transferOrOptions) === "Object" &&
+ transferOrOptions !== undefined &&
+ transferOrOptions[SymbolIterator] !== undefined
+ ) {
+ const transfer = webidl.converters["sequence<object>"](
+ transferOrOptions,
+ prefix,
+ "Argument 2",
+ );
+ options = { transfer };
+ } else {
+ options = webidl.converters.StructuredSerializeOptions(
+ transferOrOptions,
+ prefix,
+ "Argument 2",
+ );
+ }
+ const { transfer } = options;
+ const data = serializeJsMessageData(message, transfer);
+ if (this.#status === "RUNNING") {
+ op_host_post_message(this.#id, data);
+ }
+ }
+
+ // https://nodejs.org/api/worker_threads.html#workerterminate
terminate() {
- this[kHandle].terminate();
- this.emit("exit", 0);
+ if (this.#status !== "TERMINATED") {
+ this.#status = "TERMINATED";
+ op_host_terminate_worker(this.#id);
+ }
+ this.emit("exit", 1);
}
ref() {
- refWorker(this[kHandle]);
+ this[privateWorkerRef](true);
}
unref() {
- unrefWorker(this[kHandle]);
+ this[privateWorkerRef](false);
}
readonly getHeapSnapshot = () =>
@@ -190,6 +353,7 @@ export let resourceLimits;
let threadId = 0;
let workerData: unknown = null;
+let environmentData = new SafeMap();
// Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611
interface NodeEventTarget extends
@@ -232,25 +396,32 @@ internals.__initWorkerThreads = () => {
if (!isMainThread) {
// deno-lint-ignore no-explicit-any
delete (globalThis as any).name;
- // deno-lint-ignore no-explicit-any
- const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>();
+ const listeners = new SafeWeakMap<
+ // deno-lint-ignore no-explicit-any
+ (...args: any[]) => void,
+ // deno-lint-ignore no-explicit-any
+ (ev: any) => any
+ >();
parentPort = self as ParentPort;
- const initPromise = once(
- parentPort,
- "message",
- ).then((result) => {
- // TODO(kt3k): The below values are set asynchronously
- // using the first message from the parent.
- // This should be done synchronously.
- threadId = result[0].data.threadId;
- workerData = result[0].data.workerData;
- environmentData = result[0].data.environmentData;
-
- defaultExport.threadId = threadId;
- defaultExport.workerData = workerData;
- });
+ const initPromise = PromisePrototypeThen(
+ once(
+ parentPort,
+ "message",
+ ),
+ (result) => {
+ // TODO(kt3k): The below values are set asynchronously
+ // using the first message from the parent.
+ // This should be done synchronously.
+ threadId = result[0].data.threadId;
+ workerData = result[0].data.workerData;
+ environmentData = result[0].data.environmentData;
+
+ defaultExport.threadId = threadId;
+ defaultExport.workerData = workerData;
+ },
+ );
parentPort.off = parentPort.removeListener = function (
this: ParentPort,
@@ -266,7 +437,7 @@ internals.__initWorkerThreads = () => {
name,
listener,
) {
- initPromise.then(() => {
+ PromisePrototypeThen(initPromise, () => {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
listeners.set(listener, _listener);
@@ -276,7 +447,7 @@ internals.__initWorkerThreads = () => {
};
parentPort.once = function (this: ParentPort, name, listener) {
- initPromise.then(() => {
+ PromisePrototypeThen(initPromise, () => {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
listeners.set(listener, _listener);
@@ -313,7 +484,7 @@ export function setEnvironmentData(key: unknown, value?: unknown) {
}
}
-export const SHARE_ENV = Symbol.for("nodejs.worker_threads.SHARE_ENV");
+export const SHARE_ENV = SymbolFor("nodejs.worker_threads.SHARE_ENV");
export function markAsUntransferable() {
notImplemented("markAsUntransferable");
}
@@ -324,10 +495,10 @@ export function receiveMessageOnPort() {
notImplemented("receiveMessageOnPort");
}
export {
- _Worker as Worker,
BroadcastChannel,
MessageChannel,
MessagePort,
+ NodeWorker as Worker,
parentPort,
threadId,
workerData,
@@ -340,7 +511,7 @@ const defaultExport = {
MessagePort,
MessageChannel,
BroadcastChannel,
- Worker: _Worker,
+ Worker: NodeWorker,
getEnvironmentData,
setEnvironmentData,
SHARE_ENV,