diff options
Diffstat (limited to 'cli/npm')
-rw-r--r-- | cli/npm/managed/cache/mod.rs | 1 | ||||
-rw-r--r-- | cli/npm/managed/cache/registry_info.rs | 75 | ||||
-rw-r--r-- | cli/npm/managed/cache/tarball.rs | 30 | ||||
-rw-r--r-- | cli/npm/managed/cache/value_creator.rs | 101 | ||||
-rw-r--r-- | cli/npm/managed/resolution.rs | 39 |
5 files changed, 69 insertions, 177 deletions
diff --git a/cli/npm/managed/cache/mod.rs b/cli/npm/managed/cache/mod.rs index 531fd0988..f409744b9 100644 --- a/cli/npm/managed/cache/mod.rs +++ b/cli/npm/managed/cache/mod.rs @@ -28,7 +28,6 @@ use crate::util::fs::hard_link_dir_recursive; mod registry_info; mod tarball; mod tarball_extract; -mod value_creator; pub use registry_info::RegistryInfoDownloader; pub use tarball::TarballCache; diff --git a/cli/npm/managed/cache/registry_info.rs b/cli/npm/managed/cache/registry_info.rs index 131b93192..d7675a34f 100644 --- a/cli/npm/managed/cache/registry_info.rs +++ b/cli/npm/managed/cache/registry_info.rs @@ -13,7 +13,6 @@ use deno_core::futures::FutureExt; use deno_core::parking_lot::Mutex; use deno_core::serde_json; use deno_core::url::Url; -use deno_npm::npm_rc::RegistryConfig; use deno_npm::npm_rc::ResolvedNpmRc; use deno_npm::registry::NpmPackageInfo; @@ -21,16 +20,26 @@ use crate::args::CacheSetting; use crate::http_util::HttpClientProvider; use crate::npm::common::maybe_auth_header_for_npm_registry; use crate::util::progress_bar::ProgressBar; +use crate::util::sync::MultiRuntimeAsyncValueCreator; -use super::value_creator::MultiRuntimeAsyncValueCreator; use super::NpmCache; // todo(dsherret): create seams and unit test this +type LoadResult = Result<FutureResult, Arc<AnyError>>; +type LoadFuture = LocalBoxFuture<'static, LoadResult>; + +#[derive(Debug, Clone)] +enum FutureResult { + PackageNotExists, + SavedFsCache(Arc<NpmPackageInfo>), + ErroredFsCache(Arc<NpmPackageInfo>), +} + #[derive(Debug, Clone)] enum MemoryCacheItem { /// The cache item hasn't loaded yet. - Pending(Arc<MultiRuntimeAsyncValueCreator<FutureResult>>), + Pending(Arc<MultiRuntimeAsyncValueCreator<LoadResult>>), /// The item has loaded in the past and was stored in the file system cache. /// There is no reason to request this package from the npm registry again /// for the duration of execution. @@ -40,16 +49,6 @@ enum MemoryCacheItem { MemoryCached(Result<Option<Arc<NpmPackageInfo>>, Arc<AnyError>>), } -#[derive(Debug, Clone)] -enum FutureResult { - PackageNotExists, - SavedFsCache(Arc<NpmPackageInfo>), - ErroredFsCache(Arc<NpmPackageInfo>), -} - -type PendingRegistryLoadFuture = - LocalBoxFuture<'static, Result<FutureResult, AnyError>>; - /// Downloads packuments from the npm registry. /// /// This is shared amongst all the workers. @@ -82,26 +81,18 @@ impl RegistryInfoDownloader { self: &Arc<Self>, name: &str, ) -> Result<Option<Arc<NpmPackageInfo>>, AnyError> { - let registry_url = self.npmrc.get_registry_url(name); - let registry_config = self.npmrc.get_registry_config(name); - - self - .load_package_info_inner(name, registry_url, registry_config) - .await - .with_context(|| { - format!( - "Error getting response at {} for package \"{}\"", - self.get_package_url(name, registry_url), - name - ) - }) + self.load_package_info_inner(name).await.with_context(|| { + format!( + "Error getting response at {} for package \"{}\"", + self.get_package_url(name), + name + ) + }) } async fn load_package_info_inner( self: &Arc<Self>, name: &str, - registry_url: &Url, - registry_config: &RegistryConfig, ) -> Result<Option<Arc<NpmPackageInfo>>, AnyError> { if *self.cache.cache_setting() == CacheSetting::Only { return Err(custom_error( @@ -117,9 +108,11 @@ impl RegistryInfoDownloader { if let Some(cache_item) = mem_cache.get(name) { cache_item.clone() } else { - let future = - self.create_load_future(name, registry_url, registry_config); - let value_creator = MultiRuntimeAsyncValueCreator::new(future); + let value_creator = MultiRuntimeAsyncValueCreator::new({ + let downloader = self.clone(); + let name = name.to_string(); + Box::new(move || downloader.create_load_future(&name)) + }); let cache_item = MemoryCacheItem::Pending(Arc::new(value_creator)); mem_cache.insert(name.to_string(), cache_item.clone()); cache_item @@ -138,11 +131,7 @@ impl RegistryInfoDownloader { maybe_info.clone().map_err(|e| anyhow!("{}", e)) } MemoryCacheItem::Pending(value_creator) => { - let downloader = self.clone(); - let future = value_creator.get(move || { - downloader.create_load_future(name, registry_url, registry_config) - }); - match future.await { + match value_creator.get().await { Ok(FutureResult::SavedFsCache(info)) => { // return back the future and mark this package as having // been saved in the cache for next time it's requested @@ -199,14 +188,10 @@ impl RegistryInfoDownloader { } } - fn create_load_future( - self: &Arc<Self>, - name: &str, - registry_url: &Url, - registry_config: &RegistryConfig, - ) -> PendingRegistryLoadFuture { + fn create_load_future(self: &Arc<Self>, name: &str) -> LoadFuture { let downloader = self.clone(); - let package_url = self.get_package_url(name, registry_url); + let package_url = self.get_package_url(name); + let registry_config = self.npmrc.get_registry_config(name); let maybe_auth_header = maybe_auth_header_for_npm_registry(registry_config); let guard = self.progress_bar.update(package_url.as_str()); let name = name.to_string(); @@ -242,10 +227,12 @@ impl RegistryInfoDownloader { None => Ok(FutureResult::PackageNotExists), } } + .map(|r| r.map_err(Arc::new)) .boxed_local() } - fn get_package_url(&self, name: &str, registry_url: &Url) -> Url { + fn get_package_url(&self, name: &str) -> Url { + let registry_url = self.npmrc.get_registry_url(name); // list of all characters used in npm packages: // !, ', (, ), *, -, ., /, [0-9], @, [A-Za-z], _, ~ const ASCII_SET: percent_encoding::AsciiSet = diff --git a/cli/npm/managed/cache/tarball.rs b/cli/npm/managed/cache/tarball.rs index e65578ff3..042c3cbb2 100644 --- a/cli/npm/managed/cache/tarball.rs +++ b/cli/npm/managed/cache/tarball.rs @@ -20,18 +20,21 @@ use crate::args::CacheSetting; use crate::http_util::HttpClientProvider; use crate::npm::common::maybe_auth_header_for_npm_registry; use crate::util::progress_bar::ProgressBar; +use crate::util::sync::MultiRuntimeAsyncValueCreator; use super::tarball_extract::verify_and_extract_tarball; use super::tarball_extract::TarballExtractionMode; -use super::value_creator::MultiRuntimeAsyncValueCreator; use super::NpmCache; // todo(dsherret): create seams and unit test this +type LoadResult = Result<(), Arc<AnyError>>; +type LoadFuture = LocalBoxFuture<'static, LoadResult>; + #[derive(Debug, Clone)] enum MemoryCacheItem { /// The cache item hasn't finished yet. - Pending(Arc<MultiRuntimeAsyncValueCreator<()>>), + Pending(Arc<MultiRuntimeAsyncValueCreator<LoadResult>>), /// The result errored. Errored(Arc<AnyError>), /// This package has already been cached. @@ -91,8 +94,14 @@ impl TarballCache { if let Some(cache_item) = mem_cache.get(package_nv) { cache_item.clone() } else { - let future = self.create_setup_future(package_nv.clone(), dist.clone()); - let value_creator = MultiRuntimeAsyncValueCreator::new(future); + let value_creator = MultiRuntimeAsyncValueCreator::new({ + let tarball_cache = self.clone(); + let package_nv = package_nv.clone(); + let dist = dist.clone(); + Box::new(move || { + tarball_cache.create_setup_future(package_nv.clone(), dist.clone()) + }) + }); let cache_item = MemoryCacheItem::Pending(Arc::new(value_creator)); mem_cache.insert(package_nv.clone(), cache_item.clone()); cache_item @@ -103,12 +112,7 @@ impl TarballCache { MemoryCacheItem::Cached => Ok(()), MemoryCacheItem::Errored(err) => Err(anyhow!("{}", err)), MemoryCacheItem::Pending(creator) => { - let tarball_cache = self.clone(); - let result = creator - .get(move || { - tarball_cache.create_setup_future(package_nv.clone(), dist.clone()) - }) - .await; + let result = creator.get().await; match result { Ok(_) => { *self.memory_cache.lock().get_mut(package_nv).unwrap() = @@ -130,7 +134,7 @@ impl TarballCache { self: &Arc<Self>, package_nv: PackageNv, dist: NpmPackageVersionDistInfo, - ) -> LocalBoxFuture<'static, Result<(), AnyError>> { + ) -> LoadFuture { let tarball_cache = self.clone(); async move { let registry_url = tarball_cache.npmrc.get_registry_url(&package_nv.name); @@ -197,6 +201,8 @@ impl TarballCache { bail!("Could not find npm package tarball at: {}", dist.tarball); } } - }.boxed_local() + } + .map(|r| r.map_err(Arc::new)) + .boxed_local() } } diff --git a/cli/npm/managed/cache/value_creator.rs b/cli/npm/managed/cache/value_creator.rs deleted file mode 100644 index 38801b3aa..000000000 --- a/cli/npm/managed/cache/value_creator.rs +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. - -use std::sync::Arc; - -use deno_core::error::AnyError; -use deno_core::futures::future::BoxFuture; -use deno_core::futures::future::LocalBoxFuture; -use deno_core::futures::future::Shared; -use deno_core::futures::FutureExt; -use deno_core::parking_lot::Mutex; -use tokio::task::JoinError; - -// todo(dsherret): unit test this - -type FutureResult<TResult> = Result<TResult, Arc<AnyError>>; -type JoinResult<TResult> = Result<FutureResult<TResult>, Arc<JoinError>>; - -#[derive(Debug)] -struct State<TResult> { - retry_index: usize, - future: Shared<BoxFuture<'static, JoinResult<TResult>>>, -} - -/// Attempts to create a shared value asynchronously on one tokio runtime while -/// many runtimes are requesting the value. -/// -/// This is only useful when the value needs to get created once across -/// many runtimes. -/// -/// This handles the case where one tokio runtime goes down while another -/// one is still running. -#[derive(Debug)] -pub struct MultiRuntimeAsyncValueCreator<TResult: Send + Clone + 'static> { - state: Mutex<State<TResult>>, -} - -impl<TResult: Send + Clone + 'static> MultiRuntimeAsyncValueCreator<TResult> { - pub fn new( - future: LocalBoxFuture<'static, Result<TResult, AnyError>>, - ) -> Self { - Self { - state: Mutex::new(State { - retry_index: 0, - future: Self::create_shared_future(future), - }), - } - } - - pub async fn get( - &self, - recreate_future: impl Fn() -> LocalBoxFuture<'static, Result<TResult, AnyError>>, - ) -> Result<TResult, Arc<AnyError>> { - let (mut future, mut retry_index) = { - let state = self.state.lock(); - (state.future.clone(), state.retry_index) - }; - - loop { - let result = future.await; - - match result { - Ok(result) => return result, - Err(join_error) => { - if join_error.is_cancelled() { - let mut state = self.state.lock(); - - if state.retry_index == retry_index { - // we were the first one to retry, so create a new future - // that we'll run from the current runtime - state.retry_index += 1; - state.future = Self::create_shared_future(recreate_future()); - } - - retry_index = state.retry_index; - future = state.future.clone(); - - // just in case we're stuck in a loop - if retry_index > 1000 { - panic!("Something went wrong.") // should never happen - } - } else { - panic!("{}", join_error); - } - } - } - } - } - - fn create_shared_future( - future: LocalBoxFuture<'static, Result<TResult, AnyError>>, - ) -> Shared<BoxFuture<'static, JoinResult<TResult>>> { - deno_core::unsync::spawn(future) - .map(|result| match result { - Ok(Ok(value)) => Ok(Ok(value)), - Ok(Err(err)) => Ok(Err(Arc::new(err))), - Err(err) => Err(Arc::new(err)), - }) - .boxed() - .shared() - } -} diff --git a/cli/npm/managed/resolution.rs b/cli/npm/managed/resolution.rs index 3562d5aff..7c2756749 100644 --- a/cli/npm/managed/resolution.rs +++ b/cli/npm/managed/resolution.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; -use deno_core::parking_lot::RwLock; use deno_lockfile::NpmPackageDependencyLockfileInfo; use deno_lockfile::NpmPackageLockfileInfo; use deno_npm::registry::NpmPackageInfo; @@ -31,7 +30,7 @@ use deno_semver::package::PackageReq; use deno_semver::VersionReq; use crate::args::Lockfile; -use crate::util::sync::TaskQueue; +use crate::util::sync::SyncReadAsyncWriteLock; use super::CliNpmRegistryApi; @@ -42,8 +41,7 @@ use super::CliNpmRegistryApi; /// This does not interact with the file system. pub struct NpmResolution { api: Arc<CliNpmRegistryApi>, - snapshot: RwLock<NpmResolutionSnapshot>, - update_queue: TaskQueue, + snapshot: SyncReadAsyncWriteLock<NpmResolutionSnapshot>, maybe_lockfile: Option<Arc<Mutex<Lockfile>>>, } @@ -74,8 +72,7 @@ impl NpmResolution { ) -> Self { Self { api, - snapshot: RwLock::new(initial_snapshot), - update_queue: Default::default(), + snapshot: SyncReadAsyncWriteLock::new(initial_snapshot), maybe_lockfile, } } @@ -85,16 +82,16 @@ impl NpmResolution { package_reqs: &[PackageReq], ) -> Result<(), AnyError> { // only allow one thread in here at a time - let _permit = self.update_queue.acquire().await; + let snapshot_lock = self.snapshot.acquire().await; let snapshot = add_package_reqs_to_snapshot( &self.api, package_reqs, self.maybe_lockfile.clone(), - || self.snapshot.read().clone(), + || snapshot_lock.read().clone(), ) .await?; - *self.snapshot.write() = snapshot; + *snapshot_lock.write() = snapshot; Ok(()) } @@ -103,7 +100,7 @@ impl NpmResolution { package_reqs: &[PackageReq], ) -> Result<(), AnyError> { // only allow one thread in here at a time - let _permit = self.update_queue.acquire().await; + let snapshot_lock = self.snapshot.acquire().await; let reqs_set = package_reqs.iter().collect::<HashSet<_>>(); let snapshot = add_package_reqs_to_snapshot( @@ -111,7 +108,7 @@ impl NpmResolution { package_reqs, self.maybe_lockfile.clone(), || { - let snapshot = self.snapshot.read().clone(); + let snapshot = snapshot_lock.read().clone(); let has_removed_package = !snapshot .package_reqs() .keys() @@ -126,24 +123,24 @@ impl NpmResolution { ) .await?; - *self.snapshot.write() = snapshot; + *snapshot_lock.write() = snapshot; Ok(()) } pub async fn resolve_pending(&self) -> Result<(), AnyError> { // only allow one thread in here at a time - let _permit = self.update_queue.acquire().await; + let snapshot_lock = self.snapshot.acquire().await; let snapshot = add_package_reqs_to_snapshot( &self.api, &Vec::new(), self.maybe_lockfile.clone(), - || self.snapshot.read().clone(), + || snapshot_lock.read().clone(), ) .await?; - *self.snapshot.write() = snapshot; + *snapshot_lock.write() = snapshot; Ok(()) } @@ -229,8 +226,10 @@ impl NpmResolution { pkg_info: &NpmPackageInfo, ) -> Result<PackageNv, NpmPackageVersionResolutionError> { debug_assert_eq!(pkg_req.name, pkg_info.name); - let _permit = self.update_queue.acquire().await; - let mut snapshot = self.snapshot.write(); + // only allow one thread in here at a time + let snapshot_lock = self.snapshot.acquire().await; + + let mut snapshot = snapshot_lock.write(); let pending_resolver = get_npm_pending_resolver(&self.api); let nv = pending_resolver.resolve_package_req_as_pending( &mut snapshot, @@ -244,8 +243,10 @@ impl NpmResolution { &self, reqs_with_pkg_infos: &[(&PackageReq, Arc<NpmPackageInfo>)], ) -> Vec<Result<PackageNv, NpmPackageVersionResolutionError>> { - let _permit = self.update_queue.acquire().await; - let mut snapshot = self.snapshot.write(); + // only allow one thread in here at a time + let snapshot_lock = self.snapshot.acquire().await; + + let mut snapshot = snapshot_lock.write(); let pending_resolver = get_npm_pending_resolver(&self.api); let mut results = Vec::with_capacity(reqs_with_pkg_infos.len()); for (pkg_req, pkg_info) in reqs_with_pkg_infos { |