summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/worker_threads.ts
diff options
context:
space:
mode:
authorSatya Rohith <me@satyarohith.com>2024-04-02 17:06:09 +0530
committerGitHub <noreply@github.com>2024-04-02 17:06:09 +0530
commit4d66ec91c1ca23134dc25f58f41da52a99615a38 (patch)
tree96b9de0cf5ac664f14ac8fc35f93d2d3da65a87f /ext/node/polyfills/worker_threads.ts
parent7ad76fd453972e9262985c61840c77b8b8a6dbb7 (diff)
fix(ext/node): MessagePort works (#22999)
Closes https://github.com/denoland/deno/issues/22951 Closes https://github.com/denoland/deno/issues/23001 Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
Diffstat (limited to 'ext/node/polyfills/worker_threads.ts')
-rw-r--r--ext/node/polyfills/worker_threads.ts63
1 files changed, 60 insertions, 3 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index ab3834132..49562d892 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -17,6 +17,7 @@ import {
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
+ nodeWorkerThreadCloseCb,
serializeJsMessageData,
} from "ext:deno_web/13_message_port.js";
import * as webidl from "ext:deno_webidl/00_webidl.js";
@@ -342,6 +343,15 @@ internals.__initWorkerThreads = (
defaultExport.parentPort = parentPort;
defaultExport.threadId = threadId;
+ for (const obj in workerData as Record<string, unknown>) {
+ if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, workerData[obj])) {
+ workerData[obj] = webMessagePortToNodeMessagePort(
+ workerData[obj] as MessagePort,
+ );
+ break;
+ }
+ }
+
parentPort.off = parentPort.removeListener = function (
this: ParentPort,
name,
@@ -357,7 +367,22 @@ internals.__initWorkerThreads = (
listener,
) {
// deno-lint-ignore no-explicit-any
- const _listener = (ev: any) => listener(ev.data);
+ const _listener = (ev: any) => {
+ let message = ev.data;
+ if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message)) {
+ message = webMessagePortToNodeMessagePort(message);
+ } else {
+ for (const obj in message) {
+ if (
+ ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message[obj])
+ ) {
+ message[obj] = webMessagePortToNodeMessagePort(message[obj]);
+ break;
+ }
+ }
+ }
+ return listener(message);
+ };
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
@@ -424,10 +449,42 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined {
return { message: deserializeJsMessageData(data)[0] };
}
+class NodeMessageChannel {
+ port1: MessagePort;
+ port2: MessagePort;
+
+ constructor() {
+ const { port1, port2 } = new MessageChannel();
+ this.port1 = webMessagePortToNodeMessagePort(port1);
+ this.port2 = webMessagePortToNodeMessagePort(port2);
+ }
+}
+
+function webMessagePortToNodeMessagePort(port: MessagePort) {
+ port.on = port.addListener = function (this: MessagePort, name, listener) {
+ // deno-lint-ignore no-explicit-any
+ const _listener = (ev: any) => listener(ev.data);
+ if (name == "message") {
+ port.onmessage = _listener;
+ } else if (name == "messageerror") {
+ port.onmessageerror = _listener;
+ } else if (name == "close") {
+ port.addEventListener("close", _listener);
+ } else {
+ throw new Error(`Unknown event: "${name}"`);
+ }
+ return this;
+ };
+ port[nodeWorkerThreadCloseCb] = () => {
+ port.dispatchEvent(new Event("close"));
+ };
+ return port;
+}
+
export {
BroadcastChannel,
- MessageChannel,
MessagePort,
+ NodeMessageChannel as MessageChannel,
NodeWorker as Worker,
parentPort,
threadId,
@@ -439,7 +496,7 @@ const defaultExport = {
moveMessagePortToContext,
receiveMessageOnPort,
MessagePort,
- MessageChannel,
+ MessageChannel: NodeMessageChannel,
BroadcastChannel,
Worker: NodeWorker,
getEnvironmentData,