summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/node/polyfills/worker_threads.ts12
-rw-r--r--ext/web/13_message_port.js88
-rw-r--r--ext/web/message_port.rs1
-rw-r--r--runtime/js/99_main.js5
-rw-r--r--tests/unit_node/worker_threads_test.ts201
5 files changed, 278 insertions, 29 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index 5ff4446f7..d4b75fb30 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -302,8 +302,8 @@ class NodeWorker extends EventEmitter {
if (this.#status !== "TERMINATED") {
this.#status = "TERMINATED";
op_host_terminate_worker(this.#id);
+ this.emit("exit", 0);
}
- this.emit("exit", 0);
return PromiseResolve(0);
}
@@ -422,7 +422,11 @@ internals.__initWorkerThreads = (
parentPort.once = function (this: ParentPort, name, listener) {
// deno-lint-ignore no-explicit-any
- const _listener = (ev: any) => listener(ev.data);
+ const _listener = (ev: any) => {
+ const message = ev.data;
+ patchMessagePortIfFound(message);
+ return listener(message);
+ };
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
@@ -494,7 +498,9 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined {
port[MessagePortReceiveMessageOnPortSymbol] = true;
const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]);
if (data === null) return undefined;
- return { message: deserializeJsMessageData(data)[0] };
+ const message = deserializeJsMessageData(data)[0];
+ patchMessagePortIfFound(message);
+ return { message };
}
class NodeMessageChannel {
diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js
index 04697d6aa..cf72c43e6 100644
--- a/ext/web/13_message_port.js
+++ b/ext/web/13_message_port.js
@@ -22,6 +22,7 @@ const {
Symbol,
SymbolFor,
SymbolIterator,
+ PromiseResolve,
SafeArrayIterator,
TypeError,
} = primordials;
@@ -41,7 +42,10 @@ import {
import { isDetachedBuffer } from "./06_streams.js";
import { DOMException } from "./01_dom_exception.js";
-let messageEventListenerCount = 0;
+// counter of how many message ports are actively refed
+// either due to the existence of "message" event listeners or
+// explicit calls to ref/unref (in the case of node message ports)
+let refedMessagePortsCount = 0;
class MessageChannel {
/** @type {MessagePort} */
@@ -93,6 +97,7 @@ const MessagePortReceiveMessageOnPortSymbol = Symbol(
);
const _enabled = Symbol("enabled");
const _refed = Symbol("refed");
+const _messageEventListenerCount = Symbol("messageEventListenerCount");
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
export const refMessagePort = Symbol("refMessagePort");
@@ -109,6 +114,9 @@ function createMessagePort(id) {
port[core.hostObjectBrand] = core.hostObjectBrand;
setEventTargetData(port);
port[_id] = id;
+ port[_enabled] = false;
+ port[_messageEventListenerCount] = 0;
+ port[_refed] = false;
return port;
}
@@ -122,12 +130,18 @@ function nodeWorkerThreadMaybeInvokeCloseCb(port) {
}
}
+const _isRefed = Symbol("isRefed");
+const _dataPromise = Symbol("dataPromise");
+
class MessagePort extends EventTarget {
/** @type {number | null} */
[_id] = null;
/** @type {boolean} */
[_enabled] = false;
[_refed] = false;
+ /** @type {Promise<any> | undefined} */
+ [_dataPromise] = undefined;
+ [_messageEventListenerCount] = 0;
constructor() {
super();
@@ -193,24 +207,21 @@ class MessagePort extends EventTarget {
this[_enabled] = true;
while (true) {
if (this[_id] === null) break;
- // Exit if no message event listeners are present in Node compat mode.
- if (
- typeof this[nodeWorkerThreadCloseCb] == "function" &&
- messageEventListenerCount === 0
- ) break;
let data;
try {
- data = await op_message_port_recv_message(
+ this[_dataPromise] = op_message_port_recv_message(
this[_id],
);
+ if (
+ typeof this[nodeWorkerThreadCloseCb] === "function" &&
+ !this[_refed]
+ ) {
+ core.unrefOpPromise(this[_dataPromise]);
+ }
+ data = await this[_dataPromise];
+ this[_dataPromise] = undefined;
} catch (err) {
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) {
- // If we were interrupted, check if the interruption is coming
- // from `receiveMessageOnPort` API from Node compat, if so, continue.
- if (this[MessagePortReceiveMessageOnPortSymbol]) {
- this[MessagePortReceiveMessageOnPortSymbol] = false;
- continue;
- }
break;
}
nodeWorkerThreadMaybeInvokeCloseCb(this);
@@ -246,12 +257,26 @@ class MessagePort extends EventTarget {
}
[refMessagePort](ref) {
- if (ref && !this[_refed]) {
- this[_refed] = true;
- messageEventListenerCount++;
- } else if (!ref && this[_refed]) {
- this[_refed] = false;
- messageEventListenerCount = 0;
+ if (ref) {
+ if (!this[_refed]) {
+ refedMessagePortsCount++;
+ if (
+ this[_dataPromise]
+ ) {
+ core.refOpPromise(this[_dataPromise]);
+ }
+ this[_refed] = true;
+ }
+ } else if (!ref) {
+ if (this[_refed]) {
+ refedMessagePortsCount--;
+ if (
+ this[_dataPromise]
+ ) {
+ core.unrefOpPromise(this[_dataPromise]);
+ }
+ this[_refed] = false;
+ }
}
}
@@ -266,15 +291,20 @@ class MessagePort extends EventTarget {
removeEventListener(...args) {
if (args[0] == "message") {
- messageEventListenerCount--;
+ if (--this[_messageEventListenerCount] === 0 && this[_refed]) {
+ refedMessagePortsCount--;
+ this[_refed] = false;
+ }
}
super.removeEventListener(...new SafeArrayIterator(args));
}
addEventListener(...args) {
if (args[0] == "message") {
- messageEventListenerCount++;
- if (!this[_refed]) this[_refed] = true;
+ if (++this[_messageEventListenerCount] === 1 && !this[_refed]) {
+ refedMessagePortsCount++;
+ this[_refed] = true;
+ }
}
super.addEventListener(...new SafeArrayIterator(args));
}
@@ -295,7 +325,17 @@ class MessagePort extends EventTarget {
}
defineEventHandler(MessagePort.prototype, "message", function (self) {
- self.start();
+ if (self[nodeWorkerThreadCloseCb]) {
+ (async () => {
+ // delay `start()` until he end of this event loop turn, to give `receiveMessageOnPort`
+ // a chance to receive a message first. this is primarily to resolve an issue with
+ // a pattern used in `npm:piscina` that results in an indefinite hang
+ await PromiseResolve();
+ self.start();
+ })();
+ } else {
+ self.start();
+ }
});
defineEventHandler(MessagePort.prototype, "messageerror");
@@ -463,12 +503,12 @@ function structuredClone(value, options) {
export {
deserializeJsMessageData,
MessageChannel,
- messageEventListenerCount,
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
MessagePortReceiveMessageOnPortSymbol,
nodeWorkerThreadCloseCb,
+ refedMessagePortsCount,
serializeJsMessageData,
structuredClone,
};
diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs
index c069037f8..fa299475d 100644
--- a/ext/web/message_port.rs
+++ b/ext/web/message_port.rs
@@ -239,7 +239,6 @@ pub fn op_message_port_recv_message_sync(
#[smi] rid: ResourceId,
) -> Result<Option<JsMessageData>, AnyError> {
let resource = state.resource_table.get::<MessagePortResource>(rid)?;
- resource.cancel.cancel();
let mut rx = resource.port.rx.borrow_mut();
match rx.try_recv() {
diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js
index 0da2072b8..56a5b411b 100644
--- a/runtime/js/99_main.js
+++ b/runtime/js/99_main.js
@@ -169,8 +169,11 @@ let isClosing = false;
let globalDispatchEvent;
function hasMessageEventListener() {
+ // the function name is kind of a misnomer, but we want to behave
+ // as if we have message event listeners if a node message port is explicitly
+ // refed (and the inverse as well)
return event.listenerCount(globalThis, "message") > 0 ||
- messagePort.messageEventListenerCount > 0;
+ messagePort.refedMessagePortsCount > 0;
}
async function pollForMessages() {
diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts
index ac797601f..24a910789 100644
--- a/tests/unit_node/worker_threads_test.ts
+++ b/tests/unit_node/worker_threads_test.ts
@@ -621,3 +621,204 @@ Deno.test({
worker.terminate();
},
});
+
+Deno.test({
+ name: "[node/worker_threads] receiveMessageOnPort doesn't exit receive loop",
+ async fn() {
+ const worker = new workerThreads.Worker(
+ `
+ import { parentPort, receiveMessageOnPort } from "node:worker_threads";
+ parentPort.on("message", (msg) => {
+ const port = msg.port;
+ port.on("message", (msg2) => {
+ if (msg2 === "c") {
+ port.postMessage("done");
+ port.unref();
+ parentPort.unref();
+ }
+ });
+ parentPort.postMessage("ready");
+ const msg2 = receiveMessageOnPort(port);
+ });
+ `,
+ { eval: true },
+ );
+
+ const { port1, port2 } = new workerThreads.MessageChannel();
+
+ worker.postMessage({ port: port2 }, [port2]);
+
+ const done = Promise.withResolvers<boolean>();
+
+ port1.on("message", (msg) => {
+ assertEquals(msg, "done");
+ worker.unref();
+ port1.close();
+ done.resolve(true);
+ });
+ worker.on("message", (msg) => {
+ assertEquals(msg, "ready");
+ port1.postMessage("a");
+ port1.postMessage("b");
+ port1.postMessage("c");
+ });
+
+ const timeout = setTimeout(() => {
+ fail("Test timed out");
+ }, 20_000);
+ try {
+ const result = await done.promise;
+ assertEquals(result, true);
+ } finally {
+ clearTimeout(timeout);
+ }
+ },
+});
+
+Deno.test({
+ name: "[node/worker_threads] MessagePort.unref doesn't exit receive loop",
+ async fn() {
+ const worker = new workerThreads.Worker(
+ `
+ import { parentPort } from "node:worker_threads";
+ const assertEquals = (a, b) => {
+ if (a !== b) {
+ throw new Error();
+ }
+ };
+ let state = 0;
+ parentPort.on("message", (msg) => {
+ const port = msg.port;
+ const expect = ["a", "b", "c"];
+ port.on("message", (msg2) => {
+ assertEquals(msg2, expect[state++]);
+ if (msg2 === "c") {
+ port.postMessage({ type: "done", got: msg2 });
+ parentPort.unref();
+ }
+ });
+ port.unref();
+ parentPort.postMessage("ready");
+ });
+ `,
+ { eval: true },
+ );
+
+ const { port1, port2 } = new workerThreads.MessageChannel();
+
+ const done = Promise.withResolvers<boolean>();
+
+ port1.on("message", (msg) => {
+ assertEquals(msg.type, "done");
+ assertEquals(msg.got, "c");
+ worker.unref();
+ port1.close();
+ done.resolve(true);
+ });
+ worker.on("message", (msg) => {
+ assertEquals(msg, "ready");
+ port1.postMessage("a");
+ port1.postMessage("b");
+ port1.postMessage("c");
+ });
+ worker.postMessage({ port: port2 }, [port2]);
+
+ const timeout = setTimeout(() => {
+ fail("Test timed out");
+ }, 20_000);
+ try {
+ const result = await done.promise;
+ assertEquals(result, true);
+ } finally {
+ clearTimeout(timeout);
+ }
+ },
+});
+
+Deno.test({
+ name: "[node/worker_threads] npm:piscina wait loop hang regression",
+ async fn() {
+ const worker = new workerThreads.Worker(
+ `
+ import { assert, assertEquals } from "@std/assert";
+ import { parentPort, receiveMessageOnPort } from "node:worker_threads";
+
+ assert(parentPort !== null);
+
+ let currentTasks = 0;
+ let lastSeen = 0;
+
+ parentPort.on("message", (msg) => {
+ (async () => {
+ assert(typeof msg === "object" && msg !== null);
+ assert(msg.buf !== undefined);
+ assert(msg.port !== undefined);
+ const { buf, port } = msg;
+ port.postMessage("ready");
+ port.on("message", (msg) => onMessage(msg, buf, port));
+ atomicsWaitLoop(buf, port);
+ })();
+ });
+
+ function onMessage(msg, buf, port) {
+ currentTasks++;
+ (async () => {
+ assert(msg.taskName !== undefined);
+ port.postMessage({ type: "response", taskName: msg.taskName });
+ currentTasks--;
+ atomicsWaitLoop(buf, port);
+ })();
+ }
+
+ function atomicsWaitLoop(buf, port) {
+ while (currentTasks === 0) {
+ Atomics.wait(buf, 0, lastSeen);
+ lastSeen = Atomics.load(buf, 0);
+ let task;
+ while ((task = receiveMessageOnPort(port)) !== undefined) {
+ onMessage(task.message, buf, port);
+ }
+ }
+ }
+ `,
+ { eval: true },
+ );
+
+ const sab = new SharedArrayBuffer(4);
+ const buf = new Int32Array(sab);
+ const { port1, port2 } = new workerThreads.MessageChannel();
+
+ const done = Promise.withResolvers<boolean>();
+
+ port1.unref();
+
+ worker.postMessage({
+ type: "init",
+ buf,
+ port: port2,
+ }, [port2]);
+
+ let count = 0;
+ port1.on("message", (msg) => {
+ if (count++ === 0) {
+ assertEquals(msg, "ready");
+ } else {
+ assertEquals(msg.type, "response");
+ port1.close();
+ done.resolve(true);
+ }
+ });
+
+ port1.postMessage({
+ taskName: "doThing",
+ });
+
+ Atomics.add(buf, 0, 1);
+ Atomics.notify(buf, 0, 1);
+
+ worker.unref();
+
+ const result = await done.promise;
+ assertEquals(result, true);
+ },
+});