summaryrefslogtreecommitdiff
path: root/cli/file_watcher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/file_watcher.rs')
-rw-r--r--cli/file_watcher.rs86
1 files changed, 33 insertions, 53 deletions
diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs
index c7fff4632..84b30dfd2 100644
--- a/cli/file_watcher.rs
+++ b/cli/file_watcher.rs
@@ -4,9 +4,7 @@ use crate::colors;
use crate::fs_util::canonicalize_path;
use deno_core::error::AnyError;
-use deno_core::futures::stream::{Stream, StreamExt};
use deno_core::futures::Future;
-use deno_core::parking_lot::Mutex;
use log::info;
use notify::event::Event as NotifyEvent;
use notify::event::EventKind;
@@ -15,57 +13,42 @@ use notify::Error as NotifyError;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
-use pin_project::pin_project;
use std::collections::HashSet;
use std::path::PathBuf;
-use std::pin::Pin;
use std::sync::Arc;
-use std::task::Context;
-use std::task::Poll;
use std::time::Duration;
-use tokio::pin;
use tokio::select;
+use tokio::sync::mpsc;
use tokio::time::sleep;
-use tokio::time::Instant;
-use tokio::time::Sleep;
const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200);
-#[pin_project(project = DebounceProjection)]
-struct Debounce {
- #[pin]
- timer: Sleep,
- changed_paths: Arc<Mutex<HashSet<PathBuf>>>,
+struct DebouncedReceiver {
+ receiver: mpsc::UnboundedReceiver<Vec<PathBuf>>,
}
-impl Debounce {
- fn new() -> Self {
- Self {
- timer: sleep(DEBOUNCE_INTERVAL),
- changed_paths: Arc::new(Mutex::new(HashSet::new())),
- }
+impl DebouncedReceiver {
+ fn new_with_sender() -> (Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, Self) {
+ let (sender, receiver) = mpsc::unbounded_channel();
+ (Arc::new(sender), Self { receiver })
}
-}
-impl Stream for Debounce {
- type Item = Vec<PathBuf>;
-
- /// Note that this never returns `Poll::Ready(None)`, which means that the
- /// file watcher will be alive until the Deno process is terminated.
- fn poll_next(
- self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> Poll<Option<Self::Item>> {
- let mut changed_paths = self.changed_paths.lock();
- if changed_paths.len() > 0 {
- Poll::Ready(Some(changed_paths.drain().collect()))
- } else {
- drop(changed_paths);
- let mut timer = self.project().timer;
- if timer.as_mut().poll(cx).is_ready() {
- timer.reset(Instant::now() + DEBOUNCE_INTERVAL);
+ async fn recv(&mut self) -> Option<Vec<PathBuf>> {
+ let mut received_items = self
+ .receiver
+ .recv()
+ .await?
+ .into_iter()
+ .collect::<HashSet<_>>(); // prevent duplicates
+ loop {
+ tokio::select! {
+ items = self.receiver.recv() => {
+ received_items.extend(items?);
+ }
+ _ = sleep(DEBOUNCE_INTERVAL) => {
+ return Some(received_items.into_iter().collect());
+ }
}
- Poll::Pending
}
}
}
@@ -91,14 +74,14 @@ pub enum ResolutionResult<T> {
async fn next_restart<R, T, F>(
resolver: &mut R,
- debounce: &mut Pin<&mut Debounce>,
+ debounced_receiver: &mut DebouncedReceiver,
) -> (Vec<PathBuf>, Result<T, AnyError>)
where
R: FnMut(Option<Vec<PathBuf>>) -> F,
F: Future<Output = ResolutionResult<T>>,
{
loop {
- let changed = debounce.next().await;
+ let changed = debounced_receiver.recv().await;
match resolver(changed).await {
ResolutionResult::Ignore => {
log::debug!("File change ignored")
@@ -140,8 +123,7 @@ where
F1: Future<Output = ResolutionResult<T>>,
F2: Future<Output = Result<(), AnyError>>,
{
- let debounce = Debounce::new();
- pin!(debounce);
+ let (sender, mut receiver) = DebouncedReceiver::new_with_sender();
// Store previous data. If module resolution fails at some point, the watcher will try to
// continue watching files using these data.
@@ -161,7 +143,7 @@ where
colors::intense_blue("Watcher"),
);
- let (paths, result) = next_restart(&mut resolver, &mut debounce).await;
+ let (paths, result) = next_restart(&mut resolver, &mut receiver).await;
paths_to_watch = paths;
resolution_result = result;
}
@@ -175,13 +157,13 @@ where
};
loop {
- let watcher = new_watcher(&paths_to_watch, &debounce)?;
+ let watcher = new_watcher(&paths_to_watch, sender.clone())?;
match resolution_result {
Ok(operation_arg) => {
let fut = error_handler(operation(operation_arg));
select! {
- (paths, result) = next_restart(&mut resolver, &mut debounce) => {
+ (paths, result) = next_restart(&mut resolver, &mut receiver) => {
if result.is_ok() {
paths_to_watch = paths;
}
@@ -207,7 +189,7 @@ where
}
}
- let (paths, result) = next_restart(&mut resolver, &mut debounce).await;
+ let (paths, result) = next_restart(&mut resolver, &mut receiver).await;
if result.is_ok() {
paths_to_watch = paths;
}
@@ -219,10 +201,8 @@ where
fn new_watcher(
paths: &[PathBuf],
- debounce: &Debounce,
+ sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>,
) -> Result<RecommendedWatcher, AnyError> {
- let changed_paths = Arc::clone(&debounce.changed_paths);
-
let mut watcher: RecommendedWatcher =
Watcher::new(move |res: Result<NotifyEvent, NotifyError>| {
if let Ok(event) = res {
@@ -233,9 +213,9 @@ fn new_watcher(
let paths = event
.paths
.iter()
- .filter_map(|path| canonicalize_path(path).ok());
- let mut changed_paths = changed_paths.lock();
- changed_paths.extend(paths);
+ .filter_map(|path| canonicalize_path(path).ok())
+ .collect();
+ sender.send(paths).unwrap();
}
}
})?;