summaryrefslogtreecommitdiff
path: root/cli/file_watcher.rs
diff options
context:
space:
mode:
authorDavid Sherret <dsherret@users.noreply.github.com>2022-04-19 10:12:30 -0400
committerGitHub <noreply@github.com>2022-04-19 10:12:30 -0400
commit3d1123f8b09cecfa57a93d8b8b7d19af2b45f070 (patch)
treefb28ba4f3b2eee1d969d140a0326a9e20b731923 /cli/file_watcher.rs
parentc30d95f2e36cb3519e1e23c0934b388ebba6bc2c (diff)
fix: `--watch` was losing items (#14317)
Diffstat (limited to 'cli/file_watcher.rs')
-rw-r--r--cli/file_watcher.rs28
1 files changed, 19 insertions, 9 deletions
diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs
index fd7d4e1a1..c326eb7dd 100644
--- a/cli/file_watcher.rs
+++ b/cli/file_watcher.rs
@@ -25,29 +25,39 @@ const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H";
const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200);
struct DebouncedReceiver {
+ // The `recv()` call could be used in a tokio `select!` macro,
+ // and so we store this state on the struct to ensure we don't
+ // lose items if a `recv()` never completes
+ received_items: HashSet<PathBuf>,
receiver: mpsc::UnboundedReceiver<Vec<PathBuf>>,
}
impl DebouncedReceiver {
fn new_with_sender() -> (Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, Self) {
let (sender, receiver) = mpsc::unbounded_channel();
- (Arc::new(sender), Self { receiver })
+ (
+ Arc::new(sender),
+ Self {
+ receiver,
+ received_items: HashSet::new(),
+ },
+ )
}
async fn recv(&mut self) -> Option<Vec<PathBuf>> {
- let mut received_items = self
- .receiver
- .recv()
- .await?
- .into_iter()
- .collect::<HashSet<_>>(); // prevent duplicates
+ if self.received_items.is_empty() {
+ self
+ .received_items
+ .extend(self.receiver.recv().await?.into_iter());
+ }
+
loop {
tokio::select! {
items = self.receiver.recv() => {
- received_items.extend(items?);
+ self.received_items.extend(items?);
}
_ = sleep(DEBOUNCE_INTERVAL) => {
- return Some(received_items.into_iter().collect());
+ return Some(self.received_items.drain().collect());
}
}
}