summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--cli/Cargo.toml1
-rw-r--r--cli/npm/resolvers/local.rs10
-rw-r--r--cli/util/fs.rs246
-rw-r--r--cli/util/progress_bar/mod.rs36
-rw-r--r--cli/util/progress_bar/renderer.rs11
-rw-r--r--runtime/Cargo.toml2
8 files changed, 301 insertions, 7 deletions
diff --git a/Cargo.lock b/Cargo.lock
index adf3706b7..2d16aa88c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -850,6 +850,7 @@ dependencies = [
"fancy-regex",
"flaky_test",
"flate2",
+ "fs3",
"fwdansi",
"glibc_version",
"http",
diff --git a/Cargo.toml b/Cargo.toml
index 1fbdd5c65..48fe593e7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -88,6 +88,7 @@ data-url = "=0.2.0"
dlopen = "0.1.8"
encoding_rs = "=0.8.31"
flate2 = "=1.0.24"
+fs3 = "0.5.0"
futures = "0.3.21"
http = "=0.2.8"
hyper = "0.14.18"
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 0ad4896f3..692ac5205 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -73,6 +73,7 @@ env_logger = "=0.9.0"
eszip = "=0.37.0"
fancy-regex = "=0.10.0"
flate2.workspace = true
+fs3.workspace = true
http.workspace = true
import_map = "=0.15.0"
indexmap.workspace = true
diff --git a/cli/npm/resolvers/local.rs b/cli/npm/resolvers/local.rs
index bf5b8529c..52a783823 100644
--- a/cli/npm/resolvers/local.rs
+++ b/cli/npm/resolvers/local.rs
@@ -10,6 +10,7 @@ use std::path::Path;
use std::path::PathBuf;
use crate::util::fs::symlink_dir;
+use crate::util::fs::LaxSingleProcessFsFlag;
use async_trait::async_trait;
use deno_ast::ModuleSpecifier;
use deno_core::anyhow::bail;
@@ -236,6 +237,13 @@ async fn sync_resolution_with_fs(
format!("Creating '{}'", deno_local_registry_dir.display())
})?;
+ let single_process_lock = LaxSingleProcessFsFlag::lock(
+ deno_local_registry_dir.join(".deno.lock"),
+ // similar message used by cargo build
+ "waiting for file lock on node_modules directory",
+ )
+ .await;
+
// 1. Write all the packages out the .deno directory.
//
// Copy (hardlink in future) <global_registry_cache>/<package_id>/ to
@@ -394,6 +402,8 @@ async fn sync_resolution_with_fs(
}
}
+ drop(single_process_lock);
+
Ok(())
}
diff --git a/cli/util/fs.rs b/cli/util/fs.rs
index 777b22c5f..4ac57eac0 100644
--- a/cli/util/fs.rs
+++ b/cli/util/fs.rs
@@ -14,10 +14,14 @@ use std::io::ErrorKind;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
+use std::sync::Arc;
use std::time::Duration;
use walkdir::WalkDir;
use crate::args::FilesConfig;
+use crate::util::progress_bar::ProgressBar;
+use crate::util::progress_bar::ProgressBarStyle;
+use crate::util::progress_bar::ProgressMessagePrompt;
use super::path::specifier_to_file_path;
@@ -471,11 +475,167 @@ pub fn dir_size(path: &Path) -> std::io::Result<u64> {
Ok(total)
}
+struct LaxSingleProcessFsFlagInner {
+ file_path: PathBuf,
+ fs_file: std::fs::File,
+ finished_token: Arc<tokio_util::sync::CancellationToken>,
+}
+
+impl Drop for LaxSingleProcessFsFlagInner {
+ fn drop(&mut self) {
+ use fs3::FileExt;
+ // kill the poll thread
+ self.finished_token.cancel();
+ // release the file lock
+ if let Err(err) = self.fs_file.unlock() {
+ log::debug!(
+ "Failed releasing lock for {}. {:#}",
+ self.file_path.display(),
+ err
+ );
+ }
+ }
+}
+
+/// A file system based flag that will attempt to synchronize multiple
+/// processes so they go one after the other. In scenarios where
+/// synchronization cannot be achieved, it will allow the current process
+/// to proceed.
+///
+/// This should only be used in places where it's ideal for multiple
+/// processes to not update something on the file system at the same time,
+/// but it's not that big of a deal.
+pub struct LaxSingleProcessFsFlag(Option<LaxSingleProcessFsFlagInner>);
+
+impl LaxSingleProcessFsFlag {
+ pub async fn lock(file_path: PathBuf, long_wait_message: &str) -> Self {
+ log::debug!("Acquiring file lock at {}", file_path.display());
+ use fs3::FileExt;
+ let last_updated_path = file_path.with_extension("lock.poll");
+ let start_instant = std::time::Instant::now();
+ let open_result = std::fs::OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create(true)
+ .open(&file_path);
+
+ match open_result {
+ Ok(fs_file) => {
+ let mut pb_update_guard = None;
+ let mut error_count = 0;
+ while error_count < 10 {
+ let lock_result = fs_file.try_lock_exclusive();
+ let poll_file_update_ms = 100;
+ match lock_result {
+ Ok(_) => {
+ log::debug!("Acquired file lock at {}", file_path.display());
+ let _ignore = std::fs::write(&last_updated_path, "");
+ let token = Arc::new(tokio_util::sync::CancellationToken::new());
+
+ // Spawn a blocking task that will continually update a file
+ // signalling the lock is alive. This is a fail safe for when
+ // a file lock is never released. For example, on some operating
+ // systems, if a process does not release the lock (say it's
+ // killed), then the OS may release it at an indeterminate time
+ //
+ // This uses a blocking task because we use a single threaded
+ // runtime and this is time sensitive so we don't want it to update
+ // at the whims of of whatever is occurring on the runtime thread.
+ tokio::task::spawn_blocking({
+ let token = token.clone();
+ let last_updated_path = last_updated_path.clone();
+ move || {
+ let mut i = 0;
+ while !token.is_cancelled() {
+ i += 1;
+ let _ignore =
+ std::fs::write(&last_updated_path, i.to_string());
+ std::thread::sleep(Duration::from_millis(
+ poll_file_update_ms,
+ ));
+ }
+ }
+ });
+
+ return Self(Some(LaxSingleProcessFsFlagInner {
+ file_path,
+ fs_file,
+ finished_token: token,
+ }));
+ }
+ Err(_) => {
+ // show a message if it's been a while
+ if pb_update_guard.is_none()
+ && start_instant.elapsed().as_millis() > 1_000
+ {
+ let pb = ProgressBar::new(ProgressBarStyle::TextOnly);
+ let guard = pb.update_with_prompt(
+ ProgressMessagePrompt::Blocking,
+ long_wait_message,
+ );
+ pb_update_guard = Some((guard, pb));
+ }
+
+ // sleep for a little bit
+ tokio::time::sleep(Duration::from_millis(20)).await;
+
+ // Poll the last updated path to check if it's stopped updating,
+ // which is an indication that the file lock is claimed, but
+ // was never properly released.
+ match std::fs::metadata(&last_updated_path)
+ .and_then(|p| p.modified())
+ {
+ Ok(last_updated_time) => {
+ let current_time = std::time::SystemTime::now();
+ match current_time.duration_since(last_updated_time) {
+ Ok(duration) => {
+ if duration.as_millis()
+ > (poll_file_update_ms * 2) as u128
+ {
+ // the other process hasn't updated this file in a long time
+ // so maybe it was killed and the operating system hasn't
+ // released the file lock yet
+ return Self(None);
+ } else {
+ error_count = 0; // reset
+ }
+ }
+ Err(_) => {
+ error_count += 1;
+ }
+ }
+ }
+ Err(_) => {
+ error_count += 1;
+ }
+ }
+ }
+ }
+ }
+
+ drop(pb_update_guard); // explicit for clarity
+ Self(None)
+ }
+ Err(err) => {
+ log::debug!(
+ "Failed to open file lock at {}. {:#}",
+ file_path.display(),
+ err
+ );
+ Self(None) // let the process through
+ }
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
+ use deno_core::futures;
+ use deno_core::parking_lot::Mutex;
use pretty_assertions::assert_eq;
use test_util::TempDir;
+ use tokio::sync::Notify;
#[test]
fn resolve_from_cwd_child() {
@@ -793,4 +953,90 @@ mod tests {
);
}
}
+
+ #[tokio::test]
+ async fn lax_fs_lock() {
+ let temp_dir = TempDir::new();
+ let lock_path = temp_dir.path().join("file.lock");
+ let signal1 = Arc::new(Notify::new());
+ let signal2 = Arc::new(Notify::new());
+ let signal3 = Arc::new(Notify::new());
+ let signal4 = Arc::new(Notify::new());
+ tokio::spawn({
+ let lock_path = lock_path.clone();
+ let signal1 = signal1.clone();
+ let signal2 = signal2.clone();
+ let signal3 = signal3.clone();
+ let signal4 = signal4.clone();
+ let temp_dir = temp_dir.clone();
+ async move {
+ let flag =
+ LaxSingleProcessFsFlag::lock(lock_path.clone(), "waiting").await;
+ signal1.notify_one();
+ signal2.notified().await;
+ tokio::time::sleep(Duration::from_millis(10)).await; // give the other thread time to acquire the lock
+ temp_dir.write("file.txt", "update1");
+ signal3.notify_one();
+ signal4.notified().await;
+ drop(flag);
+ }
+ });
+ let signal5 = Arc::new(Notify::new());
+ tokio::spawn({
+ let temp_dir = temp_dir.clone();
+ let signal5 = signal5.clone();
+ async move {
+ signal1.notified().await;
+ signal2.notify_one();
+ let flag = LaxSingleProcessFsFlag::lock(lock_path, "waiting").await;
+ temp_dir.write("file.txt", "update2");
+ signal5.notify_one();
+ drop(flag);
+ }
+ });
+
+ signal3.notified().await;
+ assert_eq!(temp_dir.read_to_string("file.txt"), "update1");
+ signal4.notify_one();
+ signal5.notified().await;
+ assert_eq!(temp_dir.read_to_string("file.txt"), "update2");
+ }
+
+ #[tokio::test]
+ async fn lax_fs_lock_ordered() {
+ let temp_dir = TempDir::new();
+ let lock_path = temp_dir.path().join("file.lock");
+ let output_path = temp_dir.path().join("output");
+ let expected_order = Arc::new(Mutex::new(Vec::new()));
+ let count = 10;
+ let mut tasks = Vec::with_capacity(count);
+
+ std::fs::write(&output_path, "").unwrap();
+
+ for i in 0..count {
+ let lock_path = lock_path.clone();
+ let output_path = output_path.clone();
+ let expected_order = expected_order.clone();
+ tasks.push(tokio::spawn(async move {
+ let flag =
+ LaxSingleProcessFsFlag::lock(lock_path.clone(), "waiting").await;
+ expected_order.lock().push(i.to_string());
+ // be extremely racy
+ let mut output = std::fs::read_to_string(&output_path).unwrap();
+ if !output.is_empty() {
+ output.push('\n');
+ }
+ output.push_str(&i.to_string());
+ std::fs::write(&output_path, output).unwrap();
+ drop(flag);
+ }));
+ }
+
+ futures::future::join_all(tasks).await;
+ let expected_output = expected_order.lock().join("\n");
+ assert_eq!(
+ std::fs::read_to_string(output_path).unwrap(),
+ expected_output
+ );
+ }
}
diff --git a/cli/util/progress_bar/mod.rs b/cli/util/progress_bar/mod.rs
index 004b48b2f..256871079 100644
--- a/cli/util/progress_bar/mod.rs
+++ b/cli/util/progress_bar/mod.rs
@@ -23,6 +23,21 @@ mod renderer;
// Inspired by Indicatif, but this custom implementation allows
// for more control over what's going on under the hood.
+#[derive(Debug, Clone, Copy)]
+pub enum ProgressMessagePrompt {
+ Download,
+ Blocking,
+}
+
+impl ProgressMessagePrompt {
+ pub fn as_text(&self) -> String {
+ match self {
+ ProgressMessagePrompt::Download => colors::green("Download").to_string(),
+ ProgressMessagePrompt::Blocking => colors::cyan("Blocking").to_string(),
+ }
+ }
+}
+
#[derive(Debug)]
pub struct UpdateGuard {
maybe_entry: Option<ProgressBarEntry>,
@@ -59,6 +74,7 @@ pub enum ProgressBarStyle {
#[derive(Clone, Debug)]
struct ProgressBarEntry {
id: usize,
+ prompt: ProgressMessagePrompt,
pub message: String,
pos: Arc<AtomicU64>,
total_size: Arc<AtomicU64>,
@@ -128,11 +144,16 @@ impl ProgressBarInner {
}
}
- pub fn add_entry(&self, message: String) -> ProgressBarEntry {
+ pub fn add_entry(
+ &self,
+ kind: ProgressMessagePrompt,
+ message: String,
+ ) -> ProgressBarEntry {
let mut internal_state = self.state.lock();
let id = internal_state.total_entries;
let entry = ProgressBarEntry {
id,
+ prompt: kind,
message,
pos: Default::default(),
total_size: Default::default(),
@@ -208,6 +229,7 @@ impl DrawThreadRenderer for ProgressBarInner {
pending_entries: state.entries.len(),
total_entries: state.total_entries,
display_entry: ProgressDataDisplayEntry {
+ prompt: preferred_entry.prompt,
message: preferred_entry.message.clone(),
position: preferred_entry.position(),
total_size: preferred_entry.total_size(),
@@ -255,9 +277,17 @@ impl ProgressBar {
}
pub fn update(&self, msg: &str) -> UpdateGuard {
+ self.update_with_prompt(ProgressMessagePrompt::Download, msg)
+ }
+
+ pub fn update_with_prompt(
+ &self,
+ kind: ProgressMessagePrompt,
+ msg: &str,
+ ) -> UpdateGuard {
match &self.inner {
Some(inner) => {
- let entry = inner.add_entry(msg.to_string());
+ let entry = inner.add_entry(kind, msg.to_string());
UpdateGuard {
maybe_entry: Some(entry),
}
@@ -265,7 +295,7 @@ impl ProgressBar {
None => {
// if we're not running in TTY, fallback to using logger crate
if !msg.is_empty() {
- log::log!(log::Level::Info, "{} {}", colors::green("Download"), msg);
+ log::log!(log::Level::Info, "{} {}", kind.as_text(), msg);
}
UpdateGuard { maybe_entry: None }
}
diff --git a/cli/util/progress_bar/renderer.rs b/cli/util/progress_bar/renderer.rs
index 0ea275e77..5635ad316 100644
--- a/cli/util/progress_bar/renderer.rs
+++ b/cli/util/progress_bar/renderer.rs
@@ -6,8 +6,11 @@ use deno_runtime::colors;
use crate::util::display::human_download_size;
+use super::ProgressMessagePrompt;
+
#[derive(Clone)]
pub struct ProgressDataDisplayEntry {
+ pub prompt: ProgressMessagePrompt,
pub message: String,
pub position: u64,
pub total_size: u64,
@@ -142,7 +145,7 @@ impl ProgressBarRenderer for TextOnlyProgressBarRenderer {
format!(
"{} {}{}{}",
- colors::green("Download"),
+ data.display_entry.prompt.as_text(),
data.display_entry.message,
colors::gray(bytes_text),
colors::gray(total_text),
@@ -195,6 +198,7 @@ mod test {
let renderer = BarProgressBarRenderer;
let mut data = ProgressData {
display_entry: ProgressDataDisplayEntry {
+ prompt: ProgressMessagePrompt::Download,
message: "data".to_string(),
position: 0,
total_size: 10 * BYTES_TO_KIB,
@@ -251,6 +255,7 @@ mod test {
let renderer = TextOnlyProgressBarRenderer;
let mut data = ProgressData {
display_entry: ProgressDataDisplayEntry {
+ prompt: ProgressMessagePrompt::Blocking,
message: "data".to_string(),
position: 0,
total_size: 10 * BYTES_TO_KIB,
@@ -263,7 +268,7 @@ mod test {
};
let text = renderer.render(data.clone());
let text = test_util::strip_ansi_codes(&text);
- assert_eq!(text, "Download data 0.00KiB/10.00KiB (2/3)");
+ assert_eq!(text, "Blocking data 0.00KiB/10.00KiB (2/3)");
data.pending_entries = 0;
data.total_entries = 1;
@@ -271,6 +276,6 @@ mod test {
data.display_entry.total_size = 0;
let text = renderer.render(data);
let text = test_util::strip_ansi_codes(&text);
- assert_eq!(text, "Download data");
+ assert_eq!(text, "Blocking data");
}
}
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml
index 112156203..0ba8f8d3a 100644
--- a/runtime/Cargo.toml
+++ b/runtime/Cargo.toml
@@ -89,7 +89,7 @@ atty.workspace = true
dlopen.workspace = true
encoding_rs.workspace = true
filetime = "0.2.16"
-fs3 = "0.5.0"
+fs3.workspace = true
http.workspace = true
hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] }
libc.workspace = true