summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSatya Rohith <me@satyarohith.com>2024-04-09 23:45:55 +0530
committerGitHub <noreply@github.com>2024-04-09 20:15:55 +0200
commit5a3ee6d9af875af032909489c0bed7db11b608dd (patch)
tree894744a0811b21179a65ee63858f50fbe4fc64d9
parentfad12b7c2ebd87a2a11f63998f4c2549fd405eff (diff)
fix(ext/node): implement MessagePort.unref() (#23278)
Closes https://github.com/denoland/deno/issues/23252 Closes https://github.com/denoland/deno/issues/23264
-rw-r--r--ext/node/polyfills/worker_threads.ts14
-rw-r--r--ext/web/13_message_port.js17
-rw-r--r--runtime/js/99_main.js7
-rw-r--r--tests/integration/worker_tests.rs13
-rw-r--r--tests/testdata/workers/node_worker_message_port_unref.mjs40
-rw-r--r--tests/testdata/workers/node_worker_message_port_unref.mjs.out2
6 files changed, 92 insertions, 1 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index 3c8c9d443..323095206 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -18,7 +18,9 @@ import {
MessagePortIdSymbol,
MessagePortPrototype,
nodeWorkerThreadCloseCb,
+ refMessagePort,
serializeJsMessageData,
+ unrefPollForMessages,
} from "ext:deno_web/13_message_port.js";
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { notImplemented } from "ext:deno_node/_utils.ts";
@@ -398,6 +400,12 @@ internals.__initWorkerThreads = (
parentPort.addEventListener("offline", () => {
parentPort.emit("close");
});
+ parentPort.unref = () => {
+ parentPort[unrefPollForMessages] = true;
+ };
+ parentPort.ref = () => {
+ parentPort[unrefPollForMessages] = false;
+ };
}
};
@@ -467,6 +475,12 @@ function webMessagePortToNodeMessagePort(port: MessagePort) {
port[nodeWorkerThreadCloseCb] = () => {
port.dispatchEvent(new Event("close"));
};
+ port.unref = () => {
+ port[refMessagePort](false);
+ };
+ port.ref = () => {
+ port[refMessagePort](true);
+ };
return port;
}
diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js
index 24982a982..62c0328c3 100644
--- a/ext/web/13_message_port.js
+++ b/ext/web/13_message_port.js
@@ -89,8 +89,13 @@ const MessageChannelPrototype = MessageChannel.prototype;
const _id = Symbol("id");
const MessagePortIdSymbol = _id;
const _enabled = Symbol("enabled");
+const _refed = Symbol("refed");
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
+export const refMessagePort = Symbol("refMessagePort");
+/** It is used by 99_main.js and worker_threads to
+ * unref/ref on the global pollForMessages promise. */
+export const unrefPollForMessages = Symbol("unrefPollForMessages");
/**
* @param {number} id
@@ -119,6 +124,7 @@ class MessagePort extends EventTarget {
[_id] = null;
/** @type {boolean} */
[_enabled] = false;
+ [_refed] = false;
constructor() {
super();
@@ -216,6 +222,16 @@ class MessagePort extends EventTarget {
})();
}
+ [refMessagePort](ref) {
+ if (ref && !this[_refed]) {
+ this[_refed] = true;
+ messageEventListenerCount++;
+ } else if (!ref && this[_refed]) {
+ this[_refed] = false;
+ messageEventListenerCount = 0;
+ }
+ }
+
close() {
webidl.assertBranded(this, MessagePortPrototype);
if (this[_id] !== null) {
@@ -235,6 +251,7 @@ class MessagePort extends EventTarget {
addEventListener(...args) {
if (args[0] == "message") {
messageEventListenerCount++;
+ if (!this[_refed]) this[_refed] = true;
}
super.addEventListener(...new SafeArrayIterator(args));
}
diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js
index a66a1660e..e5b9b9778 100644
--- a/runtime/js/99_main.js
+++ b/runtime/js/99_main.js
@@ -294,7 +294,12 @@ async function pollForMessages() {
);
}
while (!isClosing) {
- const data = await op_worker_recv_message();
+ const recvMessage = op_worker_recv_message();
+ if (globalThis[messagePort.unrefPollForMessages] === true) {
+ core.unrefOpPromise(recvMessage);
+ }
+ const data = await recvMessage;
+ // const data = await op_worker_recv_message();
if (data === null) break;
const v = messagePort.deserializeJsMessageData(data);
const message = v[0];
diff --git a/tests/integration/worker_tests.rs b/tests/integration/worker_tests.rs
index 65c34aa15..aa1c9656e 100644
--- a/tests/integration/worker_tests.rs
+++ b/tests/integration/worker_tests.rs
@@ -137,3 +137,16 @@ itest!(node_worker_transfer_port {
output: "workers/node_worker_transfer_port.mjs.out",
exit_code: 0,
});
+
+itest!(node_worker_message_port_unref {
+ args: "run --quiet --allow-env --allow-read workers/node_worker_message_port_unref.mjs",
+ output: "workers/node_worker_message_port_unref.mjs.out",
+ exit_code: 0,
+});
+
+itest!(node_worker_parent_port_unref {
+ envs: vec![("PARENT_PORT".into(), "1".into())],
+ args: "run --quiet --allow-env --allow-read workers/node_worker_message_port_unref.mjs",
+ output: "workers/node_worker_message_port_unref.mjs.out",
+ exit_code: 0,
+});
diff --git a/tests/testdata/workers/node_worker_message_port_unref.mjs b/tests/testdata/workers/node_worker_message_port_unref.mjs
new file mode 100644
index 000000000..a12be1ec8
--- /dev/null
+++ b/tests/testdata/workers/node_worker_message_port_unref.mjs
@@ -0,0 +1,40 @@
+import {
+ isMainThread,
+ MessageChannel,
+ parentPort,
+ Worker,
+ workerData,
+} from "node:worker_threads";
+
+const useParentPort = Deno.env.get("PARENT_PORT") === "1";
+
+if (useParentPort) {
+ if (isMainThread) {
+ const worker = new Worker(import.meta.filename);
+ worker.postMessage("main says hi!");
+ worker.on("message", (msg) => console.log(msg));
+ } else {
+ parentPort.on("message", (msg) => {
+ console.log(msg);
+ parentPort.postMessage("worker says hi!");
+ parentPort.unref();
+ });
+ }
+} else {
+ if (isMainThread) {
+ const { port1, port2 } = new MessageChannel();
+ const worker = new Worker(import.meta.filename, {
+ workerData: port2,
+ transferList: [port2],
+ });
+ port1.postMessage("main says hi!");
+ port1.on("message", (msg) => console.log(msg));
+ } else {
+ const port = workerData;
+ port.on("message", (msg) => {
+ console.log(msg);
+ port.postMessage("worker says hi!");
+ port.unref();
+ });
+ }
+}
diff --git a/tests/testdata/workers/node_worker_message_port_unref.mjs.out b/tests/testdata/workers/node_worker_message_port_unref.mjs.out
new file mode 100644
index 000000000..c9d5325bc
--- /dev/null
+++ b/tests/testdata/workers/node_worker_message_port_unref.mjs.out
@@ -0,0 +1,2 @@
+main says hi!
+worker says hi!