diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2024-03-11 22:18:03 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-11 23:18:03 +0100 |
commit | d69aab62b0789dd54b8c09b54af022a38f060b5b (patch) | |
tree | e99a5a3217d6aeee379bc592bfa33702dc2d6de8 | |
parent | 28b362adfc49324e20af5ecb1530f89eb91c4ed5 (diff) |
fix(ext/node): make worker setup synchronous (#22815)
This commit fixes race condition in "node:worker_threads" module were
the first message did a setup of "threadId", "workerData" and
"environmentData".
Now this data is passed explicitly during workers creation and is set up
before any user code is executed.
Closes https://github.com/denoland/deno/issues/22783
Closes https://github.com/denoland/deno/issues/22672
---------
Co-authored-by: Satya Rohith <me@satyarohith.com>
-rw-r--r-- | cli/worker.rs | 1 | ||||
-rw-r--r-- | ext/node/polyfills/02_init.js | 3 | ||||
-rw-r--r-- | ext/node/polyfills/worker_threads.ts | 72 | ||||
-rw-r--r-- | runtime/js/99_main.js | 12 | ||||
-rw-r--r-- | runtime/ops/worker_host.rs | 3 | ||||
-rw-r--r-- | runtime/web_worker.rs | 15 |
6 files changed, 60 insertions, 46 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index 47658e594..697514477 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -841,6 +841,7 @@ fn create_web_worker_callback( stdio: stdio.clone(), cache_storage_dir, feature_checker, + maybe_worker_metadata: args.maybe_worker_metadata, }; WebWorker::bootstrap_from_options( diff --git a/ext/node/polyfills/02_init.js b/ext/node/polyfills/02_init.js index 84f8a7cdc..04820b837 100644 --- a/ext/node/polyfills/02_init.js +++ b/ext/node/polyfills/02_init.js @@ -14,6 +14,7 @@ function initialize( usesLocalNodeModulesDir, argv0, runningOnMainThread, + maybeWorkerMetadata, ) { if (initialized) { throw Error("Node runtime already initialized"); @@ -38,7 +39,7 @@ function initialize( // FIXME(bartlomieju): not nice to depend on `Deno` namespace here // but it's the only way to get `args` and `version` and this point. internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version); - internals.__initWorkerThreads(runningOnMainThread); + internals.__initWorkerThreads(runningOnMainThread, maybeWorkerMetadata); internals.__setupChildProcessIpcChannel(); // `Deno[Deno.internal].requireImpl` will be unreachable after this line. delete internals.requireImpl; diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 2e379cfaf..15b51aeb4 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -22,7 +22,7 @@ import { import * as webidl from "ext:deno_webidl/00_webidl.js"; import { log } from "ext:runtime/06_util.js"; import { notImplemented } from "ext:deno_node/_utils.ts"; -import { EventEmitter, once } from "node:events"; +import { EventEmitter } from "node:events"; import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; import { isAbsolute, resolve } from "node:path"; @@ -42,7 +42,6 @@ const { SafeRegExp, SafeMap, TypeError, - PromisePrototypeThen, } = primordials; export interface WorkerOptions { @@ -196,6 +195,13 @@ class NodeWorker extends EventEmitter { name = "[worker eval]"; } this.#name = name; + this.threadId = ++threads; + + const serializedWorkerMetadata = serializeJsMessageData({ + workerData: options?.workerData, + environmentData: environmentData, + threadId: this.threadId, + }, options?.transferList ?? []); const id = op_create_worker( { // deno-lint-ignore prefer-primordials @@ -206,16 +212,11 @@ class NodeWorker extends EventEmitter { name: this.#name, workerType: "module", }, + serializedWorkerMetadata, ); this.#id = id; this.#pollControl(); this.#pollMessages(); - - this.postMessage({ - environmentData, - threadId: (this.threadId = ++threads), - workerData: options?.workerData, - }, options?.transferList || []); // https://nodejs.org/api/worker_threads.html#event-online this.emit("online"); } @@ -387,7 +388,10 @@ type ParentPort = typeof self & NodeEventTarget; // deno-lint-ignore no-explicit-any let parentPort: ParentPort = null as any; -internals.__initWorkerThreads = (runningOnMainThread: boolean) => { +internals.__initWorkerThreads = ( + runningOnMainThread: boolean, + maybeWorkerMetadata, +) => { isMainThread = runningOnMainThread; defaultExport.isMainThread = isMainThread; @@ -409,29 +413,15 @@ internals.__initWorkerThreads = (runningOnMainThread: boolean) => { >(); parentPort = self as ParentPort; + if (typeof maybeWorkerMetadata !== "undefined") { + const { 0: metadata, 1: _ } = maybeWorkerMetadata; + workerData = metadata.workerData; + environmentData = metadata.environmentData; + threadId = metadata.threadId; + } + defaultExport.workerData = workerData; defaultExport.parentPort = parentPort; - - const initPromise = PromisePrototypeThen( - once( - parentPort, - "message", - ), - (result) => { - // TODO(bartlomieju): just so we don't error out here. It's still racy, - // but should be addressed by https://github.com/denoland/deno/issues/22783 - // shortly. - const data = result[0].data ?? {}; - // TODO(kt3k): The below values are set asynchronously - // using the first message from the parent. - // This should be done synchronously. - threadId = data.threadId; - workerData = data.workerData; - environmentData = data.environmentData; - - defaultExport.threadId = threadId; - defaultExport.workerData = workerData; - }, - ); + defaultExport.threadId = threadId; parentPort.off = parentPort.removeListener = function ( this: ParentPort, @@ -447,22 +437,18 @@ internals.__initWorkerThreads = (runningOnMainThread: boolean) => { name, listener, ) { - PromisePrototypeThen(initPromise, () => { - // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); - listeners.set(listener, _listener); - this.addEventListener(name, _listener); - }); + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); return this; }; parentPort.once = function (this: ParentPort, name, listener) { - PromisePrototypeThen(initPromise, () => { - // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); - listeners.set(listener, _listener); - this.addEventListener(name, _listener); - }); + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); return this; }; diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 82e444dfd..27ba488e7 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -786,6 +786,7 @@ function bootstrapWorkerRuntime( runtimeOptions, name, internalName, + maybeWorkerMetadata, ) { if (hasBootstrapped) { throw new Error("Worker runtime already bootstrapped"); @@ -908,8 +909,17 @@ function bootstrapWorkerRuntime( // existing global `Deno` with `Deno` namespace from "./deno.ts". ObjectDefineProperty(globalThis, "Deno", core.propReadOnly(finalDenoNs)); + const workerMetadata = maybeWorkerMetadata + ? messagePort.deserializeJsMessageData(maybeWorkerMetadata) + : undefined; + if (nodeBootstrap) { - nodeBootstrap(hasNodeModulesDir, argv0, /* runningOnMainThread */ false); + nodeBootstrap( + hasNodeModulesDir, + argv0, + /* runningOnMainThread */ false, + workerMetadata, + ); } } diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index ee9f0dc5e..d1b318f0f 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -35,6 +35,7 @@ pub struct CreateWebWorkerArgs { pub permissions: PermissionsContainer, pub main_module: ModuleSpecifier, pub worker_type: WebWorkerType, + pub maybe_worker_metadata: Option<JsMessageData>, } pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle) @@ -121,6 +122,7 @@ pub struct CreateWorkerArgs { fn op_create_worker( state: &mut OpState, #[serde] args: CreateWorkerArgs, + #[serde] maybe_worker_metadata: Option<JsMessageData>, ) -> Result<WorkerId, AnyError> { let specifier = args.specifier.clone(); let maybe_source_code = if args.has_source_code { @@ -189,6 +191,7 @@ fn op_create_worker( permissions: worker_permissions, main_module: module_specifier.clone(), worker_type, + maybe_worker_metadata, }); // Send thread safe handle from newly created worker to host thread diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 6571da6c2..f35d38921 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -48,6 +48,7 @@ use deno_terminal::colors; use deno_tls::RootCertStoreProvider; use deno_web::create_entangled_message_port; use deno_web::BlobStore; +use deno_web::JsMessageData; use deno_web::MessagePort; use log::debug; use std::cell::RefCell; @@ -331,6 +332,8 @@ pub struct WebWorker { pub main_module: ModuleSpecifier, poll_for_messages_fn: Option<v8::Global<v8::Value>>, bootstrap_fn_global: Option<v8::Global<v8::Function>>, + // Consumed when `bootstrap_fn` is called + maybe_worker_metadata: Option<JsMessageData>, } pub struct WebWorkerOptions { @@ -356,6 +359,7 @@ pub struct WebWorkerOptions { pub cache_storage_dir: Option<std::path::PathBuf>, pub stdio: Stdio, pub feature_checker: Arc<FeatureChecker>, + pub maybe_worker_metadata: Option<JsMessageData>, } impl WebWorker { @@ -601,6 +605,7 @@ impl WebWorker { main_module, poll_for_messages_fn: None, bootstrap_fn_global: Some(bootstrap_fn_global), + maybe_worker_metadata: options.maybe_worker_metadata, }, external_handle, ) @@ -616,6 +621,10 @@ impl WebWorker { let bootstrap_fn = self.bootstrap_fn_global.take().unwrap(); let bootstrap_fn = v8::Local::new(scope, bootstrap_fn); let undefined = v8::undefined(scope); + let mut worker_data: v8::Local<v8::Value> = v8::undefined(scope).into(); + if let Some(data) = self.maybe_worker_metadata.take() { + worker_data = deno_core::serde_v8::to_v8(scope, data).unwrap(); + } let name_str: v8::Local<v8::Value> = v8::String::new(scope, &self.name).unwrap().into(); let id_str: v8::Local<v8::Value> = @@ -623,7 +632,11 @@ impl WebWorker { .unwrap() .into(); bootstrap_fn - .call(scope, undefined.into(), &[args, name_str, id_str]) + .call( + scope, + undefined.into(), + &[args, name_str, id_str, worker_data], + ) .unwrap(); } // TODO(bartlomieju): this could be done using V8 API, without calling `execute_script`. |