diff options
author | David Sherret <dsherret@users.noreply.github.com> | 2024-06-05 11:04:16 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-05 17:04:16 +0200 |
commit | 7ed90a20d04982ae15a52ae2378cbffd4b6839df (patch) | |
tree | 3297d6f7227fbf1cf80e17a2a376ef4dfa52e6ad /cli/npm/managed/cache/registry_info.rs | |
parent | 0544d60012006b1c7799d8b6eafacec9567901ad (diff) |
fix: better handling of npm resolution occurring on workers (#24094)
Closes https://github.com/denoland/deno/issues/24063
Diffstat (limited to 'cli/npm/managed/cache/registry_info.rs')
-rw-r--r-- | cli/npm/managed/cache/registry_info.rs | 107 |
1 files changed, 49 insertions, 58 deletions
diff --git a/cli/npm/managed/cache/registry_info.rs b/cli/npm/managed/cache/registry_info.rs index 24f0a12e7..131b93192 100644 --- a/cli/npm/managed/cache/registry_info.rs +++ b/cli/npm/managed/cache/registry_info.rs @@ -8,8 +8,7 @@ use deno_core::anyhow::bail; use deno_core::anyhow::Context; use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::future::BoxFuture; -use deno_core::futures::future::Shared; +use deno_core::futures::future::LocalBoxFuture; use deno_core::futures::FutureExt; use deno_core::parking_lot::Mutex; use deno_core::serde_json; @@ -23,6 +22,7 @@ use crate::http_util::HttpClientProvider; use crate::npm::common::maybe_auth_header_for_npm_registry; use crate::util::progress_bar::ProgressBar; +use super::value_creator::MultiRuntimeAsyncValueCreator; use super::NpmCache; // todo(dsherret): create seams and unit test this @@ -30,7 +30,7 @@ use super::NpmCache; #[derive(Debug, Clone)] enum MemoryCacheItem { /// The cache item hasn't loaded yet. - PendingFuture(Shared<PendingRegistryLoadFuture>), + Pending(Arc<MultiRuntimeAsyncValueCreator<FutureResult>>), /// The item has loaded in the past and was stored in the file system cache. /// There is no reason to request this package from the npm registry again /// for the duration of execution. @@ -48,7 +48,7 @@ enum FutureResult { } type PendingRegistryLoadFuture = - BoxFuture<'static, Result<FutureResult, Arc<AnyError>>>; + LocalBoxFuture<'static, Result<FutureResult, AnyError>>; /// Downloads packuments from the npm registry. /// @@ -79,7 +79,7 @@ impl RegistryInfoDownloader { } pub async fn load_package_info( - &self, + self: &Arc<Self>, name: &str, ) -> Result<Option<Arc<NpmPackageInfo>>, AnyError> { let registry_url = self.npmrc.get_registry_url(name); @@ -98,7 +98,7 @@ impl RegistryInfoDownloader { } async fn load_package_info_inner( - &self, + self: &Arc<Self>, name: &str, registry_url: &Url, registry_config: &RegistryConfig, @@ -112,18 +112,20 @@ impl RegistryInfoDownloader { )); } - let (created, cache_item) = { + let cache_item = { let mut mem_cache = self.memory_cache.lock(); if let Some(cache_item) = mem_cache.get(name) { - (false, cache_item.clone()) + cache_item.clone() } else { let future = self.create_load_future(name, registry_url, registry_config); - let cache_item = MemoryCacheItem::PendingFuture(future); + let value_creator = MultiRuntimeAsyncValueCreator::new(future); + let cache_item = MemoryCacheItem::Pending(Arc::new(value_creator)); mem_cache.insert(name.to_string(), cache_item.clone()); - (true, cache_item) + cache_item } }; + match cache_item { MemoryCacheItem::FsCached => { // this struct previously loaded from the registry, so we can load it from the file system cache @@ -135,40 +137,35 @@ impl RegistryInfoDownloader { MemoryCacheItem::MemoryCached(maybe_info) => { maybe_info.clone().map_err(|e| anyhow!("{}", e)) } - MemoryCacheItem::PendingFuture(future) => { - if created { - match future.await { - Ok(FutureResult::SavedFsCache(info)) => { - // return back the future and mark this package as having - // been saved in the cache for next time it's requested - *self.memory_cache.lock().get_mut(name).unwrap() = - MemoryCacheItem::FsCached; - Ok(Some(info)) - } - Ok(FutureResult::ErroredFsCache(info)) => { - // since saving to the fs cache failed, keep the package information in memory - *self.memory_cache.lock().get_mut(name).unwrap() = - MemoryCacheItem::MemoryCached(Ok(Some(info.clone()))); - Ok(Some(info)) - } - Ok(FutureResult::PackageNotExists) => { - *self.memory_cache.lock().get_mut(name).unwrap() = - MemoryCacheItem::MemoryCached(Ok(None)); - Ok(None) - } - Err(err) => { - let return_err = anyhow!("{}", err); - *self.memory_cache.lock().get_mut(name).unwrap() = - MemoryCacheItem::MemoryCached(Err(err)); - Err(return_err) - } + MemoryCacheItem::Pending(value_creator) => { + let downloader = self.clone(); + let future = value_creator.get(move || { + downloader.create_load_future(name, registry_url, registry_config) + }); + match future.await { + Ok(FutureResult::SavedFsCache(info)) => { + // return back the future and mark this package as having + // been saved in the cache for next time it's requested + *self.memory_cache.lock().get_mut(name).unwrap() = + MemoryCacheItem::FsCached; + Ok(Some(info)) + } + Ok(FutureResult::ErroredFsCache(info)) => { + // since saving to the fs cache failed, keep the package information in memory + *self.memory_cache.lock().get_mut(name).unwrap() = + MemoryCacheItem::MemoryCached(Ok(Some(info.clone()))); + Ok(Some(info)) } - } else { - match future.await { - Ok(FutureResult::SavedFsCache(info)) => Ok(Some(info)), - Ok(FutureResult::ErroredFsCache(info)) => Ok(Some(info)), - Ok(FutureResult::PackageNotExists) => Ok(None), - Err(err) => Err(anyhow!("{}", err)), + Ok(FutureResult::PackageNotExists) => { + *self.memory_cache.lock().get_mut(name).unwrap() = + MemoryCacheItem::MemoryCached(Ok(None)); + Ok(None) + } + Err(err) => { + let return_err = anyhow!("{}", err); + *self.memory_cache.lock().get_mut(name).unwrap() = + MemoryCacheItem::MemoryCached(Err(err)); + Err(return_err) } } } @@ -203,23 +200,19 @@ impl RegistryInfoDownloader { } fn create_load_future( - &self, + self: &Arc<Self>, name: &str, registry_url: &Url, registry_config: &RegistryConfig, - ) -> Shared<PendingRegistryLoadFuture> { + ) -> PendingRegistryLoadFuture { + let downloader = self.clone(); let package_url = self.get_package_url(name, registry_url); let maybe_auth_header = maybe_auth_header_for_npm_registry(registry_config); let guard = self.progress_bar.update(package_url.as_str()); - let cache = self.cache.clone(); - let http_client_provider = self.http_client_provider.clone(); let name = name.to_string(); - // force this future to be polled on the current runtime because it's not - // safe to share `HttpClient`s across runtimes and because a restart of - // npm resolution might cause this package not to be resolved again - // causing the future to never be polled - deno_core::unsync::spawn(async move { - let maybe_bytes = http_client_provider + async move { + let maybe_bytes = downloader + .http_client_provider .get_or_create()? .download_with_progress(package_url, maybe_auth_header, &guard) .await?; @@ -228,7 +221,7 @@ impl RegistryInfoDownloader { let future_result = deno_core::unsync::spawn_blocking( move || -> Result<FutureResult, AnyError> { let package_info = serde_json::from_slice(&bytes)?; - match cache.save_package_info(&name, &package_info) { + match downloader.cache.save_package_info(&name, &package_info) { Ok(()) => { Ok(FutureResult::SavedFsCache(Arc::new(package_info))) } @@ -248,10 +241,8 @@ impl RegistryInfoDownloader { } None => Ok(FutureResult::PackageNotExists), } - }) - .map(|result| result.unwrap().map_err(Arc::new)) - .boxed() - .shared() + } + .boxed_local() } fn get_package_url(&self, name: &str, registry_url: &Url) -> Url { |