summaryrefslogtreecommitdiff
path: root/cli/npm/managed/cache/value_creator.rs
diff options
context:
space:
mode:
authorDavid Sherret <dsherret@users.noreply.github.com>2024-06-05 11:04:16 -0400
committerGitHub <noreply@github.com>2024-06-05 17:04:16 +0200
commit7ed90a20d04982ae15a52ae2378cbffd4b6839df (patch)
tree3297d6f7227fbf1cf80e17a2a376ef4dfa52e6ad /cli/npm/managed/cache/value_creator.rs
parent0544d60012006b1c7799d8b6eafacec9567901ad (diff)
fix: better handling of npm resolution occurring on workers (#24094)
Closes https://github.com/denoland/deno/issues/24063
Diffstat (limited to 'cli/npm/managed/cache/value_creator.rs')
-rw-r--r--cli/npm/managed/cache/value_creator.rs101
1 files changed, 101 insertions, 0 deletions
diff --git a/cli/npm/managed/cache/value_creator.rs b/cli/npm/managed/cache/value_creator.rs
new file mode 100644
index 000000000..38801b3aa
--- /dev/null
+++ b/cli/npm/managed/cache/value_creator.rs
@@ -0,0 +1,101 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use std::sync::Arc;
+
+use deno_core::error::AnyError;
+use deno_core::futures::future::BoxFuture;
+use deno_core::futures::future::LocalBoxFuture;
+use deno_core::futures::future::Shared;
+use deno_core::futures::FutureExt;
+use deno_core::parking_lot::Mutex;
+use tokio::task::JoinError;
+
+// todo(dsherret): unit test this
+
+type FutureResult<TResult> = Result<TResult, Arc<AnyError>>;
+type JoinResult<TResult> = Result<FutureResult<TResult>, Arc<JoinError>>;
+
+#[derive(Debug)]
+struct State<TResult> {
+ retry_index: usize,
+ future: Shared<BoxFuture<'static, JoinResult<TResult>>>,
+}
+
+/// Attempts to create a shared value asynchronously on one tokio runtime while
+/// many runtimes are requesting the value.
+///
+/// This is only useful when the value needs to get created once across
+/// many runtimes.
+///
+/// This handles the case where one tokio runtime goes down while another
+/// one is still running.
+#[derive(Debug)]
+pub struct MultiRuntimeAsyncValueCreator<TResult: Send + Clone + 'static> {
+ state: Mutex<State<TResult>>,
+}
+
+impl<TResult: Send + Clone + 'static> MultiRuntimeAsyncValueCreator<TResult> {
+ pub fn new(
+ future: LocalBoxFuture<'static, Result<TResult, AnyError>>,
+ ) -> Self {
+ Self {
+ state: Mutex::new(State {
+ retry_index: 0,
+ future: Self::create_shared_future(future),
+ }),
+ }
+ }
+
+ pub async fn get(
+ &self,
+ recreate_future: impl Fn() -> LocalBoxFuture<'static, Result<TResult, AnyError>>,
+ ) -> Result<TResult, Arc<AnyError>> {
+ let (mut future, mut retry_index) = {
+ let state = self.state.lock();
+ (state.future.clone(), state.retry_index)
+ };
+
+ loop {
+ let result = future.await;
+
+ match result {
+ Ok(result) => return result,
+ Err(join_error) => {
+ if join_error.is_cancelled() {
+ let mut state = self.state.lock();
+
+ if state.retry_index == retry_index {
+ // we were the first one to retry, so create a new future
+ // that we'll run from the current runtime
+ state.retry_index += 1;
+ state.future = Self::create_shared_future(recreate_future());
+ }
+
+ retry_index = state.retry_index;
+ future = state.future.clone();
+
+ // just in case we're stuck in a loop
+ if retry_index > 1000 {
+ panic!("Something went wrong.") // should never happen
+ }
+ } else {
+ panic!("{}", join_error);
+ }
+ }
+ }
+ }
+ }
+
+ fn create_shared_future(
+ future: LocalBoxFuture<'static, Result<TResult, AnyError>>,
+ ) -> Shared<BoxFuture<'static, JoinResult<TResult>>> {
+ deno_core::unsync::spawn(future)
+ .map(|result| match result {
+ Ok(Ok(value)) => Ok(Ok(value)),
+ Ok(Err(err)) => Ok(Err(Arc::new(err))),
+ Err(err) => Err(Arc::new(err)),
+ })
+ .boxed()
+ .shared()
+ }
+}