summaryrefslogtreecommitdiff
path: root/cli/file_watcher.rs
blob: 892d0d4a1a739c26922da5e536d8cf3fd1bfa53e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use crate::colors;
use deno_core::ErrBox;
use futures::stream::StreamExt;
use futures::Future;
use notify::event::Event as NotifyEvent;
use notify::event::EventKind;
use notify::Config;
use notify::Error as NotifyError;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
use std::path::PathBuf;
use std::pin::Pin;
use tokio::select;
use tokio::sync::mpsc;

// TODO(bartlomieju): rename
type WatchFuture =
  Pin<Box<dyn Future<Output = std::result::Result<(), deno_core::ErrBox>>>>;

async fn error_handler(watch_future: WatchFuture) {
  let result = watch_future.await;
  if let Err(err) = result {
    let msg = format!("{}: {}", colors::red_bold("error"), err.to_string(),);
    eprintln!("{}", msg);
  }
}

pub async fn watch_func<F>(
  watch_paths: &[PathBuf],
  closure: F,
) -> Result<(), ErrBox>
where
  F: Fn() -> WatchFuture,
{
  loop {
    let func = error_handler(closure());
    let mut is_file_changed = false;
    select! {
      _ = file_watcher(watch_paths) => {
          is_file_changed = true;
          info!(
            "{} File change detected! Restarting!",
            colors::intense_blue("Watcher")
          );
        },
      _ = func => { },
    };
    if !is_file_changed {
      info!(
        "{} Process terminated! Restarting on file change...",
        colors::intense_blue("Watcher")
      );
      file_watcher(watch_paths).await?;
      info!(
        "{} File change detected! Restarting!",
        colors::intense_blue("Watcher")
      );
    }
  }
}

pub async fn file_watcher(paths: &[PathBuf]) -> Result<(), deno_core::ErrBox> {
  let (sender, mut receiver) = mpsc::channel::<Result<NotifyEvent, ErrBox>>(16);
  let sender = std::sync::Mutex::new(sender);

  let mut watcher: RecommendedWatcher =
    Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| {
      let res2 = res.map_err(ErrBox::from);
      let mut sender = sender.lock().unwrap();
      // Ignore result, if send failed it means that watcher was already closed,
      // but not all messages have been flushed.
      let _ = sender.try_send(res2);
    })?;

  watcher.configure(Config::PreciseEvents(true)).unwrap();

  for path in paths {
    watcher.watch(path, RecursiveMode::NonRecursive)?;
  }

  while let Some(result) = receiver.next().await {
    let event = result?;
    match event.kind {
      EventKind::Create(_) => break,
      EventKind::Modify(_) => break,
      EventKind::Remove(_) => break,
      _ => continue,
    }
  }
  Ok(())
}