diff options
Diffstat (limited to 'cli/tools/registry/mod.rs')
-rw-r--r-- | cli/tools/registry/mod.rs | 351 |
1 files changed, 216 insertions, 135 deletions
diff --git a/cli/tools/registry/mod.rs b/cli/tools/registry/mod.rs index 8c53fd5cd..0e58e601e 100644 --- a/cli/tools/registry/mod.rs +++ b/cli/tools/registry/mod.rs @@ -1,9 +1,8 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use std::collections::HashMap; use std::fmt::Write; use std::io::IsTerminal; -use std::path::Path; -use std::path::PathBuf; use std::rc::Rc; use std::sync::Arc; @@ -16,6 +15,8 @@ use deno_core::anyhow::Context; use deno_core::error::AnyError; use deno_core::serde_json; use deno_core::serde_json::json; +use deno_core::unsync::JoinHandle; +use deno_core::unsync::JoinSet; use deno_runtime::colors; use deno_runtime::deno_fetch::reqwest; use http::header::AUTHORIZATION; @@ -34,6 +35,9 @@ use crate::factory::CliFactory; use crate::http_util::HttpClient; use crate::util::import_map::ImportMapUnfurler; +use self::publish_order::PublishOrderGraph; + +mod publish_order; mod tar; enum AuthMethod { @@ -75,31 +79,22 @@ static SUGGESTED_ENTRYPOINTS: [&str; 4] = ["mod.ts", "mod.js", "index.ts", "index.js"]; async fn prepare_publish( - initial_cwd: &Path, - directory: PathBuf, - import_map: &ImportMap, + deno_json: &ConfigFile, + import_map: Arc<ImportMap>, ) -> Result<PreparedPublishPackage, AnyError> { - let directory_path = initial_cwd.join(directory); - // TODO: doesn't handle jsonc - let deno_json_path = directory_path.join("deno.json"); - let deno_json = ConfigFile::read(&deno_json_path).with_context(|| { - format!( - "Failed to read deno configuration file at {}", - deno_json_path.display() - ) - })?; - + let config_path = deno_json.specifier.to_file_path().unwrap(); + let dir_path = config_path.parent().unwrap().to_path_buf(); let Some(version) = deno_json.json.version.clone() else { - bail!("{} is missing 'version' field", deno_json_path.display()); + bail!("{} is missing 'version' field", deno_json.specifier); }; let Some(name) = deno_json.json.name.clone() else { - bail!("{} is missing 'name' field", deno_json_path.display()); + bail!("{} is missing 'name' field", deno_json.specifier); }; if deno_json.json.exports.is_none() { let mut suggested_entrypoint = None; for entrypoint in SUGGESTED_ENTRYPOINTS { - if directory_path.join(entrypoint).exists() { + if dir_path.join(entrypoint).exists() { suggested_entrypoint = Some(entrypoint); break; } @@ -119,7 +114,7 @@ async fn prepare_publish( bail!( "You did not specify an entrypoint to \"{}\" package in {}. Add `exports` mapping in the configuration file, eg:\n{}", name, - deno_json_path.display(), + deno_json.specifier, exports_content ); } @@ -130,11 +125,12 @@ async fn prepare_publish( bail!("Invalid package name, use '@<scope_name>/<package_name> format"); }; - let unfurler = ImportMapUnfurler::new(import_map); - - let (tarball, diagnostics) = - tar::create_gzipped_tarball(directory_path, unfurler) - .context("Failed to create a tarball")?; + let (tarball, diagnostics) = deno_core::unsync::spawn_blocking(move || { + let unfurler = ImportMapUnfurler::new(&import_map); + tar::create_gzipped_tarball(&dir_path, unfurler) + .context("Failed to create a tarball") + }) + .await??; let tarball_hash_bytes: Vec<u8> = sha2::Sha256::digest(&tarball).iter().cloned().collect(); @@ -298,12 +294,14 @@ fn print_diagnostics(diagnostics: Vec<String>) { async fn perform_publish( http_client: &Arc<HttpClient>, - packages: Vec<PreparedPublishPackage>, + mut publish_order_graph: PublishOrderGraph, + mut prepared_package_by_name: HashMap<String, PreparedPublishPackage>, auth_method: AuthMethod, ) -> Result<(), AnyError> { let client = http_client.client()?; let registry_url = deno_registry_api_url().to_string(); + let packages = prepared_package_by_name.values().collect::<Vec<_>>(); let diagnostics = packages .iter() .flat_map(|p| p.diagnostics.clone()) @@ -320,7 +318,9 @@ async fn perform_publish( }) .collect::<Vec<_>>(); - let authorizations = match auth_method { + let mut authorizations = HashMap::with_capacity(packages.len()); + + match auth_method { AuthMethod::Interactive => { let verifier = uuid::Uuid::new_v4().to_string(); let challenge = BASE64_STANDARD.encode(sha2::Sha256::digest(&verifier)); @@ -376,11 +376,13 @@ async fn perform_publish( colors::cyan(res.user.name) ); let authorization: Rc<str> = format!("Bearer {}", res.token).into(); - let mut authorizations = Vec::new(); - for _ in &packages { - authorizations.push(authorization.clone()); + for pkg in &packages { + authorizations.insert( + (pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()), + authorization.clone(), + ); } - break authorizations; + break; } Err(err) => { if err.code == "authorizationPending" { @@ -394,14 +396,15 @@ async fn perform_publish( } AuthMethod::Token(token) => { let authorization: Rc<str> = format!("Bearer {}", token).into(); - let mut authorizations = Vec::new(); - for _ in &packages { - authorizations.push(authorization.clone()); + for pkg in &packages { + authorizations.insert( + (pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()), + authorization.clone(), + ); } - authorizations } AuthMethod::Oidc(oidc_config) => { - let mut authorizations = Vec::new(); + let mut chunked_packages = packages.chunks(16); for permissions in permissions.chunks(16) { let audience = json!({ "permissions": permissions }).to_string(); let url = format!( @@ -439,110 +442,151 @@ async fn perform_publish( })?; let authorization: Rc<str> = format!("githuboidc {}", value).into(); - for _ in permissions { - authorizations.push(authorization.clone()); + for pkg in chunked_packages.next().unwrap() { + authorizations.insert( + (pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()), + authorization.clone(), + ); } } - authorizations } }; - assert_eq!(packages.len(), authorizations.len()); - for (package, authorization) in - packages.into_iter().zip(authorizations.into_iter()) - { - println!( - "{} @{}/{}@{} ...", - colors::intense_blue("Publishing"), - package.scope, - package.package, - package.version - ); - - let url = format!( - "{}scopes/{}/packages/{}/versions/{}", - registry_url, package.scope, package.package, package.version - ); - - let response = client - .post(url) - .header(AUTHORIZATION, &*authorization) - .header(CONTENT_ENCODING, "gzip") - .body(package.tarball) - .send() - .await?; + assert_eq!(prepared_package_by_name.len(), authorizations.len()); + let mut futures: JoinSet<Result<String, AnyError>> = JoinSet::default(); + loop { + let next_batch = publish_order_graph.next(); + + for package_name in next_batch { + let package = prepared_package_by_name.remove(&package_name).unwrap(); + let authorization = authorizations + .remove(&( + package.scope.clone(), + package.package.clone(), + package.version.clone(), + )) + .unwrap(); + let registry_url = registry_url.clone(); + let http_client = http_client.clone(); + futures.spawn(async move { + let display_name = + format!("@{}/{}@{}", package.scope, package.package, package.version); + publish_package(&http_client, package, ®istry_url, &authorization) + .await + .with_context(|| format!("Failed to publish {}", display_name))?; + Ok(package_name) + }); + } - let res = parse_response::<PublishingTask>(response).await; - let mut task = match res { - Ok(task) => task, - Err(err) if err.code == "duplicateVersionPublish" => { - println!( - "{} @{}/{}@{}", - colors::yellow("Skipping, already published"), - package.scope, - package.package, - package.version - ); - continue; - } - Err(err) => { - return Err(err).with_context(|| { - format!( - "Failed to publish @{}/{} at {}", - package.scope, package.package, package.version - ) - }) - } + let Some(result) = futures.join_next().await else { + // done, ensure no circular dependency + publish_order_graph.ensure_no_pending()?; + break; }; - let interval = std::time::Duration::from_secs(2); - while task.status != "success" && task.status != "failure" { - tokio::time::sleep(interval).await; - let resp = client - .get(format!("{}publish_status/{}", registry_url, task.id)) - .send() - .await - .with_context(|| { - format!( - "Failed to get publishing status for @{}/{} at {}", - package.scope, package.package, package.version - ) - })?; - task = - parse_response::<PublishingTask>(resp) - .await - .with_context(|| { - format!( - "Failed to get publishing status for @{}/{} at {}", - package.scope, package.package, package.version - ) - })?; - } + let package_name = result??; + publish_order_graph.finish_package(&package_name); + } - if let Some(error) = task.error { - bail!( - "{} @{}/{} at {}: {}", - colors::red("Failed to publish"), + Ok(()) +} + +async fn publish_package( + http_client: &HttpClient, + package: PreparedPublishPackage, + registry_url: &str, + authorization: &str, +) -> Result<(), AnyError> { + let client = http_client.client()?; + println!( + "{} @{}/{}@{} ...", + colors::intense_blue("Publishing"), + package.scope, + package.package, + package.version + ); + + let url = format!( + "{}scopes/{}/packages/{}/versions/{}", + registry_url, package.scope, package.package, package.version + ); + + let response = client + .post(url) + .header(AUTHORIZATION, authorization) + .header(CONTENT_ENCODING, "gzip") + .body(package.tarball) + .send() + .await?; + + let res = parse_response::<PublishingTask>(response).await; + let mut task = match res { + Ok(task) => task, + Err(err) if err.code == "duplicateVersionPublish" => { + println!( + "{} @{}/{}@{}", + colors::yellow("Skipping, already published"), package.scope, package.package, - package.version, - error.message + package.version ); + return Ok(()); } + Err(err) => { + return Err(err).with_context(|| { + format!( + "Failed to publish @{}/{} at {}", + package.scope, package.package, package.version + ) + }) + } + }; - println!( - "{} @{}/{}@{}", - colors::green("Successfully published"), + let interval = std::time::Duration::from_secs(2); + while task.status != "success" && task.status != "failure" { + tokio::time::sleep(interval).await; + let resp = client + .get(format!("{}publish_status/{}", registry_url, task.id)) + .send() + .await + .with_context(|| { + format!( + "Failed to get publishing status for @{}/{} at {}", + package.scope, package.package, package.version + ) + })?; + task = parse_response::<PublishingTask>(resp) + .await + .with_context(|| { + format!( + "Failed to get publishing status for @{}/{} at {}", + package.scope, package.package, package.version + ) + })?; + } + + if let Some(error) = task.error { + bail!( + "{} @{}/{} at {}: {}", + colors::red("Failed to publish"), package.scope, package.package, - package.version - ); - println!( - "{}@{}/{}/{}_meta.json", - registry_url, package.scope, package.package, package.version + package.version, + error.message ); } + println!( + "{} @{}/{}@{}", + colors::green("Successfully published"), + package.scope, + package.package, + package.version + ); + println!( + "{}@{}/{}/{}_meta.json", + registry_url, package.scope, package.package, package.version + ); Ok(()) } @@ -601,25 +645,62 @@ pub async fn publish( ) })?; - let mut packages = - Vec::with_capacity(std::cmp::max(1, deno_json.json.workspaces.len())); + let workspace_config = deno_json.to_workspace_config()?; + + let (publish_order_graph, prepared_package_by_name) = match workspace_config { + Some(workspace_config) => { + println!("Publishing a workspace..."); + let mut prepared_package_by_name = + HashMap::with_capacity(workspace_config.members.len()); + let publish_order_graph = publish_order::build_publish_graph( + &workspace_config, + cli_factory.module_graph_builder().await?.as_ref(), + ) + .await?; - let members = &deno_json.json.workspaces; - if members.is_empty() { - packages - .push(prepare_publish(&initial_cwd, directory_path, &import_map).await?); - } else { - println!("Publishing a workspace..."); - for member in members { - let member_dir = directory_path.join(member); - packages - .push(prepare_publish(&initial_cwd, member_dir, &import_map).await?); + let results = workspace_config + .members + .iter() + .cloned() + .map(|member| { + let import_map = import_map.clone(); + deno_core::unsync::spawn(async move { + let package = prepare_publish(&member.config_file, import_map) + .await + .with_context(|| { + format!("Failed preparing '{}'.", member.package_name) + })?; + Ok((member.package_name, package)) + }) + }) + .collect::<Vec<JoinHandle<Result<(String, PreparedPublishPackage), AnyError>>>>(); + let results = deno_core::futures::future::join_all(results).await; + for result in results { + let (package_name, package) = result??; + prepared_package_by_name.insert(package_name, package); + } + (publish_order_graph, prepared_package_by_name) } - } + None => { + let mut prepared_package_by_name = HashMap::with_capacity(1); + let package = prepare_publish(&deno_json, import_map).await?; + let package_name = package.package.clone(); + let publish_order_graph = + PublishOrderGraph::new_single(package_name.clone()); + prepared_package_by_name.insert(package_name, package); + (publish_order_graph, prepared_package_by_name) + } + }; - if packages.is_empty() { + if prepared_package_by_name.is_empty() { bail!("No packages to publish"); } - perform_publish(cli_factory.http_client(), packages, auth_method).await + perform_publish( + cli_factory.http_client(), + publish_order_graph, + prepared_package_by_name, + auth_method, + ) + .await } |