diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-10-19 07:05:00 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-19 07:05:00 +0200 |
commit | 8d9fef3b8955eadfd4820455b422b5bec1cdad0a (patch) | |
tree | 7114c3e3c329c83ce1d9c90e8fab5c7104bd1579 /cli/util/file_watcher.rs | |
parent | 5095af78018f1d77374bc06cbb58231e631056b9 (diff) |
refactor: add WatcherCommunicator helper struct (#20927)
This commit introduces "WatcherCommunicator" struct that
is used facilitate bi-directional communication between CLI
file watcher and the watched function.
Prerequisite for https://github.com/denoland/deno/pull/20876
Diffstat (limited to 'cli/util/file_watcher.rs')
-rw-r--r-- | cli/util/file_watcher.rs | 190 |
1 files changed, 143 insertions, 47 deletions
diff --git a/cli/util/file_watcher.rs b/cli/util/file_watcher.rs index c0eda2d86..8d6b4e8fb 100644 --- a/cli/util/file_watcher.rs +++ b/cli/util/file_watcher.rs @@ -7,6 +7,7 @@ use crate::util::fs::canonicalize_path; use deno_core::error::AnyError; use deno_core::error::JsError; use deno_core::futures::Future; +use deno_core::futures::FutureExt; use deno_runtime::fmt_errors::format_js_error; use log::info; use notify::event::Event as NotifyEvent; @@ -23,7 +24,6 @@ use std::time::Duration; use tokio::select; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedReceiver; -use tokio::sync::mpsc::UnboundedSender; use tokio::time::sleep; const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H"; @@ -109,26 +109,99 @@ fn create_print_after_restart_fn(clear_screen: bool) -> impl Fn() { } } +/// An interface to interact with Deno's CLI file watcher. +#[derive(Debug)] +pub struct WatcherCommunicator { + /// Send a list of paths that should be watched for changes. + paths_to_watch_tx: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>, + + /// Listen for a list of paths that were changed. + changed_paths_rx: tokio::sync::broadcast::Receiver<Option<Vec<PathBuf>>>, + + /// Send a message to force a restart. + restart_tx: tokio::sync::mpsc::UnboundedSender<()>, +} + +impl Clone for WatcherCommunicator { + fn clone(&self) -> Self { + Self { + paths_to_watch_tx: self.paths_to_watch_tx.clone(), + changed_paths_rx: self.changed_paths_rx.resubscribe(), + restart_tx: self.restart_tx.clone(), + } + } +} + +impl WatcherCommunicator { + pub fn watch_paths(&self, paths: Vec<PathBuf>) -> Result<(), AnyError> { + self.paths_to_watch_tx.send(paths).map_err(AnyError::from) + } +} + /// Creates a file watcher. /// /// - `operation` is the actual operation we want to run every time the watcher detects file /// changes. For example, in the case where we would like to bundle, then `operation` would /// have the logic for it like bundling the code. pub async fn watch_func<O, F>( + flags: Flags, + print_config: PrintConfig, + operation: O, +) -> Result<(), AnyError> +where + O: FnMut( + Flags, + WatcherCommunicator, + Option<Vec<PathBuf>>, + ) -> Result<F, AnyError>, + F: Future<Output = Result<(), AnyError>>, +{ + let fut = watch_recv( + flags, + print_config, + WatcherRestartMode::Automatic, + operation, + ) + .boxed_local(); + + fut.await +} + +#[derive(Clone, Copy, Debug)] +pub enum WatcherRestartMode { + /// When a file path changes the process is restarted. + Automatic, + + /// When a file path changes the caller will trigger a restart, using + /// `WatcherCommunicator.restart_tx`. + // TODO(bartlomieju): this mode will be used in a follow up PR + #[allow(dead_code)] + Manual, +} + +/// Creates a file watcher. +/// +/// - `operation` is the actual operation we want to run every time the watcher detects file +/// changes. For example, in the case where we would like to bundle, then `operation` would +/// have the logic for it like bundling the code. +pub async fn watch_recv<O, F>( mut flags: Flags, print_config: PrintConfig, + restart_mode: WatcherRestartMode, mut operation: O, ) -> Result<(), AnyError> where O: FnMut( Flags, - UnboundedSender<Vec<PathBuf>>, + WatcherCommunicator, Option<Vec<PathBuf>>, ) -> Result<F, AnyError>, F: Future<Output = Result<(), AnyError>>, { - let (paths_to_watch_sender, mut paths_to_watch_receiver) = + let (paths_to_watch_tx, mut paths_to_watch_rx) = tokio::sync::mpsc::unbounded_channel(); + let (restart_tx, mut restart_rx) = tokio::sync::mpsc::unbounded_channel(); + let (changed_paths_tx, changed_paths_rx) = tokio::sync::broadcast::channel(4); let (watcher_sender, mut watcher_receiver) = DebouncedReceiver::new_with_sender(); @@ -138,29 +211,13 @@ where } = print_config; let print_after_restart = create_print_after_restart_fn(clear_screen); - + let watcher_communicator = WatcherCommunicator { + paths_to_watch_tx: paths_to_watch_tx.clone(), + changed_paths_rx: changed_paths_rx.resubscribe(), + restart_tx: restart_tx.clone(), + }; info!("{} {} started.", colors::intense_blue("Watcher"), job_name,); - fn consume_paths_to_watch( - watcher: &mut RecommendedWatcher, - receiver: &mut UnboundedReceiver<Vec<PathBuf>>, - ) { - loop { - match receiver.try_recv() { - Ok(paths) => { - add_paths_to_watcher(watcher, &paths); - } - Err(e) => match e { - mpsc::error::TryRecvError::Empty => { - break; - } - // there must be at least one receiver alive - _ => unreachable!(), - }, - } - } - } - let mut changed_paths = None; loop { // We may need to give the runtime a tick to settle, as cancellations may need to propagate @@ -171,17 +228,17 @@ where } let mut watcher = new_watcher(watcher_sender.clone())?; - consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver); + consume_paths_to_watch(&mut watcher, &mut paths_to_watch_rx); let receiver_future = async { loop { - let maybe_paths = paths_to_watch_receiver.recv().await; + let maybe_paths = paths_to_watch_rx.recv().await; add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap()); } }; let operation_future = error_handler(operation( flags.clone(), - paths_to_watch_sender.clone(), + watcher_communicator.clone(), changed_paths.take(), )?); @@ -190,13 +247,26 @@ where select! { _ = receiver_future => {}, - received_changed_paths = watcher_receiver.recv() => { + _ = restart_rx.recv() => { print_after_restart(); - changed_paths = received_changed_paths; continue; }, + received_changed_paths = watcher_receiver.recv() => { + changed_paths = received_changed_paths.clone(); + + match restart_mode { + WatcherRestartMode::Automatic => { + print_after_restart(); + continue; + }, + WatcherRestartMode::Manual => { + // TODO(bartlomieju): should we fail on sending changed paths? + let _ = changed_paths_tx.send(received_changed_paths); + } + } + }, success = operation_future => { - consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver); + consume_paths_to_watch(&mut watcher, &mut paths_to_watch_rx); // TODO(bartlomieju): print exit code here? info!( "{} {} {}. Restarting on file change...", @@ -213,10 +283,14 @@ where let receiver_future = async { loop { - let maybe_paths = paths_to_watch_receiver.recv().await; + let maybe_paths = paths_to_watch_rx.recv().await; add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap()); } }; + + // If we got this far, it means that the `operation` has finished; let's wait + // and see if there are any new paths to watch received or any of the already + // watched paths has changed. select! { _ = receiver_future => {}, received_changed_paths = watcher_receiver.recv() => { @@ -231,26 +305,28 @@ where fn new_watcher( sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, ) -> Result<RecommendedWatcher, AnyError> { - let watcher = Watcher::new( + Ok(Watcher::new( move |res: Result<NotifyEvent, NotifyError>| { - if let Ok(event) = res { - if matches!( - event.kind, - EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) - ) { - let paths = event - .paths - .iter() - .filter_map(|path| canonicalize_path(path).ok()) - .collect(); - sender.send(paths).unwrap(); - } + let Ok(event) = res else { + return; + }; + + if !matches!( + event.kind, + EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) + ) { + return; } + + let paths = event + .paths + .iter() + .filter_map(|path| canonicalize_path(path).ok()) + .collect(); + sender.send(paths).unwrap(); }, Default::default(), - )?; - - Ok(watcher) + )?) } fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) { @@ -260,3 +336,23 @@ fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) { } log::debug!("Watching paths: {:?}", paths); } + +fn consume_paths_to_watch( + watcher: &mut RecommendedWatcher, + receiver: &mut UnboundedReceiver<Vec<PathBuf>>, +) { + loop { + match receiver.try_recv() { + Ok(paths) => { + add_paths_to_watcher(watcher, &paths); + } + Err(e) => match e { + mpsc::error::TryRecvError::Empty => { + break; + } + // there must be at least one receiver alive + _ => unreachable!(), + }, + } + } +} |