diff options
Diffstat (limited to 'tests/unit_node')
-rw-r--r-- | tests/unit_node/worker_threads_test.ts | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index ac797601f..24a910789 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -621,3 +621,204 @@ Deno.test({ worker.terminate(); }, }); + +Deno.test({ + name: "[node/worker_threads] receiveMessageOnPort doesn't exit receive loop", + async fn() { + const worker = new workerThreads.Worker( + ` + import { parentPort, receiveMessageOnPort } from "node:worker_threads"; + parentPort.on("message", (msg) => { + const port = msg.port; + port.on("message", (msg2) => { + if (msg2 === "c") { + port.postMessage("done"); + port.unref(); + parentPort.unref(); + } + }); + parentPort.postMessage("ready"); + const msg2 = receiveMessageOnPort(port); + }); + `, + { eval: true }, + ); + + const { port1, port2 } = new workerThreads.MessageChannel(); + + worker.postMessage({ port: port2 }, [port2]); + + const done = Promise.withResolvers<boolean>(); + + port1.on("message", (msg) => { + assertEquals(msg, "done"); + worker.unref(); + port1.close(); + done.resolve(true); + }); + worker.on("message", (msg) => { + assertEquals(msg, "ready"); + port1.postMessage("a"); + port1.postMessage("b"); + port1.postMessage("c"); + }); + + const timeout = setTimeout(() => { + fail("Test timed out"); + }, 20_000); + try { + const result = await done.promise; + assertEquals(result, true); + } finally { + clearTimeout(timeout); + } + }, +}); + +Deno.test({ + name: "[node/worker_threads] MessagePort.unref doesn't exit receive loop", + async fn() { + const worker = new workerThreads.Worker( + ` + import { parentPort } from "node:worker_threads"; + const assertEquals = (a, b) => { + if (a !== b) { + throw new Error(); + } + }; + let state = 0; + parentPort.on("message", (msg) => { + const port = msg.port; + const expect = ["a", "b", "c"]; + port.on("message", (msg2) => { + assertEquals(msg2, expect[state++]); + if (msg2 === "c") { + port.postMessage({ type: "done", got: msg2 }); + parentPort.unref(); + } + }); + port.unref(); + parentPort.postMessage("ready"); + }); + `, + { eval: true }, + ); + + const { port1, port2 } = new workerThreads.MessageChannel(); + + const done = Promise.withResolvers<boolean>(); + + port1.on("message", (msg) => { + assertEquals(msg.type, "done"); + assertEquals(msg.got, "c"); + worker.unref(); + port1.close(); + done.resolve(true); + }); + worker.on("message", (msg) => { + assertEquals(msg, "ready"); + port1.postMessage("a"); + port1.postMessage("b"); + port1.postMessage("c"); + }); + worker.postMessage({ port: port2 }, [port2]); + + const timeout = setTimeout(() => { + fail("Test timed out"); + }, 20_000); + try { + const result = await done.promise; + assertEquals(result, true); + } finally { + clearTimeout(timeout); + } + }, +}); + +Deno.test({ + name: "[node/worker_threads] npm:piscina wait loop hang regression", + async fn() { + const worker = new workerThreads.Worker( + ` + import { assert, assertEquals } from "@std/assert"; + import { parentPort, receiveMessageOnPort } from "node:worker_threads"; + + assert(parentPort !== null); + + let currentTasks = 0; + let lastSeen = 0; + + parentPort.on("message", (msg) => { + (async () => { + assert(typeof msg === "object" && msg !== null); + assert(msg.buf !== undefined); + assert(msg.port !== undefined); + const { buf, port } = msg; + port.postMessage("ready"); + port.on("message", (msg) => onMessage(msg, buf, port)); + atomicsWaitLoop(buf, port); + })(); + }); + + function onMessage(msg, buf, port) { + currentTasks++; + (async () => { + assert(msg.taskName !== undefined); + port.postMessage({ type: "response", taskName: msg.taskName }); + currentTasks--; + atomicsWaitLoop(buf, port); + })(); + } + + function atomicsWaitLoop(buf, port) { + while (currentTasks === 0) { + Atomics.wait(buf, 0, lastSeen); + lastSeen = Atomics.load(buf, 0); + let task; + while ((task = receiveMessageOnPort(port)) !== undefined) { + onMessage(task.message, buf, port); + } + } + } + `, + { eval: true }, + ); + + const sab = new SharedArrayBuffer(4); + const buf = new Int32Array(sab); + const { port1, port2 } = new workerThreads.MessageChannel(); + + const done = Promise.withResolvers<boolean>(); + + port1.unref(); + + worker.postMessage({ + type: "init", + buf, + port: port2, + }, [port2]); + + let count = 0; + port1.on("message", (msg) => { + if (count++ === 0) { + assertEquals(msg, "ready"); + } else { + assertEquals(msg.type, "response"); + port1.close(); + done.resolve(true); + } + }); + + port1.postMessage({ + taskName: "doThing", + }); + + Atomics.add(buf, 0, 1); + Atomics.notify(buf, 0, 1); + + worker.unref(); + + const result = await done.promise; + assertEquals(result, true); + }, +}); |