summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYoshiya Hinosawa <stibium121@gmail.com>2023-05-24 03:56:29 +0900
committerGitHub <noreply@github.com>2023-05-23 20:56:29 +0200
commit26f42a248f4764f85c1c3c3c511b82a990e4b651 (patch)
tree04281d6d58fa0f5fba8946481dd13fcd4becf4b2
parent3d865949c2f9f0cb61031bcc2b9e81a4ca623109 (diff)
fix(ext/node): add basic node:worker_threads support (#19192)
This PR restores `node:worker_threads` implementation and test cases from [`std@0.175.0/node`](https://github.com/denoland/deno_std/blob/0.175.0/node/worker_threads.ts). --------- Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
-rw-r--r--cli/tests/node_compat/config.jsonc5
-rw-r--r--cli/tests/node_compat/test/common/tmpdir.js13
-rw-r--r--cli/tests/unit_node/testdata/worker_threads.mjs34
-rw-r--r--cli/tests/unit_node/worker_threads_test.ts185
-rw-r--r--ext/node/polyfills/02_init.js1
-rw-r--r--ext/node/polyfills/worker_threads.ts176
6 files changed, 327 insertions, 87 deletions
diff --git a/cli/tests/node_compat/config.jsonc b/cli/tests/node_compat/config.jsonc
index a3b97b510..ccc83cd3d 100644
--- a/cli/tests/node_compat/config.jsonc
+++ b/cli/tests/node_compat/config.jsonc
@@ -1,7 +1,7 @@
{
"nodeVersion": "18.12.1",
"ignore": {
- "common": ["index.js", "internet.js", "tmpdir.js"],
+ "common": ["index.js", "internet.js"],
"fixtures": [
"child-process-spawn-node.js",
"echo.js",
@@ -121,7 +121,8 @@
"fixtures.js",
"hijackstdio.js",
"index.mjs",
- "internet.js"
+ "internet.js",
+ "tmpdir.js"
],
"fixtures": [
"GH-1899-output.js",
diff --git a/cli/tests/node_compat/test/common/tmpdir.js b/cli/tests/node_compat/test/common/tmpdir.js
index d3ce98e45..dbc945c13 100644
--- a/cli/tests/node_compat/test/common/tmpdir.js
+++ b/cli/tests/node_compat/test/common/tmpdir.js
@@ -2,14 +2,14 @@
// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
-// Taken from Node 16.13.0
+// Taken from Node 18.12.1
// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
'use strict';
const fs = require('fs');
const path = require('path');
-// const { isMainThread } = require('worker_threads');
+const { isMainThread } = require('worker_threads');
function rmSync(pathname) {
fs.rmSync(pathname, { maxRetries: 3, recursive: true, force: true });
@@ -26,8 +26,8 @@ const tmpPath = path.join(testRoot, tmpdirName);
let firstRefresh = true;
function refresh() {
- rmSync(this.path);
- fs.mkdirSync(this.path);
+ rmSync(tmpPath);
+ fs.mkdirSync(tmpPath);
if (firstRefresh) {
firstRefresh = false;
@@ -39,9 +39,8 @@ function refresh() {
function onexit() {
// Change directory to avoid possible EBUSY
- // TODO(f3n67u): uncomment when `worker_thread.isMainThread` implemented
- // if (isMainThread)
- // process.chdir(testRoot);
+ if (isMainThread)
+ process.chdir(testRoot);
try {
rmSync(tmpPath);
diff --git a/cli/tests/unit_node/testdata/worker_threads.mjs b/cli/tests/unit_node/testdata/worker_threads.mjs
new file mode 100644
index 000000000..03dc462f0
--- /dev/null
+++ b/cli/tests/unit_node/testdata/worker_threads.mjs
@@ -0,0 +1,34 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+import {
+ getEnvironmentData,
+ isMainThread,
+ parentPort,
+ threadId,
+ workerData,
+} from "node:worker_threads";
+import { once } from "node:events";
+
+async function message(expectedMessage) {
+ const [message] = await once(parentPort, "message");
+ if (message !== expectedMessage) {
+ console.log(`Expected the message "${expectedMessage}", but got`, message);
+ // fail test
+ parentPort.close();
+ }
+}
+
+await message("Hello, how are you my thread?");
+
+parentPort.postMessage("I'm fine!");
+
+await new Promise((resolve) => setTimeout(resolve, 100));
+
+parentPort.postMessage({
+ isMainThread,
+ threadId,
+ workerData: Array.isArray(workerData) &&
+ workerData[workerData.length - 1] instanceof MessagePort
+ ? workerData.slice(0, -1)
+ : workerData,
+ envData: [getEnvironmentData("test"), getEnvironmentData(1)],
+});
diff --git a/cli/tests/unit_node/worker_threads_test.ts b/cli/tests/unit_node/worker_threads_test.ts
index 17de7cca1..f53b1e692 100644
--- a/cli/tests/unit_node/worker_threads_test.ts
+++ b/cli/tests/unit_node/worker_threads_test.ts
@@ -1,7 +1,13 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-import { assertEquals } from "../../../test_util/std/testing/asserts.ts";
-import workerThreads from "node:worker_threads";
+import {
+ assert,
+ assertEquals,
+ assertObjectMatch,
+} from "../../../test_util/std/testing/asserts.ts";
+import { fromFileUrl, relative } from "../../../test_util/std/path/mod.ts";
+import * as workerThreads from "node:worker_threads";
+import { EventEmitter, once } from "node:events";
Deno.test("[node/worker_threads] BroadcastChannel is exported", () => {
assertEquals<unknown>(workerThreads.BroadcastChannel, BroadcastChannel);
@@ -11,3 +17,178 @@ Deno.test("[node/worker_threads] MessageChannel are MessagePort are exported", (
assertEquals<unknown>(workerThreads.MessageChannel, MessageChannel);
assertEquals<unknown>(workerThreads.MessagePort, MessagePort);
});
+
+Deno.test({
+ name: "[worker_threads] isMainThread",
+ fn() {
+ assertEquals(workerThreads.isMainThread, true);
+ },
+});
+
+Deno.test({
+ name: "[worker_threads] threadId",
+ fn() {
+ assertEquals(workerThreads.threadId, 0);
+ },
+});
+
+Deno.test({
+ name: "[worker_threads] resourceLimits",
+ fn() {
+ assertObjectMatch(workerThreads.resourceLimits, {});
+ },
+});
+
+Deno.test({
+ name: "[worker_threads] parentPort",
+ fn() {
+ assertEquals(workerThreads.parentPort, null);
+ },
+});
+
+Deno.test({
+ name: "[worker_threads] workerData",
+ fn() {
+ assertEquals(workerThreads.workerData, null);
+ },
+});
+
+Deno.test({
+ name: "[worker_threads] setEnvironmentData / getEnvironmentData",
+ fn() {
+ workerThreads.setEnvironmentData("test", "test");
+ assertEquals(workerThreads.getEnvironmentData("test"), "test");
+ },
+});
+
+Deno.test({
+ name: "[worker_threads] Worker threadId",
+ async fn() {
+ const worker = new workerThreads.Worker(
+ new URL("./testdata/worker_threads.mjs", import.meta.url),
+ );
+ worker.postMessage("Hello, how are you my thread?");
+ await once(worker, "message");
+ const message = await once(worker, "message");
+ assertEquals(message[0].threadId, 1);
+ worker.terminate();
+
+ const worker1 = new workerThreads.Worker(
+ new URL("./testdata/worker_threads.mjs", import.meta.url),
+ );
+ worker1.postMessage("Hello, how are you my thread?");
+ await once(worker1, "message");
+ assertEquals((await once(worker1, "message"))[0].threadId, 2);
+ worker1.terminate();
+ },
+});
+
+Deno.test({
+ name: "[worker_threads] Worker basics",
+ async fn() {
+ workerThreads.setEnvironmentData("test", "test");
+ workerThreads.setEnvironmentData(1, {
+ test: "random",
+ random: "test",
+ });
+ const { port1 } = new MessageChannel();
+ const worker = new workerThreads.Worker(
+ new URL("./testdata/worker_threads.mjs", import.meta.url),
+ {
+ workerData: ["hey", true, false, 2, port1],
+ // deno-lint-ignore no-explicit-any
+ transferList: [port1 as any],
+ },
+ );
+ worker.postMessage("Hello, how are you my thread?");
+ assertEquals((await once(worker, "message"))[0], "I'm fine!");
+ const data = (await once(worker, "message"))[0];
+ // data.threadId can be 1 when this test is runned individually
+ if (data.threadId === 1) data.threadId = 3;
+ assertObjectMatch(data, {
+ isMainThread: false,
+ threadId: 3,
+ workerData: ["hey", true, false, 2],
+ envData: ["test", { test: "random", random: "test" }],
+ });
+ worker.terminate();
+ },
+ sanitizeResources: false,
+});
+
+Deno.test({
+ name: "[worker_threads] Worker eval",
+ async fn() {
+ const worker = new workerThreads.Worker(
+ `
+ import { parentPort } from "node:worker_threads";
+ parentPort.postMessage("It works!");
+ `,
+ {
+ eval: true,
+ },
+ );
+ assertEquals((await once(worker, "message"))[0], "It works!");
+ worker.terminate();
+ },
+});
+
+Deno.test({
+ name: "[worker_threads] inheritences",
+ async fn() {
+ const worker = new workerThreads.Worker(
+ `
+ import { EventEmitter } from "node:events";
+ import { parentPort } from "node:worker_threads";
+ parentPort.postMessage(parentPort instanceof EventTarget);
+ await new Promise(resolve => setTimeout(resolve, 100));
+ parentPort.postMessage(parentPort instanceof EventEmitter);
+ `,
+ {
+ eval: true,
+ },
+ );
+ assertEquals((await once(worker, "message"))[0], true);
+ assertEquals((await once(worker, "message"))[0], false);
+ assert(worker instanceof EventEmitter);
+ assert(!(worker instanceof EventTarget));
+ worker.terminate();
+ },
+});
+
+Deno.test({
+ name: "[worker_threads] Worker workerData",
+ async fn() {
+ const worker = new workerThreads.Worker(
+ new URL("./testdata/worker_threads.mjs", import.meta.url),
+ {
+ workerData: null,
+ },
+ );
+ worker.postMessage("Hello, how are you my thread?");
+ await once(worker, "message");
+ assertEquals((await once(worker, "message"))[0].workerData, null);
+ worker.terminate();
+
+ const worker1 = new workerThreads.Worker(
+ new URL("./testdata/worker_threads.mjs", import.meta.url),
+ );
+ worker1.postMessage("Hello, how are you my thread?");
+ await once(worker1, "message");
+ assertEquals((await once(worker1, "message"))[0].workerData, undefined);
+ worker1.terminate();
+ },
+});
+
+Deno.test({
+ name: "[worker_threads] Worker with relative path",
+ async fn() {
+ const worker = new workerThreads.Worker(relative(
+ Deno.cwd(),
+ fromFileUrl(new URL("./testdata/worker_threads.mjs", import.meta.url)),
+ ));
+ worker.postMessage("Hello, how are you my thread?");
+ assertEquals((await once(worker, "message"))[0], "I'm fine!");
+ worker.terminate();
+ },
+});
diff --git a/ext/node/polyfills/02_init.js b/ext/node/polyfills/02_init.js
index dc8955d83..a2fba8c0c 100644
--- a/ext/node/polyfills/02_init.js
+++ b/ext/node/polyfills/02_init.js
@@ -46,6 +46,7 @@ function initialize(
// FIXME(bartlomieju): not nice to depend on `Deno` namespace here
// but it's the only way to get `args` and `version` and this point.
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
+ internals.__initWorkerThreads();
// `Deno[Deno.internal].requireImpl` will be unreachable after this line.
delete internals.requireImpl;
}
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index 2c13e4bc8..8005506bb 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -3,11 +3,11 @@
import { resolve, toFileUrl } from "ext:deno_node/path.ts";
import { notImplemented } from "ext:deno_node/_utils.ts";
-import { EventEmitter } from "ext:deno_node/events.ts";
+import { EventEmitter, once } from "ext:deno_node/events.ts";
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js";
-const environmentData = new Map();
+let environmentData = new Map();
let threads = 0;
export interface WorkerOptions {
@@ -48,13 +48,18 @@ class _Worker extends EventEmitter {
postMessage: Worker["postMessage"];
constructor(specifier: URL | string, options?: WorkerOptions) {
- notImplemented("Worker");
super();
if (options?.eval === true) {
specifier = `data:text/javascript,${specifier}`;
} else if (typeof specifier === "string") {
- // @ts-ignore This API is temporarily disabled
- specifier = toFileUrl(resolve(specifier));
+ specifier = resolve(specifier);
+ if (!specifier.toString().endsWith(".mjs")) {
+ const cwdFileUrl = toFileUrl(Deno.cwd());
+ specifier =
+ `data:text/javascript,(async function() {const { createRequire } = await import("node:module");const require = createRequire("${cwdFileUrl}");require("${specifier}");})();`;
+ } else {
+ specifier = toFileUrl(specifier);
+ }
}
const handle = this[kHandle] = new Worker(
specifier,
@@ -95,20 +100,11 @@ class _Worker extends EventEmitter {
readonly performance = globalThis.performance;
}
-export const isMainThread =
- // deno-lint-ignore no-explicit-any
- (globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME;
-
-// fake resourceLimits
-export const resourceLimits = isMainThread ? {} : {
- maxYoungGenerationSizeMb: 48,
- maxOldGenerationSizeMb: 2048,
- codeRangeSizeMb: 0,
- stackSizeMb: 4,
-};
+export let isMainThread;
+export let resourceLimits;
-const threadId = 0;
-const workerData: unknown = null;
+let threadId = 0;
+let workerData: unknown = null;
// Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611
interface NodeEventTarget extends
@@ -131,74 +127,100 @@ interface NodeEventTarget extends
type ParentPort = typeof self & NodeEventTarget;
// deno-lint-ignore no-explicit-any
-const parentPort: ParentPort = null as any;
+let parentPort: ParentPort = null as any;
-/*
-if (!isMainThread) {
- // deno-lint-ignore no-explicit-any
- delete (globalThis as any).name;
- // deno-lint-ignore no-explicit-any
- const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>();
-
- parentPort = self as ParentPort;
- parentPort.off = parentPort.removeListener = function (
- this: ParentPort,
- name,
- listener,
- ) {
- this.removeEventListener(name, listeners.get(listener)!);
- listeners.delete(listener);
- return this;
- };
- parentPort.on = parentPort.addListener = function (
- this: ParentPort,
- name,
- listener,
- ) {
+globalThis.__bootstrap.internals.__initWorkerThreads = () => {
+ isMainThread =
// deno-lint-ignore no-explicit-any
- const _listener = (ev: any) => listener(ev.data);
- listeners.set(listener, _listener);
- this.addEventListener(name, _listener);
- return this;
+ (globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME;
+
+ defaultExport.isMainThread = isMainThread;
+ // fake resourceLimits
+ resourceLimits = isMainThread ? {} : {
+ maxYoungGenerationSizeMb: 48,
+ maxOldGenerationSizeMb: 2048,
+ codeRangeSizeMb: 0,
+ stackSizeMb: 4,
};
- parentPort.once = function (this: ParentPort, name, listener) {
+ defaultExport.resourceLimits = resourceLimits;
+
+ if (!isMainThread) {
// deno-lint-ignore no-explicit-any
- const _listener = (ev: any) => listener(ev.data);
- listeners.set(listener, _listener);
- this.addEventListener(name, _listener);
- return this;
- };
+ delete (globalThis as any).name;
+ // deno-lint-ignore no-explicit-any
+ const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>();
- // mocks
- parentPort.setMaxListeners = () => {};
- parentPort.getMaxListeners = () => Infinity;
- parentPort.eventNames = () => [""];
- parentPort.listenerCount = () => 0;
-
- parentPort.emit = () => notImplemented("parentPort.emit");
- parentPort.removeAllListeners = () =>
- notImplemented("parentPort.removeAllListeners");
-
- // Receive startup message
- [{ threadId, workerData, environmentData }] = await once(
- parentPort,
- "message",
- );
-
- // alias
- parentPort.addEventListener("offline", () => {
- parentPort.emit("close");
- });
-}
-*/
+ parentPort = self as ParentPort;
+
+ const initPromise = once(
+ parentPort,
+ "message",
+ ).then((result) => {
+ // TODO(kt3k): The below values are set asynchronously
+ // using the first message from the parent.
+ // This should be done synchronously.
+ threadId = result[0].data.threadId;
+ workerData = result[0].data.workerData;
+ environmentData = result[0].data.environmentData;
+
+ defaultExport.threadId = threadId;
+ defaultExport.workerData = workerData;
+ });
+
+ parentPort.off = parentPort.removeListener = function (
+ this: ParentPort,
+ name,
+ listener,
+ ) {
+ this.removeEventListener(name, listeners.get(listener)!);
+ listeners.delete(listener);
+ return this;
+ };
+ parentPort.on = parentPort.addListener = function (
+ this: ParentPort,
+ name,
+ listener,
+ ) {
+ initPromise.then(() => {
+ // deno-lint-ignore no-explicit-any
+ const _listener = (ev: any) => listener(ev.data);
+ listeners.set(listener, _listener);
+ this.addEventListener(name, _listener);
+ });
+ return this;
+ };
+
+ parentPort.once = function (this: ParentPort, name, listener) {
+ initPromise.then(() => {
+ // deno-lint-ignore no-explicit-any
+ const _listener = (ev: any) => listener(ev.data);
+ listeners.set(listener, _listener);
+ this.addEventListener(name, _listener);
+ });
+ return this;
+ };
+
+ // mocks
+ parentPort.setMaxListeners = () => {};
+ parentPort.getMaxListeners = () => Infinity;
+ parentPort.eventNames = () => [""];
+ parentPort.listenerCount = () => 0;
+
+ parentPort.emit = () => notImplemented("parentPort.emit");
+ parentPort.removeAllListeners = () =>
+ notImplemented("parentPort.removeAllListeners");
+
+ parentPort.addEventListener("offline", () => {
+ parentPort.emit("close");
+ });
+ }
+};
export function getEnvironmentData(key: unknown) {
- notImplemented("getEnvironmentData");
return environmentData.get(key);
}
export function setEnvironmentData(key: unknown, value?: unknown) {
- notImplemented("setEnvironmentData");
if (value === undefined) {
environmentData.delete(key);
} else {
@@ -226,7 +248,7 @@ export {
workerData,
};
-export default {
+const defaultExport = {
markAsUntransferable,
moveMessagePortToContext,
receiveMessageOnPort,
@@ -243,3 +265,5 @@ export default {
parentPort,
isMainThread,
};
+
+export default defaultExport;