diff options
author | David Sherret <dsherret@users.noreply.github.com> | 2024-11-19 07:45:09 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-19 12:45:09 +0000 |
commit | 661aa22c03f829489e2d9289dee0a8292a95227a (patch) | |
tree | b16f4a37f2f221540f1b22f986f5c58eb19cc913 /cli | |
parent | 069bc15030225393f7d05521505316066464bdbd (diff) |
feat(task): dependencies (#26467)
This commit adds support for "dependencies" in `deno task` subcommand:
```jsonc
{
"tasks": {
"build": "deno run -RW build.ts",
"generate": "deno run -RW generate.ts",
"serve": {
"command": "deno run -RN server.ts",
"dependencies": ["build", "generate"]
}
}
}
```
Executing `deno task serve` will first execute `build` and `generate`
tasks (in parallel) and once both complete the `serve` task will be executed.
Number of tasks run in parallel is equal to the no of cores on the
machine, and respects `DENO_JOBS` env var if one is specified.
Part of https://github.com/denoland/deno/issues/26462
---------
Co-authored-by: Bartek IwaĆczuk <biwanczuk@gmail.com>
Co-authored-by: Marvin Hagemeister <marvin@deno.com>
Diffstat (limited to 'cli')
-rw-r--r-- | cli/schemas/config-file.v1.json | 7 | ||||
-rw-r--r-- | cli/tools/task.rs | 229 |
2 files changed, 222 insertions, 14 deletions
diff --git a/cli/schemas/config-file.v1.json b/cli/schemas/config-file.v1.json index 56a8090f9..3ba803ef8 100644 --- a/cli/schemas/config-file.v1.json +++ b/cli/schemas/config-file.v1.json @@ -448,6 +448,13 @@ "type": "string", "required": true, "description": "The task to execute" + }, + "dependencies": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Tasks that should be executed before this task" } } } diff --git a/cli/tools/task.rs b/cli/tools/task.rs index a13efbaf4..682dbf814 100644 --- a/cli/tools/task.rs +++ b/cli/tools/task.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::collections::HashSet; +use std::num::NonZeroUsize; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; @@ -15,6 +16,10 @@ use deno_core::anyhow::anyhow; use deno_core::anyhow::bail; use deno_core::anyhow::Context; use deno_core::error::AnyError; +use deno_core::futures::future::LocalBoxFuture; +use deno_core::futures::stream::futures_unordered; +use deno_core::futures::FutureExt; +use deno_core::futures::StreamExt; use deno_core::url::Url; use deno_path_util::normalize_path; use deno_runtime::deno_node::NodeResolver; @@ -68,6 +73,13 @@ pub async fn execute_script( let node_resolver = factory.node_resolver().await?; let env_vars = task_runner::real_env_vars(); + let no_of_concurrent_tasks = if let Ok(value) = std::env::var("DENO_JOBS") { + value.parse::<NonZeroUsize>().ok() + } else { + std::thread::available_parallelism().ok() + } + .unwrap_or_else(|| NonZeroUsize::new(2).unwrap()); + let task_runner = TaskRunner { tasks_config, task_flags: &task_flags, @@ -75,7 +87,9 @@ pub async fn execute_script( node_resolver: node_resolver.as_ref(), env_vars, cli_options, + concurrency: no_of_concurrent_tasks.into(), }; + task_runner.run_task(task_name).await } @@ -93,30 +107,156 @@ struct TaskRunner<'a> { node_resolver: &'a NodeResolver, env_vars: HashMap<String, String>, cli_options: &'a CliOptions, + concurrency: usize, } impl<'a> TaskRunner<'a> { - async fn run_task( + pub async fn run_task( &self, - task_name: &String, + task_name: &str, ) -> Result<i32, deno_core::anyhow::Error> { - let Some((dir_url, task_or_script)) = self.tasks_config.task(task_name) - else { - if self.task_flags.is_run { - return Err(anyhow!("Task not found: {}", task_name)); + match sort_tasks_topo(task_name, &self.tasks_config) { + Ok(sorted) => self.run_tasks_in_parallel(sorted).await, + Err(err) => match err { + TaskError::NotFound(name) => { + if self.task_flags.is_run { + return Err(anyhow!("Task not found: {}", name)); + } + + log::error!("Task not found: {}", name); + if log::log_enabled!(log::Level::Error) { + self.print_available_tasks()?; + } + Ok(1) + } + TaskError::TaskDepCycle { path } => { + log::error!("Task cycle detected: {}", path.join(" -> ")); + Ok(1) + } + }, + } + } + + pub fn print_available_tasks(&self) -> Result<(), std::io::Error> { + print_available_tasks( + &mut std::io::stderr(), + &self.cli_options.start_dir, + &self.tasks_config, + ) + } + + async fn run_tasks_in_parallel( + &self, + task_names: Vec<String>, + ) -> Result<i32, deno_core::anyhow::Error> { + struct PendingTasksContext { + completed: HashSet<String>, + running: HashSet<String>, + task_names: Vec<String>, + } + + impl PendingTasksContext { + fn has_remaining_tasks(&self) -> bool { + self.completed.len() < self.task_names.len() } - log::error!("Task not found: {}", task_name); - if log::log_enabled!(log::Level::Error) { - print_available_tasks( - &mut std::io::stderr(), - &self.cli_options.start_dir, - &self.tasks_config, - )?; + fn mark_complete(&mut self, task_name: String) { + self.running.remove(&task_name); + self.completed.insert(task_name); } - return Ok(1); + + fn get_next_task<'a>( + &mut self, + runner: &'a TaskRunner<'a>, + ) -> Option<LocalBoxFuture<'a, Result<(i32, String), AnyError>>> { + for name in &self.task_names { + if self.completed.contains(name) || self.running.contains(name) { + continue; + } + + let should_run = if let Ok((_, def)) = runner.get_task(name) { + match def { + TaskOrScript::Task(_, def) => def + .dependencies + .iter() + .all(|dep| self.completed.contains(dep)), + TaskOrScript::Script(_, _) => true, + } + } else { + false + }; + + if !should_run { + continue; + } + + self.running.insert(name.clone()); + let name = name.clone(); + return Some( + async move { + runner + .run_task_no_dependencies(&name) + .await + .map(|exit_code| (exit_code, name)) + } + .boxed_local(), + ); + } + None + } + } + + let mut context = PendingTasksContext { + completed: HashSet::with_capacity(task_names.len()), + running: HashSet::with_capacity(self.concurrency), + task_names, + }; + + let mut queue = futures_unordered::FuturesUnordered::new(); + + while context.has_remaining_tasks() { + while queue.len() < self.concurrency { + if let Some(task) = context.get_next_task(self) { + queue.push(task); + } else { + break; + } + } + + // If queue is empty at this point, then there are no more tasks in the queue. + let Some(result) = queue.next().await else { + debug_assert_eq!(context.task_names.len(), 0); + break; + }; + + let (exit_code, name) = result?; + if exit_code > 0 { + return Ok(exit_code); + } + + context.mark_complete(name); + } + + Ok(0) + } + + fn get_task( + &self, + task_name: &str, + ) -> Result<(&Url, TaskOrScript), TaskError> { + let Some(result) = self.tasks_config.task(task_name) else { + return Err(TaskError::NotFound(task_name.to_string())); }; + Ok(result) + } + + async fn run_task_no_dependencies( + &self, + task_name: &String, + ) -> Result<i32, deno_core::anyhow::Error> { + let (dir_url, task_or_script) = self.get_task(task_name.as_str()).unwrap(); + match task_or_script { TaskOrScript::Task(_tasks, definition) => { self.run_deno_task(dir_url, task_name, definition).await @@ -234,6 +374,59 @@ impl<'a> TaskRunner<'a> { } } +#[derive(Debug)] +enum TaskError { + NotFound(String), + TaskDepCycle { path: Vec<String> }, +} + +fn sort_tasks_topo( + name: &str, + task_config: &WorkspaceTasksConfig, +) -> Result<Vec<String>, TaskError> { + fn sort_visit<'a>( + name: &'a str, + sorted: &mut Vec<String>, + mut path: Vec<&'a str>, + tasks_config: &'a WorkspaceTasksConfig, + ) -> Result<(), TaskError> { + // Already sorted + if sorted.iter().any(|sorted_name| sorted_name == name) { + return Ok(()); + } + + // Graph has a cycle + if path.contains(&name) { + path.push(name); + return Err(TaskError::TaskDepCycle { + path: path.iter().map(|s| s.to_string()).collect(), + }); + } + + let Some(def) = tasks_config.task(name) else { + return Err(TaskError::NotFound(name.to_string())); + }; + + if let TaskOrScript::Task(_, actual_def) = def.1 { + for dep in &actual_def.dependencies { + let mut path = path.clone(); + path.push(name); + sort_visit(dep, sorted, path, tasks_config)? + } + } + + sorted.push(name.to_string()); + + Ok(()) + } + + let mut sorted: Vec<String> = vec![]; + + sort_visit(name, &mut sorted, Vec::new(), task_config)?; + + Ok(sorted) +} + fn output_task(task_name: &str, script: &str) { log::info!( "{} {} {}", @@ -339,6 +532,14 @@ fn print_available_tasks( )?; } writeln!(writer, " {}", desc.task.command)?; + if !desc.task.dependencies.is_empty() { + writeln!( + writer, + " {} {}", + colors::gray("depends on:"), + colors::cyan(desc.task.dependencies.join(", ")) + )?; + } } Ok(()) |