summaryrefslogtreecommitdiff
path: root/cli/util/file_watcher.rs
diff options
context:
space:
mode:
authorDavid Sherret <dsherret@users.noreply.github.com>2022-11-28 17:28:54 -0500
committerGitHub <noreply@github.com>2022-11-28 17:28:54 -0500
commit2d4c46c975eb916dc622cc729a1a8d397582a76f (patch)
tree445e819117acd2f94ffc9d7da7ed8e3e604435d0 /cli/util/file_watcher.rs
parentf526513d74d34ac254aa40ef9b73238cb21c395b (diff)
refactor: create util folder, move nap_sym to napi/sym, move http_cache to cache folder (#16857)
Diffstat (limited to 'cli/util/file_watcher.rs')
-rw-r--r--cli/util/file_watcher.rs374
1 files changed, 374 insertions, 0 deletions
diff --git a/cli/util/file_watcher.rs b/cli/util/file_watcher.rs
new file mode 100644
index 000000000..5158437a0
--- /dev/null
+++ b/cli/util/file_watcher.rs
@@ -0,0 +1,374 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use crate::colors;
+use crate::util::fs::canonicalize_path;
+
+use deno_core::error::AnyError;
+use deno_core::error::JsError;
+use deno_core::futures::Future;
+use deno_runtime::fmt_errors::format_js_error;
+use log::info;
+use notify::event::Event as NotifyEvent;
+use notify::event::EventKind;
+use notify::Error as NotifyError;
+use notify::RecommendedWatcher;
+use notify::RecursiveMode;
+use notify::Watcher;
+use std::collections::HashSet;
+use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Duration;
+use tokio::select;
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::UnboundedReceiver;
+use tokio::time::sleep;
+
+const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H";
+const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200);
+
+struct DebouncedReceiver {
+ // The `recv()` call could be used in a tokio `select!` macro,
+ // and so we store this state on the struct to ensure we don't
+ // lose items if a `recv()` never completes
+ received_items: HashSet<PathBuf>,
+ receiver: UnboundedReceiver<Vec<PathBuf>>,
+}
+
+impl DebouncedReceiver {
+ fn new_with_sender() -> (Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, Self) {
+ let (sender, receiver) = mpsc::unbounded_channel();
+ (
+ Arc::new(sender),
+ Self {
+ receiver,
+ received_items: HashSet::new(),
+ },
+ )
+ }
+
+ async fn recv(&mut self) -> Option<Vec<PathBuf>> {
+ if self.received_items.is_empty() {
+ self
+ .received_items
+ .extend(self.receiver.recv().await?.into_iter());
+ }
+
+ loop {
+ select! {
+ items = self.receiver.recv() => {
+ self.received_items.extend(items?);
+ }
+ _ = sleep(DEBOUNCE_INTERVAL) => {
+ return Some(self.received_items.drain().collect());
+ }
+ }
+ }
+ }
+}
+
+async fn error_handler<F>(watch_future: F)
+where
+ F: Future<Output = Result<(), AnyError>>,
+{
+ let result = watch_future.await;
+ if let Err(err) = result {
+ let error_string = match err.downcast_ref::<JsError>() {
+ Some(e) => format_js_error(e),
+ None => format!("{:?}", err),
+ };
+ eprintln!(
+ "{}: {}",
+ colors::red_bold("error"),
+ error_string.trim_start_matches("error: ")
+ );
+ }
+}
+
+pub enum ResolutionResult<T> {
+ Restart {
+ paths_to_watch: Vec<PathBuf>,
+ result: Result<T, AnyError>,
+ },
+ Ignore,
+}
+
+async fn next_restart<R, T, F>(
+ resolver: &mut R,
+ debounced_receiver: &mut DebouncedReceiver,
+) -> (Vec<PathBuf>, Result<T, AnyError>)
+where
+ R: FnMut(Option<Vec<PathBuf>>) -> F,
+ F: Future<Output = ResolutionResult<T>>,
+{
+ loop {
+ let changed = debounced_receiver.recv().await;
+ match resolver(changed).await {
+ ResolutionResult::Ignore => {
+ log::debug!("File change ignored")
+ }
+ ResolutionResult::Restart {
+ paths_to_watch,
+ result,
+ } => {
+ return (paths_to_watch, result);
+ }
+ }
+ }
+}
+
+pub struct PrintConfig {
+ /// printing watcher status to terminal.
+ pub job_name: String,
+ /// determine whether to clear the terminal screen
+ pub clear_screen: bool,
+}
+
+fn create_print_after_restart_fn(clear_screen: bool) -> impl Fn() {
+ move || {
+ if clear_screen {
+ eprint!("{}", CLEAR_SCREEN);
+ }
+ info!(
+ "{} File change detected! Restarting!",
+ colors::intense_blue("Watcher"),
+ );
+ }
+}
+
+/// Creates a file watcher, which will call `resolver` with every file change.
+///
+/// - `resolver` is used for resolving file paths to be watched at every restarting
+/// of the watcher, and can also return a value to be passed to `operation`.
+/// It returns a [`ResolutionResult`], which can either instruct the watcher to restart or ignore the change.
+/// This always contains paths to watch;
+///
+/// - `operation` is the actual operation we want to run every time the watcher detects file
+/// changes. For example, in the case where we would like to bundle, then `operation` would
+/// have the logic for it like bundling the code.
+pub async fn watch_func<R, O, T, F1, F2>(
+ mut resolver: R,
+ mut operation: O,
+ print_config: PrintConfig,
+) -> Result<(), AnyError>
+where
+ R: FnMut(Option<Vec<PathBuf>>) -> F1,
+ O: FnMut(T) -> F2,
+ F1: Future<Output = ResolutionResult<T>>,
+ F2: Future<Output = Result<(), AnyError>>,
+{
+ let (sender, mut receiver) = DebouncedReceiver::new_with_sender();
+
+ let PrintConfig {
+ job_name,
+ clear_screen,
+ } = print_config;
+
+ // Store previous data. If module resolution fails at some point, the watcher will try to
+ // continue watching files using these data.
+ let mut paths_to_watch;
+ let mut resolution_result;
+
+ let print_after_restart = create_print_after_restart_fn(clear_screen);
+
+ match resolver(None).await {
+ ResolutionResult::Ignore => {
+ // The only situation where it makes sense to ignore the initial 'change'
+ // is if the command isn't supposed to do anything until something changes,
+ // e.g. a variant of `deno test` which doesn't run the entire test suite to start with,
+ // but instead does nothing until you make a change.
+ //
+ // In that case, this is probably the correct output.
+ info!(
+ "{} Waiting for file changes...",
+ colors::intense_blue("Watcher"),
+ );
+
+ let (paths, result) = next_restart(&mut resolver, &mut receiver).await;
+ paths_to_watch = paths;
+ resolution_result = result;
+
+ print_after_restart();
+ }
+ ResolutionResult::Restart {
+ paths_to_watch: paths,
+ result,
+ } => {
+ paths_to_watch = paths;
+ resolution_result = result;
+ }
+ };
+
+ info!("{} {} started.", colors::intense_blue("Watcher"), job_name,);
+
+ loop {
+ let mut watcher = new_watcher(sender.clone())?;
+ add_paths_to_watcher(&mut watcher, &paths_to_watch);
+
+ match resolution_result {
+ Ok(operation_arg) => {
+ let fut = error_handler(operation(operation_arg));
+ select! {
+ (paths, result) = next_restart(&mut resolver, &mut receiver) => {
+ if result.is_ok() {
+ paths_to_watch = paths;
+ }
+ resolution_result = result;
+
+ print_after_restart();
+ continue;
+ },
+ _ = fut => {},
+ };
+
+ info!(
+ "{} {} finished. Restarting on file change...",
+ colors::intense_blue("Watcher"),
+ job_name,
+ );
+ }
+ Err(error) => {
+ eprintln!("{}: {}", colors::red_bold("error"), error);
+ info!(
+ "{} {} failed. Restarting on file change...",
+ colors::intense_blue("Watcher"),
+ job_name,
+ );
+ }
+ }
+
+ let (paths, result) = next_restart(&mut resolver, &mut receiver).await;
+ if result.is_ok() {
+ paths_to_watch = paths;
+ }
+ resolution_result = result;
+
+ print_after_restart();
+
+ drop(watcher);
+ }
+}
+
+/// Creates a file watcher.
+///
+/// - `operation` is the actual operation we want to run every time the watcher detects file
+/// changes. For example, in the case where we would like to bundle, then `operation` would
+/// have the logic for it like bundling the code.
+pub async fn watch_func2<T: Clone, O, F>(
+ mut paths_to_watch_receiver: UnboundedReceiver<Vec<PathBuf>>,
+ mut operation: O,
+ operation_args: T,
+ print_config: PrintConfig,
+) -> Result<(), AnyError>
+where
+ O: FnMut(T) -> Result<F, AnyError>,
+ F: Future<Output = Result<(), AnyError>>,
+{
+ let (watcher_sender, mut watcher_receiver) =
+ DebouncedReceiver::new_with_sender();
+
+ let PrintConfig {
+ job_name,
+ clear_screen,
+ } = print_config;
+
+ let print_after_restart = create_print_after_restart_fn(clear_screen);
+
+ info!("{} {} started.", colors::intense_blue("Watcher"), job_name,);
+
+ fn consume_paths_to_watch(
+ watcher: &mut RecommendedWatcher,
+ receiver: &mut UnboundedReceiver<Vec<PathBuf>>,
+ ) {
+ loop {
+ match receiver.try_recv() {
+ Ok(paths) => {
+ add_paths_to_watcher(watcher, &paths);
+ }
+ Err(e) => match e {
+ mpsc::error::TryRecvError::Empty => {
+ break;
+ }
+ // there must be at least one receiver alive
+ _ => unreachable!(),
+ },
+ }
+ }
+ }
+
+ loop {
+ let mut watcher = new_watcher(watcher_sender.clone())?;
+ consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver);
+
+ let receiver_future = async {
+ loop {
+ let maybe_paths = paths_to_watch_receiver.recv().await;
+ add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap());
+ }
+ };
+ let operation_future = error_handler(operation(operation_args.clone())?);
+
+ select! {
+ _ = receiver_future => {},
+ _ = watcher_receiver.recv() => {
+ print_after_restart();
+ continue;
+ },
+ _ = operation_future => {
+ // TODO(bartlomieju): print exit code here?
+ info!(
+ "{} {} finished. Restarting on file change...",
+ colors::intense_blue("Watcher"),
+ job_name,
+ );
+ consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver);
+ },
+ };
+
+ let receiver_future = async {
+ loop {
+ let maybe_paths = paths_to_watch_receiver.recv().await;
+ add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap());
+ }
+ };
+ select! {
+ _ = receiver_future => {},
+ _ = watcher_receiver.recv() => {
+ print_after_restart();
+ continue;
+ },
+ };
+ }
+}
+
+fn new_watcher(
+ sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>,
+) -> Result<RecommendedWatcher, AnyError> {
+ let watcher = Watcher::new(
+ move |res: Result<NotifyEvent, NotifyError>| {
+ if let Ok(event) = res {
+ if matches!(
+ event.kind,
+ EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
+ ) {
+ let paths = event
+ .paths
+ .iter()
+ .filter_map(|path| canonicalize_path(path).ok())
+ .collect();
+ sender.send(paths).unwrap();
+ }
+ }
+ },
+ Default::default(),
+ )?;
+
+ Ok(watcher)
+}
+
+fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) {
+ // Ignore any error e.g. `PathNotFound`
+ for path in paths {
+ let _ = watcher.watch(path, RecursiveMode::Recursive);
+ }
+ log::debug!("Watching paths: {:?}", paths);
+}