summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/node/polyfills/worker_threads.ts12
-rw-r--r--ext/web/13_message_port.js88
-rw-r--r--ext/web/message_port.rs1
3 files changed, 73 insertions, 28 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index 5ff4446f7..d4b75fb30 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -302,8 +302,8 @@ class NodeWorker extends EventEmitter {
if (this.#status !== "TERMINATED") {
this.#status = "TERMINATED";
op_host_terminate_worker(this.#id);
+ this.emit("exit", 0);
}
- this.emit("exit", 0);
return PromiseResolve(0);
}
@@ -422,7 +422,11 @@ internals.__initWorkerThreads = (
parentPort.once = function (this: ParentPort, name, listener) {
// deno-lint-ignore no-explicit-any
- const _listener = (ev: any) => listener(ev.data);
+ const _listener = (ev: any) => {
+ const message = ev.data;
+ patchMessagePortIfFound(message);
+ return listener(message);
+ };
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
@@ -494,7 +498,9 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined {
port[MessagePortReceiveMessageOnPortSymbol] = true;
const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]);
if (data === null) return undefined;
- return { message: deserializeJsMessageData(data)[0] };
+ const message = deserializeJsMessageData(data)[0];
+ patchMessagePortIfFound(message);
+ return { message };
}
class NodeMessageChannel {
diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js
index 04697d6aa..cf72c43e6 100644
--- a/ext/web/13_message_port.js
+++ b/ext/web/13_message_port.js
@@ -22,6 +22,7 @@ const {
Symbol,
SymbolFor,
SymbolIterator,
+ PromiseResolve,
SafeArrayIterator,
TypeError,
} = primordials;
@@ -41,7 +42,10 @@ import {
import { isDetachedBuffer } from "./06_streams.js";
import { DOMException } from "./01_dom_exception.js";
-let messageEventListenerCount = 0;
+// counter of how many message ports are actively refed
+// either due to the existence of "message" event listeners or
+// explicit calls to ref/unref (in the case of node message ports)
+let refedMessagePortsCount = 0;
class MessageChannel {
/** @type {MessagePort} */
@@ -93,6 +97,7 @@ const MessagePortReceiveMessageOnPortSymbol = Symbol(
);
const _enabled = Symbol("enabled");
const _refed = Symbol("refed");
+const _messageEventListenerCount = Symbol("messageEventListenerCount");
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
export const refMessagePort = Symbol("refMessagePort");
@@ -109,6 +114,9 @@ function createMessagePort(id) {
port[core.hostObjectBrand] = core.hostObjectBrand;
setEventTargetData(port);
port[_id] = id;
+ port[_enabled] = false;
+ port[_messageEventListenerCount] = 0;
+ port[_refed] = false;
return port;
}
@@ -122,12 +130,18 @@ function nodeWorkerThreadMaybeInvokeCloseCb(port) {
}
}
+const _isRefed = Symbol("isRefed");
+const _dataPromise = Symbol("dataPromise");
+
class MessagePort extends EventTarget {
/** @type {number | null} */
[_id] = null;
/** @type {boolean} */
[_enabled] = false;
[_refed] = false;
+ /** @type {Promise<any> | undefined} */
+ [_dataPromise] = undefined;
+ [_messageEventListenerCount] = 0;
constructor() {
super();
@@ -193,24 +207,21 @@ class MessagePort extends EventTarget {
this[_enabled] = true;
while (true) {
if (this[_id] === null) break;
- // Exit if no message event listeners are present in Node compat mode.
- if (
- typeof this[nodeWorkerThreadCloseCb] == "function" &&
- messageEventListenerCount === 0
- ) break;
let data;
try {
- data = await op_message_port_recv_message(
+ this[_dataPromise] = op_message_port_recv_message(
this[_id],
);
+ if (
+ typeof this[nodeWorkerThreadCloseCb] === "function" &&
+ !this[_refed]
+ ) {
+ core.unrefOpPromise(this[_dataPromise]);
+ }
+ data = await this[_dataPromise];
+ this[_dataPromise] = undefined;
} catch (err) {
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) {
- // If we were interrupted, check if the interruption is coming
- // from `receiveMessageOnPort` API from Node compat, if so, continue.
- if (this[MessagePortReceiveMessageOnPortSymbol]) {
- this[MessagePortReceiveMessageOnPortSymbol] = false;
- continue;
- }
break;
}
nodeWorkerThreadMaybeInvokeCloseCb(this);
@@ -246,12 +257,26 @@ class MessagePort extends EventTarget {
}
[refMessagePort](ref) {
- if (ref && !this[_refed]) {
- this[_refed] = true;
- messageEventListenerCount++;
- } else if (!ref && this[_refed]) {
- this[_refed] = false;
- messageEventListenerCount = 0;
+ if (ref) {
+ if (!this[_refed]) {
+ refedMessagePortsCount++;
+ if (
+ this[_dataPromise]
+ ) {
+ core.refOpPromise(this[_dataPromise]);
+ }
+ this[_refed] = true;
+ }
+ } else if (!ref) {
+ if (this[_refed]) {
+ refedMessagePortsCount--;
+ if (
+ this[_dataPromise]
+ ) {
+ core.unrefOpPromise(this[_dataPromise]);
+ }
+ this[_refed] = false;
+ }
}
}
@@ -266,15 +291,20 @@ class MessagePort extends EventTarget {
removeEventListener(...args) {
if (args[0] == "message") {
- messageEventListenerCount--;
+ if (--this[_messageEventListenerCount] === 0 && this[_refed]) {
+ refedMessagePortsCount--;
+ this[_refed] = false;
+ }
}
super.removeEventListener(...new SafeArrayIterator(args));
}
addEventListener(...args) {
if (args[0] == "message") {
- messageEventListenerCount++;
- if (!this[_refed]) this[_refed] = true;
+ if (++this[_messageEventListenerCount] === 1 && !this[_refed]) {
+ refedMessagePortsCount++;
+ this[_refed] = true;
+ }
}
super.addEventListener(...new SafeArrayIterator(args));
}
@@ -295,7 +325,17 @@ class MessagePort extends EventTarget {
}
defineEventHandler(MessagePort.prototype, "message", function (self) {
- self.start();
+ if (self[nodeWorkerThreadCloseCb]) {
+ (async () => {
+ // delay `start()` until he end of this event loop turn, to give `receiveMessageOnPort`
+ // a chance to receive a message first. this is primarily to resolve an issue with
+ // a pattern used in `npm:piscina` that results in an indefinite hang
+ await PromiseResolve();
+ self.start();
+ })();
+ } else {
+ self.start();
+ }
});
defineEventHandler(MessagePort.prototype, "messageerror");
@@ -463,12 +503,12 @@ function structuredClone(value, options) {
export {
deserializeJsMessageData,
MessageChannel,
- messageEventListenerCount,
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
MessagePortReceiveMessageOnPortSymbol,
nodeWorkerThreadCloseCb,
+ refedMessagePortsCount,
serializeJsMessageData,
structuredClone,
};
diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs
index c069037f8..fa299475d 100644
--- a/ext/web/message_port.rs
+++ b/ext/web/message_port.rs
@@ -239,7 +239,6 @@ pub fn op_message_port_recv_message_sync(
#[smi] rid: ResourceId,
) -> Result<Option<JsMessageData>, AnyError> {
let resource = state.resource_table.get::<MessagePortResource>(rid)?;
- resource.cancel.cancel();
let mut rx = resource.port.rx.borrow_mut();
match rx.try_recv() {