diff options
author | David Sherret <dsherret@users.noreply.github.com> | 2024-06-03 17:17:08 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-03 21:17:08 +0000 |
commit | 3341c50b6ae676cdc8f7b1c44221aa633f2bde68 (patch) | |
tree | e051e384d722403ea0a04402679a358ef61002dc /cli/tools/registry/mod.rs | |
parent | 72088f2f52d65b2948155a11e7b56722bf6c10f9 (diff) |
refactor: don't share `reqwest::HttpClient` across tokio runtimes (#24092)
This also fixes several issues where we weren't properly creating http
clients with the user's settings.
Diffstat (limited to 'cli/tools/registry/mod.rs')
-rw-r--r-- | cli/tools/registry/mod.rs | 113 |
1 files changed, 57 insertions, 56 deletions
diff --git a/cli/tools/registry/mod.rs b/cli/tools/registry/mod.rs index 23e8f4313..d300e5eaf 100644 --- a/cli/tools/registry/mod.rs +++ b/cli/tools/registry/mod.rs @@ -17,11 +17,13 @@ use deno_config::WorkspaceMemberConfig; use deno_core::anyhow::bail; use deno_core::anyhow::Context; use deno_core::error::AnyError; +use deno_core::futures::future::LocalBoxFuture; +use deno_core::futures::stream::FuturesUnordered; use deno_core::futures::FutureExt; +use deno_core::futures::StreamExt; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; -use deno_core::unsync::JoinSet; use deno_runtime::deno_fetch::reqwest; use deno_runtime::deno_fs::FileSystem; use deno_terminal::colors; @@ -154,7 +156,7 @@ pub async fn publish( } perform_publish( - cli_factory.http_client(), + &cli_factory.http_client_provider().get_or_create()?, prepared_data.publish_order_graph, prepared_data.package_by_name, auth_method, @@ -523,9 +525,9 @@ pub enum Permission<'s> { } async fn get_auth_headers( - client: &reqwest::Client, - registry_url: String, - packages: Vec<Rc<PreparedPublishPackage>>, + client: &HttpClient, + registry_url: &Url, + packages: &[Rc<PreparedPublishPackage>], auth_method: AuthMethod, ) -> Result<HashMap<(String, String, String), Rc<str>>, AnyError> { let permissions = packages @@ -600,7 +602,7 @@ async fn get_auth_headers( colors::cyan(res.user.name) ); let authorization: Rc<str> = format!("Bearer {}", res.token).into(); - for pkg in &packages { + for pkg in packages { authorizations.insert( (pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()), authorization.clone(), @@ -620,7 +622,7 @@ async fn get_auth_headers( } AuthMethod::Token(token) => { let authorization: Rc<str> = format!("Bearer {}", token).into(); - for pkg in &packages { + for pkg in packages { authorizations.insert( (pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()), authorization.clone(), @@ -682,9 +684,9 @@ async fn get_auth_headers( /// Check if both `scope` and `package` already exist, if not return /// a URL to the management panel to create them. async fn check_if_scope_and_package_exist( - client: &reqwest::Client, - registry_api_url: &str, - registry_manage_url: &str, + client: &HttpClient, + registry_api_url: &Url, + registry_manage_url: &Url, scope: &str, package: &str, ) -> Result<Option<String>, AnyError> { @@ -714,18 +716,18 @@ async fn check_if_scope_and_package_exist( } async fn ensure_scopes_and_packages_exist( - client: &reqwest::Client, - registry_api_url: String, - registry_manage_url: String, - packages: Vec<Rc<PreparedPublishPackage>>, + client: &HttpClient, + registry_api_url: &Url, + registry_manage_url: &Url, + packages: &[Rc<PreparedPublishPackage>], ) -> Result<(), AnyError> { if !std::io::stdin().is_terminal() { let mut missing_packages_lines = vec![]; for package in packages { let maybe_create_package_url = check_if_scope_and_package_exist( client, - ®istry_api_url, - ®istry_manage_url, + registry_api_url, + registry_manage_url, &package.scope, &package.package, ) @@ -748,8 +750,8 @@ async fn ensure_scopes_and_packages_exist( for package in packages { let maybe_create_package_url = check_if_scope_and_package_exist( client, - ®istry_api_url, - ®istry_manage_url, + registry_api_url, + registry_manage_url, &package.scope, &package.package, ) @@ -770,7 +772,7 @@ async fn ensure_scopes_and_packages_exist( let _ = open::that_detached(&create_package_url); let package_api_url = api::get_package_api_url( - ®istry_api_url, + registry_api_url, &package.scope, &package.package, ); @@ -790,15 +792,14 @@ async fn ensure_scopes_and_packages_exist( } async fn perform_publish( - http_client: &Arc<HttpClient>, + http_client: &HttpClient, mut publish_order_graph: PublishOrderGraph, mut prepared_package_by_name: HashMap<String, Rc<PreparedPublishPackage>>, auth_method: AuthMethod, provenance: bool, ) -> Result<(), AnyError> { - let client = http_client.client()?; - let registry_api_url = jsr_api_url().to_string(); - let registry_url = jsr_url().to_string(); + let registry_api_url = jsr_api_url(); + let registry_url = jsr_url(); let packages = prepared_package_by_name .values() @@ -806,19 +807,20 @@ async fn perform_publish( .collect::<Vec<_>>(); ensure_scopes_and_packages_exist( - client, - registry_api_url.clone(), - registry_url.clone(), - packages.clone(), + http_client, + registry_api_url, + registry_url, + &packages, ) .await?; let mut authorizations = - get_auth_headers(client, registry_api_url.clone(), packages, auth_method) + get_auth_headers(http_client, registry_api_url, &packages, auth_method) .await?; assert_eq!(prepared_package_by_name.len(), authorizations.len()); - let mut futures: JoinSet<Result<String, AnyError>> = JoinSet::default(); + let mut futures: FuturesUnordered<LocalBoxFuture<Result<String, AnyError>>> = + Default::default(); loop { let next_batch = publish_order_graph.next(); @@ -844,32 +846,32 @@ async fn perform_publish( package.version.clone(), )) .unwrap(); - let registry_api_url = registry_api_url.clone(); - let registry_url = registry_url.clone(); - let http_client = http_client.clone(); - futures.spawn(async move { - let display_name = package.display_name(); - publish_package( - &http_client, - package, - ®istry_api_url, - ®istry_url, - &authorization, - provenance, - ) - .await - .with_context(|| format!("Failed to publish {}", display_name))?; - Ok(package_name) - }); + futures.push( + async move { + let display_name = package.display_name(); + publish_package( + http_client, + package, + registry_api_url, + registry_url, + &authorization, + provenance, + ) + .await + .with_context(|| format!("Failed to publish {}", display_name))?; + Ok(package_name) + } + .boxed_local(), + ); } - let Some(result) = futures.join_next().await else { + let Some(result) = futures.next().await else { // done, ensure no circular dependency publish_order_graph.ensure_no_pending()?; break; }; - let package_name = result??; + let package_name = result?; publish_order_graph.finish_package(&package_name); } @@ -879,12 +881,11 @@ async fn perform_publish( async fn publish_package( http_client: &HttpClient, package: Rc<PreparedPublishPackage>, - registry_api_url: &str, - registry_url: &str, + registry_api_url: &Url, + registry_url: &Url, authorization: &str, provenance: bool, ) -> Result<(), AnyError> { - let client = http_client.client()?; log::info!( "{} @{}/{}@{} ...", colors::intense_blue("Publishing"), @@ -902,7 +903,7 @@ async fn publish_package( package.config ); - let response = client + let response = http_client .post(url) .header(reqwest::header::AUTHORIZATION, authorization) .header(reqwest::header::CONTENT_ENCODING, "gzip") @@ -950,7 +951,7 @@ async fn publish_package( let interval = std::time::Duration::from_secs(2); while task.status != "success" && task.status != "failure" { tokio::time::sleep(interval).await; - let resp = client + let resp = http_client .get(format!("{}publish_status/{}", registry_api_url, task.id)) .send() .await @@ -1000,7 +1001,7 @@ async fn publish_package( package.scope, package.package, package.version ))?; - let meta_bytes = client.get(meta_url).send().await?.bytes().await?; + let meta_bytes = http_client.get(meta_url).send().await?.bytes().await?; if std::env::var("DISABLE_JSR_MANIFEST_VERIFICATION_FOR_TESTING").is_err() { verify_version_manifest(&meta_bytes, &package)?; @@ -1015,7 +1016,7 @@ async fn publish_package( sha256: faster_hex::hex_string(&sha2::Sha256::digest(&meta_bytes)), }, }; - let bundle = provenance::generate_provenance(subject).await?; + let bundle = provenance::generate_provenance(http_client, subject).await?; let tlog_entry = &bundle.verification_material.tlog_entries[0]; log::info!("{}", @@ -1030,7 +1031,7 @@ async fn publish_package( "{}scopes/{}/packages/{}/versions/{}/provenance", registry_api_url, package.scope, package.package, package.version ); - client + http_client .post(provenance_url) .header(reqwest::header::AUTHORIZATION, authorization) .json(&json!({ "bundle": bundle })) |