summaryrefslogtreecommitdiff
path: root/cli
diff options
context:
space:
mode:
Diffstat (limited to 'cli')
-rw-r--r--cli/schemas/config-file.v1.json7
-rw-r--r--cli/tools/task.rs229
2 files changed, 222 insertions, 14 deletions
diff --git a/cli/schemas/config-file.v1.json b/cli/schemas/config-file.v1.json
index 56a8090f9..3ba803ef8 100644
--- a/cli/schemas/config-file.v1.json
+++ b/cli/schemas/config-file.v1.json
@@ -448,6 +448,13 @@
"type": "string",
"required": true,
"description": "The task to execute"
+ },
+ "dependencies": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "description": "Tasks that should be executed before this task"
}
}
}
diff --git a/cli/tools/task.rs b/cli/tools/task.rs
index a13efbaf4..682dbf814 100644
--- a/cli/tools/task.rs
+++ b/cli/tools/task.rs
@@ -2,6 +2,7 @@
use std::collections::HashMap;
use std::collections::HashSet;
+use std::num::NonZeroUsize;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
@@ -15,6 +16,10 @@ use deno_core::anyhow::anyhow;
use deno_core::anyhow::bail;
use deno_core::anyhow::Context;
use deno_core::error::AnyError;
+use deno_core::futures::future::LocalBoxFuture;
+use deno_core::futures::stream::futures_unordered;
+use deno_core::futures::FutureExt;
+use deno_core::futures::StreamExt;
use deno_core::url::Url;
use deno_path_util::normalize_path;
use deno_runtime::deno_node::NodeResolver;
@@ -68,6 +73,13 @@ pub async fn execute_script(
let node_resolver = factory.node_resolver().await?;
let env_vars = task_runner::real_env_vars();
+ let no_of_concurrent_tasks = if let Ok(value) = std::env::var("DENO_JOBS") {
+ value.parse::<NonZeroUsize>().ok()
+ } else {
+ std::thread::available_parallelism().ok()
+ }
+ .unwrap_or_else(|| NonZeroUsize::new(2).unwrap());
+
let task_runner = TaskRunner {
tasks_config,
task_flags: &task_flags,
@@ -75,7 +87,9 @@ pub async fn execute_script(
node_resolver: node_resolver.as_ref(),
env_vars,
cli_options,
+ concurrency: no_of_concurrent_tasks.into(),
};
+
task_runner.run_task(task_name).await
}
@@ -93,30 +107,156 @@ struct TaskRunner<'a> {
node_resolver: &'a NodeResolver,
env_vars: HashMap<String, String>,
cli_options: &'a CliOptions,
+ concurrency: usize,
}
impl<'a> TaskRunner<'a> {
- async fn run_task(
+ pub async fn run_task(
&self,
- task_name: &String,
+ task_name: &str,
) -> Result<i32, deno_core::anyhow::Error> {
- let Some((dir_url, task_or_script)) = self.tasks_config.task(task_name)
- else {
- if self.task_flags.is_run {
- return Err(anyhow!("Task not found: {}", task_name));
+ match sort_tasks_topo(task_name, &self.tasks_config) {
+ Ok(sorted) => self.run_tasks_in_parallel(sorted).await,
+ Err(err) => match err {
+ TaskError::NotFound(name) => {
+ if self.task_flags.is_run {
+ return Err(anyhow!("Task not found: {}", name));
+ }
+
+ log::error!("Task not found: {}", name);
+ if log::log_enabled!(log::Level::Error) {
+ self.print_available_tasks()?;
+ }
+ Ok(1)
+ }
+ TaskError::TaskDepCycle { path } => {
+ log::error!("Task cycle detected: {}", path.join(" -> "));
+ Ok(1)
+ }
+ },
+ }
+ }
+
+ pub fn print_available_tasks(&self) -> Result<(), std::io::Error> {
+ print_available_tasks(
+ &mut std::io::stderr(),
+ &self.cli_options.start_dir,
+ &self.tasks_config,
+ )
+ }
+
+ async fn run_tasks_in_parallel(
+ &self,
+ task_names: Vec<String>,
+ ) -> Result<i32, deno_core::anyhow::Error> {
+ struct PendingTasksContext {
+ completed: HashSet<String>,
+ running: HashSet<String>,
+ task_names: Vec<String>,
+ }
+
+ impl PendingTasksContext {
+ fn has_remaining_tasks(&self) -> bool {
+ self.completed.len() < self.task_names.len()
}
- log::error!("Task not found: {}", task_name);
- if log::log_enabled!(log::Level::Error) {
- print_available_tasks(
- &mut std::io::stderr(),
- &self.cli_options.start_dir,
- &self.tasks_config,
- )?;
+ fn mark_complete(&mut self, task_name: String) {
+ self.running.remove(&task_name);
+ self.completed.insert(task_name);
}
- return Ok(1);
+
+ fn get_next_task<'a>(
+ &mut self,
+ runner: &'a TaskRunner<'a>,
+ ) -> Option<LocalBoxFuture<'a, Result<(i32, String), AnyError>>> {
+ for name in &self.task_names {
+ if self.completed.contains(name) || self.running.contains(name) {
+ continue;
+ }
+
+ let should_run = if let Ok((_, def)) = runner.get_task(name) {
+ match def {
+ TaskOrScript::Task(_, def) => def
+ .dependencies
+ .iter()
+ .all(|dep| self.completed.contains(dep)),
+ TaskOrScript::Script(_, _) => true,
+ }
+ } else {
+ false
+ };
+
+ if !should_run {
+ continue;
+ }
+
+ self.running.insert(name.clone());
+ let name = name.clone();
+ return Some(
+ async move {
+ runner
+ .run_task_no_dependencies(&name)
+ .await
+ .map(|exit_code| (exit_code, name))
+ }
+ .boxed_local(),
+ );
+ }
+ None
+ }
+ }
+
+ let mut context = PendingTasksContext {
+ completed: HashSet::with_capacity(task_names.len()),
+ running: HashSet::with_capacity(self.concurrency),
+ task_names,
+ };
+
+ let mut queue = futures_unordered::FuturesUnordered::new();
+
+ while context.has_remaining_tasks() {
+ while queue.len() < self.concurrency {
+ if let Some(task) = context.get_next_task(self) {
+ queue.push(task);
+ } else {
+ break;
+ }
+ }
+
+ // If queue is empty at this point, then there are no more tasks in the queue.
+ let Some(result) = queue.next().await else {
+ debug_assert_eq!(context.task_names.len(), 0);
+ break;
+ };
+
+ let (exit_code, name) = result?;
+ if exit_code > 0 {
+ return Ok(exit_code);
+ }
+
+ context.mark_complete(name);
+ }
+
+ Ok(0)
+ }
+
+ fn get_task(
+ &self,
+ task_name: &str,
+ ) -> Result<(&Url, TaskOrScript), TaskError> {
+ let Some(result) = self.tasks_config.task(task_name) else {
+ return Err(TaskError::NotFound(task_name.to_string()));
};
+ Ok(result)
+ }
+
+ async fn run_task_no_dependencies(
+ &self,
+ task_name: &String,
+ ) -> Result<i32, deno_core::anyhow::Error> {
+ let (dir_url, task_or_script) = self.get_task(task_name.as_str()).unwrap();
+
match task_or_script {
TaskOrScript::Task(_tasks, definition) => {
self.run_deno_task(dir_url, task_name, definition).await
@@ -234,6 +374,59 @@ impl<'a> TaskRunner<'a> {
}
}
+#[derive(Debug)]
+enum TaskError {
+ NotFound(String),
+ TaskDepCycle { path: Vec<String> },
+}
+
+fn sort_tasks_topo(
+ name: &str,
+ task_config: &WorkspaceTasksConfig,
+) -> Result<Vec<String>, TaskError> {
+ fn sort_visit<'a>(
+ name: &'a str,
+ sorted: &mut Vec<String>,
+ mut path: Vec<&'a str>,
+ tasks_config: &'a WorkspaceTasksConfig,
+ ) -> Result<(), TaskError> {
+ // Already sorted
+ if sorted.iter().any(|sorted_name| sorted_name == name) {
+ return Ok(());
+ }
+
+ // Graph has a cycle
+ if path.contains(&name) {
+ path.push(name);
+ return Err(TaskError::TaskDepCycle {
+ path: path.iter().map(|s| s.to_string()).collect(),
+ });
+ }
+
+ let Some(def) = tasks_config.task(name) else {
+ return Err(TaskError::NotFound(name.to_string()));
+ };
+
+ if let TaskOrScript::Task(_, actual_def) = def.1 {
+ for dep in &actual_def.dependencies {
+ let mut path = path.clone();
+ path.push(name);
+ sort_visit(dep, sorted, path, tasks_config)?
+ }
+ }
+
+ sorted.push(name.to_string());
+
+ Ok(())
+ }
+
+ let mut sorted: Vec<String> = vec![];
+
+ sort_visit(name, &mut sorted, Vec::new(), task_config)?;
+
+ Ok(sorted)
+}
+
fn output_task(task_name: &str, script: &str) {
log::info!(
"{} {} {}",
@@ -339,6 +532,14 @@ fn print_available_tasks(
)?;
}
writeln!(writer, " {}", desc.task.command)?;
+ if !desc.task.dependencies.is_empty() {
+ writeln!(
+ writer,
+ " {} {}",
+ colors::gray("depends on:"),
+ colors::cyan(desc.task.dependencies.join(", "))
+ )?;
+ }
}
Ok(())