summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2024-03-11 22:18:03 +0000
committerGitHub <noreply@github.com>2024-03-11 23:18:03 +0100
commitd69aab62b0789dd54b8c09b54af022a38f060b5b (patch)
treee99a5a3217d6aeee379bc592bfa33702dc2d6de8
parent28b362adfc49324e20af5ecb1530f89eb91c4ed5 (diff)
fix(ext/node): make worker setup synchronous (#22815)
This commit fixes race condition in "node:worker_threads" module were the first message did a setup of "threadId", "workerData" and "environmentData". Now this data is passed explicitly during workers creation and is set up before any user code is executed. Closes https://github.com/denoland/deno/issues/22783 Closes https://github.com/denoland/deno/issues/22672 --------- Co-authored-by: Satya Rohith <me@satyarohith.com>
-rw-r--r--cli/worker.rs1
-rw-r--r--ext/node/polyfills/02_init.js3
-rw-r--r--ext/node/polyfills/worker_threads.ts72
-rw-r--r--runtime/js/99_main.js12
-rw-r--r--runtime/ops/worker_host.rs3
-rw-r--r--runtime/web_worker.rs15
6 files changed, 60 insertions, 46 deletions
diff --git a/cli/worker.rs b/cli/worker.rs
index 47658e594..697514477 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -841,6 +841,7 @@ fn create_web_worker_callback(
stdio: stdio.clone(),
cache_storage_dir,
feature_checker,
+ maybe_worker_metadata: args.maybe_worker_metadata,
};
WebWorker::bootstrap_from_options(
diff --git a/ext/node/polyfills/02_init.js b/ext/node/polyfills/02_init.js
index 84f8a7cdc..04820b837 100644
--- a/ext/node/polyfills/02_init.js
+++ b/ext/node/polyfills/02_init.js
@@ -14,6 +14,7 @@ function initialize(
usesLocalNodeModulesDir,
argv0,
runningOnMainThread,
+ maybeWorkerMetadata,
) {
if (initialized) {
throw Error("Node runtime already initialized");
@@ -38,7 +39,7 @@ function initialize(
// FIXME(bartlomieju): not nice to depend on `Deno` namespace here
// but it's the only way to get `args` and `version` and this point.
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
- internals.__initWorkerThreads(runningOnMainThread);
+ internals.__initWorkerThreads(runningOnMainThread, maybeWorkerMetadata);
internals.__setupChildProcessIpcChannel();
// `Deno[Deno.internal].requireImpl` will be unreachable after this line.
delete internals.requireImpl;
diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts
index 2e379cfaf..15b51aeb4 100644
--- a/ext/node/polyfills/worker_threads.ts
+++ b/ext/node/polyfills/worker_threads.ts
@@ -22,7 +22,7 @@ import {
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { log } from "ext:runtime/06_util.js";
import { notImplemented } from "ext:deno_node/_utils.ts";
-import { EventEmitter, once } from "node:events";
+import { EventEmitter } from "node:events";
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import { isAbsolute, resolve } from "node:path";
@@ -42,7 +42,6 @@ const {
SafeRegExp,
SafeMap,
TypeError,
- PromisePrototypeThen,
} = primordials;
export interface WorkerOptions {
@@ -196,6 +195,13 @@ class NodeWorker extends EventEmitter {
name = "[worker eval]";
}
this.#name = name;
+ this.threadId = ++threads;
+
+ const serializedWorkerMetadata = serializeJsMessageData({
+ workerData: options?.workerData,
+ environmentData: environmentData,
+ threadId: this.threadId,
+ }, options?.transferList ?? []);
const id = op_create_worker(
{
// deno-lint-ignore prefer-primordials
@@ -206,16 +212,11 @@ class NodeWorker extends EventEmitter {
name: this.#name,
workerType: "module",
},
+ serializedWorkerMetadata,
);
this.#id = id;
this.#pollControl();
this.#pollMessages();
-
- this.postMessage({
- environmentData,
- threadId: (this.threadId = ++threads),
- workerData: options?.workerData,
- }, options?.transferList || []);
// https://nodejs.org/api/worker_threads.html#event-online
this.emit("online");
}
@@ -387,7 +388,10 @@ type ParentPort = typeof self & NodeEventTarget;
// deno-lint-ignore no-explicit-any
let parentPort: ParentPort = null as any;
-internals.__initWorkerThreads = (runningOnMainThread: boolean) => {
+internals.__initWorkerThreads = (
+ runningOnMainThread: boolean,
+ maybeWorkerMetadata,
+) => {
isMainThread = runningOnMainThread;
defaultExport.isMainThread = isMainThread;
@@ -409,29 +413,15 @@ internals.__initWorkerThreads = (runningOnMainThread: boolean) => {
>();
parentPort = self as ParentPort;
+ if (typeof maybeWorkerMetadata !== "undefined") {
+ const { 0: metadata, 1: _ } = maybeWorkerMetadata;
+ workerData = metadata.workerData;
+ environmentData = metadata.environmentData;
+ threadId = metadata.threadId;
+ }
+ defaultExport.workerData = workerData;
defaultExport.parentPort = parentPort;
-
- const initPromise = PromisePrototypeThen(
- once(
- parentPort,
- "message",
- ),
- (result) => {
- // TODO(bartlomieju): just so we don't error out here. It's still racy,
- // but should be addressed by https://github.com/denoland/deno/issues/22783
- // shortly.
- const data = result[0].data ?? {};
- // TODO(kt3k): The below values are set asynchronously
- // using the first message from the parent.
- // This should be done synchronously.
- threadId = data.threadId;
- workerData = data.workerData;
- environmentData = data.environmentData;
-
- defaultExport.threadId = threadId;
- defaultExport.workerData = workerData;
- },
- );
+ defaultExport.threadId = threadId;
parentPort.off = parentPort.removeListener = function (
this: ParentPort,
@@ -447,22 +437,18 @@ internals.__initWorkerThreads = (runningOnMainThread: boolean) => {
name,
listener,
) {
- PromisePrototypeThen(initPromise, () => {
- // deno-lint-ignore no-explicit-any
- const _listener = (ev: any) => listener(ev.data);
- listeners.set(listener, _listener);
- this.addEventListener(name, _listener);
- });
+ // deno-lint-ignore no-explicit-any
+ const _listener = (ev: any) => listener(ev.data);
+ listeners.set(listener, _listener);
+ this.addEventListener(name, _listener);
return this;
};
parentPort.once = function (this: ParentPort, name, listener) {
- PromisePrototypeThen(initPromise, () => {
- // deno-lint-ignore no-explicit-any
- const _listener = (ev: any) => listener(ev.data);
- listeners.set(listener, _listener);
- this.addEventListener(name, _listener);
- });
+ // deno-lint-ignore no-explicit-any
+ const _listener = (ev: any) => listener(ev.data);
+ listeners.set(listener, _listener);
+ this.addEventListener(name, _listener);
return this;
};
diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js
index 82e444dfd..27ba488e7 100644
--- a/runtime/js/99_main.js
+++ b/runtime/js/99_main.js
@@ -786,6 +786,7 @@ function bootstrapWorkerRuntime(
runtimeOptions,
name,
internalName,
+ maybeWorkerMetadata,
) {
if (hasBootstrapped) {
throw new Error("Worker runtime already bootstrapped");
@@ -908,8 +909,17 @@ function bootstrapWorkerRuntime(
// existing global `Deno` with `Deno` namespace from "./deno.ts".
ObjectDefineProperty(globalThis, "Deno", core.propReadOnly(finalDenoNs));
+ const workerMetadata = maybeWorkerMetadata
+ ? messagePort.deserializeJsMessageData(maybeWorkerMetadata)
+ : undefined;
+
if (nodeBootstrap) {
- nodeBootstrap(hasNodeModulesDir, argv0, /* runningOnMainThread */ false);
+ nodeBootstrap(
+ hasNodeModulesDir,
+ argv0,
+ /* runningOnMainThread */ false,
+ workerMetadata,
+ );
}
}
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index ee9f0dc5e..d1b318f0f 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -35,6 +35,7 @@ pub struct CreateWebWorkerArgs {
pub permissions: PermissionsContainer,
pub main_module: ModuleSpecifier,
pub worker_type: WebWorkerType,
+ pub maybe_worker_metadata: Option<JsMessageData>,
}
pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
@@ -121,6 +122,7 @@ pub struct CreateWorkerArgs {
fn op_create_worker(
state: &mut OpState,
#[serde] args: CreateWorkerArgs,
+ #[serde] maybe_worker_metadata: Option<JsMessageData>,
) -> Result<WorkerId, AnyError> {
let specifier = args.specifier.clone();
let maybe_source_code = if args.has_source_code {
@@ -189,6 +191,7 @@ fn op_create_worker(
permissions: worker_permissions,
main_module: module_specifier.clone(),
worker_type,
+ maybe_worker_metadata,
});
// Send thread safe handle from newly created worker to host thread
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 6571da6c2..f35d38921 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -48,6 +48,7 @@ use deno_terminal::colors;
use deno_tls::RootCertStoreProvider;
use deno_web::create_entangled_message_port;
use deno_web::BlobStore;
+use deno_web::JsMessageData;
use deno_web::MessagePort;
use log::debug;
use std::cell::RefCell;
@@ -331,6 +332,8 @@ pub struct WebWorker {
pub main_module: ModuleSpecifier,
poll_for_messages_fn: Option<v8::Global<v8::Value>>,
bootstrap_fn_global: Option<v8::Global<v8::Function>>,
+ // Consumed when `bootstrap_fn` is called
+ maybe_worker_metadata: Option<JsMessageData>,
}
pub struct WebWorkerOptions {
@@ -356,6 +359,7 @@ pub struct WebWorkerOptions {
pub cache_storage_dir: Option<std::path::PathBuf>,
pub stdio: Stdio,
pub feature_checker: Arc<FeatureChecker>,
+ pub maybe_worker_metadata: Option<JsMessageData>,
}
impl WebWorker {
@@ -601,6 +605,7 @@ impl WebWorker {
main_module,
poll_for_messages_fn: None,
bootstrap_fn_global: Some(bootstrap_fn_global),
+ maybe_worker_metadata: options.maybe_worker_metadata,
},
external_handle,
)
@@ -616,6 +621,10 @@ impl WebWorker {
let bootstrap_fn = self.bootstrap_fn_global.take().unwrap();
let bootstrap_fn = v8::Local::new(scope, bootstrap_fn);
let undefined = v8::undefined(scope);
+ let mut worker_data: v8::Local<v8::Value> = v8::undefined(scope).into();
+ if let Some(data) = self.maybe_worker_metadata.take() {
+ worker_data = deno_core::serde_v8::to_v8(scope, data).unwrap();
+ }
let name_str: v8::Local<v8::Value> =
v8::String::new(scope, &self.name).unwrap().into();
let id_str: v8::Local<v8::Value> =
@@ -623,7 +632,11 @@ impl WebWorker {
.unwrap()
.into();
bootstrap_fn
- .call(scope, undefined.into(), &[args, name_str, id_str])
+ .call(
+ scope,
+ undefined.into(),
+ &[args, name_str, id_str, worker_data],
+ )
.unwrap();
}
// TODO(bartlomieju): this could be done using V8 API, without calling `execute_script`.