summaryrefslogtreecommitdiff
path: root/tests/unit_node/worker_threads_test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unit_node/worker_threads_test.ts')
-rw-r--r--tests/unit_node/worker_threads_test.ts201
1 files changed, 201 insertions, 0 deletions
diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts
index ac797601f..24a910789 100644
--- a/tests/unit_node/worker_threads_test.ts
+++ b/tests/unit_node/worker_threads_test.ts
@@ -621,3 +621,204 @@ Deno.test({
worker.terminate();
},
});
+
+Deno.test({
+ name: "[node/worker_threads] receiveMessageOnPort doesn't exit receive loop",
+ async fn() {
+ const worker = new workerThreads.Worker(
+ `
+ import { parentPort, receiveMessageOnPort } from "node:worker_threads";
+ parentPort.on("message", (msg) => {
+ const port = msg.port;
+ port.on("message", (msg2) => {
+ if (msg2 === "c") {
+ port.postMessage("done");
+ port.unref();
+ parentPort.unref();
+ }
+ });
+ parentPort.postMessage("ready");
+ const msg2 = receiveMessageOnPort(port);
+ });
+ `,
+ { eval: true },
+ );
+
+ const { port1, port2 } = new workerThreads.MessageChannel();
+
+ worker.postMessage({ port: port2 }, [port2]);
+
+ const done = Promise.withResolvers<boolean>();
+
+ port1.on("message", (msg) => {
+ assertEquals(msg, "done");
+ worker.unref();
+ port1.close();
+ done.resolve(true);
+ });
+ worker.on("message", (msg) => {
+ assertEquals(msg, "ready");
+ port1.postMessage("a");
+ port1.postMessage("b");
+ port1.postMessage("c");
+ });
+
+ const timeout = setTimeout(() => {
+ fail("Test timed out");
+ }, 20_000);
+ try {
+ const result = await done.promise;
+ assertEquals(result, true);
+ } finally {
+ clearTimeout(timeout);
+ }
+ },
+});
+
+Deno.test({
+ name: "[node/worker_threads] MessagePort.unref doesn't exit receive loop",
+ async fn() {
+ const worker = new workerThreads.Worker(
+ `
+ import { parentPort } from "node:worker_threads";
+ const assertEquals = (a, b) => {
+ if (a !== b) {
+ throw new Error();
+ }
+ };
+ let state = 0;
+ parentPort.on("message", (msg) => {
+ const port = msg.port;
+ const expect = ["a", "b", "c"];
+ port.on("message", (msg2) => {
+ assertEquals(msg2, expect[state++]);
+ if (msg2 === "c") {
+ port.postMessage({ type: "done", got: msg2 });
+ parentPort.unref();
+ }
+ });
+ port.unref();
+ parentPort.postMessage("ready");
+ });
+ `,
+ { eval: true },
+ );
+
+ const { port1, port2 } = new workerThreads.MessageChannel();
+
+ const done = Promise.withResolvers<boolean>();
+
+ port1.on("message", (msg) => {
+ assertEquals(msg.type, "done");
+ assertEquals(msg.got, "c");
+ worker.unref();
+ port1.close();
+ done.resolve(true);
+ });
+ worker.on("message", (msg) => {
+ assertEquals(msg, "ready");
+ port1.postMessage("a");
+ port1.postMessage("b");
+ port1.postMessage("c");
+ });
+ worker.postMessage({ port: port2 }, [port2]);
+
+ const timeout = setTimeout(() => {
+ fail("Test timed out");
+ }, 20_000);
+ try {
+ const result = await done.promise;
+ assertEquals(result, true);
+ } finally {
+ clearTimeout(timeout);
+ }
+ },
+});
+
+Deno.test({
+ name: "[node/worker_threads] npm:piscina wait loop hang regression",
+ async fn() {
+ const worker = new workerThreads.Worker(
+ `
+ import { assert, assertEquals } from "@std/assert";
+ import { parentPort, receiveMessageOnPort } from "node:worker_threads";
+
+ assert(parentPort !== null);
+
+ let currentTasks = 0;
+ let lastSeen = 0;
+
+ parentPort.on("message", (msg) => {
+ (async () => {
+ assert(typeof msg === "object" && msg !== null);
+ assert(msg.buf !== undefined);
+ assert(msg.port !== undefined);
+ const { buf, port } = msg;
+ port.postMessage("ready");
+ port.on("message", (msg) => onMessage(msg, buf, port));
+ atomicsWaitLoop(buf, port);
+ })();
+ });
+
+ function onMessage(msg, buf, port) {
+ currentTasks++;
+ (async () => {
+ assert(msg.taskName !== undefined);
+ port.postMessage({ type: "response", taskName: msg.taskName });
+ currentTasks--;
+ atomicsWaitLoop(buf, port);
+ })();
+ }
+
+ function atomicsWaitLoop(buf, port) {
+ while (currentTasks === 0) {
+ Atomics.wait(buf, 0, lastSeen);
+ lastSeen = Atomics.load(buf, 0);
+ let task;
+ while ((task = receiveMessageOnPort(port)) !== undefined) {
+ onMessage(task.message, buf, port);
+ }
+ }
+ }
+ `,
+ { eval: true },
+ );
+
+ const sab = new SharedArrayBuffer(4);
+ const buf = new Int32Array(sab);
+ const { port1, port2 } = new workerThreads.MessageChannel();
+
+ const done = Promise.withResolvers<boolean>();
+
+ port1.unref();
+
+ worker.postMessage({
+ type: "init",
+ buf,
+ port: port2,
+ }, [port2]);
+
+ let count = 0;
+ port1.on("message", (msg) => {
+ if (count++ === 0) {
+ assertEquals(msg, "ready");
+ } else {
+ assertEquals(msg.type, "response");
+ port1.close();
+ done.resolve(true);
+ }
+ });
+
+ port1.postMessage({
+ taskName: "doThing",
+ });
+
+ Atomics.add(buf, 0, 1);
+ Atomics.notify(buf, 0, 1);
+
+ worker.unref();
+
+ const result = await done.promise;
+ assertEquals(result, true);
+ },
+});