summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSatya Rohith <me@satyarohith.com>2024-04-02 17:06:09 +0530
committerGitHub <noreply@github.com>2024-04-02 17:06:09 +0530
commit4d66ec91c1ca23134dc25f58f41da52a99615a38 (patch)
tree96b9de0cf5ac664f14ac8fc35f93d2d3da65a87f
parent7ad76fd453972e9262985c61840c77b8b8a6dbb7 (diff)
fix(ext/node): MessagePort works (#22999)
Closes https://github.com/denoland/deno/issues/22951 Closes https://github.com/denoland/deno/issues/23001 Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
-rw-r--r--ext/node/polyfills/worker_threads.ts63
-rw-r--r--ext/web/13_message_port.js29
-rw-r--r--tests/integration/worker_tests.rs13
-rw-r--r--tests/testdata/workers/node_worker_message_port.mjs41
-rw-r--r--tests/testdata/workers/node_worker_message_port.mjs.out4
-rw-r--r--tests/testdata/workers/node_worker_message_port_1.cjs9
-rw-r--r--tests/testdata/workers/node_worker_transfer_port.mjs14
-rw-r--r--tests/testdata/workers/node_worker_transfer_port.mjs.out3
-rw-r--r--tests/testdata/workers/node_worker_transfer_port_1.mjs10
-rw-r--r--tests/unit_node/worker_threads_test.ts2
10 files changed, 183 insertions, 5 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index ab3834132..49562d892 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -17,6 +17,7 @@ import {
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
+ nodeWorkerThreadCloseCb,
serializeJsMessageData,
} from "ext:deno_web/13_message_port.js";
import * as webidl from "ext:deno_webidl/00_webidl.js";
@@ -342,6 +343,15 @@ internals.__initWorkerThreads = (
defaultExport.parentPort = parentPort;
defaultExport.threadId = threadId;
+ for (const obj in workerData as Record<string, unknown>) {
+ if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, workerData[obj])) {
+ workerData[obj] = webMessagePortToNodeMessagePort(
+ workerData[obj] as MessagePort,
+ );
+ break;
+ }
+ }
+
parentPort.off = parentPort.removeListener = function (
this: ParentPort,
name,
@@ -357,7 +367,22 @@ internals.__initWorkerThreads = (
listener,
) {
// deno-lint-ignore no-explicit-any
- const _listener = (ev: any) => listener(ev.data);
+ const _listener = (ev: any) => {
+ let message = ev.data;
+ if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message)) {
+ message = webMessagePortToNodeMessagePort(message);
+ } else {
+ for (const obj in message) {
+ if (
+ ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message[obj])
+ ) {
+ message[obj] = webMessagePortToNodeMessagePort(message[obj]);
+ break;
+ }
+ }
+ }
+ return listener(message);
+ };
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
@@ -424,10 +449,42 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined {
return { message: deserializeJsMessageData(data)[0] };
}
+class NodeMessageChannel {
+ port1: MessagePort;
+ port2: MessagePort;
+
+ constructor() {
+ const { port1, port2 } = new MessageChannel();
+ this.port1 = webMessagePortToNodeMessagePort(port1);
+ this.port2 = webMessagePortToNodeMessagePort(port2);
+ }
+}
+
+function webMessagePortToNodeMessagePort(port: MessagePort) {
+ port.on = port.addListener = function (this: MessagePort, name, listener) {
+ // deno-lint-ignore no-explicit-any
+ const _listener = (ev: any) => listener(ev.data);
+ if (name == "message") {
+ port.onmessage = _listener;
+ } else if (name == "messageerror") {
+ port.onmessageerror = _listener;
+ } else if (name == "close") {
+ port.addEventListener("close", _listener);
+ } else {
+ throw new Error(`Unknown event: "${name}"`);
+ }
+ return this;
+ };
+ port[nodeWorkerThreadCloseCb] = () => {
+ port.dispatchEvent(new Event("close"));
+ };
+ return port;
+}
+
export {
BroadcastChannel,
- MessageChannel,
MessagePort,
+ NodeMessageChannel as MessageChannel,
NodeWorker as Worker,
parentPort,
threadId,
@@ -439,7 +496,7 @@ const defaultExport = {
moveMessagePortToContext,
receiveMessageOnPort,
MessagePort,
- MessageChannel,
+ MessageChannel: NodeMessageChannel,
BroadcastChannel,
Worker: NodeWorker,
getEnvironmentData,
diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js
index d953c52ed..83470c895 100644
--- a/ext/web/13_message_port.js
+++ b/ext/web/13_message_port.js
@@ -18,6 +18,7 @@ const {
ArrayPrototypeIncludes,
ArrayPrototypePush,
ObjectPrototypeIsPrototypeOf,
+ ObjectDefineProperty,
Symbol,
SymbolFor,
SymbolIterator,
@@ -85,6 +86,8 @@ const MessageChannelPrototype = MessageChannel.prototype;
const _id = Symbol("id");
const MessagePortIdSymbol = _id;
const _enabled = Symbol("enabled");
+const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
+const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
/**
* @param {number} id
@@ -98,6 +101,16 @@ function createMessagePort(id) {
return port;
}
+function nodeWorkerThreadMaybeInvokeCloseCb(port) {
+ if (
+ typeof port[nodeWorkerThreadCloseCb] == "function" &&
+ !port[nodeWorkerThreadCloseCbInvoked]
+ ) {
+ port[nodeWorkerThreadCloseCb]();
+ port[nodeWorkerThreadCloseCbInvoked] = true;
+ }
+}
+
class MessagePort extends EventTarget {
/** @type {number | null} */
[_id] = null;
@@ -106,6 +119,14 @@ class MessagePort extends EventTarget {
constructor() {
super();
+ ObjectDefineProperty(this, nodeWorkerThreadCloseCb, {
+ value: null,
+ enumerable: false,
+ });
+ ObjectDefineProperty(this, nodeWorkerThreadCloseCbInvoked, {
+ value: false,
+ enumerable: false,
+ });
webidl.illegalConstructor();
}
@@ -160,9 +181,13 @@ class MessagePort extends EventTarget {
);
} catch (err) {
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) break;
+ nodeWorkerThreadMaybeInvokeCloseCb(this);
throw err;
}
- if (data === null) break;
+ if (data === null) {
+ nodeWorkerThreadMaybeInvokeCloseCb(this);
+ break;
+ }
let message, transferables;
try {
const v = deserializeJsMessageData(data);
@@ -193,6 +218,7 @@ class MessagePort extends EventTarget {
if (this[_id] !== null) {
core.close(this[_id]);
this[_id] = null;
+ nodeWorkerThreadMaybeInvokeCloseCb(this);
}
}
@@ -383,6 +409,7 @@ export {
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
+ nodeWorkerThreadCloseCb,
serializeJsMessageData,
structuredClone,
};
diff --git a/tests/integration/worker_tests.rs b/tests/integration/worker_tests.rs
index dd0c2d409..abd07da16 100644
--- a/tests/integration/worker_tests.rs
+++ b/tests/integration/worker_tests.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use test_util::itest;
+use test_util::itest_flaky;
itest!(worker_error {
args: "run -A workers/worker_error.ts",
@@ -125,3 +126,15 @@ itest!(node_worker_auto_exits {
output: "workers/node_worker_auto_exits.mjs.out",
exit_code: 0,
});
+
+itest_flaky!(node_worker_message_port {
+ args: "run --quiet --allow-read workers/node_worker_message_port.mjs",
+ output: "workers/node_worker_message_port.mjs.out",
+ exit_code: 0,
+});
+
+itest!(node_worker_transfer_port {
+ args: "run --quiet --allow-read workers/node_worker_transfer_port.mjs",
+ output: "workers/node_worker_transfer_port.mjs.out",
+ exit_code: 0,
+});
diff --git a/tests/testdata/workers/node_worker_message_port.mjs b/tests/testdata/workers/node_worker_message_port.mjs
new file mode 100644
index 000000000..71640fb40
--- /dev/null
+++ b/tests/testdata/workers/node_worker_message_port.mjs
@@ -0,0 +1,41 @@
+import workerThreads from "node:worker_threads";
+
+const { port1: mainPort, port2: workerPort } = new workerThreads
+ .MessageChannel();
+
+// Note: not using Promise.withResolver() because it's not available in Node.js
+const deferred = createDeferred();
+
+const worker = new workerThreads.Worker(
+ import.meta.resolve("./node_worker_message_port_1.cjs"),
+ {
+ workerData: { workerPort },
+ transferList: [workerPort],
+ },
+);
+
+worker.on("message", (data) => {
+ console.log("worker:", data);
+ mainPort.on("message", (msg) => {
+ console.log("mainPort:", msg);
+ deferred.resolve();
+ });
+ mainPort.on("close", (_msg) => {
+ console.log("mainPort closed");
+ });
+});
+
+worker.postMessage("Hello from parent");
+await deferred.promise;
+await worker.terminate();
+mainPort.close();
+
+function createDeferred() {
+ let resolveCallback;
+ let rejectCallback;
+ const promise = new Promise((resolve, reject) => {
+ resolveCallback = resolve;
+ rejectCallback = reject;
+ });
+ return { promise, resolve: resolveCallback, reject: rejectCallback };
+}
diff --git a/tests/testdata/workers/node_worker_message_port.mjs.out b/tests/testdata/workers/node_worker_message_port.mjs.out
new file mode 100644
index 000000000..5317b65a0
--- /dev/null
+++ b/tests/testdata/workers/node_worker_message_port.mjs.out
@@ -0,0 +1,4 @@
+worker port closed
+worker: Hello from worker on parentPort!
+mainPort: Hello from worker on workerPort!
+mainPort closed
diff --git a/tests/testdata/workers/node_worker_message_port_1.cjs b/tests/testdata/workers/node_worker_message_port_1.cjs
new file mode 100644
index 000000000..01739c51e
--- /dev/null
+++ b/tests/testdata/workers/node_worker_message_port_1.cjs
@@ -0,0 +1,9 @@
+const { parentPort, workerData } = require("worker_threads");
+
+parentPort.on("message", (msg) => {
+ const workerPort = workerData.workerPort;
+ parentPort.postMessage("Hello from worker on parentPort!");
+ workerPort.postMessage("Hello from worker on workerPort!");
+ workerPort.on("close", () => console.log("worker port closed"));
+ workerPort.close();
+});
diff --git a/tests/testdata/workers/node_worker_transfer_port.mjs b/tests/testdata/workers/node_worker_transfer_port.mjs
new file mode 100644
index 000000000..1b17ed1ab
--- /dev/null
+++ b/tests/testdata/workers/node_worker_transfer_port.mjs
@@ -0,0 +1,14 @@
+import { MessageChannel, Worker } from "node:worker_threads";
+
+const { port1, port2 } = new MessageChannel();
+const worker = new Worker(
+ import.meta.resolve("./node_worker_transfer_port_1.mjs"),
+);
+// Send the port directly after the worker is created
+worker.postMessage(port2, [port2]);
+// Send a message to the worker using the transferred port
+port1.postMessage("Hello from main thread!");
+worker.on("message", (message) => {
+ console.log("Received message from worker:", message);
+ worker.terminate();
+});
diff --git a/tests/testdata/workers/node_worker_transfer_port.mjs.out b/tests/testdata/workers/node_worker_transfer_port.mjs.out
new file mode 100644
index 000000000..8e8f11940
--- /dev/null
+++ b/tests/testdata/workers/node_worker_transfer_port.mjs.out
@@ -0,0 +1,3 @@
+Worker thread started!
+Received message from main thread: Hello from main thread!
+Received message from worker: Reply from worker
diff --git a/tests/testdata/workers/node_worker_transfer_port_1.mjs b/tests/testdata/workers/node_worker_transfer_port_1.mjs
new file mode 100644
index 000000000..4d0a38bd5
--- /dev/null
+++ b/tests/testdata/workers/node_worker_transfer_port_1.mjs
@@ -0,0 +1,10 @@
+import { parentPort } from "node:worker_threads";
+
+parentPort.on("message", (message) => {
+ const transferredPort = message;
+ transferredPort.on("message", (message) => {
+ console.log("Received message from main thread:", message);
+ parentPort.postMessage("Reply from worker");
+ });
+ console.log("Worker thread started!");
+});
diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts
index 52c9cfffd..2351e1052 100644
--- a/tests/unit_node/worker_threads_test.ts
+++ b/tests/unit_node/worker_threads_test.ts
@@ -16,7 +16,7 @@ Deno.test("[node/worker_threads] BroadcastChannel is exported", () => {
});
Deno.test("[node/worker_threads] MessageChannel are MessagePort are exported", () => {
- assertEquals<unknown>(workerThreads.MessageChannel, MessageChannel);
+ assert(workerThreads.MessageChannel);
assertEquals<unknown>(workerThreads.MessagePort, MessagePort);
});