summaryrefslogtreecommitdiff
path: root/cli/tools/registry/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/tools/registry/mod.rs')
-rw-r--r--cli/tools/registry/mod.rs351
1 files changed, 216 insertions, 135 deletions
diff --git a/cli/tools/registry/mod.rs b/cli/tools/registry/mod.rs
index 8c53fd5cd..0e58e601e 100644
--- a/cli/tools/registry/mod.rs
+++ b/cli/tools/registry/mod.rs
@@ -1,9 +1,8 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use std::collections::HashMap;
use std::fmt::Write;
use std::io::IsTerminal;
-use std::path::Path;
-use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
@@ -16,6 +15,8 @@ use deno_core::anyhow::Context;
use deno_core::error::AnyError;
use deno_core::serde_json;
use deno_core::serde_json::json;
+use deno_core::unsync::JoinHandle;
+use deno_core::unsync::JoinSet;
use deno_runtime::colors;
use deno_runtime::deno_fetch::reqwest;
use http::header::AUTHORIZATION;
@@ -34,6 +35,9 @@ use crate::factory::CliFactory;
use crate::http_util::HttpClient;
use crate::util::import_map::ImportMapUnfurler;
+use self::publish_order::PublishOrderGraph;
+
+mod publish_order;
mod tar;
enum AuthMethod {
@@ -75,31 +79,22 @@ static SUGGESTED_ENTRYPOINTS: [&str; 4] =
["mod.ts", "mod.js", "index.ts", "index.js"];
async fn prepare_publish(
- initial_cwd: &Path,
- directory: PathBuf,
- import_map: &ImportMap,
+ deno_json: &ConfigFile,
+ import_map: Arc<ImportMap>,
) -> Result<PreparedPublishPackage, AnyError> {
- let directory_path = initial_cwd.join(directory);
- // TODO: doesn't handle jsonc
- let deno_json_path = directory_path.join("deno.json");
- let deno_json = ConfigFile::read(&deno_json_path).with_context(|| {
- format!(
- "Failed to read deno configuration file at {}",
- deno_json_path.display()
- )
- })?;
-
+ let config_path = deno_json.specifier.to_file_path().unwrap();
+ let dir_path = config_path.parent().unwrap().to_path_buf();
let Some(version) = deno_json.json.version.clone() else {
- bail!("{} is missing 'version' field", deno_json_path.display());
+ bail!("{} is missing 'version' field", deno_json.specifier);
};
let Some(name) = deno_json.json.name.clone() else {
- bail!("{} is missing 'name' field", deno_json_path.display());
+ bail!("{} is missing 'name' field", deno_json.specifier);
};
if deno_json.json.exports.is_none() {
let mut suggested_entrypoint = None;
for entrypoint in SUGGESTED_ENTRYPOINTS {
- if directory_path.join(entrypoint).exists() {
+ if dir_path.join(entrypoint).exists() {
suggested_entrypoint = Some(entrypoint);
break;
}
@@ -119,7 +114,7 @@ async fn prepare_publish(
bail!(
"You did not specify an entrypoint to \"{}\" package in {}. Add `exports` mapping in the configuration file, eg:\n{}",
name,
- deno_json_path.display(),
+ deno_json.specifier,
exports_content
);
}
@@ -130,11 +125,12 @@ async fn prepare_publish(
bail!("Invalid package name, use '@<scope_name>/<package_name> format");
};
- let unfurler = ImportMapUnfurler::new(import_map);
-
- let (tarball, diagnostics) =
- tar::create_gzipped_tarball(directory_path, unfurler)
- .context("Failed to create a tarball")?;
+ let (tarball, diagnostics) = deno_core::unsync::spawn_blocking(move || {
+ let unfurler = ImportMapUnfurler::new(&import_map);
+ tar::create_gzipped_tarball(&dir_path, unfurler)
+ .context("Failed to create a tarball")
+ })
+ .await??;
let tarball_hash_bytes: Vec<u8> =
sha2::Sha256::digest(&tarball).iter().cloned().collect();
@@ -298,12 +294,14 @@ fn print_diagnostics(diagnostics: Vec<String>) {
async fn perform_publish(
http_client: &Arc<HttpClient>,
- packages: Vec<PreparedPublishPackage>,
+ mut publish_order_graph: PublishOrderGraph,
+ mut prepared_package_by_name: HashMap<String, PreparedPublishPackage>,
auth_method: AuthMethod,
) -> Result<(), AnyError> {
let client = http_client.client()?;
let registry_url = deno_registry_api_url().to_string();
+ let packages = prepared_package_by_name.values().collect::<Vec<_>>();
let diagnostics = packages
.iter()
.flat_map(|p| p.diagnostics.clone())
@@ -320,7 +318,9 @@ async fn perform_publish(
})
.collect::<Vec<_>>();
- let authorizations = match auth_method {
+ let mut authorizations = HashMap::with_capacity(packages.len());
+
+ match auth_method {
AuthMethod::Interactive => {
let verifier = uuid::Uuid::new_v4().to_string();
let challenge = BASE64_STANDARD.encode(sha2::Sha256::digest(&verifier));
@@ -376,11 +376,13 @@ async fn perform_publish(
colors::cyan(res.user.name)
);
let authorization: Rc<str> = format!("Bearer {}", res.token).into();
- let mut authorizations = Vec::new();
- for _ in &packages {
- authorizations.push(authorization.clone());
+ for pkg in &packages {
+ authorizations.insert(
+ (pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()),
+ authorization.clone(),
+ );
}
- break authorizations;
+ break;
}
Err(err) => {
if err.code == "authorizationPending" {
@@ -394,14 +396,15 @@ async fn perform_publish(
}
AuthMethod::Token(token) => {
let authorization: Rc<str> = format!("Bearer {}", token).into();
- let mut authorizations = Vec::new();
- for _ in &packages {
- authorizations.push(authorization.clone());
+ for pkg in &packages {
+ authorizations.insert(
+ (pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()),
+ authorization.clone(),
+ );
}
- authorizations
}
AuthMethod::Oidc(oidc_config) => {
- let mut authorizations = Vec::new();
+ let mut chunked_packages = packages.chunks(16);
for permissions in permissions.chunks(16) {
let audience = json!({ "permissions": permissions }).to_string();
let url = format!(
@@ -439,110 +442,151 @@ async fn perform_publish(
})?;
let authorization: Rc<str> = format!("githuboidc {}", value).into();
- for _ in permissions {
- authorizations.push(authorization.clone());
+ for pkg in chunked_packages.next().unwrap() {
+ authorizations.insert(
+ (pkg.scope.clone(), pkg.package.clone(), pkg.version.clone()),
+ authorization.clone(),
+ );
}
}
- authorizations
}
};
- assert_eq!(packages.len(), authorizations.len());
- for (package, authorization) in
- packages.into_iter().zip(authorizations.into_iter())
- {
- println!(
- "{} @{}/{}@{} ...",
- colors::intense_blue("Publishing"),
- package.scope,
- package.package,
- package.version
- );
-
- let url = format!(
- "{}scopes/{}/packages/{}/versions/{}",
- registry_url, package.scope, package.package, package.version
- );
-
- let response = client
- .post(url)
- .header(AUTHORIZATION, &*authorization)
- .header(CONTENT_ENCODING, "gzip")
- .body(package.tarball)
- .send()
- .await?;
+ assert_eq!(prepared_package_by_name.len(), authorizations.len());
+ let mut futures: JoinSet<Result<String, AnyError>> = JoinSet::default();
+ loop {
+ let next_batch = publish_order_graph.next();
+
+ for package_name in next_batch {
+ let package = prepared_package_by_name.remove(&package_name).unwrap();
+ let authorization = authorizations
+ .remove(&(
+ package.scope.clone(),
+ package.package.clone(),
+ package.version.clone(),
+ ))
+ .unwrap();
+ let registry_url = registry_url.clone();
+ let http_client = http_client.clone();
+ futures.spawn(async move {
+ let display_name =
+ format!("@{}/{}@{}", package.scope, package.package, package.version);
+ publish_package(&http_client, package, &registry_url, &authorization)
+ .await
+ .with_context(|| format!("Failed to publish {}", display_name))?;
+ Ok(package_name)
+ });
+ }
- let res = parse_response::<PublishingTask>(response).await;
- let mut task = match res {
- Ok(task) => task,
- Err(err) if err.code == "duplicateVersionPublish" => {
- println!(
- "{} @{}/{}@{}",
- colors::yellow("Skipping, already published"),
- package.scope,
- package.package,
- package.version
- );
- continue;
- }
- Err(err) => {
- return Err(err).with_context(|| {
- format!(
- "Failed to publish @{}/{} at {}",
- package.scope, package.package, package.version
- )
- })
- }
+ let Some(result) = futures.join_next().await else {
+ // done, ensure no circular dependency
+ publish_order_graph.ensure_no_pending()?;
+ break;
};
- let interval = std::time::Duration::from_secs(2);
- while task.status != "success" && task.status != "failure" {
- tokio::time::sleep(interval).await;
- let resp = client
- .get(format!("{}publish_status/{}", registry_url, task.id))
- .send()
- .await
- .with_context(|| {
- format!(
- "Failed to get publishing status for @{}/{} at {}",
- package.scope, package.package, package.version
- )
- })?;
- task =
- parse_response::<PublishingTask>(resp)
- .await
- .with_context(|| {
- format!(
- "Failed to get publishing status for @{}/{} at {}",
- package.scope, package.package, package.version
- )
- })?;
- }
+ let package_name = result??;
+ publish_order_graph.finish_package(&package_name);
+ }
- if let Some(error) = task.error {
- bail!(
- "{} @{}/{} at {}: {}",
- colors::red("Failed to publish"),
+ Ok(())
+}
+
+async fn publish_package(
+ http_client: &HttpClient,
+ package: PreparedPublishPackage,
+ registry_url: &str,
+ authorization: &str,
+) -> Result<(), AnyError> {
+ let client = http_client.client()?;
+ println!(
+ "{} @{}/{}@{} ...",
+ colors::intense_blue("Publishing"),
+ package.scope,
+ package.package,
+ package.version
+ );
+
+ let url = format!(
+ "{}scopes/{}/packages/{}/versions/{}",
+ registry_url, package.scope, package.package, package.version
+ );
+
+ let response = client
+ .post(url)
+ .header(AUTHORIZATION, authorization)
+ .header(CONTENT_ENCODING, "gzip")
+ .body(package.tarball)
+ .send()
+ .await?;
+
+ let res = parse_response::<PublishingTask>(response).await;
+ let mut task = match res {
+ Ok(task) => task,
+ Err(err) if err.code == "duplicateVersionPublish" => {
+ println!(
+ "{} @{}/{}@{}",
+ colors::yellow("Skipping, already published"),
package.scope,
package.package,
- package.version,
- error.message
+ package.version
);
+ return Ok(());
}
+ Err(err) => {
+ return Err(err).with_context(|| {
+ format!(
+ "Failed to publish @{}/{} at {}",
+ package.scope, package.package, package.version
+ )
+ })
+ }
+ };
- println!(
- "{} @{}/{}@{}",
- colors::green("Successfully published"),
+ let interval = std::time::Duration::from_secs(2);
+ while task.status != "success" && task.status != "failure" {
+ tokio::time::sleep(interval).await;
+ let resp = client
+ .get(format!("{}publish_status/{}", registry_url, task.id))
+ .send()
+ .await
+ .with_context(|| {
+ format!(
+ "Failed to get publishing status for @{}/{} at {}",
+ package.scope, package.package, package.version
+ )
+ })?;
+ task = parse_response::<PublishingTask>(resp)
+ .await
+ .with_context(|| {
+ format!(
+ "Failed to get publishing status for @{}/{} at {}",
+ package.scope, package.package, package.version
+ )
+ })?;
+ }
+
+ if let Some(error) = task.error {
+ bail!(
+ "{} @{}/{} at {}: {}",
+ colors::red("Failed to publish"),
package.scope,
package.package,
- package.version
- );
- println!(
- "{}@{}/{}/{}_meta.json",
- registry_url, package.scope, package.package, package.version
+ package.version,
+ error.message
);
}
+ println!(
+ "{} @{}/{}@{}",
+ colors::green("Successfully published"),
+ package.scope,
+ package.package,
+ package.version
+ );
+ println!(
+ "{}@{}/{}/{}_meta.json",
+ registry_url, package.scope, package.package, package.version
+ );
Ok(())
}
@@ -601,25 +645,62 @@ pub async fn publish(
)
})?;
- let mut packages =
- Vec::with_capacity(std::cmp::max(1, deno_json.json.workspaces.len()));
+ let workspace_config = deno_json.to_workspace_config()?;
+
+ let (publish_order_graph, prepared_package_by_name) = match workspace_config {
+ Some(workspace_config) => {
+ println!("Publishing a workspace...");
+ let mut prepared_package_by_name =
+ HashMap::with_capacity(workspace_config.members.len());
+ let publish_order_graph = publish_order::build_publish_graph(
+ &workspace_config,
+ cli_factory.module_graph_builder().await?.as_ref(),
+ )
+ .await?;
- let members = &deno_json.json.workspaces;
- if members.is_empty() {
- packages
- .push(prepare_publish(&initial_cwd, directory_path, &import_map).await?);
- } else {
- println!("Publishing a workspace...");
- for member in members {
- let member_dir = directory_path.join(member);
- packages
- .push(prepare_publish(&initial_cwd, member_dir, &import_map).await?);
+ let results = workspace_config
+ .members
+ .iter()
+ .cloned()
+ .map(|member| {
+ let import_map = import_map.clone();
+ deno_core::unsync::spawn(async move {
+ let package = prepare_publish(&member.config_file, import_map)
+ .await
+ .with_context(|| {
+ format!("Failed preparing '{}'.", member.package_name)
+ })?;
+ Ok((member.package_name, package))
+ })
+ })
+ .collect::<Vec<JoinHandle<Result<(String, PreparedPublishPackage), AnyError>>>>();
+ let results = deno_core::futures::future::join_all(results).await;
+ for result in results {
+ let (package_name, package) = result??;
+ prepared_package_by_name.insert(package_name, package);
+ }
+ (publish_order_graph, prepared_package_by_name)
}
- }
+ None => {
+ let mut prepared_package_by_name = HashMap::with_capacity(1);
+ let package = prepare_publish(&deno_json, import_map).await?;
+ let package_name = package.package.clone();
+ let publish_order_graph =
+ PublishOrderGraph::new_single(package_name.clone());
+ prepared_package_by_name.insert(package_name, package);
+ (publish_order_graph, prepared_package_by_name)
+ }
+ };
- if packages.is_empty() {
+ if prepared_package_by_name.is_empty() {
bail!("No packages to publish");
}
- perform_publish(cli_factory.http_client(), packages, auth_method).await
+ perform_publish(
+ cli_factory.http_client(),
+ publish_order_graph,
+ prepared_package_by_name,
+ auth_method,
+ )
+ .await
}