summaryrefslogtreecommitdiff
path: root/cli/file_watcher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/file_watcher.rs')
-rw-r--r--cli/file_watcher.rs112
1 files changed, 75 insertions, 37 deletions
diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs
index 1f0107f49..1730e6472 100644
--- a/cli/file_watcher.rs
+++ b/cli/file_watcher.rs
@@ -1,8 +1,9 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::colors;
+use core::task::{Context, Poll};
use deno_core::error::AnyError;
-use deno_core::futures::stream::StreamExt;
+use deno_core::futures::stream::{Stream, StreamExt};
use deno_core::futures::Future;
use notify::event::Event as NotifyEvent;
use notify::event::EventKind;
@@ -13,12 +14,51 @@ use notify::RecursiveMode;
use notify::Watcher;
use std::path::PathBuf;
use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::time::Duration;
use tokio::select;
-use tokio::sync::mpsc;
+use tokio::time::{interval, Interval};
+
+const DEBOUNCE_INTERVAL_MS: Duration = Duration::from_millis(200);
// TODO(bartlomieju): rename
type WatchFuture = Pin<Box<dyn Future<Output = Result<(), AnyError>>>>;
+struct Debounce {
+ interval: Interval,
+ event_detected: Arc<AtomicBool>,
+}
+
+impl Debounce {
+ fn new() -> Self {
+ Self {
+ interval: interval(DEBOUNCE_INTERVAL_MS),
+ event_detected: Arc::new(AtomicBool::new(false)),
+ }
+ }
+}
+
+impl Stream for Debounce {
+ type Item = ();
+
+ /// Note that this never returns `Poll::Ready(None)`, which means that file watcher will be alive
+ /// until the Deno process is terminated.
+ fn poll_next(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Option<Self::Item>> {
+ let inner = self.get_mut();
+ if inner.event_detected.load(Ordering::Relaxed) {
+ inner.event_detected.store(false, Ordering::Relaxed);
+ Poll::Ready(Some(()))
+ } else {
+ let _ = inner.interval.poll_tick(cx);
+ Poll::Pending
+ }
+ }
+}
+
async fn error_handler(watch_future: WatchFuture) {
let result = watch_future.await;
if let Err(err) = result {
@@ -28,52 +68,59 @@ async fn error_handler(watch_future: WatchFuture) {
}
pub async fn watch_func<F>(
- watch_paths: &[PathBuf],
+ paths: &[PathBuf],
closure: F,
) -> Result<(), AnyError>
where
F: Fn() -> WatchFuture,
{
+ let mut debounce = Debounce::new();
+ // This binding is required for the watcher to work properly without being dropped.
+ let _watcher = new_watcher(paths, &debounce)?;
+
loop {
let func = error_handler(closure());
let mut is_file_changed = false;
select! {
- _ = file_watcher(watch_paths) => {
- is_file_changed = true;
- info!(
- "{} File change detected! Restarting!",
- colors::intense_blue("Watcher")
- );
- },
- _ = func => { },
- };
+ _ = debounce.next() => {
+ is_file_changed = true;
+ info!(
+ "{} File change detected! Restarting!",
+ colors::intense_blue("Watcher"),
+ );
+ },
+ _ = func => {},
+ }
if !is_file_changed {
info!(
"{} Process terminated! Restarting on file change...",
- colors::intense_blue("Watcher")
+ colors::intense_blue("Watcher"),
);
- file_watcher(watch_paths).await?;
+ debounce.next().await;
info!(
"{} File change detected! Restarting!",
- colors::intense_blue("Watcher")
+ colors::intense_blue("Watcher"),
);
}
}
}
-pub async fn file_watcher(paths: &[PathBuf]) -> Result<(), AnyError> {
- let (sender, mut receiver) =
- mpsc::channel::<Result<NotifyEvent, AnyError>>(16);
- let sender = std::sync::Mutex::new(sender);
+fn new_watcher(
+ paths: &[PathBuf],
+ debounce: &Debounce,
+) -> Result<RecommendedWatcher, AnyError> {
+ let event_detected = Arc::clone(&debounce.event_detected);
- let mut watcher: RecommendedWatcher =
- Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| {
- let res2 = res.map_err(AnyError::from);
- let mut sender = sender.lock().unwrap();
- // Ignore result, if send failed it means that watcher was already closed,
- // but not all messages have been flushed.
- let _ = sender.try_send(res2);
- })?;
+ let mut watcher: RecommendedWatcher = Watcher::new_immediate(
+ move |res: Result<NotifyEvent, NotifyError>| {
+ if let Ok(event) = res {
+ if matches!(event.kind, EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_))
+ {
+ event_detected.store(true, Ordering::Relaxed);
+ }
+ }
+ },
+ )?;
watcher.configure(Config::PreciseEvents(true)).unwrap();
@@ -81,14 +128,5 @@ pub async fn file_watcher(paths: &[PathBuf]) -> Result<(), AnyError> {
watcher.watch(path, RecursiveMode::NonRecursive)?;
}
- while let Some(result) = receiver.next().await {
- let event = result?;
- match event.kind {
- EventKind::Create(_) => break,
- EventKind::Modify(_) => break,
- EventKind::Remove(_) => break,
- _ => continue,
- }
- }
- Ok(())
+ Ok(watcher)
}