summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/worker_threads.ts
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills/worker_threads.ts')
-rw-r--r--ext/node/polyfills/worker_threads.ts37
1 files changed, 34 insertions, 3 deletions
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index eaabe9cd7..24bcbe057 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -52,6 +52,16 @@ function debugWT(...args) {
}
}
+interface WorkerOnlineMsg {
+ type: "WORKER_ONLINE";
+}
+
+function isWorkerOnlineMsg(data: unknown): data is WorkerOnlineMsg {
+ return typeof data === "object" && data !== null &&
+ ObjectHasOwn(data, "type") &&
+ (data as { "type": unknown })["type"] === "WORKER_ONLINE";
+}
+
export interface WorkerOptions {
// only for typings
argv?: unknown[];
@@ -81,6 +91,7 @@ class NodeWorker extends EventEmitter {
#refCount = 1;
#messagePromise = undefined;
#controlPromise = undefined;
+ #workerOnline = false;
// "RUNNING" | "CLOSED" | "TERMINATED"
// "TERMINATED" means that any controls or messages received will be
// discarded. "CLOSED" means that we have received a control
@@ -141,6 +152,7 @@ class NodeWorker extends EventEmitter {
workerData: options?.workerData,
environmentData: environmentData,
env: env_,
+ isWorkerThread: true,
}, options?.transferList ?? []);
const id = op_create_worker(
{
@@ -159,8 +171,6 @@ class NodeWorker extends EventEmitter {
this.threadId = id;
this.#pollControl();
this.#pollMessages();
- // https://nodejs.org/api/worker_threads.html#event-online
- this.emit("online");
}
[privateWorkerRef](ref) {
@@ -243,7 +253,17 @@ class NodeWorker extends EventEmitter {
this.emit("messageerror", err);
return;
}
- this.emit("message", message);
+ if (
+ // only emit "online" event once, and since the message
+ // has to come before user messages, we are safe to assume
+ // it came from us
+ !this.#workerOnline && isWorkerOnlineMsg(message)
+ ) {
+ this.#workerOnline = true;
+ this.emit("online");
+ } else {
+ this.emit("message", message);
+ }
}
};
@@ -358,10 +378,12 @@ internals.__initWorkerThreads = (
parentPort = globalThis as ParentPort;
threadId = workerId;
+ let isWorkerThread = false;
if (maybeWorkerMetadata) {
const { 0: metadata, 1: _ } = maybeWorkerMetadata;
workerData = metadata.workerData;
environmentData = metadata.environmentData;
+ isWorkerThread = metadata.isWorkerThread;
const env = metadata.env;
if (env) {
process.env = env;
@@ -425,6 +447,15 @@ internals.__initWorkerThreads = (
parentPort.ref = () => {
parentPort[unrefPollForMessages] = false;
};
+
+ if (isWorkerThread) {
+ // Notify the host that the worker is online
+ parentPort.postMessage(
+ {
+ type: "WORKER_ONLINE",
+ } satisfies WorkerOnlineMsg,
+ );
+ }
}
};