diff options
author | Satya Rohith <me@satyarohith.com> | 2024-04-02 17:06:09 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-02 17:06:09 +0530 |
commit | 4d66ec91c1ca23134dc25f58f41da52a99615a38 (patch) | |
tree | 96b9de0cf5ac664f14ac8fc35f93d2d3da65a87f /ext/node/polyfills/worker_threads.ts | |
parent | 7ad76fd453972e9262985c61840c77b8b8a6dbb7 (diff) |
fix(ext/node): MessagePort works (#22999)
Closes https://github.com/denoland/deno/issues/22951
Closes https://github.com/denoland/deno/issues/23001
Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
Diffstat (limited to 'ext/node/polyfills/worker_threads.ts')
-rw-r--r-- | ext/node/polyfills/worker_threads.ts | 63 |
1 files changed, 60 insertions, 3 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index ab3834132..49562d892 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -17,6 +17,7 @@ import { MessagePort, MessagePortIdSymbol, MessagePortPrototype, + nodeWorkerThreadCloseCb, serializeJsMessageData, } from "ext:deno_web/13_message_port.js"; import * as webidl from "ext:deno_webidl/00_webidl.js"; @@ -342,6 +343,15 @@ internals.__initWorkerThreads = ( defaultExport.parentPort = parentPort; defaultExport.threadId = threadId; + for (const obj in workerData as Record<string, unknown>) { + if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, workerData[obj])) { + workerData[obj] = webMessagePortToNodeMessagePort( + workerData[obj] as MessagePort, + ); + break; + } + } + parentPort.off = parentPort.removeListener = function ( this: ParentPort, name, @@ -357,7 +367,22 @@ internals.__initWorkerThreads = ( listener, ) { // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); + const _listener = (ev: any) => { + let message = ev.data; + if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message)) { + message = webMessagePortToNodeMessagePort(message); + } else { + for (const obj in message) { + if ( + ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message[obj]) + ) { + message[obj] = webMessagePortToNodeMessagePort(message[obj]); + break; + } + } + } + return listener(message); + }; listeners.set(listener, _listener); this.addEventListener(name, _listener); return this; @@ -424,10 +449,42 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined { return { message: deserializeJsMessageData(data)[0] }; } +class NodeMessageChannel { + port1: MessagePort; + port2: MessagePort; + + constructor() { + const { port1, port2 } = new MessageChannel(); + this.port1 = webMessagePortToNodeMessagePort(port1); + this.port2 = webMessagePortToNodeMessagePort(port2); + } +} + +function webMessagePortToNodeMessagePort(port: MessagePort) { + port.on = port.addListener = function (this: MessagePort, name, listener) { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + if (name == "message") { + port.onmessage = _listener; + } else if (name == "messageerror") { + port.onmessageerror = _listener; + } else if (name == "close") { + port.addEventListener("close", _listener); + } else { + throw new Error(`Unknown event: "${name}"`); + } + return this; + }; + port[nodeWorkerThreadCloseCb] = () => { + port.dispatchEvent(new Event("close")); + }; + return port; +} + export { BroadcastChannel, - MessageChannel, MessagePort, + NodeMessageChannel as MessageChannel, NodeWorker as Worker, parentPort, threadId, @@ -439,7 +496,7 @@ const defaultExport = { moveMessagePortToContext, receiveMessageOnPort, MessagePort, - MessageChannel, + MessageChannel: NodeMessageChannel, BroadcastChannel, Worker: NodeWorker, getEnvironmentData, |