diff options
author | Satya Rohith <me@satyarohith.com> | 2024-04-09 23:45:55 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-09 20:15:55 +0200 |
commit | 5a3ee6d9af875af032909489c0bed7db11b608dd (patch) | |
tree | 894744a0811b21179a65ee63858f50fbe4fc64d9 | |
parent | fad12b7c2ebd87a2a11f63998f4c2549fd405eff (diff) |
fix(ext/node): implement MessagePort.unref() (#23278)
Closes https://github.com/denoland/deno/issues/23252
Closes https://github.com/denoland/deno/issues/23264
-rw-r--r-- | ext/node/polyfills/worker_threads.ts | 14 | ||||
-rw-r--r-- | ext/web/13_message_port.js | 17 | ||||
-rw-r--r-- | runtime/js/99_main.js | 7 | ||||
-rw-r--r-- | tests/integration/worker_tests.rs | 13 | ||||
-rw-r--r-- | tests/testdata/workers/node_worker_message_port_unref.mjs | 40 | ||||
-rw-r--r-- | tests/testdata/workers/node_worker_message_port_unref.mjs.out | 2 |
6 files changed, 92 insertions, 1 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 3c8c9d443..323095206 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -18,7 +18,9 @@ import { MessagePortIdSymbol, MessagePortPrototype, nodeWorkerThreadCloseCb, + refMessagePort, serializeJsMessageData, + unrefPollForMessages, } from "ext:deno_web/13_message_port.js"; import * as webidl from "ext:deno_webidl/00_webidl.js"; import { notImplemented } from "ext:deno_node/_utils.ts"; @@ -398,6 +400,12 @@ internals.__initWorkerThreads = ( parentPort.addEventListener("offline", () => { parentPort.emit("close"); }); + parentPort.unref = () => { + parentPort[unrefPollForMessages] = true; + }; + parentPort.ref = () => { + parentPort[unrefPollForMessages] = false; + }; } }; @@ -467,6 +475,12 @@ function webMessagePortToNodeMessagePort(port: MessagePort) { port[nodeWorkerThreadCloseCb] = () => { port.dispatchEvent(new Event("close")); }; + port.unref = () => { + port[refMessagePort](false); + }; + port.ref = () => { + port[refMessagePort](true); + }; return port; } diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index 24982a982..62c0328c3 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -89,8 +89,13 @@ const MessageChannelPrototype = MessageChannel.prototype; const _id = Symbol("id"); const MessagePortIdSymbol = _id; const _enabled = Symbol("enabled"); +const _refed = Symbol("refed"); const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb"); const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked"); +export const refMessagePort = Symbol("refMessagePort"); +/** It is used by 99_main.js and worker_threads to + * unref/ref on the global pollForMessages promise. */ +export const unrefPollForMessages = Symbol("unrefPollForMessages"); /** * @param {number} id @@ -119,6 +124,7 @@ class MessagePort extends EventTarget { [_id] = null; /** @type {boolean} */ [_enabled] = false; + [_refed] = false; constructor() { super(); @@ -216,6 +222,16 @@ class MessagePort extends EventTarget { })(); } + [refMessagePort](ref) { + if (ref && !this[_refed]) { + this[_refed] = true; + messageEventListenerCount++; + } else if (!ref && this[_refed]) { + this[_refed] = false; + messageEventListenerCount = 0; + } + } + close() { webidl.assertBranded(this, MessagePortPrototype); if (this[_id] !== null) { @@ -235,6 +251,7 @@ class MessagePort extends EventTarget { addEventListener(...args) { if (args[0] == "message") { messageEventListenerCount++; + if (!this[_refed]) this[_refed] = true; } super.addEventListener(...new SafeArrayIterator(args)); } diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index a66a1660e..e5b9b9778 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -294,7 +294,12 @@ async function pollForMessages() { ); } while (!isClosing) { - const data = await op_worker_recv_message(); + const recvMessage = op_worker_recv_message(); + if (globalThis[messagePort.unrefPollForMessages] === true) { + core.unrefOpPromise(recvMessage); + } + const data = await recvMessage; + // const data = await op_worker_recv_message(); if (data === null) break; const v = messagePort.deserializeJsMessageData(data); const message = v[0]; diff --git a/tests/integration/worker_tests.rs b/tests/integration/worker_tests.rs index 65c34aa15..aa1c9656e 100644 --- a/tests/integration/worker_tests.rs +++ b/tests/integration/worker_tests.rs @@ -137,3 +137,16 @@ itest!(node_worker_transfer_port { output: "workers/node_worker_transfer_port.mjs.out", exit_code: 0, }); + +itest!(node_worker_message_port_unref { + args: "run --quiet --allow-env --allow-read workers/node_worker_message_port_unref.mjs", + output: "workers/node_worker_message_port_unref.mjs.out", + exit_code: 0, +}); + +itest!(node_worker_parent_port_unref { + envs: vec![("PARENT_PORT".into(), "1".into())], + args: "run --quiet --allow-env --allow-read workers/node_worker_message_port_unref.mjs", + output: "workers/node_worker_message_port_unref.mjs.out", + exit_code: 0, +}); diff --git a/tests/testdata/workers/node_worker_message_port_unref.mjs b/tests/testdata/workers/node_worker_message_port_unref.mjs new file mode 100644 index 000000000..a12be1ec8 --- /dev/null +++ b/tests/testdata/workers/node_worker_message_port_unref.mjs @@ -0,0 +1,40 @@ +import { + isMainThread, + MessageChannel, + parentPort, + Worker, + workerData, +} from "node:worker_threads"; + +const useParentPort = Deno.env.get("PARENT_PORT") === "1"; + +if (useParentPort) { + if (isMainThread) { + const worker = new Worker(import.meta.filename); + worker.postMessage("main says hi!"); + worker.on("message", (msg) => console.log(msg)); + } else { + parentPort.on("message", (msg) => { + console.log(msg); + parentPort.postMessage("worker says hi!"); + parentPort.unref(); + }); + } +} else { + if (isMainThread) { + const { port1, port2 } = new MessageChannel(); + const worker = new Worker(import.meta.filename, { + workerData: port2, + transferList: [port2], + }); + port1.postMessage("main says hi!"); + port1.on("message", (msg) => console.log(msg)); + } else { + const port = workerData; + port.on("message", (msg) => { + console.log(msg); + port.postMessage("worker says hi!"); + port.unref(); + }); + } +} diff --git a/tests/testdata/workers/node_worker_message_port_unref.mjs.out b/tests/testdata/workers/node_worker_message_port_unref.mjs.out new file mode 100644 index 000000000..c9d5325bc --- /dev/null +++ b/tests/testdata/workers/node_worker_message_port_unref.mjs.out @@ -0,0 +1,2 @@ +main says hi! +worker says hi! |