summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-10-19 07:05:00 +0200
committerGitHub <noreply@github.com>2023-10-19 07:05:00 +0200
commit8d9fef3b8955eadfd4820455b422b5bec1cdad0a (patch)
tree7114c3e3c329c83ce1d9c90e8fab5c7104bd1579
parent5095af78018f1d77374bc06cbb58231e631056b9 (diff)
refactor: add WatcherCommunicator helper struct (#20927)
This commit introduces "WatcherCommunicator" struct that is used facilitate bi-directional communication between CLI file watcher and the watched function. Prerequisite for https://github.com/denoland/deno/pull/20876
-rw-r--r--cli/factory.rs39
-rw-r--r--cli/graph_util.rs12
-rw-r--r--cli/tools/bench/mod.rs9
-rw-r--r--cli/tools/bundle.rs7
-rw-r--r--cli/tools/fmt.rs4
-rw-r--r--cli/tools/lint.rs4
-rw-r--r--cli/tools/run.rs7
-rw-r--r--cli/tools/test/mod.rs9
-rw-r--r--cli/util/file_watcher.rs190
9 files changed, 191 insertions, 90 deletions
diff --git a/cli/factory.rs b/cli/factory.rs
index e4f9b60fe..2841482f8 100644
--- a/cli/factory.rs
+++ b/cli/factory.rs
@@ -40,6 +40,7 @@ use crate::resolver::CliGraphResolver;
use crate::resolver::CliGraphResolverOptions;
use crate::standalone::DenoCompileBinaryWriter;
use crate::tools::check::TypeChecker;
+use crate::util::file_watcher::WatcherCommunicator;
use crate::util::progress_bar::ProgressBar;
use crate::util::progress_bar::ProgressBarStyle;
use crate::worker::CliMainWorkerFactory;
@@ -59,26 +60,18 @@ use deno_runtime::inspector_server::InspectorServer;
use deno_semver::npm::NpmPackageReqReference;
use import_map::ImportMap;
use log::warn;
-use std::cell::RefCell;
use std::future::Future;
-use std::path::PathBuf;
use std::sync::Arc;
pub struct CliFactoryBuilder {
- maybe_sender: Option<tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>>,
+ watcher_communicator: Option<WatcherCommunicator>,
}
impl CliFactoryBuilder {
pub fn new() -> Self {
- Self { maybe_sender: None }
- }
-
- pub fn with_watcher(
- mut self,
- sender: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>,
- ) -> Self {
- self.maybe_sender = Some(sender);
- self
+ Self {
+ watcher_communicator: None,
+ }
}
pub async fn build_from_flags(
@@ -88,9 +81,18 @@ impl CliFactoryBuilder {
Ok(self.build_from_cli_options(Arc::new(CliOptions::from_flags(flags)?)))
}
+ pub async fn build_from_flags_for_watcher(
+ mut self,
+ flags: Flags,
+ watcher_communicator: WatcherCommunicator,
+ ) -> Result<CliFactory, AnyError> {
+ self.watcher_communicator = Some(watcher_communicator);
+ self.build_from_flags(flags).await
+ }
+
pub fn build_from_cli_options(self, options: Arc<CliOptions>) -> CliFactory {
CliFactory {
- maybe_sender: RefCell::new(self.maybe_sender),
+ watcher_communicator: self.watcher_communicator,
options,
services: Default::default(),
}
@@ -166,8 +168,7 @@ struct CliFactoryServices {
}
pub struct CliFactory {
- maybe_sender:
- RefCell<Option<tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>>>,
+ watcher_communicator: Option<WatcherCommunicator>,
options: Arc<CliOptions>,
services: CliFactoryServices,
}
@@ -384,11 +385,14 @@ impl CliFactory {
}
pub fn maybe_file_watcher_reporter(&self) -> &Option<FileWatcherReporter> {
- let maybe_sender = self.maybe_sender.borrow_mut().take();
+ let maybe_file_watcher_reporter = self
+ .watcher_communicator
+ .as_ref()
+ .map(|i| FileWatcherReporter::new(i.clone()));
self
.services
.maybe_file_watcher_reporter
- .get_or_init(|| maybe_sender.map(FileWatcherReporter::new))
+ .get_or_init(|| maybe_file_watcher_reporter)
}
pub fn emit_cache(&self) -> Result<&EmitCache, AnyError> {
@@ -595,6 +599,7 @@ impl CliFactory {
let npm_resolver = self.npm_resolver().await?;
let fs = self.fs();
let cli_node_resolver = self.cli_node_resolver().await?;
+
Ok(CliMainWorkerFactory::new(
StorageKeyResolver::from_options(&self.options),
npm_resolver.clone(),
diff --git a/cli/graph_util.rs b/cli/graph_util.rs
index 17437ca99..b90581a14 100644
--- a/cli/graph_util.rs
+++ b/cli/graph_util.rs
@@ -13,6 +13,7 @@ use crate::npm::CliNpmResolver;
use crate::resolver::CliGraphResolver;
use crate::tools::check;
use crate::tools::check::TypeChecker;
+use crate::util::file_watcher::WatcherCommunicator;
use crate::util::sync::TaskQueue;
use crate::util::sync::TaskQueuePermit;
@@ -635,14 +636,14 @@ impl<'a> ModuleGraphUpdatePermit<'a> {
#[derive(Clone, Debug)]
pub struct FileWatcherReporter {
- sender: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>,
+ watcher_communicator: WatcherCommunicator,
file_paths: Arc<Mutex<Vec<PathBuf>>>,
}
impl FileWatcherReporter {
- pub fn new(sender: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>) -> Self {
+ pub fn new(watcher_communicator: WatcherCommunicator) -> Self {
Self {
- sender,
+ watcher_communicator,
file_paths: Default::default(),
}
}
@@ -665,7 +666,10 @@ impl deno_graph::source::Reporter for FileWatcherReporter {
}
if modules_done == modules_total {
- self.sender.send(file_paths.drain(..).collect()).unwrap();
+ self
+ .watcher_communicator
+ .watch_paths(file_paths.drain(..).collect())
+ .unwrap();
}
}
}
diff --git a/cli/tools/bench/mod.rs b/cli/tools/bench/mod.rs
index 454a97126..eb400442e 100644
--- a/cli/tools/bench/mod.rs
+++ b/cli/tools/bench/mod.rs
@@ -417,19 +417,18 @@ pub async fn run_benchmarks_with_watch(
.map(|w| !w.no_clear_screen)
.unwrap_or(true),
},
- move |flags, sender, changed_paths| {
+ move |flags, watcher_communicator, changed_paths| {
let bench_flags = bench_flags.clone();
Ok(async move {
let factory = CliFactoryBuilder::new()
- .with_watcher(sender.clone())
- .build_from_flags(flags)
+ .build_from_flags_for_watcher(flags, watcher_communicator.clone())
.await?;
let cli_options = factory.cli_options();
let bench_options = cli_options.resolve_bench_options(bench_flags)?;
- let _ = sender.send(cli_options.watch_paths());
+ let _ = watcher_communicator.watch_paths(cli_options.watch_paths());
if let Some(include) = &bench_options.files.include {
- let _ = sender.send(include.clone());
+ let _ = watcher_communicator.watch_paths(include.clone());
}
let graph_kind = cli_options.type_check_mode().as_graph_kind();
diff --git a/cli/tools/bundle.rs b/cli/tools/bundle.rs
index 827641b1b..cbde8768f 100644
--- a/cli/tools/bundle.rs
+++ b/cli/tools/bundle.rs
@@ -35,15 +35,14 @@ pub async fn bundle(
job_name: "Bundle".to_string(),
clear_screen: !watch_flags.no_clear_screen,
},
- move |flags, sender, _changed_paths| {
+ move |flags, watcher_communicator, _changed_paths| {
let bundle_flags = bundle_flags.clone();
Ok(async move {
let factory = CliFactoryBuilder::new()
- .with_watcher(sender.clone())
- .build_from_flags(flags)
+ .build_from_flags_for_watcher(flags, watcher_communicator.clone())
.await?;
let cli_options = factory.cli_options();
- let _ = sender.send(cli_options.watch_paths());
+ let _ = watcher_communicator.watch_paths(cli_options.watch_paths());
bundle_action(factory, &bundle_flags).await?;
Ok(())
diff --git a/cli/tools/fmt.rs b/cli/tools/fmt.rs
index 284f20dda..b9525b7b2 100644
--- a/cli/tools/fmt.rs
+++ b/cli/tools/fmt.rs
@@ -68,7 +68,7 @@ pub async fn format(flags: Flags, fmt_flags: FmtFlags) -> Result<(), AnyError> {
job_name: "Fmt".to_string(),
clear_screen: !watch_flags.no_clear_screen,
},
- move |flags, sender, changed_paths| {
+ move |flags, watcher_communicator, changed_paths| {
let fmt_flags = fmt_flags.clone();
Ok(async move {
let factory = CliFactory::from_flags(flags).await?;
@@ -82,7 +82,7 @@ pub async fn format(flags: Flags, fmt_flags: FmtFlags) -> Result<(), AnyError> {
Ok(files)
}
})?;
- _ = sender.send(files.clone());
+ _ = watcher_communicator.watch_paths(files.clone());
let refmt_files = if let Some(paths) = changed_paths {
if fmt_options.check {
// check all files on any changed (https://github.com/denoland/deno/issues/12446)
diff --git a/cli/tools/lint.rs b/cli/tools/lint.rs
index 6a308b599..b7f4a3f0d 100644
--- a/cli/tools/lint.rs
+++ b/cli/tools/lint.rs
@@ -63,7 +63,7 @@ pub async fn lint(flags: Flags, lint_flags: LintFlags) -> Result<(), AnyError> {
job_name: "Lint".to_string(),
clear_screen: !watch_flags.no_clear_screen,
},
- move |flags, sender, changed_paths| {
+ move |flags, watcher_communicator, changed_paths| {
let lint_flags = lint_flags.clone();
Ok(async move {
let factory = CliFactory::from_flags(flags).await?;
@@ -77,7 +77,7 @@ pub async fn lint(flags: Flags, lint_flags: LintFlags) -> Result<(), AnyError> {
Ok(files)
}
})?;
- _ = sender.send(files.clone());
+ _ = watcher_communicator.watch_paths(files.clone());
let lint_paths = if let Some(paths) = changed_paths {
// lint all files on any changed (https://github.com/denoland/deno/issues/12446)
diff --git a/cli/tools/run.rs b/cli/tools/run.rs
index 5fb31a4ad..80e80577e 100644
--- a/cli/tools/run.rs
+++ b/cli/tools/run.rs
@@ -110,18 +110,17 @@ async fn run_with_watch(
job_name: "Process".to_string(),
clear_screen: !watch_flags.no_clear_screen,
},
- move |flags, sender, _changed_paths| {
+ move |flags, watcher_communicator, _changed_paths| {
Ok(async move {
let factory = CliFactoryBuilder::new()
- .with_watcher(sender.clone())
- .build_from_flags(flags)
+ .build_from_flags_for_watcher(flags, watcher_communicator.clone())
.await?;
let cli_options = factory.cli_options();
let main_module = cli_options.resolve_main_module()?;
maybe_npm_install(&factory).await?;
- let _ = sender.send(cli_options.watch_paths());
+ let _ = watcher_communicator.watch_paths(cli_options.watch_paths());
let permissions = PermissionsContainer::new(Permissions::from_options(
&cli_options.permissions_options(),
diff --git a/cli/tools/test/mod.rs b/cli/tools/test/mod.rs
index b3aadc1e7..8e29ba2cb 100644
--- a/cli/tools/test/mod.rs
+++ b/cli/tools/test/mod.rs
@@ -1213,19 +1213,18 @@ pub async fn run_tests_with_watch(
.map(|w| !w.no_clear_screen)
.unwrap_or(true),
},
- move |flags, sender, changed_paths| {
+ move |flags, watcher_communicator, changed_paths| {
let test_flags = test_flags.clone();
Ok(async move {
let factory = CliFactoryBuilder::new()
- .with_watcher(sender.clone())
- .build_from_flags(flags)
+ .build_from_flags_for_watcher(flags, watcher_communicator.clone())
.await?;
let cli_options = factory.cli_options();
let test_options = cli_options.resolve_test_options(test_flags)?;
- let _ = sender.send(cli_options.watch_paths());
+ let _ = watcher_communicator.watch_paths(cli_options.watch_paths());
if let Some(include) = &test_options.files.include {
- let _ = sender.send(include.clone());
+ let _ = watcher_communicator.watch_paths(include.clone());
}
let graph_kind = cli_options.type_check_mode().as_graph_kind();
diff --git a/cli/util/file_watcher.rs b/cli/util/file_watcher.rs
index c0eda2d86..8d6b4e8fb 100644
--- a/cli/util/file_watcher.rs
+++ b/cli/util/file_watcher.rs
@@ -7,6 +7,7 @@ use crate::util::fs::canonicalize_path;
use deno_core::error::AnyError;
use deno_core::error::JsError;
use deno_core::futures::Future;
+use deno_core::futures::FutureExt;
use deno_runtime::fmt_errors::format_js_error;
use log::info;
use notify::event::Event as NotifyEvent;
@@ -23,7 +24,6 @@ use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedReceiver;
-use tokio::sync::mpsc::UnboundedSender;
use tokio::time::sleep;
const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H";
@@ -109,26 +109,99 @@ fn create_print_after_restart_fn(clear_screen: bool) -> impl Fn() {
}
}
+/// An interface to interact with Deno's CLI file watcher.
+#[derive(Debug)]
+pub struct WatcherCommunicator {
+ /// Send a list of paths that should be watched for changes.
+ paths_to_watch_tx: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>,
+
+ /// Listen for a list of paths that were changed.
+ changed_paths_rx: tokio::sync::broadcast::Receiver<Option<Vec<PathBuf>>>,
+
+ /// Send a message to force a restart.
+ restart_tx: tokio::sync::mpsc::UnboundedSender<()>,
+}
+
+impl Clone for WatcherCommunicator {
+ fn clone(&self) -> Self {
+ Self {
+ paths_to_watch_tx: self.paths_to_watch_tx.clone(),
+ changed_paths_rx: self.changed_paths_rx.resubscribe(),
+ restart_tx: self.restart_tx.clone(),
+ }
+ }
+}
+
+impl WatcherCommunicator {
+ pub fn watch_paths(&self, paths: Vec<PathBuf>) -> Result<(), AnyError> {
+ self.paths_to_watch_tx.send(paths).map_err(AnyError::from)
+ }
+}
+
/// Creates a file watcher.
///
/// - `operation` is the actual operation we want to run every time the watcher detects file
/// changes. For example, in the case where we would like to bundle, then `operation` would
/// have the logic for it like bundling the code.
pub async fn watch_func<O, F>(
+ flags: Flags,
+ print_config: PrintConfig,
+ operation: O,
+) -> Result<(), AnyError>
+where
+ O: FnMut(
+ Flags,
+ WatcherCommunicator,
+ Option<Vec<PathBuf>>,
+ ) -> Result<F, AnyError>,
+ F: Future<Output = Result<(), AnyError>>,
+{
+ let fut = watch_recv(
+ flags,
+ print_config,
+ WatcherRestartMode::Automatic,
+ operation,
+ )
+ .boxed_local();
+
+ fut.await
+}
+
+#[derive(Clone, Copy, Debug)]
+pub enum WatcherRestartMode {
+ /// When a file path changes the process is restarted.
+ Automatic,
+
+ /// When a file path changes the caller will trigger a restart, using
+ /// `WatcherCommunicator.restart_tx`.
+ // TODO(bartlomieju): this mode will be used in a follow up PR
+ #[allow(dead_code)]
+ Manual,
+}
+
+/// Creates a file watcher.
+///
+/// - `operation` is the actual operation we want to run every time the watcher detects file
+/// changes. For example, in the case where we would like to bundle, then `operation` would
+/// have the logic for it like bundling the code.
+pub async fn watch_recv<O, F>(
mut flags: Flags,
print_config: PrintConfig,
+ restart_mode: WatcherRestartMode,
mut operation: O,
) -> Result<(), AnyError>
where
O: FnMut(
Flags,
- UnboundedSender<Vec<PathBuf>>,
+ WatcherCommunicator,
Option<Vec<PathBuf>>,
) -> Result<F, AnyError>,
F: Future<Output = Result<(), AnyError>>,
{
- let (paths_to_watch_sender, mut paths_to_watch_receiver) =
+ let (paths_to_watch_tx, mut paths_to_watch_rx) =
tokio::sync::mpsc::unbounded_channel();
+ let (restart_tx, mut restart_rx) = tokio::sync::mpsc::unbounded_channel();
+ let (changed_paths_tx, changed_paths_rx) = tokio::sync::broadcast::channel(4);
let (watcher_sender, mut watcher_receiver) =
DebouncedReceiver::new_with_sender();
@@ -138,29 +211,13 @@ where
} = print_config;
let print_after_restart = create_print_after_restart_fn(clear_screen);
-
+ let watcher_communicator = WatcherCommunicator {
+ paths_to_watch_tx: paths_to_watch_tx.clone(),
+ changed_paths_rx: changed_paths_rx.resubscribe(),
+ restart_tx: restart_tx.clone(),
+ };
info!("{} {} started.", colors::intense_blue("Watcher"), job_name,);
- fn consume_paths_to_watch(
- watcher: &mut RecommendedWatcher,
- receiver: &mut UnboundedReceiver<Vec<PathBuf>>,
- ) {
- loop {
- match receiver.try_recv() {
- Ok(paths) => {
- add_paths_to_watcher(watcher, &paths);
- }
- Err(e) => match e {
- mpsc::error::TryRecvError::Empty => {
- break;
- }
- // there must be at least one receiver alive
- _ => unreachable!(),
- },
- }
- }
- }
-
let mut changed_paths = None;
loop {
// We may need to give the runtime a tick to settle, as cancellations may need to propagate
@@ -171,17 +228,17 @@ where
}
let mut watcher = new_watcher(watcher_sender.clone())?;
- consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver);
+ consume_paths_to_watch(&mut watcher, &mut paths_to_watch_rx);
let receiver_future = async {
loop {
- let maybe_paths = paths_to_watch_receiver.recv().await;
+ let maybe_paths = paths_to_watch_rx.recv().await;
add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap());
}
};
let operation_future = error_handler(operation(
flags.clone(),
- paths_to_watch_sender.clone(),
+ watcher_communicator.clone(),
changed_paths.take(),
)?);
@@ -190,13 +247,26 @@ where
select! {
_ = receiver_future => {},
- received_changed_paths = watcher_receiver.recv() => {
+ _ = restart_rx.recv() => {
print_after_restart();
- changed_paths = received_changed_paths;
continue;
},
+ received_changed_paths = watcher_receiver.recv() => {
+ changed_paths = received_changed_paths.clone();
+
+ match restart_mode {
+ WatcherRestartMode::Automatic => {
+ print_after_restart();
+ continue;
+ },
+ WatcherRestartMode::Manual => {
+ // TODO(bartlomieju): should we fail on sending changed paths?
+ let _ = changed_paths_tx.send(received_changed_paths);
+ }
+ }
+ },
success = operation_future => {
- consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver);
+ consume_paths_to_watch(&mut watcher, &mut paths_to_watch_rx);
// TODO(bartlomieju): print exit code here?
info!(
"{} {} {}. Restarting on file change...",
@@ -213,10 +283,14 @@ where
let receiver_future = async {
loop {
- let maybe_paths = paths_to_watch_receiver.recv().await;
+ let maybe_paths = paths_to_watch_rx.recv().await;
add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap());
}
};
+
+ // If we got this far, it means that the `operation` has finished; let's wait
+ // and see if there are any new paths to watch received or any of the already
+ // watched paths has changed.
select! {
_ = receiver_future => {},
received_changed_paths = watcher_receiver.recv() => {
@@ -231,26 +305,28 @@ where
fn new_watcher(
sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>,
) -> Result<RecommendedWatcher, AnyError> {
- let watcher = Watcher::new(
+ Ok(Watcher::new(
move |res: Result<NotifyEvent, NotifyError>| {
- if let Ok(event) = res {
- if matches!(
- event.kind,
- EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
- ) {
- let paths = event
- .paths
- .iter()
- .filter_map(|path| canonicalize_path(path).ok())
- .collect();
- sender.send(paths).unwrap();
- }
+ let Ok(event) = res else {
+ return;
+ };
+
+ if !matches!(
+ event.kind,
+ EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
+ ) {
+ return;
}
+
+ let paths = event
+ .paths
+ .iter()
+ .filter_map(|path| canonicalize_path(path).ok())
+ .collect();
+ sender.send(paths).unwrap();
},
Default::default(),
- )?;
-
- Ok(watcher)
+ )?)
}
fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) {
@@ -260,3 +336,23 @@ fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) {
}
log::debug!("Watching paths: {:?}", paths);
}
+
+fn consume_paths_to_watch(
+ watcher: &mut RecommendedWatcher,
+ receiver: &mut UnboundedReceiver<Vec<PathBuf>>,
+) {
+ loop {
+ match receiver.try_recv() {
+ Ok(paths) => {
+ add_paths_to_watcher(watcher, &paths);
+ }
+ Err(e) => match e {
+ mpsc::error::TryRecvError::Empty => {
+ break;
+ }
+ // there must be at least one receiver alive
+ _ => unreachable!(),
+ },
+ }
+ }
+}