diff options
Diffstat (limited to 'cli/file_watcher.rs')
-rw-r--r-- | cli/file_watcher.rs | 86 |
1 files changed, 33 insertions, 53 deletions
diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs index c7fff4632..84b30dfd2 100644 --- a/cli/file_watcher.rs +++ b/cli/file_watcher.rs @@ -4,9 +4,7 @@ use crate::colors; use crate::fs_util::canonicalize_path; use deno_core::error::AnyError; -use deno_core::futures::stream::{Stream, StreamExt}; use deno_core::futures::Future; -use deno_core::parking_lot::Mutex; use log::info; use notify::event::Event as NotifyEvent; use notify::event::EventKind; @@ -15,57 +13,42 @@ use notify::Error as NotifyError; use notify::RecommendedWatcher; use notify::RecursiveMode; use notify::Watcher; -use pin_project::pin_project; use std::collections::HashSet; use std::path::PathBuf; -use std::pin::Pin; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; use std::time::Duration; -use tokio::pin; use tokio::select; +use tokio::sync::mpsc; use tokio::time::sleep; -use tokio::time::Instant; -use tokio::time::Sleep; const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200); -#[pin_project(project = DebounceProjection)] -struct Debounce { - #[pin] - timer: Sleep, - changed_paths: Arc<Mutex<HashSet<PathBuf>>>, +struct DebouncedReceiver { + receiver: mpsc::UnboundedReceiver<Vec<PathBuf>>, } -impl Debounce { - fn new() -> Self { - Self { - timer: sleep(DEBOUNCE_INTERVAL), - changed_paths: Arc::new(Mutex::new(HashSet::new())), - } +impl DebouncedReceiver { + fn new_with_sender() -> (Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, Self) { + let (sender, receiver) = mpsc::unbounded_channel(); + (Arc::new(sender), Self { receiver }) } -} -impl Stream for Debounce { - type Item = Vec<PathBuf>; - - /// Note that this never returns `Poll::Ready(None)`, which means that the - /// file watcher will be alive until the Deno process is terminated. - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll<Option<Self::Item>> { - let mut changed_paths = self.changed_paths.lock(); - if changed_paths.len() > 0 { - Poll::Ready(Some(changed_paths.drain().collect())) - } else { - drop(changed_paths); - let mut timer = self.project().timer; - if timer.as_mut().poll(cx).is_ready() { - timer.reset(Instant::now() + DEBOUNCE_INTERVAL); + async fn recv(&mut self) -> Option<Vec<PathBuf>> { + let mut received_items = self + .receiver + .recv() + .await? + .into_iter() + .collect::<HashSet<_>>(); // prevent duplicates + loop { + tokio::select! { + items = self.receiver.recv() => { + received_items.extend(items?); + } + _ = sleep(DEBOUNCE_INTERVAL) => { + return Some(received_items.into_iter().collect()); + } } - Poll::Pending } } } @@ -91,14 +74,14 @@ pub enum ResolutionResult<T> { async fn next_restart<R, T, F>( resolver: &mut R, - debounce: &mut Pin<&mut Debounce>, + debounced_receiver: &mut DebouncedReceiver, ) -> (Vec<PathBuf>, Result<T, AnyError>) where R: FnMut(Option<Vec<PathBuf>>) -> F, F: Future<Output = ResolutionResult<T>>, { loop { - let changed = debounce.next().await; + let changed = debounced_receiver.recv().await; match resolver(changed).await { ResolutionResult::Ignore => { log::debug!("File change ignored") @@ -140,8 +123,7 @@ where F1: Future<Output = ResolutionResult<T>>, F2: Future<Output = Result<(), AnyError>>, { - let debounce = Debounce::new(); - pin!(debounce); + let (sender, mut receiver) = DebouncedReceiver::new_with_sender(); // Store previous data. If module resolution fails at some point, the watcher will try to // continue watching files using these data. @@ -161,7 +143,7 @@ where colors::intense_blue("Watcher"), ); - let (paths, result) = next_restart(&mut resolver, &mut debounce).await; + let (paths, result) = next_restart(&mut resolver, &mut receiver).await; paths_to_watch = paths; resolution_result = result; } @@ -175,13 +157,13 @@ where }; loop { - let watcher = new_watcher(&paths_to_watch, &debounce)?; + let watcher = new_watcher(&paths_to_watch, sender.clone())?; match resolution_result { Ok(operation_arg) => { let fut = error_handler(operation(operation_arg)); select! { - (paths, result) = next_restart(&mut resolver, &mut debounce) => { + (paths, result) = next_restart(&mut resolver, &mut receiver) => { if result.is_ok() { paths_to_watch = paths; } @@ -207,7 +189,7 @@ where } } - let (paths, result) = next_restart(&mut resolver, &mut debounce).await; + let (paths, result) = next_restart(&mut resolver, &mut receiver).await; if result.is_ok() { paths_to_watch = paths; } @@ -219,10 +201,8 @@ where fn new_watcher( paths: &[PathBuf], - debounce: &Debounce, + sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, ) -> Result<RecommendedWatcher, AnyError> { - let changed_paths = Arc::clone(&debounce.changed_paths); - let mut watcher: RecommendedWatcher = Watcher::new(move |res: Result<NotifyEvent, NotifyError>| { if let Ok(event) = res { @@ -233,9 +213,9 @@ fn new_watcher( let paths = event .paths .iter() - .filter_map(|path| canonicalize_path(path).ok()); - let mut changed_paths = changed_paths.lock(); - changed_paths.extend(paths); + .filter_map(|path| canonicalize_path(path).ok()) + .collect(); + sender.send(paths).unwrap(); } } })?; |