summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/worker_threads.ts
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills/worker_threads.ts')
-rw-r--r--ext/node/polyfills/worker_threads.ts176
1 files changed, 100 insertions, 76 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index 2c13e4bc8..8005506bb 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -3,11 +3,11 @@
import { resolve, toFileUrl } from "ext:deno_node/path.ts";
import { notImplemented } from "ext:deno_node/_utils.ts";
-import { EventEmitter } from "ext:deno_node/events.ts";
+import { EventEmitter, once } from "ext:deno_node/events.ts";
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js";
-const environmentData = new Map();
+let environmentData = new Map();
let threads = 0;
export interface WorkerOptions {
@@ -48,13 +48,18 @@ class _Worker extends EventEmitter {
postMessage: Worker["postMessage"];
constructor(specifier: URL | string, options?: WorkerOptions) {
- notImplemented("Worker");
super();
if (options?.eval === true) {
specifier = `data:text/javascript,${specifier}`;
} else if (typeof specifier === "string") {
- // @ts-ignore This API is temporarily disabled
- specifier = toFileUrl(resolve(specifier));
+ specifier = resolve(specifier);
+ if (!specifier.toString().endsWith(".mjs")) {
+ const cwdFileUrl = toFileUrl(Deno.cwd());
+ specifier =
+ `data:text/javascript,(async function() {const { createRequire } = await import("node:module");const require = createRequire("${cwdFileUrl}");require("${specifier}");})();`;
+ } else {
+ specifier = toFileUrl(specifier);
+ }
}
const handle = this[kHandle] = new Worker(
specifier,
@@ -95,20 +100,11 @@ class _Worker extends EventEmitter {
readonly performance = globalThis.performance;
}
-export const isMainThread =
- // deno-lint-ignore no-explicit-any
- (globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME;
-
-// fake resourceLimits
-export const resourceLimits = isMainThread ? {} : {
- maxYoungGenerationSizeMb: 48,
- maxOldGenerationSizeMb: 2048,
- codeRangeSizeMb: 0,
- stackSizeMb: 4,
-};
+export let isMainThread;
+export let resourceLimits;
-const threadId = 0;
-const workerData: unknown = null;
+let threadId = 0;
+let workerData: unknown = null;
// Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611
interface NodeEventTarget extends
@@ -131,74 +127,100 @@ interface NodeEventTarget extends
type ParentPort = typeof self & NodeEventTarget;
// deno-lint-ignore no-explicit-any
-const parentPort: ParentPort = null as any;
+let parentPort: ParentPort = null as any;
-/*
-if (!isMainThread) {
- // deno-lint-ignore no-explicit-any
- delete (globalThis as any).name;
- // deno-lint-ignore no-explicit-any
- const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>();
-
- parentPort = self as ParentPort;
- parentPort.off = parentPort.removeListener = function (
- this: ParentPort,
- name,
- listener,
- ) {
- this.removeEventListener(name, listeners.get(listener)!);
- listeners.delete(listener);
- return this;
- };
- parentPort.on = parentPort.addListener = function (
- this: ParentPort,
- name,
- listener,
- ) {
+globalThis.__bootstrap.internals.__initWorkerThreads = () => {
+ isMainThread =
// deno-lint-ignore no-explicit-any
- const _listener = (ev: any) => listener(ev.data);
- listeners.set(listener, _listener);
- this.addEventListener(name, _listener);
- return this;
+ (globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME;
+
+ defaultExport.isMainThread = isMainThread;
+ // fake resourceLimits
+ resourceLimits = isMainThread ? {} : {
+ maxYoungGenerationSizeMb: 48,
+ maxOldGenerationSizeMb: 2048,
+ codeRangeSizeMb: 0,
+ stackSizeMb: 4,
};
- parentPort.once = function (this: ParentPort, name, listener) {
+ defaultExport.resourceLimits = resourceLimits;
+
+ if (!isMainThread) {
// deno-lint-ignore no-explicit-any
- const _listener = (ev: any) => listener(ev.data);
- listeners.set(listener, _listener);
- this.addEventListener(name, _listener);
- return this;
- };
+ delete (globalThis as any).name;
+ // deno-lint-ignore no-explicit-any
+ const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>();
- // mocks
- parentPort.setMaxListeners = () => {};
- parentPort.getMaxListeners = () => Infinity;
- parentPort.eventNames = () => [""];
- parentPort.listenerCount = () => 0;
-
- parentPort.emit = () => notImplemented("parentPort.emit");
- parentPort.removeAllListeners = () =>
- notImplemented("parentPort.removeAllListeners");
-
- // Receive startup message
- [{ threadId, workerData, environmentData }] = await once(
- parentPort,
- "message",
- );
-
- // alias
- parentPort.addEventListener("offline", () => {
- parentPort.emit("close");
- });
-}
-*/
+ parentPort = self as ParentPort;
+
+ const initPromise = once(
+ parentPort,
+ "message",
+ ).then((result) => {
+ // TODO(kt3k): The below values are set asynchronously
+ // using the first message from the parent.
+ // This should be done synchronously.
+ threadId = result[0].data.threadId;
+ workerData = result[0].data.workerData;
+ environmentData = result[0].data.environmentData;
+
+ defaultExport.threadId = threadId;
+ defaultExport.workerData = workerData;
+ });
+
+ parentPort.off = parentPort.removeListener = function (
+ this: ParentPort,
+ name,
+ listener,
+ ) {
+ this.removeEventListener(name, listeners.get(listener)!);
+ listeners.delete(listener);
+ return this;
+ };
+ parentPort.on = parentPort.addListener = function (
+ this: ParentPort,
+ name,
+ listener,
+ ) {
+ initPromise.then(() => {
+ // deno-lint-ignore no-explicit-any
+ const _listener = (ev: any) => listener(ev.data);
+ listeners.set(listener, _listener);
+ this.addEventListener(name, _listener);
+ });
+ return this;
+ };
+
+ parentPort.once = function (this: ParentPort, name, listener) {
+ initPromise.then(() => {
+ // deno-lint-ignore no-explicit-any
+ const _listener = (ev: any) => listener(ev.data);
+ listeners.set(listener, _listener);
+ this.addEventListener(name, _listener);
+ });
+ return this;
+ };
+
+ // mocks
+ parentPort.setMaxListeners = () => {};
+ parentPort.getMaxListeners = () => Infinity;
+ parentPort.eventNames = () => [""];
+ parentPort.listenerCount = () => 0;
+
+ parentPort.emit = () => notImplemented("parentPort.emit");
+ parentPort.removeAllListeners = () =>
+ notImplemented("parentPort.removeAllListeners");
+
+ parentPort.addEventListener("offline", () => {
+ parentPort.emit("close");
+ });
+ }
+};
export function getEnvironmentData(key: unknown) {
- notImplemented("getEnvironmentData");
return environmentData.get(key);
}
export function setEnvironmentData(key: unknown, value?: unknown) {
- notImplemented("setEnvironmentData");
if (value === undefined) {
environmentData.delete(key);
} else {
@@ -226,7 +248,7 @@ export {
workerData,
};
-export default {
+const defaultExport = {
markAsUntransferable,
moveMessagePortToContext,
receiveMessageOnPort,
@@ -243,3 +265,5 @@ export default {
parentPort,
isMainThread,
};
+
+export default defaultExport;