diff options
Diffstat (limited to 'ext/node/polyfills/worker_threads.ts')
-rw-r--r-- | ext/node/polyfills/worker_threads.ts | 176 |
1 files changed, 100 insertions, 76 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 2c13e4bc8..8005506bb 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -3,11 +3,11 @@ import { resolve, toFileUrl } from "ext:deno_node/path.ts"; import { notImplemented } from "ext:deno_node/_utils.ts"; -import { EventEmitter } from "ext:deno_node/events.ts"; +import { EventEmitter, once } from "ext:deno_node/events.ts"; import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js"; -const environmentData = new Map(); +let environmentData = new Map(); let threads = 0; export interface WorkerOptions { @@ -48,13 +48,18 @@ class _Worker extends EventEmitter { postMessage: Worker["postMessage"]; constructor(specifier: URL | string, options?: WorkerOptions) { - notImplemented("Worker"); super(); if (options?.eval === true) { specifier = `data:text/javascript,${specifier}`; } else if (typeof specifier === "string") { - // @ts-ignore This API is temporarily disabled - specifier = toFileUrl(resolve(specifier)); + specifier = resolve(specifier); + if (!specifier.toString().endsWith(".mjs")) { + const cwdFileUrl = toFileUrl(Deno.cwd()); + specifier = + `data:text/javascript,(async function() {const { createRequire } = await import("node:module");const require = createRequire("${cwdFileUrl}");require("${specifier}");})();`; + } else { + specifier = toFileUrl(specifier); + } } const handle = this[kHandle] = new Worker( specifier, @@ -95,20 +100,11 @@ class _Worker extends EventEmitter { readonly performance = globalThis.performance; } -export const isMainThread = - // deno-lint-ignore no-explicit-any - (globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME; - -// fake resourceLimits -export const resourceLimits = isMainThread ? {} : { - maxYoungGenerationSizeMb: 48, - maxOldGenerationSizeMb: 2048, - codeRangeSizeMb: 0, - stackSizeMb: 4, -}; +export let isMainThread; +export let resourceLimits; -const threadId = 0; -const workerData: unknown = null; +let threadId = 0; +let workerData: unknown = null; // Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611 interface NodeEventTarget extends @@ -131,74 +127,100 @@ interface NodeEventTarget extends type ParentPort = typeof self & NodeEventTarget; // deno-lint-ignore no-explicit-any -const parentPort: ParentPort = null as any; +let parentPort: ParentPort = null as any; -/* -if (!isMainThread) { - // deno-lint-ignore no-explicit-any - delete (globalThis as any).name; - // deno-lint-ignore no-explicit-any - const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>(); - - parentPort = self as ParentPort; - parentPort.off = parentPort.removeListener = function ( - this: ParentPort, - name, - listener, - ) { - this.removeEventListener(name, listeners.get(listener)!); - listeners.delete(listener); - return this; - }; - parentPort.on = parentPort.addListener = function ( - this: ParentPort, - name, - listener, - ) { +globalThis.__bootstrap.internals.__initWorkerThreads = () => { + isMainThread = // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); - listeners.set(listener, _listener); - this.addEventListener(name, _listener); - return this; + (globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME; + + defaultExport.isMainThread = isMainThread; + // fake resourceLimits + resourceLimits = isMainThread ? {} : { + maxYoungGenerationSizeMb: 48, + maxOldGenerationSizeMb: 2048, + codeRangeSizeMb: 0, + stackSizeMb: 4, }; - parentPort.once = function (this: ParentPort, name, listener) { + defaultExport.resourceLimits = resourceLimits; + + if (!isMainThread) { // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); - listeners.set(listener, _listener); - this.addEventListener(name, _listener); - return this; - }; + delete (globalThis as any).name; + // deno-lint-ignore no-explicit-any + const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>(); - // mocks - parentPort.setMaxListeners = () => {}; - parentPort.getMaxListeners = () => Infinity; - parentPort.eventNames = () => [""]; - parentPort.listenerCount = () => 0; - - parentPort.emit = () => notImplemented("parentPort.emit"); - parentPort.removeAllListeners = () => - notImplemented("parentPort.removeAllListeners"); - - // Receive startup message - [{ threadId, workerData, environmentData }] = await once( - parentPort, - "message", - ); - - // alias - parentPort.addEventListener("offline", () => { - parentPort.emit("close"); - }); -} -*/ + parentPort = self as ParentPort; + + const initPromise = once( + parentPort, + "message", + ).then((result) => { + // TODO(kt3k): The below values are set asynchronously + // using the first message from the parent. + // This should be done synchronously. + threadId = result[0].data.threadId; + workerData = result[0].data.workerData; + environmentData = result[0].data.environmentData; + + defaultExport.threadId = threadId; + defaultExport.workerData = workerData; + }); + + parentPort.off = parentPort.removeListener = function ( + this: ParentPort, + name, + listener, + ) { + this.removeEventListener(name, listeners.get(listener)!); + listeners.delete(listener); + return this; + }; + parentPort.on = parentPort.addListener = function ( + this: ParentPort, + name, + listener, + ) { + initPromise.then(() => { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); + }); + return this; + }; + + parentPort.once = function (this: ParentPort, name, listener) { + initPromise.then(() => { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); + }); + return this; + }; + + // mocks + parentPort.setMaxListeners = () => {}; + parentPort.getMaxListeners = () => Infinity; + parentPort.eventNames = () => [""]; + parentPort.listenerCount = () => 0; + + parentPort.emit = () => notImplemented("parentPort.emit"); + parentPort.removeAllListeners = () => + notImplemented("parentPort.removeAllListeners"); + + parentPort.addEventListener("offline", () => { + parentPort.emit("close"); + }); + } +}; export function getEnvironmentData(key: unknown) { - notImplemented("getEnvironmentData"); return environmentData.get(key); } export function setEnvironmentData(key: unknown, value?: unknown) { - notImplemented("setEnvironmentData"); if (value === undefined) { environmentData.delete(key); } else { @@ -226,7 +248,7 @@ export { workerData, }; -export default { +const defaultExport = { markAsUntransferable, moveMessagePortToContext, receiveMessageOnPort, @@ -243,3 +265,5 @@ export default { parentPort, isMainThread, }; + +export default defaultExport; |