summaryrefslogtreecommitdiff
path: root/cli/tools/task.rs
diff options
context:
space:
mode:
authorhaturau <135221985+haturatu@users.noreply.github.com>2024-11-20 01:20:47 +0900
committerGitHub <noreply@github.com>2024-11-20 01:20:47 +0900
commit85719a67e59c7aa45bead26e4942d7df8b1b42d4 (patch)
treeface0aecaac53e93ce2f23b53c48859bcf1a36ec /cli/tools/task.rs
parent67697bc2e4a62a9670699fd18ad0dd8efc5bd955 (diff)
parent186b52731c6bb326c4d32905c5e732d082e83465 (diff)
Merge branch 'denoland:main' into main
Diffstat (limited to 'cli/tools/task.rs')
-rw-r--r--cli/tools/task.rs631
1 files changed, 438 insertions, 193 deletions
diff --git a/cli/tools/task.rs b/cli/tools/task.rs
index 502b09d2c..682dbf814 100644
--- a/cli/tools/task.rs
+++ b/cli/tools/task.rs
@@ -1,14 +1,14 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
-use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;
+use std::num::NonZeroUsize;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
-use deno_config::deno_json::Task;
+use deno_config::workspace::TaskDefinition;
use deno_config::workspace::TaskOrScript;
use deno_config::workspace::WorkspaceDirectory;
use deno_config::workspace::WorkspaceTasksConfig;
@@ -16,8 +16,15 @@ use deno_core::anyhow::anyhow;
use deno_core::anyhow::bail;
use deno_core::anyhow::Context;
use deno_core::error::AnyError;
+use deno_core::futures::future::LocalBoxFuture;
+use deno_core::futures::stream::futures_unordered;
+use deno_core::futures::FutureExt;
+use deno_core::futures::StreamExt;
+use deno_core::url::Url;
use deno_path_util::normalize_path;
+use deno_runtime::deno_node::NodeResolver;
use deno_task_shell::ShellCommand;
+use indexmap::IndexMap;
use crate::args::CliOptions;
use crate::args::Flags;
@@ -48,155 +55,376 @@ pub async fn execute_script(
v == "1"
})
.unwrap_or(false);
- let tasks_config = start_dir.to_tasks_config()?;
- let tasks_config = if force_use_pkg_json {
- tasks_config.with_only_pkg_json()
- } else {
- tasks_config
- };
+ let mut tasks_config = start_dir.to_tasks_config()?;
+ if force_use_pkg_json {
+ tasks_config = tasks_config.with_only_pkg_json()
+ }
- let task_name = match &task_flags.task {
- Some(task) => task,
- None => {
- print_available_tasks(
- &mut std::io::stdout(),
- &cli_options.start_dir,
- &tasks_config,
- )?;
- return Ok(0);
- }
+ let Some(task_name) = &task_flags.task else {
+ print_available_tasks(
+ &mut std::io::stdout(),
+ &cli_options.start_dir,
+ &tasks_config,
+ )?;
+ return Ok(0);
};
let npm_resolver = factory.npm_resolver().await?;
let node_resolver = factory.node_resolver().await?;
let env_vars = task_runner::real_env_vars();
- match tasks_config.task(task_name) {
- Some((dir_url, task_or_script)) => match task_or_script {
- TaskOrScript::Task(_tasks, script) => {
- let cwd = match task_flags.cwd {
- Some(path) => canonicalize_path(&PathBuf::from(path))
- .context("failed canonicalizing --cwd")?,
- None => normalize_path(dir_url.to_file_path().unwrap()),
- };
-
- let custom_commands = task_runner::resolve_custom_commands(
- npm_resolver.as_ref(),
- node_resolver,
- )?;
- run_task(RunTaskOptions {
- task_name,
- script,
- cwd: &cwd,
- env_vars,
- custom_commands,
- npm_resolver: npm_resolver.as_ref(),
- cli_options,
- })
- .await
- }
- TaskOrScript::Script(scripts, _script) => {
- // ensure the npm packages are installed if using a managed resolver
- if let Some(npm_resolver) = npm_resolver.as_managed() {
- npm_resolver.ensure_top_level_package_json_install().await?;
+ let no_of_concurrent_tasks = if let Ok(value) = std::env::var("DENO_JOBS") {
+ value.parse::<NonZeroUsize>().ok()
+ } else {
+ std::thread::available_parallelism().ok()
+ }
+ .unwrap_or_else(|| NonZeroUsize::new(2).unwrap());
+
+ let task_runner = TaskRunner {
+ tasks_config,
+ task_flags: &task_flags,
+ npm_resolver: npm_resolver.as_ref(),
+ node_resolver: node_resolver.as_ref(),
+ env_vars,
+ cli_options,
+ concurrency: no_of_concurrent_tasks.into(),
+ };
+
+ task_runner.run_task(task_name).await
+}
+
+struct RunSingleOptions<'a> {
+ task_name: &'a str,
+ script: &'a str,
+ cwd: &'a Path,
+ custom_commands: HashMap<String, Rc<dyn ShellCommand>>,
+}
+
+struct TaskRunner<'a> {
+ tasks_config: WorkspaceTasksConfig,
+ task_flags: &'a TaskFlags,
+ npm_resolver: &'a dyn CliNpmResolver,
+ node_resolver: &'a NodeResolver,
+ env_vars: HashMap<String, String>,
+ cli_options: &'a CliOptions,
+ concurrency: usize,
+}
+
+impl<'a> TaskRunner<'a> {
+ pub async fn run_task(
+ &self,
+ task_name: &str,
+ ) -> Result<i32, deno_core::anyhow::Error> {
+ match sort_tasks_topo(task_name, &self.tasks_config) {
+ Ok(sorted) => self.run_tasks_in_parallel(sorted).await,
+ Err(err) => match err {
+ TaskError::NotFound(name) => {
+ if self.task_flags.is_run {
+ return Err(anyhow!("Task not found: {}", name));
+ }
+
+ log::error!("Task not found: {}", name);
+ if log::log_enabled!(log::Level::Error) {
+ self.print_available_tasks()?;
+ }
+ Ok(1)
+ }
+ TaskError::TaskDepCycle { path } => {
+ log::error!("Task cycle detected: {}", path.join(" -> "));
+ Ok(1)
}
+ },
+ }
+ }
+
+ pub fn print_available_tasks(&self) -> Result<(), std::io::Error> {
+ print_available_tasks(
+ &mut std::io::stderr(),
+ &self.cli_options.start_dir,
+ &self.tasks_config,
+ )
+ }
- let cwd = match task_flags.cwd {
- Some(path) => canonicalize_path(&PathBuf::from(path))?,
- None => normalize_path(dir_url.to_file_path().unwrap()),
- };
-
- // At this point we already checked if the task name exists in package.json.
- // We can therefore check for "pre" and "post" scripts too, since we're only
- // dealing with package.json here and not deno.json
- let task_names = vec![
- format!("pre{}", task_name),
- task_name.clone(),
- format!("post{}", task_name),
- ];
- let custom_commands = task_runner::resolve_custom_commands(
- npm_resolver.as_ref(),
- node_resolver,
- )?;
- for task_name in &task_names {
- if let Some(script) = scripts.get(task_name) {
- let exit_code = run_task(RunTaskOptions {
- task_name,
- script,
- cwd: &cwd,
- env_vars: env_vars.clone(),
- custom_commands: custom_commands.clone(),
- npm_resolver: npm_resolver.as_ref(),
- cli_options,
- })
- .await?;
- if exit_code > 0 {
- return Ok(exit_code);
+ async fn run_tasks_in_parallel(
+ &self,
+ task_names: Vec<String>,
+ ) -> Result<i32, deno_core::anyhow::Error> {
+ struct PendingTasksContext {
+ completed: HashSet<String>,
+ running: HashSet<String>,
+ task_names: Vec<String>,
+ }
+
+ impl PendingTasksContext {
+ fn has_remaining_tasks(&self) -> bool {
+ self.completed.len() < self.task_names.len()
+ }
+
+ fn mark_complete(&mut self, task_name: String) {
+ self.running.remove(&task_name);
+ self.completed.insert(task_name);
+ }
+
+ fn get_next_task<'a>(
+ &mut self,
+ runner: &'a TaskRunner<'a>,
+ ) -> Option<LocalBoxFuture<'a, Result<(i32, String), AnyError>>> {
+ for name in &self.task_names {
+ if self.completed.contains(name) || self.running.contains(name) {
+ continue;
+ }
+
+ let should_run = if let Ok((_, def)) = runner.get_task(name) {
+ match def {
+ TaskOrScript::Task(_, def) => def
+ .dependencies
+ .iter()
+ .all(|dep| self.completed.contains(dep)),
+ TaskOrScript::Script(_, _) => true,
}
+ } else {
+ false
+ };
+
+ if !should_run {
+ continue;
}
+
+ self.running.insert(name.clone());
+ let name = name.clone();
+ return Some(
+ async move {
+ runner
+ .run_task_no_dependencies(&name)
+ .await
+ .map(|exit_code| (exit_code, name))
+ }
+ .boxed_local(),
+ );
}
+ None
+ }
+ }
- Ok(0)
+ let mut context = PendingTasksContext {
+ completed: HashSet::with_capacity(task_names.len()),
+ running: HashSet::with_capacity(self.concurrency),
+ task_names,
+ };
+
+ let mut queue = futures_unordered::FuturesUnordered::new();
+
+ while context.has_remaining_tasks() {
+ while queue.len() < self.concurrency {
+ if let Some(task) = context.get_next_task(self) {
+ queue.push(task);
+ } else {
+ break;
+ }
}
- },
- None => {
- if task_flags.is_run {
- return Err(anyhow!("Task not found: {}", task_name));
+
+ // If queue is empty at this point, then there are no more tasks in the queue.
+ let Some(result) = queue.next().await else {
+ debug_assert_eq!(context.task_names.len(), 0);
+ break;
+ };
+
+ let (exit_code, name) = result?;
+ if exit_code > 0 {
+ return Ok(exit_code);
}
- log::error!("Task not found: {}", task_name);
- if log::log_enabled!(log::Level::Error) {
- print_available_tasks(
- &mut std::io::stderr(),
- &cli_options.start_dir,
- &tasks_config,
- )?;
+
+ context.mark_complete(name);
+ }
+
+ Ok(0)
+ }
+
+ fn get_task(
+ &self,
+ task_name: &str,
+ ) -> Result<(&Url, TaskOrScript), TaskError> {
+ let Some(result) = self.tasks_config.task(task_name) else {
+ return Err(TaskError::NotFound(task_name.to_string()));
+ };
+
+ Ok(result)
+ }
+
+ async fn run_task_no_dependencies(
+ &self,
+ task_name: &String,
+ ) -> Result<i32, deno_core::anyhow::Error> {
+ let (dir_url, task_or_script) = self.get_task(task_name.as_str()).unwrap();
+
+ match task_or_script {
+ TaskOrScript::Task(_tasks, definition) => {
+ self.run_deno_task(dir_url, task_name, definition).await
+ }
+ TaskOrScript::Script(scripts, _script) => {
+ self.run_npm_script(dir_url, task_name, scripts).await
}
- Ok(1)
}
}
-}
-struct RunTaskOptions<'a> {
- task_name: &'a str,
- script: &'a str,
- cwd: &'a Path,
- env_vars: HashMap<String, String>,
- custom_commands: HashMap<String, Rc<dyn ShellCommand>>,
- npm_resolver: &'a dyn CliNpmResolver,
- cli_options: &'a CliOptions,
-}
+ async fn run_deno_task(
+ &self,
+ dir_url: &Url,
+ task_name: &String,
+ definition: &TaskDefinition,
+ ) -> Result<i32, deno_core::anyhow::Error> {
+ let cwd = match &self.task_flags.cwd {
+ Some(path) => canonicalize_path(&PathBuf::from(path))
+ .context("failed canonicalizing --cwd")?,
+ None => normalize_path(dir_url.to_file_path().unwrap()),
+ };
-async fn run_task(opts: RunTaskOptions<'_>) -> Result<i32, AnyError> {
- let RunTaskOptions {
- task_name,
- script,
- cwd,
- env_vars,
- custom_commands,
- npm_resolver,
- cli_options,
- } = opts;
+ let custom_commands = task_runner::resolve_custom_commands(
+ self.npm_resolver,
+ self.node_resolver,
+ )?;
+ self
+ .run_single(RunSingleOptions {
+ task_name,
+ script: &definition.command,
+ cwd: &cwd,
+ custom_commands,
+ })
+ .await
+ }
- output_task(
- opts.task_name,
- &task_runner::get_script_with_args(script, cli_options.argv()),
- );
+ async fn run_npm_script(
+ &self,
+ dir_url: &Url,
+ task_name: &String,
+ scripts: &IndexMap<String, String>,
+ ) -> Result<i32, deno_core::anyhow::Error> {
+ // ensure the npm packages are installed if using a managed resolver
+ if let Some(npm_resolver) = self.npm_resolver.as_managed() {
+ npm_resolver.ensure_top_level_package_json_install().await?;
+ }
+
+ let cwd = match &self.task_flags.cwd {
+ Some(path) => canonicalize_path(&PathBuf::from(path))?,
+ None => normalize_path(dir_url.to_file_path().unwrap()),
+ };
- Ok(
- task_runner::run_task(task_runner::RunTaskOptions {
+ // At this point we already checked if the task name exists in package.json.
+ // We can therefore check for "pre" and "post" scripts too, since we're only
+ // dealing with package.json here and not deno.json
+ let task_names = vec![
+ format!("pre{}", task_name),
+ task_name.clone(),
+ format!("post{}", task_name),
+ ];
+ let custom_commands = task_runner::resolve_custom_commands(
+ self.npm_resolver,
+ self.node_resolver,
+ )?;
+ for task_name in &task_names {
+ if let Some(script) = scripts.get(task_name) {
+ let exit_code = self
+ .run_single(RunSingleOptions {
+ task_name,
+ script,
+ cwd: &cwd,
+ custom_commands: custom_commands.clone(),
+ })
+ .await?;
+ if exit_code > 0 {
+ return Ok(exit_code);
+ }
+ }
+ }
+
+ Ok(0)
+ }
+
+ async fn run_single(
+ &self,
+ opts: RunSingleOptions<'_>,
+ ) -> Result<i32, AnyError> {
+ let RunSingleOptions {
task_name,
script,
cwd,
- env_vars,
custom_commands,
- init_cwd: opts.cli_options.initial_cwd(),
- argv: cli_options.argv(),
- root_node_modules_dir: npm_resolver.root_node_modules_path(),
- stdio: None,
- })
- .await?
- .exit_code,
- )
+ } = opts;
+
+ output_task(
+ opts.task_name,
+ &task_runner::get_script_with_args(script, self.cli_options.argv()),
+ );
+
+ Ok(
+ task_runner::run_task(task_runner::RunTaskOptions {
+ task_name,
+ script,
+ cwd,
+ env_vars: self.env_vars.clone(),
+ custom_commands,
+ init_cwd: self.cli_options.initial_cwd(),
+ argv: self.cli_options.argv(),
+ root_node_modules_dir: self.npm_resolver.root_node_modules_path(),
+ stdio: None,
+ })
+ .await?
+ .exit_code,
+ )
+ }
+}
+
+#[derive(Debug)]
+enum TaskError {
+ NotFound(String),
+ TaskDepCycle { path: Vec<String> },
+}
+
+fn sort_tasks_topo(
+ name: &str,
+ task_config: &WorkspaceTasksConfig,
+) -> Result<Vec<String>, TaskError> {
+ fn sort_visit<'a>(
+ name: &'a str,
+ sorted: &mut Vec<String>,
+ mut path: Vec<&'a str>,
+ tasks_config: &'a WorkspaceTasksConfig,
+ ) -> Result<(), TaskError> {
+ // Already sorted
+ if sorted.iter().any(|sorted_name| sorted_name == name) {
+ return Ok(());
+ }
+
+ // Graph has a cycle
+ if path.contains(&name) {
+ path.push(name);
+ return Err(TaskError::TaskDepCycle {
+ path: path.iter().map(|s| s.to_string()).collect(),
+ });
+ }
+
+ let Some(def) = tasks_config.task(name) else {
+ return Err(TaskError::NotFound(name.to_string()));
+ };
+
+ if let TaskOrScript::Task(_, actual_def) = def.1 {
+ for dep in &actual_def.dependencies {
+ let mut path = path.clone();
+ path.push(name);
+ sort_visit(dep, sorted, path, tasks_config)?
+ }
+ }
+
+ sorted.push(name.to_string());
+
+ Ok(())
+ }
+
+ let mut sorted: Vec<String> = vec![];
+
+ sort_visit(name, &mut sorted, Vec::new(), task_config)?;
+
+ Ok(sorted)
}
fn output_task(task_name: &str, script: &str) {
@@ -222,80 +450,97 @@ fn print_available_tasks(
" {}",
colors::red("No tasks found in configuration file")
)?;
- } else {
- let mut seen_task_names =
- HashSet::with_capacity(tasks_config.tasks_count());
- for maybe_config in [&tasks_config.member, &tasks_config.root] {
- let Some(config) = maybe_config else {
- continue;
- };
- for (is_root, is_deno, (key, task)) in config
- .deno_json
- .as_ref()
- .map(|config| {
- let is_root = !is_cwd_root_dir
- && config.folder_url
- == *workspace_dir.workspace.root_dir().as_ref();
- config
- .tasks
- .iter()
- .map(move |(k, t)| (is_root, true, (k, Cow::Borrowed(t))))
- })
- .into_iter()
- .flatten()
- .chain(
- config
- .package_json
- .as_ref()
- .map(|config| {
- let is_root = !is_cwd_root_dir
- && config.folder_url
- == *workspace_dir.workspace.root_dir().as_ref();
- config.tasks.iter().map(move |(k, v)| {
- (is_root, false, (k, Cow::Owned(Task::Definition(v.clone()))))
- })
- })
- .into_iter()
- .flatten(),
- )
- {
- if !seen_task_names.insert(key) {
+ return Ok(());
+ }
+
+ struct AvailableTaskDescription {
+ is_root: bool,
+ is_deno: bool,
+ name: String,
+ task: TaskDefinition,
+ }
+ let mut seen_task_names = HashSet::with_capacity(tasks_config.tasks_count());
+ let mut task_descriptions = Vec::with_capacity(tasks_config.tasks_count());
+
+ for maybe_config in [&tasks_config.member, &tasks_config.root] {
+ let Some(config) = maybe_config else {
+ continue;
+ };
+
+ if let Some(config) = config.deno_json.as_ref() {
+ let is_root = !is_cwd_root_dir
+ && config.folder_url == *workspace_dir.workspace.root_dir().as_ref();
+
+ for (name, definition) in &config.tasks {
+ if !seen_task_names.insert(name) {
continue; // already seen
}
- writeln!(
- writer,
- "- {}{}",
- colors::cyan(key),
- if is_root {
- if is_deno {
- format!(" {}", colors::italic_gray("(workspace)"))
- } else {
- format!(" {}", colors::italic_gray("(workspace package.json)"))
- }
- } else if is_deno {
- "".to_string()
- } else {
- format!(" {}", colors::italic_gray("(package.json)"))
- }
- )?;
- let definition = match task.as_ref() {
- Task::Definition(definition) => definition,
- Task::Commented { definition, .. } => definition,
- };
- if let Task::Commented { comments, .. } = task.as_ref() {
- let slash_slash = colors::italic_gray("//");
- for comment in comments {
- writeln!(
- writer,
- " {slash_slash} {}",
- colors::italic_gray(comment)
- )?;
- }
+ task_descriptions.push(AvailableTaskDescription {
+ is_root,
+ is_deno: true,
+ name: name.to_string(),
+ task: definition.clone(),
+ });
+ }
+ }
+
+ if let Some(config) = config.package_json.as_ref() {
+ let is_root = !is_cwd_root_dir
+ && config.folder_url == *workspace_dir.workspace.root_dir().as_ref();
+ for (name, script) in &config.tasks {
+ if !seen_task_names.insert(name) {
+ continue; // already seen
}
- writeln!(writer, " {definition}")?;
+
+ task_descriptions.push(AvailableTaskDescription {
+ is_root,
+ is_deno: false,
+ name: name.to_string(),
+ task: deno_config::deno_json::TaskDefinition {
+ command: script.to_string(),
+ dependencies: vec![],
+ description: None,
+ },
+ });
}
}
}
+ for desc in task_descriptions {
+ writeln!(
+ writer,
+ "- {}{}",
+ colors::cyan(desc.name),
+ if desc.is_root {
+ if desc.is_deno {
+ format!(" {}", colors::italic_gray("(workspace)"))
+ } else {
+ format!(" {}", colors::italic_gray("(workspace package.json)"))
+ }
+ } else if desc.is_deno {
+ "".to_string()
+ } else {
+ format!(" {}", colors::italic_gray("(package.json)"))
+ }
+ )?;
+ if let Some(description) = &desc.task.description {
+ let slash_slash = colors::italic_gray("//");
+ writeln!(
+ writer,
+ " {slash_slash} {}",
+ colors::italic_gray(description)
+ )?;
+ }
+ writeln!(writer, " {}", desc.task.command)?;
+ if !desc.task.dependencies.is_empty() {
+ writeln!(
+ writer,
+ " {} {}",
+ colors::gray("depends on:"),
+ colors::cyan(desc.task.dependencies.join(", "))
+ )?;
+ }
+ }
+
Ok(())
}