diff options
author | David Sherret <dsherret@users.noreply.github.com> | 2024-06-05 11:04:16 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-05 17:04:16 +0200 |
commit | 7ed90a20d04982ae15a52ae2378cbffd4b6839df (patch) | |
tree | 3297d6f7227fbf1cf80e17a2a376ef4dfa52e6ad /cli/npm/managed/cache | |
parent | 0544d60012006b1c7799d8b6eafacec9567901ad (diff) |
fix: better handling of npm resolution occurring on workers (#24094)
Closes https://github.com/denoland/deno/issues/24063
Diffstat (limited to 'cli/npm/managed/cache')
-rw-r--r-- | cli/npm/managed/cache/mod.rs | 1 | ||||
-rw-r--r-- | cli/npm/managed/cache/registry_info.rs | 107 | ||||
-rw-r--r-- | cli/npm/managed/cache/tarball.rs | 92 | ||||
-rw-r--r-- | cli/npm/managed/cache/value_creator.rs | 101 |
4 files changed, 195 insertions, 106 deletions
diff --git a/cli/npm/managed/cache/mod.rs b/cli/npm/managed/cache/mod.rs index f409744b9..531fd0988 100644 --- a/cli/npm/managed/cache/mod.rs +++ b/cli/npm/managed/cache/mod.rs @@ -28,6 +28,7 @@ use crate::util::fs::hard_link_dir_recursive; mod registry_info; mod tarball; mod tarball_extract; +mod value_creator; pub use registry_info::RegistryInfoDownloader; pub use tarball::TarballCache; diff --git a/cli/npm/managed/cache/registry_info.rs b/cli/npm/managed/cache/registry_info.rs index 24f0a12e7..131b93192 100644 --- a/cli/npm/managed/cache/registry_info.rs +++ b/cli/npm/managed/cache/registry_info.rs @@ -8,8 +8,7 @@ use deno_core::anyhow::bail; use deno_core::anyhow::Context; use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::future::BoxFuture; -use deno_core::futures::future::Shared; +use deno_core::futures::future::LocalBoxFuture; use deno_core::futures::FutureExt; use deno_core::parking_lot::Mutex; use deno_core::serde_json; @@ -23,6 +22,7 @@ use crate::http_util::HttpClientProvider; use crate::npm::common::maybe_auth_header_for_npm_registry; use crate::util::progress_bar::ProgressBar; +use super::value_creator::MultiRuntimeAsyncValueCreator; use super::NpmCache; // todo(dsherret): create seams and unit test this @@ -30,7 +30,7 @@ use super::NpmCache; #[derive(Debug, Clone)] enum MemoryCacheItem { /// The cache item hasn't loaded yet. - PendingFuture(Shared<PendingRegistryLoadFuture>), + Pending(Arc<MultiRuntimeAsyncValueCreator<FutureResult>>), /// The item has loaded in the past and was stored in the file system cache. /// There is no reason to request this package from the npm registry again /// for the duration of execution. @@ -48,7 +48,7 @@ enum FutureResult { } type PendingRegistryLoadFuture = - BoxFuture<'static, Result<FutureResult, Arc<AnyError>>>; + LocalBoxFuture<'static, Result<FutureResult, AnyError>>; /// Downloads packuments from the npm registry. /// @@ -79,7 +79,7 @@ impl RegistryInfoDownloader { } pub async fn load_package_info( - &self, + self: &Arc<Self>, name: &str, ) -> Result<Option<Arc<NpmPackageInfo>>, AnyError> { let registry_url = self.npmrc.get_registry_url(name); @@ -98,7 +98,7 @@ impl RegistryInfoDownloader { } async fn load_package_info_inner( - &self, + self: &Arc<Self>, name: &str, registry_url: &Url, registry_config: &RegistryConfig, @@ -112,18 +112,20 @@ impl RegistryInfoDownloader { )); } - let (created, cache_item) = { + let cache_item = { let mut mem_cache = self.memory_cache.lock(); if let Some(cache_item) = mem_cache.get(name) { - (false, cache_item.clone()) + cache_item.clone() } else { let future = self.create_load_future(name, registry_url, registry_config); - let cache_item = MemoryCacheItem::PendingFuture(future); + let value_creator = MultiRuntimeAsyncValueCreator::new(future); + let cache_item = MemoryCacheItem::Pending(Arc::new(value_creator)); mem_cache.insert(name.to_string(), cache_item.clone()); - (true, cache_item) + cache_item } }; + match cache_item { MemoryCacheItem::FsCached => { // this struct previously loaded from the registry, so we can load it from the file system cache @@ -135,40 +137,35 @@ impl RegistryInfoDownloader { MemoryCacheItem::MemoryCached(maybe_info) => { maybe_info.clone().map_err(|e| anyhow!("{}", e)) } - MemoryCacheItem::PendingFuture(future) => { - if created { - match future.await { - Ok(FutureResult::SavedFsCache(info)) => { - // return back the future and mark this package as having - // been saved in the cache for next time it's requested - *self.memory_cache.lock().get_mut(name).unwrap() = - MemoryCacheItem::FsCached; - Ok(Some(info)) - } - Ok(FutureResult::ErroredFsCache(info)) => { - // since saving to the fs cache failed, keep the package information in memory - *self.memory_cache.lock().get_mut(name).unwrap() = - MemoryCacheItem::MemoryCached(Ok(Some(info.clone()))); - Ok(Some(info)) - } - Ok(FutureResult::PackageNotExists) => { - *self.memory_cache.lock().get_mut(name).unwrap() = - MemoryCacheItem::MemoryCached(Ok(None)); - Ok(None) - } - Err(err) => { - let return_err = anyhow!("{}", err); - *self.memory_cache.lock().get_mut(name).unwrap() = - MemoryCacheItem::MemoryCached(Err(err)); - Err(return_err) - } + MemoryCacheItem::Pending(value_creator) => { + let downloader = self.clone(); + let future = value_creator.get(move || { + downloader.create_load_future(name, registry_url, registry_config) + }); + match future.await { + Ok(FutureResult::SavedFsCache(info)) => { + // return back the future and mark this package as having + // been saved in the cache for next time it's requested + *self.memory_cache.lock().get_mut(name).unwrap() = + MemoryCacheItem::FsCached; + Ok(Some(info)) + } + Ok(FutureResult::ErroredFsCache(info)) => { + // since saving to the fs cache failed, keep the package information in memory + *self.memory_cache.lock().get_mut(name).unwrap() = + MemoryCacheItem::MemoryCached(Ok(Some(info.clone()))); + Ok(Some(info)) } - } else { - match future.await { - Ok(FutureResult::SavedFsCache(info)) => Ok(Some(info)), - Ok(FutureResult::ErroredFsCache(info)) => Ok(Some(info)), - Ok(FutureResult::PackageNotExists) => Ok(None), - Err(err) => Err(anyhow!("{}", err)), + Ok(FutureResult::PackageNotExists) => { + *self.memory_cache.lock().get_mut(name).unwrap() = + MemoryCacheItem::MemoryCached(Ok(None)); + Ok(None) + } + Err(err) => { + let return_err = anyhow!("{}", err); + *self.memory_cache.lock().get_mut(name).unwrap() = + MemoryCacheItem::MemoryCached(Err(err)); + Err(return_err) } } } @@ -203,23 +200,19 @@ impl RegistryInfoDownloader { } fn create_load_future( - &self, + self: &Arc<Self>, name: &str, registry_url: &Url, registry_config: &RegistryConfig, - ) -> Shared<PendingRegistryLoadFuture> { + ) -> PendingRegistryLoadFuture { + let downloader = self.clone(); let package_url = self.get_package_url(name, registry_url); let maybe_auth_header = maybe_auth_header_for_npm_registry(registry_config); let guard = self.progress_bar.update(package_url.as_str()); - let cache = self.cache.clone(); - let http_client_provider = self.http_client_provider.clone(); let name = name.to_string(); - // force this future to be polled on the current runtime because it's not - // safe to share `HttpClient`s across runtimes and because a restart of - // npm resolution might cause this package not to be resolved again - // causing the future to never be polled - deno_core::unsync::spawn(async move { - let maybe_bytes = http_client_provider + async move { + let maybe_bytes = downloader + .http_client_provider .get_or_create()? .download_with_progress(package_url, maybe_auth_header, &guard) .await?; @@ -228,7 +221,7 @@ impl RegistryInfoDownloader { let future_result = deno_core::unsync::spawn_blocking( move || -> Result<FutureResult, AnyError> { let package_info = serde_json::from_slice(&bytes)?; - match cache.save_package_info(&name, &package_info) { + match downloader.cache.save_package_info(&name, &package_info) { Ok(()) => { Ok(FutureResult::SavedFsCache(Arc::new(package_info))) } @@ -248,10 +241,8 @@ impl RegistryInfoDownloader { } None => Ok(FutureResult::PackageNotExists), } - }) - .map(|result| result.unwrap().map_err(Arc::new)) - .boxed() - .shared() + } + .boxed_local() } fn get_package_url(&self, name: &str, registry_url: &Url) -> Url { diff --git a/cli/npm/managed/cache/tarball.rs b/cli/npm/managed/cache/tarball.rs index a116ad1cf..e65578ff3 100644 --- a/cli/npm/managed/cache/tarball.rs +++ b/cli/npm/managed/cache/tarball.rs @@ -8,8 +8,7 @@ use deno_core::anyhow::bail; use deno_core::anyhow::Context; use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::future::BoxFuture; -use deno_core::futures::future::Shared; +use deno_core::futures::future::LocalBoxFuture; use deno_core::futures::FutureExt; use deno_core::parking_lot::Mutex; use deno_npm::npm_rc::ResolvedNpmRc; @@ -24,6 +23,7 @@ use crate::util::progress_bar::ProgressBar; use super::tarball_extract::verify_and_extract_tarball; use super::tarball_extract::TarballExtractionMode; +use super::value_creator::MultiRuntimeAsyncValueCreator; use super::NpmCache; // todo(dsherret): create seams and unit test this @@ -31,7 +31,7 @@ use super::NpmCache; #[derive(Debug, Clone)] enum MemoryCacheItem { /// The cache item hasn't finished yet. - PendingFuture(Shared<BoxFuture<'static, Result<(), Arc<AnyError>>>>), + Pending(Arc<MultiRuntimeAsyncValueCreator<()>>), /// The result errored. Errored(Arc<AnyError>), /// This package has already been cached. @@ -71,7 +71,7 @@ impl TarballCache { } pub async fn ensure_package( - &self, + self: &Arc<Self>, package: &PackageNv, dist: &NpmPackageVersionDistInfo, ) -> Result<(), AnyError> { @@ -82,69 +82,67 @@ impl TarballCache { } async fn ensure_package_inner( - &self, + self: &Arc<Self>, package_nv: &PackageNv, dist: &NpmPackageVersionDistInfo, ) -> Result<(), AnyError> { - let (created, cache_item) = { + let cache_item = { let mut mem_cache = self.memory_cache.lock(); if let Some(cache_item) = mem_cache.get(package_nv) { - (false, cache_item.clone()) + cache_item.clone() } else { let future = self.create_setup_future(package_nv.clone(), dist.clone()); - let cache_item = MemoryCacheItem::PendingFuture(future); + let value_creator = MultiRuntimeAsyncValueCreator::new(future); + let cache_item = MemoryCacheItem::Pending(Arc::new(value_creator)); mem_cache.insert(package_nv.clone(), cache_item.clone()); - (true, cache_item) + cache_item } }; match cache_item { MemoryCacheItem::Cached => Ok(()), MemoryCacheItem::Errored(err) => Err(anyhow!("{}", err)), - MemoryCacheItem::PendingFuture(future) => { - if created { - match future.await { - Ok(_) => { - *self.memory_cache.lock().get_mut(package_nv).unwrap() = - MemoryCacheItem::Cached; - Ok(()) - } - Err(err) => { - let result_err = anyhow!("{}", err); - *self.memory_cache.lock().get_mut(package_nv).unwrap() = - MemoryCacheItem::Errored(err); - Err(result_err) - } + MemoryCacheItem::Pending(creator) => { + let tarball_cache = self.clone(); + let result = creator + .get(move || { + tarball_cache.create_setup_future(package_nv.clone(), dist.clone()) + }) + .await; + match result { + Ok(_) => { + *self.memory_cache.lock().get_mut(package_nv).unwrap() = + MemoryCacheItem::Cached; + Ok(()) + } + Err(err) => { + let result_err = anyhow!("{}", err); + *self.memory_cache.lock().get_mut(package_nv).unwrap() = + MemoryCacheItem::Errored(err); + Err(result_err) } - } else { - future.await.map_err(|err| anyhow!("{}", err)) } } } } fn create_setup_future( - &self, + self: &Arc<Self>, package_nv: PackageNv, dist: NpmPackageVersionDistInfo, - ) -> Shared<BoxFuture<'static, Result<(), Arc<AnyError>>>> { - let registry_url = self.npmrc.get_registry_url(&package_nv.name); - let registry_config = - self.npmrc.get_registry_config(&package_nv.name).clone(); - - let cache = self.cache.clone(); - let fs = self.fs.clone(); - let progress_bar = self.progress_bar.clone(); - let package_folder = - cache.package_folder_for_nv_and_url(&package_nv, registry_url); - let http_client_provider = self.http_client_provider.clone(); - - deno_core::unsync::spawn(async move { - let should_use_cache = cache.should_use_cache_for_package(&package_nv); - let package_folder_exists = fs.exists_sync(&package_folder); + ) -> LocalBoxFuture<'static, Result<(), AnyError>> { + let tarball_cache = self.clone(); + async move { + let registry_url = tarball_cache.npmrc.get_registry_url(&package_nv.name); + let registry_config = + tarball_cache.npmrc.get_registry_config(&package_nv.name).clone(); + let package_folder = + tarball_cache.cache.package_folder_for_nv_and_url(&package_nv, registry_url); + let should_use_cache = tarball_cache.cache.should_use_cache_for_package(&package_nv); + let package_folder_exists = tarball_cache.fs.exists_sync(&package_folder); if should_use_cache && package_folder_exists { return Ok(()); - } else if cache.cache_setting() == &CacheSetting::Only { + } else if tarball_cache.cache.cache_setting() == &CacheSetting::Only { return Err(custom_error( "NotCached", format!( @@ -162,8 +160,9 @@ impl TarballCache { let maybe_auth_header = maybe_auth_header_for_npm_registry(®istry_config); - let guard = progress_bar.update(&dist.tarball); - let maybe_bytes = http_client_provider.get_or_create()? + let guard = tarball_cache.progress_bar.update(&dist.tarball); + let maybe_bytes = tarball_cache.http_client_provider + .get_or_create()? .download_with_progress(&dist.tarball, maybe_auth_header, &guard) .await?; match maybe_bytes { @@ -198,9 +197,6 @@ impl TarballCache { bail!("Could not find npm package tarball at: {}", dist.tarball); } } - }) - .map(|result| result.unwrap().map_err(Arc::new)) - .boxed() - .shared() + }.boxed_local() } } diff --git a/cli/npm/managed/cache/value_creator.rs b/cli/npm/managed/cache/value_creator.rs new file mode 100644 index 000000000..38801b3aa --- /dev/null +++ b/cli/npm/managed/cache/value_creator.rs @@ -0,0 +1,101 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use std::sync::Arc; + +use deno_core::error::AnyError; +use deno_core::futures::future::BoxFuture; +use deno_core::futures::future::LocalBoxFuture; +use deno_core::futures::future::Shared; +use deno_core::futures::FutureExt; +use deno_core::parking_lot::Mutex; +use tokio::task::JoinError; + +// todo(dsherret): unit test this + +type FutureResult<TResult> = Result<TResult, Arc<AnyError>>; +type JoinResult<TResult> = Result<FutureResult<TResult>, Arc<JoinError>>; + +#[derive(Debug)] +struct State<TResult> { + retry_index: usize, + future: Shared<BoxFuture<'static, JoinResult<TResult>>>, +} + +/// Attempts to create a shared value asynchronously on one tokio runtime while +/// many runtimes are requesting the value. +/// +/// This is only useful when the value needs to get created once across +/// many runtimes. +/// +/// This handles the case where one tokio runtime goes down while another +/// one is still running. +#[derive(Debug)] +pub struct MultiRuntimeAsyncValueCreator<TResult: Send + Clone + 'static> { + state: Mutex<State<TResult>>, +} + +impl<TResult: Send + Clone + 'static> MultiRuntimeAsyncValueCreator<TResult> { + pub fn new( + future: LocalBoxFuture<'static, Result<TResult, AnyError>>, + ) -> Self { + Self { + state: Mutex::new(State { + retry_index: 0, + future: Self::create_shared_future(future), + }), + } + } + + pub async fn get( + &self, + recreate_future: impl Fn() -> LocalBoxFuture<'static, Result<TResult, AnyError>>, + ) -> Result<TResult, Arc<AnyError>> { + let (mut future, mut retry_index) = { + let state = self.state.lock(); + (state.future.clone(), state.retry_index) + }; + + loop { + let result = future.await; + + match result { + Ok(result) => return result, + Err(join_error) => { + if join_error.is_cancelled() { + let mut state = self.state.lock(); + + if state.retry_index == retry_index { + // we were the first one to retry, so create a new future + // that we'll run from the current runtime + state.retry_index += 1; + state.future = Self::create_shared_future(recreate_future()); + } + + retry_index = state.retry_index; + future = state.future.clone(); + + // just in case we're stuck in a loop + if retry_index > 1000 { + panic!("Something went wrong.") // should never happen + } + } else { + panic!("{}", join_error); + } + } + } + } + } + + fn create_shared_future( + future: LocalBoxFuture<'static, Result<TResult, AnyError>>, + ) -> Shared<BoxFuture<'static, JoinResult<TResult>>> { + deno_core::unsync::spawn(future) + .map(|result| match result { + Ok(Ok(value)) => Ok(Ok(value)), + Ok(Err(err)) => Ok(Err(Arc::new(err))), + Err(err) => Err(Arc::new(err)), + }) + .boxed() + .shared() + } +} |