From 2f7222da8a26d8be915b9467fc21649a18f54b77 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Sat, 4 Mar 2023 20:07:11 -0500 Subject: refactor: remove `Semaphore::new(1)` and use `TaskQueue` (#18014) --- cli/graph_util.rs | 24 +++++++++--------------- cli/npm/resolution/mod.rs | 11 ++++++----- cli/resolver.rs | 17 +++++++++-------- 3 files changed, 24 insertions(+), 28 deletions(-) (limited to 'cli') diff --git a/cli/graph_util.rs b/cli/graph_util.rs index 31645824d..ecae9ea4e 100644 --- a/cli/graph_util.rs +++ b/cli/graph_util.rs @@ -19,6 +19,8 @@ use deno_core::error::custom_error; use deno_core::error::AnyError; use deno_core::parking_lot::RwLock; use deno_core::ModuleSpecifier; +use deno_core::TaskQueue; +use deno_core::TaskQueuePermit; use deno_graph::Module; use deno_graph::ModuleGraph; use deno_graph::ModuleGraphError; @@ -29,8 +31,6 @@ use import_map::ImportMapError; use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; -use tokio::sync::Semaphore; -use tokio::sync::SemaphorePermit; #[derive(Clone, Copy)] pub struct GraphValidOptions { @@ -318,27 +318,21 @@ struct GraphData { } /// Holds the `ModuleGraph` and what parts of it are type checked. -#[derive(Clone)] +#[derive(Clone, Default)] pub struct ModuleGraphContainer { - update_semaphore: Arc, + // Allow only one request to update the graph data at a time, + // but allow other requests to read from it at any time even + // while another request is updating the data. + update_queue: Arc, graph_data: Arc>, } -impl Default for ModuleGraphContainer { - fn default() -> Self { - Self { - update_semaphore: Arc::new(Semaphore::new(1)), - graph_data: Default::default(), - } - } -} - impl ModuleGraphContainer { /// Acquires a permit to modify the module graph without other code /// having the chance to modify it. In the meantime, other code may /// still read from the existing module graph. pub async fn acquire_update_permit(&self) -> ModuleGraphUpdatePermit { - let permit = self.update_semaphore.acquire().await.unwrap(); + let permit = self.update_queue.acquire().await; ModuleGraphUpdatePermit { permit, graph_data: self.graph_data.clone(), @@ -395,7 +389,7 @@ impl ModuleGraphContainer { /// everything looks fine, calling `.commit()` will store the /// new graph in the ModuleGraphContainer. pub struct ModuleGraphUpdatePermit<'a> { - permit: SemaphorePermit<'a>, + permit: TaskQueuePermit<'a>, graph_data: Arc>, graph: ModuleGraph, } diff --git a/cli/npm/resolution/mod.rs b/cli/npm/resolution/mod.rs index f43f3c5cb..e1e3307c3 100644 --- a/cli/npm/resolution/mod.rs +++ b/cli/npm/resolution/mod.rs @@ -10,6 +10,7 @@ use deno_core::anyhow::Context; use deno_core::error::AnyError; use deno_core::parking_lot::Mutex; use deno_core::parking_lot::RwLock; +use deno_core::TaskQueue; use deno_graph::npm::NpmPackageNv; use deno_graph::npm::NpmPackageNvReference; use deno_graph::npm::NpmPackageReq; @@ -241,7 +242,7 @@ pub struct NpmResolution(Arc); struct NpmResolutionInner { api: NpmRegistryApi, snapshot: RwLock, - update_semaphore: tokio::sync::Semaphore, + update_queue: TaskQueue, maybe_lockfile: Option>>, } @@ -263,7 +264,7 @@ impl NpmResolution { Self(Arc::new(NpmResolutionInner { api, snapshot: RwLock::new(initial_snapshot.unwrap_or_default()), - update_semaphore: tokio::sync::Semaphore::new(1), + update_queue: Default::default(), maybe_lockfile, })) } @@ -275,7 +276,7 @@ impl NpmResolution { let inner = &self.0; // only allow one thread in here at a time - let _permit = inner.update_semaphore.acquire().await?; + let _permit = inner.update_queue.acquire().await; let snapshot = inner.snapshot.read().clone(); let snapshot = add_package_reqs_to_snapshot( @@ -296,7 +297,7 @@ impl NpmResolution { ) -> Result<(), AnyError> { let inner = &self.0; // only allow one thread in here at a time - let _permit = inner.update_semaphore.acquire().await?; + let _permit = inner.update_queue.acquire().await; let snapshot = inner.snapshot.read().clone(); let reqs_set = package_reqs.iter().collect::>(); @@ -326,7 +327,7 @@ impl NpmResolution { pub async fn resolve_pending(&self) -> Result<(), AnyError> { let inner = &self.0; // only allow one thread in here at a time - let _permit = inner.update_semaphore.acquire().await?; + let _permit = inner.update_queue.acquire().await; let snapshot = inner.snapshot.read().clone(); let snapshot = add_package_reqs_to_snapshot( diff --git a/cli/resolver.rs b/cli/resolver.rs index 46ae16a67..b113fc470 100644 --- a/cli/resolver.rs +++ b/cli/resolver.rs @@ -7,6 +7,7 @@ use deno_core::futures::future; use deno_core::futures::future::LocalBoxFuture; use deno_core::futures::FutureExt; use deno_core::ModuleSpecifier; +use deno_core::TaskQueue; use deno_graph::npm::NpmPackageNv; use deno_graph::npm::NpmPackageReq; use deno_graph::source::NpmResolver; @@ -34,7 +35,7 @@ pub struct CliGraphResolver { npm_registry_api: NpmRegistryApi, npm_resolution: NpmResolution, package_json_deps_installer: PackageJsonDepsInstaller, - sync_download_semaphore: Option>, + sync_download_queue: Option>, } impl Default for CliGraphResolver { @@ -52,7 +53,7 @@ impl Default for CliGraphResolver { npm_registry_api, npm_resolution, package_json_deps_installer: Default::default(), - sync_download_semaphore: Self::create_sync_download_semaphore(), + sync_download_queue: Self::create_sync_download_queue(), } } } @@ -77,13 +78,13 @@ impl CliGraphResolver { npm_registry_api, npm_resolution, package_json_deps_installer, - sync_download_semaphore: Self::create_sync_download_semaphore(), + sync_download_queue: Self::create_sync_download_queue(), } } - fn create_sync_download_semaphore() -> Option> { + fn create_sync_download_queue() -> Option> { if crate::npm::should_sync_download() { - Some(Arc::new(tokio::sync::Semaphore::new(1))) + Some(Default::default()) } else { None } @@ -194,10 +195,10 @@ impl NpmResolver for CliGraphResolver { let package_name = package_name.to_string(); let api = self.npm_registry_api.clone(); let deps_installer = self.package_json_deps_installer.clone(); - let maybe_sync_download_semaphore = self.sync_download_semaphore.clone(); + let maybe_sync_download_queue = self.sync_download_queue.clone(); async move { - let permit = if let Some(semaphore) = &maybe_sync_download_semaphore { - Some(semaphore.acquire().await.unwrap()) + let permit = if let Some(task_queue) = &maybe_sync_download_queue { + Some(task_queue.acquire().await) } else { None }; -- cgit v1.2.3