diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/node/polyfills/worker_threads.ts | 63 | ||||
-rw-r--r-- | ext/web/13_message_port.js | 29 |
2 files changed, 88 insertions, 4 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index ab3834132..49562d892 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -17,6 +17,7 @@ import { MessagePort, MessagePortIdSymbol, MessagePortPrototype, + nodeWorkerThreadCloseCb, serializeJsMessageData, } from "ext:deno_web/13_message_port.js"; import * as webidl from "ext:deno_webidl/00_webidl.js"; @@ -342,6 +343,15 @@ internals.__initWorkerThreads = ( defaultExport.parentPort = parentPort; defaultExport.threadId = threadId; + for (const obj in workerData as Record<string, unknown>) { + if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, workerData[obj])) { + workerData[obj] = webMessagePortToNodeMessagePort( + workerData[obj] as MessagePort, + ); + break; + } + } + parentPort.off = parentPort.removeListener = function ( this: ParentPort, name, @@ -357,7 +367,22 @@ internals.__initWorkerThreads = ( listener, ) { // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); + const _listener = (ev: any) => { + let message = ev.data; + if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message)) { + message = webMessagePortToNodeMessagePort(message); + } else { + for (const obj in message) { + if ( + ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message[obj]) + ) { + message[obj] = webMessagePortToNodeMessagePort(message[obj]); + break; + } + } + } + return listener(message); + }; listeners.set(listener, _listener); this.addEventListener(name, _listener); return this; @@ -424,10 +449,42 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined { return { message: deserializeJsMessageData(data)[0] }; } +class NodeMessageChannel { + port1: MessagePort; + port2: MessagePort; + + constructor() { + const { port1, port2 } = new MessageChannel(); + this.port1 = webMessagePortToNodeMessagePort(port1); + this.port2 = webMessagePortToNodeMessagePort(port2); + } +} + +function webMessagePortToNodeMessagePort(port: MessagePort) { + port.on = port.addListener = function (this: MessagePort, name, listener) { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + if (name == "message") { + port.onmessage = _listener; + } else if (name == "messageerror") { + port.onmessageerror = _listener; + } else if (name == "close") { + port.addEventListener("close", _listener); + } else { + throw new Error(`Unknown event: "${name}"`); + } + return this; + }; + port[nodeWorkerThreadCloseCb] = () => { + port.dispatchEvent(new Event("close")); + }; + return port; +} + export { BroadcastChannel, - MessageChannel, MessagePort, + NodeMessageChannel as MessageChannel, NodeWorker as Worker, parentPort, threadId, @@ -439,7 +496,7 @@ const defaultExport = { moveMessagePortToContext, receiveMessageOnPort, MessagePort, - MessageChannel, + MessageChannel: NodeMessageChannel, BroadcastChannel, Worker: NodeWorker, getEnvironmentData, diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index d953c52ed..83470c895 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -18,6 +18,7 @@ const { ArrayPrototypeIncludes, ArrayPrototypePush, ObjectPrototypeIsPrototypeOf, + ObjectDefineProperty, Symbol, SymbolFor, SymbolIterator, @@ -85,6 +86,8 @@ const MessageChannelPrototype = MessageChannel.prototype; const _id = Symbol("id"); const MessagePortIdSymbol = _id; const _enabled = Symbol("enabled"); +const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb"); +const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked"); /** * @param {number} id @@ -98,6 +101,16 @@ function createMessagePort(id) { return port; } +function nodeWorkerThreadMaybeInvokeCloseCb(port) { + if ( + typeof port[nodeWorkerThreadCloseCb] == "function" && + !port[nodeWorkerThreadCloseCbInvoked] + ) { + port[nodeWorkerThreadCloseCb](); + port[nodeWorkerThreadCloseCbInvoked] = true; + } +} + class MessagePort extends EventTarget { /** @type {number | null} */ [_id] = null; @@ -106,6 +119,14 @@ class MessagePort extends EventTarget { constructor() { super(); + ObjectDefineProperty(this, nodeWorkerThreadCloseCb, { + value: null, + enumerable: false, + }); + ObjectDefineProperty(this, nodeWorkerThreadCloseCbInvoked, { + value: false, + enumerable: false, + }); webidl.illegalConstructor(); } @@ -160,9 +181,13 @@ class MessagePort extends EventTarget { ); } catch (err) { if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) break; + nodeWorkerThreadMaybeInvokeCloseCb(this); throw err; } - if (data === null) break; + if (data === null) { + nodeWorkerThreadMaybeInvokeCloseCb(this); + break; + } let message, transferables; try { const v = deserializeJsMessageData(data); @@ -193,6 +218,7 @@ class MessagePort extends EventTarget { if (this[_id] !== null) { core.close(this[_id]); this[_id] = null; + nodeWorkerThreadMaybeInvokeCloseCb(this); } } @@ -383,6 +409,7 @@ export { MessagePort, MessagePortIdSymbol, MessagePortPrototype, + nodeWorkerThreadCloseCb, serializeJsMessageData, structuredClone, }; |