From 2d4c46c975eb916dc622cc729a1a8d397582a76f Mon Sep 17 00:00:00 2001 From: David Sherret Date: Mon, 28 Nov 2022 17:28:54 -0500 Subject: refactor: create util folder, move nap_sym to napi/sym, move http_cache to cache folder (#16857) --- cli/file_watcher.rs | 374 ---------------------------------------------------- 1 file changed, 374 deletions(-) delete mode 100644 cli/file_watcher.rs (limited to 'cli/file_watcher.rs') diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs deleted file mode 100644 index 78a5e7a82..000000000 --- a/cli/file_watcher.rs +++ /dev/null @@ -1,374 +0,0 @@ -// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. - -use crate::colors; -use crate::fs_util::canonicalize_path; - -use deno_core::error::AnyError; -use deno_core::error::JsError; -use deno_core::futures::Future; -use deno_runtime::fmt_errors::format_js_error; -use log::info; -use notify::event::Event as NotifyEvent; -use notify::event::EventKind; -use notify::Error as NotifyError; -use notify::RecommendedWatcher; -use notify::RecursiveMode; -use notify::Watcher; -use std::collections::HashSet; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; -use tokio::select; -use tokio::sync::mpsc; -use tokio::sync::mpsc::UnboundedReceiver; -use tokio::time::sleep; - -const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H"; -const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200); - -struct DebouncedReceiver { - // The `recv()` call could be used in a tokio `select!` macro, - // and so we store this state on the struct to ensure we don't - // lose items if a `recv()` never completes - received_items: HashSet, - receiver: UnboundedReceiver>, -} - -impl DebouncedReceiver { - fn new_with_sender() -> (Arc>>, Self) { - let (sender, receiver) = mpsc::unbounded_channel(); - ( - Arc::new(sender), - Self { - receiver, - received_items: HashSet::new(), - }, - ) - } - - async fn recv(&mut self) -> Option> { - if self.received_items.is_empty() { - self - .received_items - .extend(self.receiver.recv().await?.into_iter()); - } - - loop { - select! { - items = self.receiver.recv() => { - self.received_items.extend(items?); - } - _ = sleep(DEBOUNCE_INTERVAL) => { - return Some(self.received_items.drain().collect()); - } - } - } - } -} - -async fn error_handler(watch_future: F) -where - F: Future>, -{ - let result = watch_future.await; - if let Err(err) = result { - let error_string = match err.downcast_ref::() { - Some(e) => format_js_error(e), - None => format!("{:?}", err), - }; - eprintln!( - "{}: {}", - colors::red_bold("error"), - error_string.trim_start_matches("error: ") - ); - } -} - -pub enum ResolutionResult { - Restart { - paths_to_watch: Vec, - result: Result, - }, - Ignore, -} - -async fn next_restart( - resolver: &mut R, - debounced_receiver: &mut DebouncedReceiver, -) -> (Vec, Result) -where - R: FnMut(Option>) -> F, - F: Future>, -{ - loop { - let changed = debounced_receiver.recv().await; - match resolver(changed).await { - ResolutionResult::Ignore => { - log::debug!("File change ignored") - } - ResolutionResult::Restart { - paths_to_watch, - result, - } => { - return (paths_to_watch, result); - } - } - } -} - -pub struct PrintConfig { - /// printing watcher status to terminal. - pub job_name: String, - /// determine whether to clear the terminal screen - pub clear_screen: bool, -} - -fn create_print_after_restart_fn(clear_screen: bool) -> impl Fn() { - move || { - if clear_screen { - eprint!("{}", CLEAR_SCREEN); - } - info!( - "{} File change detected! Restarting!", - colors::intense_blue("Watcher"), - ); - } -} - -/// Creates a file watcher, which will call `resolver` with every file change. -/// -/// - `resolver` is used for resolving file paths to be watched at every restarting -/// of the watcher, and can also return a value to be passed to `operation`. -/// It returns a [`ResolutionResult`], which can either instruct the watcher to restart or ignore the change. -/// This always contains paths to watch; -/// -/// - `operation` is the actual operation we want to run every time the watcher detects file -/// changes. For example, in the case where we would like to bundle, then `operation` would -/// have the logic for it like bundling the code. -pub async fn watch_func( - mut resolver: R, - mut operation: O, - print_config: PrintConfig, -) -> Result<(), AnyError> -where - R: FnMut(Option>) -> F1, - O: FnMut(T) -> F2, - F1: Future>, - F2: Future>, -{ - let (sender, mut receiver) = DebouncedReceiver::new_with_sender(); - - let PrintConfig { - job_name, - clear_screen, - } = print_config; - - // Store previous data. If module resolution fails at some point, the watcher will try to - // continue watching files using these data. - let mut paths_to_watch; - let mut resolution_result; - - let print_after_restart = create_print_after_restart_fn(clear_screen); - - match resolver(None).await { - ResolutionResult::Ignore => { - // The only situation where it makes sense to ignore the initial 'change' - // is if the command isn't supposed to do anything until something changes, - // e.g. a variant of `deno test` which doesn't run the entire test suite to start with, - // but instead does nothing until you make a change. - // - // In that case, this is probably the correct output. - info!( - "{} Waiting for file changes...", - colors::intense_blue("Watcher"), - ); - - let (paths, result) = next_restart(&mut resolver, &mut receiver).await; - paths_to_watch = paths; - resolution_result = result; - - print_after_restart(); - } - ResolutionResult::Restart { - paths_to_watch: paths, - result, - } => { - paths_to_watch = paths; - resolution_result = result; - } - }; - - info!("{} {} started.", colors::intense_blue("Watcher"), job_name,); - - loop { - let mut watcher = new_watcher(sender.clone())?; - add_paths_to_watcher(&mut watcher, &paths_to_watch); - - match resolution_result { - Ok(operation_arg) => { - let fut = error_handler(operation(operation_arg)); - select! { - (paths, result) = next_restart(&mut resolver, &mut receiver) => { - if result.is_ok() { - paths_to_watch = paths; - } - resolution_result = result; - - print_after_restart(); - continue; - }, - _ = fut => {}, - }; - - info!( - "{} {} finished. Restarting on file change...", - colors::intense_blue("Watcher"), - job_name, - ); - } - Err(error) => { - eprintln!("{}: {}", colors::red_bold("error"), error); - info!( - "{} {} failed. Restarting on file change...", - colors::intense_blue("Watcher"), - job_name, - ); - } - } - - let (paths, result) = next_restart(&mut resolver, &mut receiver).await; - if result.is_ok() { - paths_to_watch = paths; - } - resolution_result = result; - - print_after_restart(); - - drop(watcher); - } -} - -/// Creates a file watcher. -/// -/// - `operation` is the actual operation we want to run every time the watcher detects file -/// changes. For example, in the case where we would like to bundle, then `operation` would -/// have the logic for it like bundling the code. -pub async fn watch_func2( - mut paths_to_watch_receiver: UnboundedReceiver>, - mut operation: O, - operation_args: T, - print_config: PrintConfig, -) -> Result<(), AnyError> -where - O: FnMut(T) -> Result, - F: Future>, -{ - let (watcher_sender, mut watcher_receiver) = - DebouncedReceiver::new_with_sender(); - - let PrintConfig { - job_name, - clear_screen, - } = print_config; - - let print_after_restart = create_print_after_restart_fn(clear_screen); - - info!("{} {} started.", colors::intense_blue("Watcher"), job_name,); - - fn consume_paths_to_watch( - watcher: &mut RecommendedWatcher, - receiver: &mut UnboundedReceiver>, - ) { - loop { - match receiver.try_recv() { - Ok(paths) => { - add_paths_to_watcher(watcher, &paths); - } - Err(e) => match e { - mpsc::error::TryRecvError::Empty => { - break; - } - // there must be at least one receiver alive - _ => unreachable!(), - }, - } - } - } - - loop { - let mut watcher = new_watcher(watcher_sender.clone())?; - consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver); - - let receiver_future = async { - loop { - let maybe_paths = paths_to_watch_receiver.recv().await; - add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap()); - } - }; - let operation_future = error_handler(operation(operation_args.clone())?); - - select! { - _ = receiver_future => {}, - _ = watcher_receiver.recv() => { - print_after_restart(); - continue; - }, - _ = operation_future => { - // TODO(bartlomieju): print exit code here? - info!( - "{} {} finished. Restarting on file change...", - colors::intense_blue("Watcher"), - job_name, - ); - consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver); - }, - }; - - let receiver_future = async { - loop { - let maybe_paths = paths_to_watch_receiver.recv().await; - add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap()); - } - }; - select! { - _ = receiver_future => {}, - _ = watcher_receiver.recv() => { - print_after_restart(); - continue; - }, - }; - } -} - -fn new_watcher( - sender: Arc>>, -) -> Result { - let watcher = Watcher::new( - move |res: Result| { - if let Ok(event) = res { - if matches!( - event.kind, - EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) - ) { - let paths = event - .paths - .iter() - .filter_map(|path| canonicalize_path(path).ok()) - .collect(); - sender.send(paths).unwrap(); - } - } - }, - Default::default(), - )?; - - Ok(watcher) -} - -fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) { - // Ignore any error e.g. `PathNotFound` - for path in paths { - let _ = watcher.watch(path, RecursiveMode::Recursive); - } - log::debug!("Watching paths: {:?}", paths); -} -- cgit v1.2.3