summaryrefslogtreecommitdiff
path: root/cli/util
diff options
context:
space:
mode:
authorDavid Sherret <dsherret@users.noreply.github.com>2024-06-05 15:17:35 -0400
committerGitHub <noreply@github.com>2024-06-05 15:17:35 -0400
commit1b355d8a87a3ad43bf240aa66b88eb98c1cd777f (patch)
tree0422d22a6a0e40873eea4c1ef3eff9d6648f1c51 /cli/util
parent7ed90a20d04982ae15a52ae2378cbffd4b6839df (diff)
refactor(npm): improve locking around updating npm resolution (#24104)
Introduces a `SyncReadAsyncWriteLock` to make it harder to write to the npm resolution without first waiting async in a queue. For the npm resolution, reading synchronously is fine, but when updating, someone should wait async, clone the data, then write the data at the end back.
Diffstat (limited to 'cli/util')
-rw-r--r--cli/util/sync/async_flag.rs20
-rw-r--r--cli/util/sync/atomic_flag.rs35
-rw-r--r--cli/util/sync/mod.rs14
-rw-r--r--cli/util/sync/sync_read_async_write_lock.rs62
-rw-r--r--cli/util/sync/task_queue.rs (renamed from cli/util/sync.rs)46
-rw-r--r--cli/util/sync/value_creator.rs213
6 files changed, 345 insertions, 45 deletions
diff --git a/cli/util/sync/async_flag.rs b/cli/util/sync/async_flag.rs
new file mode 100644
index 000000000..2bdff63c0
--- /dev/null
+++ b/cli/util/sync/async_flag.rs
@@ -0,0 +1,20 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use tokio_util::sync::CancellationToken;
+
+#[derive(Debug, Default, Clone)]
+pub struct AsyncFlag(CancellationToken);
+
+impl AsyncFlag {
+ pub fn raise(&self) {
+ self.0.cancel();
+ }
+
+ pub fn is_raised(&self) -> bool {
+ self.0.is_cancelled()
+ }
+
+ pub fn wait_raised(&self) -> impl std::future::Future<Output = ()> + '_ {
+ self.0.cancelled()
+ }
+}
diff --git a/cli/util/sync/atomic_flag.rs b/cli/util/sync/atomic_flag.rs
new file mode 100644
index 000000000..75396dcf4
--- /dev/null
+++ b/cli/util/sync/atomic_flag.rs
@@ -0,0 +1,35 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
+
+/// Simplifies the use of an atomic boolean as a flag.
+#[derive(Debug, Default)]
+pub struct AtomicFlag(AtomicBool);
+
+impl AtomicFlag {
+ /// Raises the flag returning if the raise was successful.
+ pub fn raise(&self) -> bool {
+ !self.0.swap(true, Ordering::SeqCst)
+ }
+
+ /// Gets if the flag is raised.
+ pub fn is_raised(&self) -> bool {
+ self.0.load(Ordering::SeqCst)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn atomic_flag_raises() {
+ let flag = AtomicFlag::default();
+ assert!(!flag.is_raised()); // false by default
+ assert!(flag.raise());
+ assert!(flag.is_raised());
+ assert!(!flag.raise());
+ assert!(flag.is_raised());
+ }
+}
diff --git a/cli/util/sync/mod.rs b/cli/util/sync/mod.rs
new file mode 100644
index 000000000..28aab7f47
--- /dev/null
+++ b/cli/util/sync/mod.rs
@@ -0,0 +1,14 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+mod async_flag;
+mod atomic_flag;
+mod sync_read_async_write_lock;
+mod task_queue;
+mod value_creator;
+
+pub use async_flag::AsyncFlag;
+pub use atomic_flag::AtomicFlag;
+pub use sync_read_async_write_lock::SyncReadAsyncWriteLock;
+pub use task_queue::TaskQueue;
+pub use task_queue::TaskQueuePermit;
+pub use value_creator::MultiRuntimeAsyncValueCreator;
diff --git a/cli/util/sync/sync_read_async_write_lock.rs b/cli/util/sync/sync_read_async_write_lock.rs
new file mode 100644
index 000000000..8bd211aa7
--- /dev/null
+++ b/cli/util/sync/sync_read_async_write_lock.rs
@@ -0,0 +1,62 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use deno_core::parking_lot::RwLock;
+use deno_core::parking_lot::RwLockReadGuard;
+use deno_core::parking_lot::RwLockWriteGuard;
+
+use super::TaskQueue;
+use super::TaskQueuePermit;
+
+/// A lock that can be read synchronously at any time (including when
+/// being written to), but must write asynchronously.
+pub struct SyncReadAsyncWriteLockWriteGuard<'a, T: Send + Sync> {
+ _update_permit: TaskQueuePermit<'a>,
+ data: &'a RwLock<T>,
+}
+
+impl<'a, T: Send + Sync> SyncReadAsyncWriteLockWriteGuard<'a, T> {
+ pub fn read(&self) -> RwLockReadGuard<'_, T> {
+ self.data.read()
+ }
+
+ /// Warning: Only `write()` with data you created within this
+ /// write this `SyncReadAsyncWriteLockWriteGuard`.
+ ///
+ /// ```rs
+ /// let mut data = lock.write().await;
+ ///
+ /// let mut data = data.read().clone();
+ /// data.value = 2;
+ /// *data.write() = data;
+ /// ```
+ pub fn write(&self) -> RwLockWriteGuard<'_, T> {
+ self.data.write()
+ }
+}
+
+/// A lock that can only be
+pub struct SyncReadAsyncWriteLock<T: Send + Sync> {
+ data: RwLock<T>,
+ update_queue: TaskQueue,
+}
+
+impl<T: Send + Sync> SyncReadAsyncWriteLock<T> {
+ pub fn new(data: T) -> Self {
+ Self {
+ data: RwLock::new(data),
+ update_queue: TaskQueue::default(),
+ }
+ }
+
+ pub fn read(&self) -> RwLockReadGuard<'_, T> {
+ self.data.read()
+ }
+
+ pub async fn acquire(&self) -> SyncReadAsyncWriteLockWriteGuard<'_, T> {
+ let update_permit = self.update_queue.acquire().await;
+ SyncReadAsyncWriteLockWriteGuard {
+ _update_permit: update_permit,
+ data: &self.data,
+ }
+ }
+}
diff --git a/cli/util/sync.rs b/cli/util/sync/task_queue.rs
index 3986c883e..6ef747e1a 100644
--- a/cli/util/sync.rs
+++ b/cli/util/sync/task_queue.rs
@@ -1,30 +1,13 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use std::collections::LinkedList;
-use std::sync::atomic::AtomicBool;
-use std::sync::atomic::Ordering;
use std::sync::Arc;
use deno_core::futures::task::AtomicWaker;
use deno_core::futures::Future;
use deno_core::parking_lot::Mutex;
-use tokio_util::sync::CancellationToken;
-/// Simplifies the use of an atomic boolean as a flag.
-#[derive(Debug, Default)]
-pub struct AtomicFlag(AtomicBool);
-
-impl AtomicFlag {
- /// Raises the flag returning if the raise was successful.
- pub fn raise(&self) -> bool {
- !self.0.swap(true, Ordering::SeqCst)
- }
-
- /// Gets if the flag is raised.
- pub fn is_raised(&self) -> bool {
- self.0.load(Ordering::SeqCst)
- }
-}
+use super::AtomicFlag;
#[derive(Debug, Default)]
struct TaskQueueTaskItem {
@@ -161,23 +144,6 @@ impl<'a> Future for TaskQueuePermitAcquireFuture<'a> {
}
}
-#[derive(Debug, Default, Clone)]
-pub struct AsyncFlag(CancellationToken);
-
-impl AsyncFlag {
- pub fn raise(&self) {
- self.0.cancel();
- }
-
- pub fn is_raised(&self) -> bool {
- self.0.is_cancelled()
- }
-
- pub fn wait_raised(&self) -> impl std::future::Future<Output = ()> + '_ {
- self.0.cancelled()
- }
-}
-
#[cfg(test)]
mod test {
use deno_core::futures;
@@ -186,16 +152,6 @@ mod test {
use super::*;
- #[test]
- fn atomic_flag_raises() {
- let flag = AtomicFlag::default();
- assert!(!flag.is_raised()); // false by default
- assert!(flag.raise());
- assert!(flag.is_raised());
- assert!(!flag.raise());
- assert!(flag.is_raised());
- }
-
#[tokio::test]
async fn task_queue_runs_one_after_other() {
let task_queue = TaskQueue::default();
diff --git a/cli/util/sync/value_creator.rs b/cli/util/sync/value_creator.rs
new file mode 100644
index 000000000..57aabe801
--- /dev/null
+++ b/cli/util/sync/value_creator.rs
@@ -0,0 +1,213 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use std::sync::Arc;
+
+use deno_core::futures::future::BoxFuture;
+use deno_core::futures::future::LocalBoxFuture;
+use deno_core::futures::future::Shared;
+use deno_core::futures::FutureExt;
+use deno_core::parking_lot::Mutex;
+use tokio::task::JoinError;
+
+type JoinResult<TResult> = Result<TResult, Arc<JoinError>>;
+type CreateFutureFn<TResult> =
+ Box<dyn Fn() -> LocalBoxFuture<'static, TResult> + Send + Sync>;
+
+#[derive(Debug)]
+struct State<TResult> {
+ retry_index: usize,
+ future: Option<Shared<BoxFuture<'static, JoinResult<TResult>>>>,
+}
+
+/// Attempts to create a shared value asynchronously on one tokio runtime while
+/// many runtimes are requesting the value.
+///
+/// This is only useful when the value needs to get created once across
+/// many runtimes.
+///
+/// This handles the case where the tokio runtime creating the value goes down
+/// while another one is waiting on the value.
+pub struct MultiRuntimeAsyncValueCreator<TResult: Send + Clone + 'static> {
+ create_future: CreateFutureFn<TResult>,
+ state: Mutex<State<TResult>>,
+}
+
+impl<TResult: Send + Clone + 'static> std::fmt::Debug
+ for MultiRuntimeAsyncValueCreator<TResult>
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("MultiRuntimeAsyncValueCreator").finish()
+ }
+}
+
+impl<TResult: Send + Clone + 'static> MultiRuntimeAsyncValueCreator<TResult> {
+ pub fn new(create_future: CreateFutureFn<TResult>) -> Self {
+ Self {
+ state: Mutex::new(State {
+ retry_index: 0,
+ future: None,
+ }),
+ create_future,
+ }
+ }
+
+ pub async fn get(&self) -> TResult {
+ let (mut future, mut retry_index) = {
+ let mut state = self.state.lock();
+ let future = match &state.future {
+ Some(future) => future.clone(),
+ None => {
+ let future = self.create_shared_future();
+ state.future = Some(future.clone());
+ future
+ }
+ };
+ (future, state.retry_index)
+ };
+
+ loop {
+ let result = future.await;
+
+ match result {
+ Ok(result) => return result,
+ Err(join_error) => {
+ if join_error.is_cancelled() {
+ let mut state = self.state.lock();
+
+ if state.retry_index == retry_index {
+ // we were the first one to retry, so create a new future
+ // that we'll run from the current runtime
+ state.retry_index += 1;
+ state.future = Some(self.create_shared_future());
+ }
+
+ retry_index = state.retry_index;
+ future = state.future.as_ref().unwrap().clone();
+
+ // just in case we're stuck in a loop
+ if retry_index > 1000 {
+ panic!("Something went wrong.") // should never happen
+ }
+ } else {
+ panic!("{}", join_error);
+ }
+ }
+ }
+ }
+ }
+
+ fn create_shared_future(
+ &self,
+ ) -> Shared<BoxFuture<'static, JoinResult<TResult>>> {
+ let future = (self.create_future)();
+ deno_core::unsync::spawn(future)
+ .map(|result| result.map_err(Arc::new))
+ .boxed()
+ .shared()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use deno_core::unsync::spawn;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn single_runtime() {
+ let value_creator = MultiRuntimeAsyncValueCreator::new(Box::new(|| {
+ async { 1 }.boxed_local()
+ }));
+ let value = value_creator.get().await;
+ assert_eq!(value, 1);
+ }
+
+ #[test]
+ fn multi_runtimes() {
+ let value_creator =
+ Arc::new(MultiRuntimeAsyncValueCreator::new(Box::new(|| {
+ async {
+ tokio::task::yield_now().await;
+ 1
+ }
+ .boxed_local()
+ })));
+ let handles = (0..3)
+ .map(|_| {
+ let value_creator = value_creator.clone();
+ std::thread::spawn(|| {
+ create_runtime().block_on(async move { value_creator.get().await })
+ })
+ })
+ .collect::<Vec<_>>();
+ for handle in handles {
+ assert_eq!(handle.join().unwrap(), 1);
+ }
+ }
+
+ #[test]
+ fn multi_runtimes_first_never_finishes() {
+ let is_first_run = Arc::new(Mutex::new(true));
+ let (tx, rx) = std::sync::mpsc::channel::<()>();
+ let value_creator = Arc::new(MultiRuntimeAsyncValueCreator::new({
+ let is_first_run = is_first_run.clone();
+ Box::new(move || {
+ let is_first_run = is_first_run.clone();
+ let tx = tx.clone();
+ async move {
+ let is_first_run = {
+ let mut is_first_run = is_first_run.lock();
+ let initial_value = *is_first_run;
+ *is_first_run = false;
+ tx.send(()).unwrap();
+ initial_value
+ };
+ if is_first_run {
+ tokio::time::sleep(std::time::Duration::from_millis(30_000)).await;
+ panic!("TIMED OUT"); // should not happen
+ } else {
+ tokio::task::yield_now().await;
+ }
+ 1
+ }
+ .boxed_local()
+ })
+ }));
+ std::thread::spawn({
+ let value_creator = value_creator.clone();
+ let is_first_run = is_first_run.clone();
+ move || {
+ create_runtime().block_on(async {
+ let value_creator = value_creator.clone();
+ // spawn a task that will never complete
+ spawn(async move { value_creator.get().await });
+ // wait for the task to set is_first_run to false
+ while *is_first_run.lock() {
+ tokio::time::sleep(std::time::Duration::from_millis(20)).await;
+ }
+ // now exit the runtime while the value_creator is still pending
+ })
+ }
+ });
+ let handle = {
+ let value_creator = value_creator.clone();
+ std::thread::spawn(|| {
+ create_runtime().block_on(async move {
+ let value_creator = value_creator.clone();
+ rx.recv().unwrap();
+ // even though the other runtime shutdown, this get() should
+ // recover and still get the value
+ value_creator.get().await
+ })
+ })
+ };
+ assert_eq!(handle.join().unwrap(), 1);
+ }
+
+ fn create_runtime() -> tokio::runtime::Runtime {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ }
+}