diff options
Diffstat (limited to 'ext/web/13_message_port.js')
-rw-r--r-- | ext/web/13_message_port.js | 88 |
1 files changed, 64 insertions, 24 deletions
diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index 04697d6aa..cf72c43e6 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -22,6 +22,7 @@ const { Symbol, SymbolFor, SymbolIterator, + PromiseResolve, SafeArrayIterator, TypeError, } = primordials; @@ -41,7 +42,10 @@ import { import { isDetachedBuffer } from "./06_streams.js"; import { DOMException } from "./01_dom_exception.js"; -let messageEventListenerCount = 0; +// counter of how many message ports are actively refed +// either due to the existence of "message" event listeners or +// explicit calls to ref/unref (in the case of node message ports) +let refedMessagePortsCount = 0; class MessageChannel { /** @type {MessagePort} */ @@ -93,6 +97,7 @@ const MessagePortReceiveMessageOnPortSymbol = Symbol( ); const _enabled = Symbol("enabled"); const _refed = Symbol("refed"); +const _messageEventListenerCount = Symbol("messageEventListenerCount"); const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb"); const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked"); export const refMessagePort = Symbol("refMessagePort"); @@ -109,6 +114,9 @@ function createMessagePort(id) { port[core.hostObjectBrand] = core.hostObjectBrand; setEventTargetData(port); port[_id] = id; + port[_enabled] = false; + port[_messageEventListenerCount] = 0; + port[_refed] = false; return port; } @@ -122,12 +130,18 @@ function nodeWorkerThreadMaybeInvokeCloseCb(port) { } } +const _isRefed = Symbol("isRefed"); +const _dataPromise = Symbol("dataPromise"); + class MessagePort extends EventTarget { /** @type {number | null} */ [_id] = null; /** @type {boolean} */ [_enabled] = false; [_refed] = false; + /** @type {Promise<any> | undefined} */ + [_dataPromise] = undefined; + [_messageEventListenerCount] = 0; constructor() { super(); @@ -193,24 +207,21 @@ class MessagePort extends EventTarget { this[_enabled] = true; while (true) { if (this[_id] === null) break; - // Exit if no message event listeners are present in Node compat mode. - if ( - typeof this[nodeWorkerThreadCloseCb] == "function" && - messageEventListenerCount === 0 - ) break; let data; try { - data = await op_message_port_recv_message( + this[_dataPromise] = op_message_port_recv_message( this[_id], ); + if ( + typeof this[nodeWorkerThreadCloseCb] === "function" && + !this[_refed] + ) { + core.unrefOpPromise(this[_dataPromise]); + } + data = await this[_dataPromise]; + this[_dataPromise] = undefined; } catch (err) { if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) { - // If we were interrupted, check if the interruption is coming - // from `receiveMessageOnPort` API from Node compat, if so, continue. - if (this[MessagePortReceiveMessageOnPortSymbol]) { - this[MessagePortReceiveMessageOnPortSymbol] = false; - continue; - } break; } nodeWorkerThreadMaybeInvokeCloseCb(this); @@ -246,12 +257,26 @@ class MessagePort extends EventTarget { } [refMessagePort](ref) { - if (ref && !this[_refed]) { - this[_refed] = true; - messageEventListenerCount++; - } else if (!ref && this[_refed]) { - this[_refed] = false; - messageEventListenerCount = 0; + if (ref) { + if (!this[_refed]) { + refedMessagePortsCount++; + if ( + this[_dataPromise] + ) { + core.refOpPromise(this[_dataPromise]); + } + this[_refed] = true; + } + } else if (!ref) { + if (this[_refed]) { + refedMessagePortsCount--; + if ( + this[_dataPromise] + ) { + core.unrefOpPromise(this[_dataPromise]); + } + this[_refed] = false; + } } } @@ -266,15 +291,20 @@ class MessagePort extends EventTarget { removeEventListener(...args) { if (args[0] == "message") { - messageEventListenerCount--; + if (--this[_messageEventListenerCount] === 0 && this[_refed]) { + refedMessagePortsCount--; + this[_refed] = false; + } } super.removeEventListener(...new SafeArrayIterator(args)); } addEventListener(...args) { if (args[0] == "message") { - messageEventListenerCount++; - if (!this[_refed]) this[_refed] = true; + if (++this[_messageEventListenerCount] === 1 && !this[_refed]) { + refedMessagePortsCount++; + this[_refed] = true; + } } super.addEventListener(...new SafeArrayIterator(args)); } @@ -295,7 +325,17 @@ class MessagePort extends EventTarget { } defineEventHandler(MessagePort.prototype, "message", function (self) { - self.start(); + if (self[nodeWorkerThreadCloseCb]) { + (async () => { + // delay `start()` until he end of this event loop turn, to give `receiveMessageOnPort` + // a chance to receive a message first. this is primarily to resolve an issue with + // a pattern used in `npm:piscina` that results in an indefinite hang + await PromiseResolve(); + self.start(); + })(); + } else { + self.start(); + } }); defineEventHandler(MessagePort.prototype, "messageerror"); @@ -463,12 +503,12 @@ function structuredClone(value, options) { export { deserializeJsMessageData, MessageChannel, - messageEventListenerCount, MessagePort, MessagePortIdSymbol, MessagePortPrototype, MessagePortReceiveMessageOnPortSymbol, nodeWorkerThreadCloseCb, + refedMessagePortsCount, serializeJsMessageData, structuredClone, }; |