summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Sherret <dsherret@users.noreply.github.com>2021-12-09 20:24:37 -0500
committerGitHub <noreply@github.com>2021-12-09 20:24:37 -0500
commitf530189c500b9f12c4c94e88eca23eab9ae0d970 (patch)
treee830ad2fa3432d65239922079f86960a98c7500e
parent616ff1d482894f5742463b89fbc8496196c82a64 (diff)
fix(watch): mitigate race condition between file write by other process and watch read (#13038)
-rw-r--r--cli/file_watcher.rs86
-rw-r--r--cli/ops/runtime_compiler.rs4
-rw-r--r--cli/tests/integration/watcher_tests.rs115
-rw-r--r--cli/tools/fmt.rs4
4 files changed, 89 insertions, 120 deletions
diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs
index c7fff4632..84b30dfd2 100644
--- a/cli/file_watcher.rs
+++ b/cli/file_watcher.rs
@@ -4,9 +4,7 @@ use crate::colors;
use crate::fs_util::canonicalize_path;
use deno_core::error::AnyError;
-use deno_core::futures::stream::{Stream, StreamExt};
use deno_core::futures::Future;
-use deno_core::parking_lot::Mutex;
use log::info;
use notify::event::Event as NotifyEvent;
use notify::event::EventKind;
@@ -15,57 +13,42 @@ use notify::Error as NotifyError;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
-use pin_project::pin_project;
use std::collections::HashSet;
use std::path::PathBuf;
-use std::pin::Pin;
use std::sync::Arc;
-use std::task::Context;
-use std::task::Poll;
use std::time::Duration;
-use tokio::pin;
use tokio::select;
+use tokio::sync::mpsc;
use tokio::time::sleep;
-use tokio::time::Instant;
-use tokio::time::Sleep;
const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200);
-#[pin_project(project = DebounceProjection)]
-struct Debounce {
- #[pin]
- timer: Sleep,
- changed_paths: Arc<Mutex<HashSet<PathBuf>>>,
+struct DebouncedReceiver {
+ receiver: mpsc::UnboundedReceiver<Vec<PathBuf>>,
}
-impl Debounce {
- fn new() -> Self {
- Self {
- timer: sleep(DEBOUNCE_INTERVAL),
- changed_paths: Arc::new(Mutex::new(HashSet::new())),
- }
+impl DebouncedReceiver {
+ fn new_with_sender() -> (Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, Self) {
+ let (sender, receiver) = mpsc::unbounded_channel();
+ (Arc::new(sender), Self { receiver })
}
-}
-impl Stream for Debounce {
- type Item = Vec<PathBuf>;
-
- /// Note that this never returns `Poll::Ready(None)`, which means that the
- /// file watcher will be alive until the Deno process is terminated.
- fn poll_next(
- self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> Poll<Option<Self::Item>> {
- let mut changed_paths = self.changed_paths.lock();
- if changed_paths.len() > 0 {
- Poll::Ready(Some(changed_paths.drain().collect()))
- } else {
- drop(changed_paths);
- let mut timer = self.project().timer;
- if timer.as_mut().poll(cx).is_ready() {
- timer.reset(Instant::now() + DEBOUNCE_INTERVAL);
+ async fn recv(&mut self) -> Option<Vec<PathBuf>> {
+ let mut received_items = self
+ .receiver
+ .recv()
+ .await?
+ .into_iter()
+ .collect::<HashSet<_>>(); // prevent duplicates
+ loop {
+ tokio::select! {
+ items = self.receiver.recv() => {
+ received_items.extend(items?);
+ }
+ _ = sleep(DEBOUNCE_INTERVAL) => {
+ return Some(received_items.into_iter().collect());
+ }
}
- Poll::Pending
}
}
}
@@ -91,14 +74,14 @@ pub enum ResolutionResult<T> {
async fn next_restart<R, T, F>(
resolver: &mut R,
- debounce: &mut Pin<&mut Debounce>,
+ debounced_receiver: &mut DebouncedReceiver,
) -> (Vec<PathBuf>, Result<T, AnyError>)
where
R: FnMut(Option<Vec<PathBuf>>) -> F,
F: Future<Output = ResolutionResult<T>>,
{
loop {
- let changed = debounce.next().await;
+ let changed = debounced_receiver.recv().await;
match resolver(changed).await {
ResolutionResult::Ignore => {
log::debug!("File change ignored")
@@ -140,8 +123,7 @@ where
F1: Future<Output = ResolutionResult<T>>,
F2: Future<Output = Result<(), AnyError>>,
{
- let debounce = Debounce::new();
- pin!(debounce);
+ let (sender, mut receiver) = DebouncedReceiver::new_with_sender();
// Store previous data. If module resolution fails at some point, the watcher will try to
// continue watching files using these data.
@@ -161,7 +143,7 @@ where
colors::intense_blue("Watcher"),
);
- let (paths, result) = next_restart(&mut resolver, &mut debounce).await;
+ let (paths, result) = next_restart(&mut resolver, &mut receiver).await;
paths_to_watch = paths;
resolution_result = result;
}
@@ -175,13 +157,13 @@ where
};
loop {
- let watcher = new_watcher(&paths_to_watch, &debounce)?;
+ let watcher = new_watcher(&paths_to_watch, sender.clone())?;
match resolution_result {
Ok(operation_arg) => {
let fut = error_handler(operation(operation_arg));
select! {
- (paths, result) = next_restart(&mut resolver, &mut debounce) => {
+ (paths, result) = next_restart(&mut resolver, &mut receiver) => {
if result.is_ok() {
paths_to_watch = paths;
}
@@ -207,7 +189,7 @@ where
}
}
- let (paths, result) = next_restart(&mut resolver, &mut debounce).await;
+ let (paths, result) = next_restart(&mut resolver, &mut receiver).await;
if result.is_ok() {
paths_to_watch = paths;
}
@@ -219,10 +201,8 @@ where
fn new_watcher(
paths: &[PathBuf],
- debounce: &Debounce,
+ sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>,
) -> Result<RecommendedWatcher, AnyError> {
- let changed_paths = Arc::clone(&debounce.changed_paths);
-
let mut watcher: RecommendedWatcher =
Watcher::new(move |res: Result<NotifyEvent, NotifyError>| {
if let Ok(event) = res {
@@ -233,9 +213,9 @@ fn new_watcher(
let paths = event
.paths
.iter()
- .filter_map(|path| canonicalize_path(path).ok());
- let mut changed_paths = changed_paths.lock();
- changed_paths.extend(paths);
+ .filter_map(|path| canonicalize_path(path).ok())
+ .collect();
+ sender.send(paths).unwrap();
}
}
})?;
diff --git a/cli/ops/runtime_compiler.rs b/cli/ops/runtime_compiler.rs
index d79a5fe31..af99f12ca 100644
--- a/cli/ops/runtime_compiler.rs
+++ b/cli/ops/runtime_compiler.rs
@@ -166,7 +166,9 @@ async fn op_emit(
args.import_map_path
{
let import_map_specifier = resolve_url_or_path(&import_map_str)
- .context(format!("Bad URL (\"{}\") for import map.", import_map_str))?;
+ .with_context(|| {
+ format!("Bad URL (\"{}\") for import map.", import_map_str)
+ })?;
let import_map = if let Some(value) = args.import_map {
ImportMap::from_json(import_map_specifier.as_str(), &value.to_string())?
} else {
diff --git a/cli/tests/integration/watcher_tests.rs b/cli/tests/integration/watcher_tests.rs
index 42ca4b520..c9a5632c0 100644
--- a/cli/tests/integration/watcher_tests.rs
+++ b/cli/tests/integration/watcher_tests.rs
@@ -18,7 +18,7 @@ macro_rules! assert_contains {
// Helper function to skip watcher output that contains "Restarting"
// phrase.
fn skip_restarting_line(
- mut stderr_lines: impl Iterator<Item = String>,
+ stderr_lines: &mut impl Iterator<Item = String>,
) -> String {
loop {
let msg = stderr_lines.next().unwrap();
@@ -69,10 +69,18 @@ fn child_lines(
) -> (impl Iterator<Item = String>, impl Iterator<Item = String>) {
let stdout_lines = std::io::BufReader::new(child.stdout.take().unwrap())
.lines()
- .map(|r| r.unwrap());
+ .map(|r| {
+ let line = r.unwrap();
+ eprintln!("STDOUT: {}", line);
+ line
+ });
let stderr_lines = std::io::BufReader::new(child.stderr.take().unwrap())
.lines()
- .map(|r| r.unwrap());
+ .map(|r| {
+ let line = r.unwrap();
+ eprintln!("STERR: {}", line);
+ line
+ });
(stdout_lines, stderr_lines)
}
@@ -106,13 +114,7 @@ fn lint_watch_test() {
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn script");
- let mut stderr = child.stderr.as_mut().unwrap();
- let mut stderr_lines = std::io::BufReader::new(&mut stderr)
- .lines()
- .map(|r| r.unwrap());
-
- // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
- std::thread::sleep(std::time::Duration::from_secs(1));
+ let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
let mut output = read_all_lints(&mut stderr_lines);
let expected = std::fs::read_to_string(badly_linted_output).unwrap();
@@ -130,7 +132,6 @@ fn lint_watch_test() {
// Change content of the file again to be badly-linted1
std::fs::copy(&badly_linted_fixed2, &badly_linted)
.expect("Failed to copy file");
- std::thread::sleep(std::time::Duration::from_secs(1));
output = read_all_lints(&mut stderr_lines);
let expected = std::fs::read_to_string(badly_linted_fixed2_output).unwrap();
@@ -172,13 +173,7 @@ fn lint_watch_without_args_test() {
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn script");
- let mut stderr = child.stderr.as_mut().unwrap();
- let mut stderr_lines = std::io::BufReader::new(&mut stderr)
- .lines()
- .map(|r| r.unwrap());
-
- // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
- std::thread::sleep(std::time::Duration::from_secs(1));
+ let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
let mut output = read_all_lints(&mut stderr_lines);
let expected = std::fs::read_to_string(badly_linted_output).unwrap();
@@ -187,7 +182,6 @@ fn lint_watch_without_args_test() {
// Change content of the file again to be badly-linted1
std::fs::copy(&badly_linted_fixed1, &badly_linted)
.expect("Failed to copy file");
- std::thread::sleep(std::time::Duration::from_secs(1));
output = read_all_lints(&mut stderr_lines);
let expected = std::fs::read_to_string(badly_linted_fixed1_output).unwrap();
@@ -236,18 +230,12 @@ fn lint_all_files_on_each_change_test() {
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn script");
- let mut stderr = child.stderr.as_mut().unwrap();
- let mut stderr_lines = std::io::BufReader::new(&mut stderr)
- .lines()
- .map(|r| r.unwrap());
-
- std::thread::sleep(std::time::Duration::from_secs(1));
+ let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 2 files");
std::fs::copy(&badly_linted_fixed2, &badly_linted_2)
.expect("Failed to copy file");
- std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 2 files");
@@ -276,12 +264,13 @@ fn fmt_watch_test() {
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
- let (_stdout_lines, stderr_lines) = child_lines(&mut child);
-
- // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
- std::thread::sleep(std::time::Duration::from_secs(1));
+ let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
- assert!(skip_restarting_line(stderr_lines).contains("badly_formatted.js"));
+ assert_contains!(
+ skip_restarting_line(&mut stderr_lines),
+ "badly_formatted.js"
+ );
+ assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file");
let expected = std::fs::read_to_string(fixed.clone()).unwrap();
let actual = std::fs::read_to_string(badly_formatted.clone()).unwrap();
@@ -289,7 +278,12 @@ fn fmt_watch_test() {
// Change content of the file again to be badly formatted
std::fs::copy(&badly_formatted_original, &badly_formatted).unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
+ assert_contains!(
+ skip_restarting_line(&mut stderr_lines),
+ "badly_formatted.js"
+ );
+ assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file");
// Check if file has been automatically formatted by watcher
let expected = std::fs::read_to_string(fixed).unwrap();
@@ -316,12 +310,13 @@ fn fmt_watch_without_args_test() {
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
- let (_stdout_lines, stderr_lines) = child_lines(&mut child);
-
- // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
- std::thread::sleep(std::time::Duration::from_secs(1));
+ let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
- assert!(skip_restarting_line(stderr_lines).contains("badly_formatted.js"));
+ assert_contains!(
+ skip_restarting_line(&mut stderr_lines),
+ "badly_formatted.js"
+ );
+ assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file");
let expected = std::fs::read_to_string(fixed.clone()).unwrap();
let actual = std::fs::read_to_string(badly_formatted.clone()).unwrap();
@@ -329,7 +324,11 @@ fn fmt_watch_without_args_test() {
// Change content of the file again to be badly formatted
std::fs::copy(&badly_formatted_original, &badly_formatted).unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+ assert_contains!(
+ skip_restarting_line(&mut stderr_lines),
+ "badly_formatted.js"
+ );
+ assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file");
// Check if file has been automatically formatted by watcher
let expected = std::fs::read_to_string(fixed).unwrap();
@@ -361,9 +360,6 @@ fn fmt_check_all_files_on_each_change_test() {
.unwrap();
let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
- // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
- std::thread::sleep(std::time::Duration::from_secs(1));
-
assert_contains!(
read_line("error", &mut stderr_lines),
"Found 2 not formatted files in 2 files"
@@ -372,8 +368,6 @@ fn fmt_check_all_files_on_each_change_test() {
// Change content of the file again to be badly formatted
std::fs::copy(&badly_formatted_original, &badly_formatted_1).unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
-
assert_contains!(
read_line("error", &mut stderr_lines),
"Found 2 not formatted files in 2 files"
@@ -407,7 +401,6 @@ fn bundle_js_watch() {
let (_stdout_lines, mut stderr_lines) = child_lines(&mut deno);
- std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Check");
assert_contains!(stderr_lines.next().unwrap(), "file_to_watch.js");
assert_contains!(stderr_lines.next().unwrap(), "mod6.bundle.js");
@@ -416,7 +409,7 @@ fn bundle_js_watch() {
wait_for("Bundle finished", &mut stderr_lines);
write(&file_to_watch, "console.log('Hello world2');").unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
assert_contains!(stderr_lines.next().unwrap(), "Check");
assert_contains!(stderr_lines.next().unwrap(), "File change detected!");
assert_contains!(stderr_lines.next().unwrap(), "file_to_watch.js");
@@ -427,7 +420,7 @@ fn bundle_js_watch() {
// Confirm that the watcher keeps on working even if the file is updated and has invalid syntax
write(&file_to_watch, "syntax error ^^").unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
assert_contains!(stderr_lines.next().unwrap(), "File change detected!");
assert_contains!(stderr_lines.next().unwrap(), "error: ");
wait_for("Bundle failed", &mut stderr_lines);
@@ -456,7 +449,6 @@ fn bundle_watch_not_exit() {
.unwrap();
let (_stdout_lines, mut stderr_lines) = child_lines(&mut deno);
- std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "error:");
assert_contains!(stderr_lines.next().unwrap(), "Bundle failed");
// the target file hasn't been created yet
@@ -464,12 +456,14 @@ fn bundle_watch_not_exit() {
// Make sure the watcher actually restarts and works fine with the proper syntax
write(&file_to_watch, "console.log(42);").unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
assert_contains!(stderr_lines.next().unwrap(), "Check");
assert_contains!(stderr_lines.next().unwrap(), "File change detected!");
assert_contains!(stderr_lines.next().unwrap(), "file_to_watch.js");
assert_contains!(stderr_lines.next().unwrap(), "target.js");
+
wait_for("Bundle finished", &mut stderr_lines);
+
// bundled file is created
assert!(target_file.is_file());
check_alive_then_kill(deno);
@@ -497,13 +491,8 @@ fn run_watch() {
assert_contains!(stdout_lines.next().unwrap(), "Hello world");
wait_for("Process finished", &mut stderr_lines);
- // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
- std::thread::sleep(std::time::Duration::from_secs(1));
-
// Change content of the file
write(&file_to_watch, "console.log('Hello world2');").unwrap();
- // Events from the file watcher is "debounced", so we need to wait for the next execution to start
- std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), "Hello world2");
@@ -517,21 +506,21 @@ fn run_watch() {
"import { foo } from './another_file.js'; console.log(foo);",
)
.unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), '0');
wait_for("Process finished", &mut stderr_lines);
// Confirm that restarting occurs when a new file is updated
write(&another_file, "export const foo = 42;").unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), "42");
wait_for("Process finished", &mut stderr_lines);
// Confirm that the watcher keeps on working even if the file is updated and has invalid syntax
write(&file_to_watch, "syntax error ^^").unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stderr_lines.next().unwrap(), "error:");
wait_for("Process failed", &mut stderr_lines);
@@ -542,21 +531,21 @@ fn run_watch() {
"import { foo } from './another_file.js'; console.log(foo);",
)
.unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), "42");
wait_for("Process finished", &mut stderr_lines);
// Update the content of the imported file with invalid syntax
write(&another_file, "syntax error ^^").unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stderr_lines.next().unwrap(), "error:");
wait_for("Process failed", &mut stderr_lines);
// Modify the imported file and make sure that restarting occurs
write(&another_file, "export const foo = 'modified!';").unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), "modified!");
wait_for("Process finished", &mut stderr_lines);
@@ -613,9 +602,6 @@ fn run_watch_load_unload_events() {
)
.unwrap();
- // Events from the file watcher is "debounced", so we need to wait for the next execution to start
- std::thread::sleep(std::time::Duration::from_secs(1));
-
// Wait for the restart
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
@@ -650,13 +636,12 @@ fn run_watch_not_exit() {
.unwrap();
let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child);
- std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "error:");
assert_contains!(stderr_lines.next().unwrap(), "Process failed");
// Make sure the watcher actually restarts and works fine with the proper syntax
write(&file_to_watch, "console.log(42);").unwrap();
- std::thread::sleep(std::time::Duration::from_secs(1));
+
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), "42");
wait_for("Process finished", &mut stderr_lines);
@@ -905,6 +890,6 @@ fn test_watch_doc() {
// We only need to scan for a Check file://.../foo.ts$3-6 line that
// corresponds to the documentation block being type-checked.
- assert_contains!(skip_restarting_line(stderr_lines), "foo.ts$3-6");
+ assert_contains!(skip_restarting_line(&mut stderr_lines), "foo.ts$3-6");
check_alive_then_kill(child);
}
diff --git a/cli/tools/fmt.rs b/cli/tools/fmt.rs
index 2149552cd..9172421b5 100644
--- a/cli/tools/fmt.rs
+++ b/cli/tools/fmt.rs
@@ -19,6 +19,7 @@ use crate::fs_util::specifier_to_file_path;
use crate::fs_util::{collect_files, get_extension, is_supported_ext_fmt};
use crate::text_encoding;
use deno_ast::ParsedSource;
+use deno_core::anyhow::Context;
use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::futures;
@@ -525,7 +526,8 @@ struct FileContents {
}
fn read_file_contents(file_path: &Path) -> Result<FileContents, AnyError> {
- let file_bytes = fs::read(&file_path)?;
+ let file_bytes = fs::read(&file_path)
+ .with_context(|| format!("Error reading {}", file_path.display()))?;
let charset = text_encoding::detect_charset(&file_bytes);
let file_text = text_encoding::convert_to_utf8(&file_bytes, charset)?;
let had_bom = file_text.starts_with(text_encoding::BOM_CHAR);