diff options
Diffstat (limited to 'ext')
| -rw-r--r-- | ext/node/polyfills/worker_threads.ts | 12 | ||||
| -rw-r--r-- | ext/web/13_message_port.js | 88 | ||||
| -rw-r--r-- | ext/web/message_port.rs | 1 |
3 files changed, 73 insertions, 28 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 5ff4446f7..d4b75fb30 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -302,8 +302,8 @@ class NodeWorker extends EventEmitter { if (this.#status !== "TERMINATED") { this.#status = "TERMINATED"; op_host_terminate_worker(this.#id); + this.emit("exit", 0); } - this.emit("exit", 0); return PromiseResolve(0); } @@ -422,7 +422,11 @@ internals.__initWorkerThreads = ( parentPort.once = function (this: ParentPort, name, listener) { // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); + const _listener = (ev: any) => { + const message = ev.data; + patchMessagePortIfFound(message); + return listener(message); + }; listeners.set(listener, _listener); this.addEventListener(name, _listener); return this; @@ -494,7 +498,9 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined { port[MessagePortReceiveMessageOnPortSymbol] = true; const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]); if (data === null) return undefined; - return { message: deserializeJsMessageData(data)[0] }; + const message = deserializeJsMessageData(data)[0]; + patchMessagePortIfFound(message); + return { message }; } class NodeMessageChannel { diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index 04697d6aa..cf72c43e6 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -22,6 +22,7 @@ const { Symbol, SymbolFor, SymbolIterator, + PromiseResolve, SafeArrayIterator, TypeError, } = primordials; @@ -41,7 +42,10 @@ import { import { isDetachedBuffer } from "./06_streams.js"; import { DOMException } from "./01_dom_exception.js"; -let messageEventListenerCount = 0; +// counter of how many message ports are actively refed +// either due to the existence of "message" event listeners or +// explicit calls to ref/unref (in the case of node message ports) +let refedMessagePortsCount = 0; class MessageChannel { /** @type {MessagePort} */ @@ -93,6 +97,7 @@ const MessagePortReceiveMessageOnPortSymbol = Symbol( ); const _enabled = Symbol("enabled"); const _refed = Symbol("refed"); +const _messageEventListenerCount = Symbol("messageEventListenerCount"); const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb"); const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked"); export const refMessagePort = Symbol("refMessagePort"); @@ -109,6 +114,9 @@ function createMessagePort(id) { port[core.hostObjectBrand] = core.hostObjectBrand; setEventTargetData(port); port[_id] = id; + port[_enabled] = false; + port[_messageEventListenerCount] = 0; + port[_refed] = false; return port; } @@ -122,12 +130,18 @@ function nodeWorkerThreadMaybeInvokeCloseCb(port) { } } +const _isRefed = Symbol("isRefed"); +const _dataPromise = Symbol("dataPromise"); + class MessagePort extends EventTarget { /** @type {number | null} */ [_id] = null; /** @type {boolean} */ [_enabled] = false; [_refed] = false; + /** @type {Promise<any> | undefined} */ + [_dataPromise] = undefined; + [_messageEventListenerCount] = 0; constructor() { super(); @@ -193,24 +207,21 @@ class MessagePort extends EventTarget { this[_enabled] = true; while (true) { if (this[_id] === null) break; - // Exit if no message event listeners are present in Node compat mode. - if ( - typeof this[nodeWorkerThreadCloseCb] == "function" && - messageEventListenerCount === 0 - ) break; let data; try { - data = await op_message_port_recv_message( + this[_dataPromise] = op_message_port_recv_message( this[_id], ); + if ( + typeof this[nodeWorkerThreadCloseCb] === "function" && + !this[_refed] + ) { + core.unrefOpPromise(this[_dataPromise]); + } + data = await this[_dataPromise]; + this[_dataPromise] = undefined; } catch (err) { if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) { - // If we were interrupted, check if the interruption is coming - // from `receiveMessageOnPort` API from Node compat, if so, continue. - if (this[MessagePortReceiveMessageOnPortSymbol]) { - this[MessagePortReceiveMessageOnPortSymbol] = false; - continue; - } break; } nodeWorkerThreadMaybeInvokeCloseCb(this); @@ -246,12 +257,26 @@ class MessagePort extends EventTarget { } [refMessagePort](ref) { - if (ref && !this[_refed]) { - this[_refed] = true; - messageEventListenerCount++; - } else if (!ref && this[_refed]) { - this[_refed] = false; - messageEventListenerCount = 0; + if (ref) { + if (!this[_refed]) { + refedMessagePortsCount++; + if ( + this[_dataPromise] + ) { + core.refOpPromise(this[_dataPromise]); + } + this[_refed] = true; + } + } else if (!ref) { + if (this[_refed]) { + refedMessagePortsCount--; + if ( + this[_dataPromise] + ) { + core.unrefOpPromise(this[_dataPromise]); + } + this[_refed] = false; + } } } @@ -266,15 +291,20 @@ class MessagePort extends EventTarget { removeEventListener(...args) { if (args[0] == "message") { - messageEventListenerCount--; + if (--this[_messageEventListenerCount] === 0 && this[_refed]) { + refedMessagePortsCount--; + this[_refed] = false; + } } super.removeEventListener(...new SafeArrayIterator(args)); } addEventListener(...args) { if (args[0] == "message") { - messageEventListenerCount++; - if (!this[_refed]) this[_refed] = true; + if (++this[_messageEventListenerCount] === 1 && !this[_refed]) { + refedMessagePortsCount++; + this[_refed] = true; + } } super.addEventListener(...new SafeArrayIterator(args)); } @@ -295,7 +325,17 @@ class MessagePort extends EventTarget { } defineEventHandler(MessagePort.prototype, "message", function (self) { - self.start(); + if (self[nodeWorkerThreadCloseCb]) { + (async () => { + // delay `start()` until he end of this event loop turn, to give `receiveMessageOnPort` + // a chance to receive a message first. this is primarily to resolve an issue with + // a pattern used in `npm:piscina` that results in an indefinite hang + await PromiseResolve(); + self.start(); + })(); + } else { + self.start(); + } }); defineEventHandler(MessagePort.prototype, "messageerror"); @@ -463,12 +503,12 @@ function structuredClone(value, options) { export { deserializeJsMessageData, MessageChannel, - messageEventListenerCount, MessagePort, MessagePortIdSymbol, MessagePortPrototype, MessagePortReceiveMessageOnPortSymbol, nodeWorkerThreadCloseCb, + refedMessagePortsCount, serializeJsMessageData, structuredClone, }; diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs index c069037f8..fa299475d 100644 --- a/ext/web/message_port.rs +++ b/ext/web/message_port.rs @@ -239,7 +239,6 @@ pub fn op_message_port_recv_message_sync( #[smi] rid: ResourceId, ) -> Result<Option<JsMessageData>, AnyError> { let resource = state.resource_table.get::<MessagePortResource>(rid)?; - resource.cancel.cancel(); let mut rx = resource.port.rx.borrow_mut(); match rx.try_recv() { |
