summaryrefslogtreecommitdiff
path: root/ext/web/13_message_port.js
diff options
context:
space:
mode:
Diffstat (limited to 'ext/web/13_message_port.js')
-rw-r--r--ext/web/13_message_port.js88
1 files changed, 64 insertions, 24 deletions
diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js
index 04697d6aa..cf72c43e6 100644
--- a/ext/web/13_message_port.js
+++ b/ext/web/13_message_port.js
@@ -22,6 +22,7 @@ const {
Symbol,
SymbolFor,
SymbolIterator,
+ PromiseResolve,
SafeArrayIterator,
TypeError,
} = primordials;
@@ -41,7 +42,10 @@ import {
import { isDetachedBuffer } from "./06_streams.js";
import { DOMException } from "./01_dom_exception.js";
-let messageEventListenerCount = 0;
+// counter of how many message ports are actively refed
+// either due to the existence of "message" event listeners or
+// explicit calls to ref/unref (in the case of node message ports)
+let refedMessagePortsCount = 0;
class MessageChannel {
/** @type {MessagePort} */
@@ -93,6 +97,7 @@ const MessagePortReceiveMessageOnPortSymbol = Symbol(
);
const _enabled = Symbol("enabled");
const _refed = Symbol("refed");
+const _messageEventListenerCount = Symbol("messageEventListenerCount");
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
export const refMessagePort = Symbol("refMessagePort");
@@ -109,6 +114,9 @@ function createMessagePort(id) {
port[core.hostObjectBrand] = core.hostObjectBrand;
setEventTargetData(port);
port[_id] = id;
+ port[_enabled] = false;
+ port[_messageEventListenerCount] = 0;
+ port[_refed] = false;
return port;
}
@@ -122,12 +130,18 @@ function nodeWorkerThreadMaybeInvokeCloseCb(port) {
}
}
+const _isRefed = Symbol("isRefed");
+const _dataPromise = Symbol("dataPromise");
+
class MessagePort extends EventTarget {
/** @type {number | null} */
[_id] = null;
/** @type {boolean} */
[_enabled] = false;
[_refed] = false;
+ /** @type {Promise<any> | undefined} */
+ [_dataPromise] = undefined;
+ [_messageEventListenerCount] = 0;
constructor() {
super();
@@ -193,24 +207,21 @@ class MessagePort extends EventTarget {
this[_enabled] = true;
while (true) {
if (this[_id] === null) break;
- // Exit if no message event listeners are present in Node compat mode.
- if (
- typeof this[nodeWorkerThreadCloseCb] == "function" &&
- messageEventListenerCount === 0
- ) break;
let data;
try {
- data = await op_message_port_recv_message(
+ this[_dataPromise] = op_message_port_recv_message(
this[_id],
);
+ if (
+ typeof this[nodeWorkerThreadCloseCb] === "function" &&
+ !this[_refed]
+ ) {
+ core.unrefOpPromise(this[_dataPromise]);
+ }
+ data = await this[_dataPromise];
+ this[_dataPromise] = undefined;
} catch (err) {
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) {
- // If we were interrupted, check if the interruption is coming
- // from `receiveMessageOnPort` API from Node compat, if so, continue.
- if (this[MessagePortReceiveMessageOnPortSymbol]) {
- this[MessagePortReceiveMessageOnPortSymbol] = false;
- continue;
- }
break;
}
nodeWorkerThreadMaybeInvokeCloseCb(this);
@@ -246,12 +257,26 @@ class MessagePort extends EventTarget {
}
[refMessagePort](ref) {
- if (ref && !this[_refed]) {
- this[_refed] = true;
- messageEventListenerCount++;
- } else if (!ref && this[_refed]) {
- this[_refed] = false;
- messageEventListenerCount = 0;
+ if (ref) {
+ if (!this[_refed]) {
+ refedMessagePortsCount++;
+ if (
+ this[_dataPromise]
+ ) {
+ core.refOpPromise(this[_dataPromise]);
+ }
+ this[_refed] = true;
+ }
+ } else if (!ref) {
+ if (this[_refed]) {
+ refedMessagePortsCount--;
+ if (
+ this[_dataPromise]
+ ) {
+ core.unrefOpPromise(this[_dataPromise]);
+ }
+ this[_refed] = false;
+ }
}
}
@@ -266,15 +291,20 @@ class MessagePort extends EventTarget {
removeEventListener(...args) {
if (args[0] == "message") {
- messageEventListenerCount--;
+ if (--this[_messageEventListenerCount] === 0 && this[_refed]) {
+ refedMessagePortsCount--;
+ this[_refed] = false;
+ }
}
super.removeEventListener(...new SafeArrayIterator(args));
}
addEventListener(...args) {
if (args[0] == "message") {
- messageEventListenerCount++;
- if (!this[_refed]) this[_refed] = true;
+ if (++this[_messageEventListenerCount] === 1 && !this[_refed]) {
+ refedMessagePortsCount++;
+ this[_refed] = true;
+ }
}
super.addEventListener(...new SafeArrayIterator(args));
}
@@ -295,7 +325,17 @@ class MessagePort extends EventTarget {
}
defineEventHandler(MessagePort.prototype, "message", function (self) {
- self.start();
+ if (self[nodeWorkerThreadCloseCb]) {
+ (async () => {
+ // delay `start()` until he end of this event loop turn, to give `receiveMessageOnPort`
+ // a chance to receive a message first. this is primarily to resolve an issue with
+ // a pattern used in `npm:piscina` that results in an indefinite hang
+ await PromiseResolve();
+ self.start();
+ })();
+ } else {
+ self.start();
+ }
});
defineEventHandler(MessagePort.prototype, "messageerror");
@@ -463,12 +503,12 @@ function structuredClone(value, options) {
export {
deserializeJsMessageData,
MessageChannel,
- messageEventListenerCount,
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
MessagePortReceiveMessageOnPortSymbol,
nodeWorkerThreadCloseCb,
+ refedMessagePortsCount,
serializeJsMessageData,
structuredClone,
};