summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/node/polyfills/worker_threads.ts2
-rw-r--r--ext/web/13_message_port.js18
-rw-r--r--ext/web/message_port.rs1
-rw-r--r--tests/unit_node/worker_threads_test.ts22
4 files changed, 42 insertions, 1 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index 323095206..f61e7e3e3 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -17,6 +17,7 @@ import {
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
+ MessagePortReceiveMessageOnPortSymbol,
nodeWorkerThreadCloseCb,
refMessagePort,
serializeJsMessageData,
@@ -441,6 +442,7 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined {
err["code"] = "ERR_INVALID_ARG_TYPE";
throw err;
}
+ port[MessagePortReceiveMessageOnPortSymbol] = true;
const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]);
if (data === null) return undefined;
return { message: deserializeJsMessageData(data)[0] };
diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js
index 62c0328c3..4e4184f2a 100644
--- a/ext/web/13_message_port.js
+++ b/ext/web/13_message_port.js
@@ -88,6 +88,9 @@ const MessageChannelPrototype = MessageChannel.prototype;
const _id = Symbol("id");
const MessagePortIdSymbol = _id;
+const MessagePortReceiveMessageOnPortSymbol = Symbol(
+ "MessagePortReceiveMessageOnPort",
+);
const _enabled = Symbol("enabled");
const _refed = Symbol("refed");
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
@@ -128,6 +131,10 @@ class MessagePort extends EventTarget {
constructor() {
super();
+ ObjectDefineProperty(this, MessagePortReceiveMessageOnPortSymbol, {
+ value: false,
+ enumerable: false,
+ });
ObjectDefineProperty(this, nodeWorkerThreadCloseCb, {
value: null,
enumerable: false,
@@ -189,7 +196,15 @@ class MessagePort extends EventTarget {
this[_id],
);
} catch (err) {
- if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) break;
+ if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) {
+ // If we were interrupted, check if the interruption is coming
+ // from `receiveMessageOnPort` API from Node compat, if so, continue.
+ if (this[MessagePortReceiveMessageOnPortSymbol]) {
+ this[MessagePortReceiveMessageOnPortSymbol] = false;
+ continue;
+ }
+ break;
+ }
nodeWorkerThreadMaybeInvokeCloseCb(this);
throw err;
}
@@ -444,6 +459,7 @@ export {
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
+ MessagePortReceiveMessageOnPortSymbol,
nodeWorkerThreadCloseCb,
serializeJsMessageData,
structuredClone,
diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs
index 1cd29c64d..ac33145b1 100644
--- a/ext/web/message_port.rs
+++ b/ext/web/message_port.rs
@@ -235,6 +235,7 @@ pub fn op_message_port_recv_message_sync(
#[smi] rid: ResourceId,
) -> Result<Option<JsMessageData>, AnyError> {
let resource = state.resource_table.get::<MessagePortResource>(rid)?;
+ resource.cancel.cancel();
let mut rx = resource.port.rx.borrow_mut();
match rx.try_recv() {
diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts
index 2351e1052..bd600469b 100644
--- a/tests/unit_node/worker_threads_test.ts
+++ b/tests/unit_node/worker_threads_test.ts
@@ -414,3 +414,25 @@ Deno.test({
mainPort.close();
},
});
+
+// Regression test for https://github.com/denoland/deno/issues/23362
+Deno.test("[node/worker_threads] receiveMessageOnPort works if there's pending read", function () {
+ const { port1, port2 } = new workerThreads.MessageChannel();
+
+ const message1 = { hello: "world" };
+ const message2 = { foo: "bar" };
+
+ assertEquals(workerThreads.receiveMessageOnPort(port2), undefined);
+ port2.start();
+
+ port1.postMessage(message1);
+ port1.postMessage(message2);
+ assertEquals(workerThreads.receiveMessageOnPort(port2), {
+ message: message1,
+ });
+ assertEquals(workerThreads.receiveMessageOnPort(port2), {
+ message: message2,
+ });
+ port1.close();
+ port2.close();
+});