diff options
author | Satya Rohith <me@satyarohith.com> | 2024-03-08 20:15:55 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-08 14:45:55 +0000 |
commit | d5b01e41586c05d2af0bcf361ed4eadd03d60765 (patch) | |
tree | eacd35ce984644d7635fcb5bbd76631c25635ee7 /ext/node/polyfills/worker_threads.ts | |
parent | 44da066359916632ba529ca7b05849b269d48648 (diff) |
refactor(ext/node): use worker ops directly in worker_threads (#22794)
Diffstat (limited to 'ext/node/polyfills/worker_threads.ts')
-rw-r--r-- | ext/node/polyfills/worker_threads.ts | 313 |
1 files changed, 242 insertions, 71 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 1efada056..5da5ec07d 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -1,21 +1,44 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // Copyright Joyent and Node contributors. All rights reserved. MIT license. -// TODO(petamoriken): enable prefer-primordials for node polyfills -// deno-lint-ignore-file prefer-primordials - -import { core, internals } from "ext:core/mod.js"; -import { op_require_read_closest_package_json } from "ext:core/ops"; - -import { isAbsolute, resolve } from "node:path"; +import { core, internals, primordials } from "ext:core/mod.js"; +import { + op_create_worker, + op_host_post_message, + op_host_recv_ctrl, + op_host_recv_message, + op_host_terminate_worker, + op_require_read_closest_package_json, +} from "ext:core/ops"; +import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; +import { + deserializeJsMessageData, + MessageChannel, + MessagePort, + serializeJsMessageData, +} from "ext:deno_web/13_message_port.js"; +import * as webidl from "ext:deno_webidl/00_webidl.js"; +import { log } from "ext:runtime/06_util.js"; import { notImplemented } from "ext:deno_node/_utils.ts"; import { EventEmitter, once } from "node:events"; -import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; -import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js"; -import { refWorker, unrefWorker } from "ext:runtime/11_workers.js"; +import { isAbsolute, resolve } from "node:path"; -let environmentData = new Map(); -let threads = 0; +const { + Error, + Symbol, + SymbolFor, + SymbolIterator, + StringPrototypeEndsWith, + StringPrototypeReplace, + StringPrototypeMatch, + StringPrototypeReplaceAll, + StringPrototypeToString, + SafeWeakMap, + SafeRegExp, + SafeMap, + TypeError, + PromisePrototypeThen, +} = primordials; export interface WorkerOptions { // only for typings @@ -33,9 +56,11 @@ export interface WorkerOptions { stackSizeMb?: number; }; + // deno-lint-ignore prefer-primordials eval?: boolean; transferList?: Transferable[]; workerData?: unknown; + name?: string; } const WHITESPACE_ENCODINGS: Record<string, string> = { @@ -48,7 +73,7 @@ const WHITESPACE_ENCODINGS: Record<string, string> = { }; function encodeWhitespace(string: string): string { - return string.replaceAll(/[\s]/g, (c) => { + return StringPrototypeReplaceAll(string, new SafeRegExp(/[\s]/g), (c) => { return WHITESPACE_ENCODINGS[c] ?? c; }); } @@ -59,7 +84,11 @@ function toFileUrlPosix(path: string): URL { } const url = new URL("file:///"); url.pathname = encodeWhitespace( - path.replace(/%/g, "%25").replace(/\\/g, "%5C"), + StringPrototypeReplace( + StringPrototypeReplace(path, new SafeRegExp(/%/g), "%25"), + new SafeRegExp(/\\/g), + "%5C", + ), ); return url; } @@ -68,11 +97,14 @@ function toFileUrlWin32(path: string): URL { if (!isAbsolute(path)) { throw new TypeError("Must be an absolute path."); } - const [, hostname, pathname] = path.match( - /^(?:[/\\]{2}([^/\\]+)(?=[/\\](?:[^/\\]|$)))?(.*)/, - )!; + const { 0: _, 1: hostname, 2: pathname } = StringPrototypeMatch( + path, + new SafeRegExp(/^(?:[/\\]{2}([^/\\]+)(?=[/\\](?:[^/\\]|$)))?(.*)/), + ); const url = new URL("file:///"); - url.pathname = encodeWhitespace(pathname.replace(/%/g, "%25")); + url.pathname = encodeWhitespace( + StringPrototypeReplace(pathname, new SafeRegExp(/%/g), "%25"), + ); if (hostname != null && hostname != "localhost") { url.hostname = hostname; if (!url.hostname) { @@ -99,11 +131,27 @@ function toFileUrl(path: string): URL { : toFileUrlPosix(path); } -const kHandle = Symbol("kHandle"); +let threads = 0; +const privateWorkerRef = Symbol("privateWorkerRef"); const PRIVATE_WORKER_THREAD_NAME = "$DENO_STD_NODE_WORKER_THREAD"; -class _Worker extends EventEmitter { - readonly threadId: number; - readonly resourceLimits: Required< +class NodeWorker extends EventEmitter { + #id = 0; + // TODO(satyarohith): remove after https://github.com/denoland/deno/pull/22785 lands + #name = PRIVATE_WORKER_THREAD_NAME; + #refCount = 1; + #messagePromise = undefined; + #controlPromise = undefined; + // "RUNNING" | "CLOSED" | "TERMINATED" + // "TERMINATED" means that any controls or messages received will be + // discarded. "CLOSED" means that we have received a control + // indicating that the worker is no longer running, but there might + // still be messages left to receive. + #status = "RUNNING"; + + // https://nodejs.org/api/worker_threads.html#workerthreadid + threadId = this.#id; + // https://nodejs.org/api/worker_threads.html#workerresourcelimits + resourceLimits: Required< NonNullable<WorkerOptions["resourceLimits"]> > = { maxYoungGenerationSizeMb: -1, @@ -111,9 +159,6 @@ class _Worker extends EventEmitter { codeRangeSizeMb: -1, stackSizeMb: 4, }; - private readonly [kHandle]: Worker; - - postMessage: Worker["postMessage"]; constructor(specifier: URL | string, options?: WorkerOptions) { super(); @@ -128,8 +173,11 @@ class _Worker extends EventEmitter { // empty catch block when package json might not be present } if ( - !(specifier.toString().endsWith(".mjs") || - (pkg && pkg.exists && pkg.typ == "module")) + !(StringPrototypeEndsWith( + StringPrototypeToString(specifier), + ".mjs", + )) || + (pkg && pkg.exists && pkg.typ == "module") ) { const cwdFileUrl = toFileUrl(Deno.cwd()); specifier = @@ -138,45 +186,160 @@ class _Worker extends EventEmitter { specifier = toFileUrl(specifier as string); } } - const handle = this[kHandle] = new Worker( - specifier, + + const id = op_create_worker( { - name: PRIVATE_WORKER_THREAD_NAME, - type: "module", - } as globalThis.WorkerOptions, // bypass unstable type error - ); - handle.addEventListener( - "error", - (event) => this.emit("error", event.error || event.message), - ); - handle.addEventListener( - "messageerror", - (event) => this.emit("messageerror", event.data), - ); - handle.addEventListener( - "message", - (event) => this.emit("message", event.data), + // deno-lint-ignore prefer-primordials + specifier: specifier.toString(), + hasSourceCode: false, + sourceCode: "", + permissions: null, + name: this.#name, + workerType: "module", + }, ); - handle.postMessage({ + this.#id = id; + this.#pollControl(); + this.#pollMessages(); + + this.postMessage({ environmentData, threadId: (this.threadId = ++threads), workerData: options?.workerData, }, options?.transferList || []); - this.postMessage = handle.postMessage.bind(handle); + // https://nodejs.org/api/worker_threads.html#event-online this.emit("online"); } + [privateWorkerRef](ref) { + if (ref) { + this.#refCount++; + } else { + this.#refCount--; + } + + if (!ref && this.#refCount == 0) { + if (this.#controlPromise) { + core.unrefOpPromise(this.#controlPromise); + } + if (this.#messagePromise) { + core.unrefOpPromise(this.#messagePromise); + } + } else if (ref && this.#refCount == 1) { + if (this.#controlPromise) { + core.refOpPromise(this.#controlPromise); + } + if (this.#messagePromise) { + core.refOpPromise(this.#messagePromise); + } + } + } + + #handleError(err) { + this.emit("error", err); + } + + #pollControl = async () => { + while (this.#status === "RUNNING") { + this.#controlPromise = op_host_recv_ctrl(this.#id); + if (this.#refCount < 1) { + core.unrefOpPromise(this.#controlPromise); + } + const { 0: type, 1: data } = await this.#controlPromise; + + // If terminate was called then we ignore all messages + if (this.#status === "TERMINATED") { + return; + } + + switch (type) { + case 1: { // TerminalError + this.#status = "CLOSED"; + } /* falls through */ + case 2: { // Error + this.#handleError(data); + break; + } + case 3: { // Close + log(`Host got "close" message from worker: ${this.#name}`); + this.#status = "CLOSED"; + return; + } + default: { + throw new Error(`Unknown worker event: "${type}"`); + } + } + } + }; + + #pollMessages = async () => { + while (this.#status !== "TERMINATED") { + this.#messagePromise = op_host_recv_message(this.#id); + if (this.#refCount < 1) { + core.unrefOpPromise(this.#messagePromise); + } + const data = await this.#messagePromise; + if (this.#status === "TERMINATED" || data === null) { + return; + } + let message, _transferables; + try { + const v = deserializeJsMessageData(data); + message = v[0]; + _transferables = v[1]; + } catch (err) { + this.emit("messageerror", err); + return; + } + this.emit("message", message); + } + }; + + postMessage(message, transferOrOptions = {}) { + const prefix = "Failed to execute 'postMessage' on 'MessagePort'"; + webidl.requiredArguments(arguments.length, 1, prefix); + message = webidl.converters.any(message); + let options; + if ( + webidl.type(transferOrOptions) === "Object" && + transferOrOptions !== undefined && + transferOrOptions[SymbolIterator] !== undefined + ) { + const transfer = webidl.converters["sequence<object>"]( + transferOrOptions, + prefix, + "Argument 2", + ); + options = { transfer }; + } else { + options = webidl.converters.StructuredSerializeOptions( + transferOrOptions, + prefix, + "Argument 2", + ); + } + const { transfer } = options; + const data = serializeJsMessageData(message, transfer); + if (this.#status === "RUNNING") { + op_host_post_message(this.#id, data); + } + } + + // https://nodejs.org/api/worker_threads.html#workerterminate terminate() { - this[kHandle].terminate(); - this.emit("exit", 0); + if (this.#status !== "TERMINATED") { + this.#status = "TERMINATED"; + op_host_terminate_worker(this.#id); + } + this.emit("exit", 1); } ref() { - refWorker(this[kHandle]); + this[privateWorkerRef](true); } unref() { - unrefWorker(this[kHandle]); + this[privateWorkerRef](false); } readonly getHeapSnapshot = () => @@ -190,6 +353,7 @@ export let resourceLimits; let threadId = 0; let workerData: unknown = null; +let environmentData = new SafeMap(); // Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611 interface NodeEventTarget extends @@ -232,25 +396,32 @@ internals.__initWorkerThreads = () => { if (!isMainThread) { // deno-lint-ignore no-explicit-any delete (globalThis as any).name; - // deno-lint-ignore no-explicit-any - const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>(); + const listeners = new SafeWeakMap< + // deno-lint-ignore no-explicit-any + (...args: any[]) => void, + // deno-lint-ignore no-explicit-any + (ev: any) => any + >(); parentPort = self as ParentPort; - const initPromise = once( - parentPort, - "message", - ).then((result) => { - // TODO(kt3k): The below values are set asynchronously - // using the first message from the parent. - // This should be done synchronously. - threadId = result[0].data.threadId; - workerData = result[0].data.workerData; - environmentData = result[0].data.environmentData; - - defaultExport.threadId = threadId; - defaultExport.workerData = workerData; - }); + const initPromise = PromisePrototypeThen( + once( + parentPort, + "message", + ), + (result) => { + // TODO(kt3k): The below values are set asynchronously + // using the first message from the parent. + // This should be done synchronously. + threadId = result[0].data.threadId; + workerData = result[0].data.workerData; + environmentData = result[0].data.environmentData; + + defaultExport.threadId = threadId; + defaultExport.workerData = workerData; + }, + ); parentPort.off = parentPort.removeListener = function ( this: ParentPort, @@ -266,7 +437,7 @@ internals.__initWorkerThreads = () => { name, listener, ) { - initPromise.then(() => { + PromisePrototypeThen(initPromise, () => { // deno-lint-ignore no-explicit-any const _listener = (ev: any) => listener(ev.data); listeners.set(listener, _listener); @@ -276,7 +447,7 @@ internals.__initWorkerThreads = () => { }; parentPort.once = function (this: ParentPort, name, listener) { - initPromise.then(() => { + PromisePrototypeThen(initPromise, () => { // deno-lint-ignore no-explicit-any const _listener = (ev: any) => listener(ev.data); listeners.set(listener, _listener); @@ -313,7 +484,7 @@ export function setEnvironmentData(key: unknown, value?: unknown) { } } -export const SHARE_ENV = Symbol.for("nodejs.worker_threads.SHARE_ENV"); +export const SHARE_ENV = SymbolFor("nodejs.worker_threads.SHARE_ENV"); export function markAsUntransferable() { notImplemented("markAsUntransferable"); } @@ -324,10 +495,10 @@ export function receiveMessageOnPort() { notImplemented("receiveMessageOnPort"); } export { - _Worker as Worker, BroadcastChannel, MessageChannel, MessagePort, + NodeWorker as Worker, parentPort, threadId, workerData, @@ -340,7 +511,7 @@ const defaultExport = { MessagePort, MessageChannel, BroadcastChannel, - Worker: _Worker, + Worker: NodeWorker, getEnvironmentData, setEnvironmentData, SHARE_ENV, |