diff options
author | David Sherret <dsherret@users.noreply.github.com> | 2024-06-05 15:17:35 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-05 15:17:35 -0400 |
commit | 1b355d8a87a3ad43bf240aa66b88eb98c1cd777f (patch) | |
tree | 0422d22a6a0e40873eea4c1ef3eff9d6648f1c51 /cli/util | |
parent | 7ed90a20d04982ae15a52ae2378cbffd4b6839df (diff) |
refactor(npm): improve locking around updating npm resolution (#24104)
Introduces a `SyncReadAsyncWriteLock` to make it harder to write to the
npm resolution without first waiting async in a queue. For the npm
resolution, reading synchronously is fine, but when updating, someone
should wait async, clone the data, then write the data at the end back.
Diffstat (limited to 'cli/util')
-rw-r--r-- | cli/util/sync/async_flag.rs | 20 | ||||
-rw-r--r-- | cli/util/sync/atomic_flag.rs | 35 | ||||
-rw-r--r-- | cli/util/sync/mod.rs | 14 | ||||
-rw-r--r-- | cli/util/sync/sync_read_async_write_lock.rs | 62 | ||||
-rw-r--r-- | cli/util/sync/task_queue.rs (renamed from cli/util/sync.rs) | 46 | ||||
-rw-r--r-- | cli/util/sync/value_creator.rs | 213 |
6 files changed, 345 insertions, 45 deletions
diff --git a/cli/util/sync/async_flag.rs b/cli/util/sync/async_flag.rs new file mode 100644 index 000000000..2bdff63c0 --- /dev/null +++ b/cli/util/sync/async_flag.rs @@ -0,0 +1,20 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use tokio_util::sync::CancellationToken; + +#[derive(Debug, Default, Clone)] +pub struct AsyncFlag(CancellationToken); + +impl AsyncFlag { + pub fn raise(&self) { + self.0.cancel(); + } + + pub fn is_raised(&self) -> bool { + self.0.is_cancelled() + } + + pub fn wait_raised(&self) -> impl std::future::Future<Output = ()> + '_ { + self.0.cancelled() + } +} diff --git a/cli/util/sync/atomic_flag.rs b/cli/util/sync/atomic_flag.rs new file mode 100644 index 000000000..75396dcf4 --- /dev/null +++ b/cli/util/sync/atomic_flag.rs @@ -0,0 +1,35 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +/// Simplifies the use of an atomic boolean as a flag. +#[derive(Debug, Default)] +pub struct AtomicFlag(AtomicBool); + +impl AtomicFlag { + /// Raises the flag returning if the raise was successful. + pub fn raise(&self) -> bool { + !self.0.swap(true, Ordering::SeqCst) + } + + /// Gets if the flag is raised. + pub fn is_raised(&self) -> bool { + self.0.load(Ordering::SeqCst) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn atomic_flag_raises() { + let flag = AtomicFlag::default(); + assert!(!flag.is_raised()); // false by default + assert!(flag.raise()); + assert!(flag.is_raised()); + assert!(!flag.raise()); + assert!(flag.is_raised()); + } +} diff --git a/cli/util/sync/mod.rs b/cli/util/sync/mod.rs new file mode 100644 index 000000000..28aab7f47 --- /dev/null +++ b/cli/util/sync/mod.rs @@ -0,0 +1,14 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +mod async_flag; +mod atomic_flag; +mod sync_read_async_write_lock; +mod task_queue; +mod value_creator; + +pub use async_flag::AsyncFlag; +pub use atomic_flag::AtomicFlag; +pub use sync_read_async_write_lock::SyncReadAsyncWriteLock; +pub use task_queue::TaskQueue; +pub use task_queue::TaskQueuePermit; +pub use value_creator::MultiRuntimeAsyncValueCreator; diff --git a/cli/util/sync/sync_read_async_write_lock.rs b/cli/util/sync/sync_read_async_write_lock.rs new file mode 100644 index 000000000..8bd211aa7 --- /dev/null +++ b/cli/util/sync/sync_read_async_write_lock.rs @@ -0,0 +1,62 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use deno_core::parking_lot::RwLock; +use deno_core::parking_lot::RwLockReadGuard; +use deno_core::parking_lot::RwLockWriteGuard; + +use super::TaskQueue; +use super::TaskQueuePermit; + +/// A lock that can be read synchronously at any time (including when +/// being written to), but must write asynchronously. +pub struct SyncReadAsyncWriteLockWriteGuard<'a, T: Send + Sync> { + _update_permit: TaskQueuePermit<'a>, + data: &'a RwLock<T>, +} + +impl<'a, T: Send + Sync> SyncReadAsyncWriteLockWriteGuard<'a, T> { + pub fn read(&self) -> RwLockReadGuard<'_, T> { + self.data.read() + } + + /// Warning: Only `write()` with data you created within this + /// write this `SyncReadAsyncWriteLockWriteGuard`. + /// + /// ```rs + /// let mut data = lock.write().await; + /// + /// let mut data = data.read().clone(); + /// data.value = 2; + /// *data.write() = data; + /// ``` + pub fn write(&self) -> RwLockWriteGuard<'_, T> { + self.data.write() + } +} + +/// A lock that can only be +pub struct SyncReadAsyncWriteLock<T: Send + Sync> { + data: RwLock<T>, + update_queue: TaskQueue, +} + +impl<T: Send + Sync> SyncReadAsyncWriteLock<T> { + pub fn new(data: T) -> Self { + Self { + data: RwLock::new(data), + update_queue: TaskQueue::default(), + } + } + + pub fn read(&self) -> RwLockReadGuard<'_, T> { + self.data.read() + } + + pub async fn acquire(&self) -> SyncReadAsyncWriteLockWriteGuard<'_, T> { + let update_permit = self.update_queue.acquire().await; + SyncReadAsyncWriteLockWriteGuard { + _update_permit: update_permit, + data: &self.data, + } + } +} diff --git a/cli/util/sync.rs b/cli/util/sync/task_queue.rs index 3986c883e..6ef747e1a 100644 --- a/cli/util/sync.rs +++ b/cli/util/sync/task_queue.rs @@ -1,30 +1,13 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use std::collections::LinkedList; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; use std::sync::Arc; use deno_core::futures::task::AtomicWaker; use deno_core::futures::Future; use deno_core::parking_lot::Mutex; -use tokio_util::sync::CancellationToken; -/// Simplifies the use of an atomic boolean as a flag. -#[derive(Debug, Default)] -pub struct AtomicFlag(AtomicBool); - -impl AtomicFlag { - /// Raises the flag returning if the raise was successful. - pub fn raise(&self) -> bool { - !self.0.swap(true, Ordering::SeqCst) - } - - /// Gets if the flag is raised. - pub fn is_raised(&self) -> bool { - self.0.load(Ordering::SeqCst) - } -} +use super::AtomicFlag; #[derive(Debug, Default)] struct TaskQueueTaskItem { @@ -161,23 +144,6 @@ impl<'a> Future for TaskQueuePermitAcquireFuture<'a> { } } -#[derive(Debug, Default, Clone)] -pub struct AsyncFlag(CancellationToken); - -impl AsyncFlag { - pub fn raise(&self) { - self.0.cancel(); - } - - pub fn is_raised(&self) -> bool { - self.0.is_cancelled() - } - - pub fn wait_raised(&self) -> impl std::future::Future<Output = ()> + '_ { - self.0.cancelled() - } -} - #[cfg(test)] mod test { use deno_core::futures; @@ -186,16 +152,6 @@ mod test { use super::*; - #[test] - fn atomic_flag_raises() { - let flag = AtomicFlag::default(); - assert!(!flag.is_raised()); // false by default - assert!(flag.raise()); - assert!(flag.is_raised()); - assert!(!flag.raise()); - assert!(flag.is_raised()); - } - #[tokio::test] async fn task_queue_runs_one_after_other() { let task_queue = TaskQueue::default(); diff --git a/cli/util/sync/value_creator.rs b/cli/util/sync/value_creator.rs new file mode 100644 index 000000000..57aabe801 --- /dev/null +++ b/cli/util/sync/value_creator.rs @@ -0,0 +1,213 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use std::sync::Arc; + +use deno_core::futures::future::BoxFuture; +use deno_core::futures::future::LocalBoxFuture; +use deno_core::futures::future::Shared; +use deno_core::futures::FutureExt; +use deno_core::parking_lot::Mutex; +use tokio::task::JoinError; + +type JoinResult<TResult> = Result<TResult, Arc<JoinError>>; +type CreateFutureFn<TResult> = + Box<dyn Fn() -> LocalBoxFuture<'static, TResult> + Send + Sync>; + +#[derive(Debug)] +struct State<TResult> { + retry_index: usize, + future: Option<Shared<BoxFuture<'static, JoinResult<TResult>>>>, +} + +/// Attempts to create a shared value asynchronously on one tokio runtime while +/// many runtimes are requesting the value. +/// +/// This is only useful when the value needs to get created once across +/// many runtimes. +/// +/// This handles the case where the tokio runtime creating the value goes down +/// while another one is waiting on the value. +pub struct MultiRuntimeAsyncValueCreator<TResult: Send + Clone + 'static> { + create_future: CreateFutureFn<TResult>, + state: Mutex<State<TResult>>, +} + +impl<TResult: Send + Clone + 'static> std::fmt::Debug + for MultiRuntimeAsyncValueCreator<TResult> +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MultiRuntimeAsyncValueCreator").finish() + } +} + +impl<TResult: Send + Clone + 'static> MultiRuntimeAsyncValueCreator<TResult> { + pub fn new(create_future: CreateFutureFn<TResult>) -> Self { + Self { + state: Mutex::new(State { + retry_index: 0, + future: None, + }), + create_future, + } + } + + pub async fn get(&self) -> TResult { + let (mut future, mut retry_index) = { + let mut state = self.state.lock(); + let future = match &state.future { + Some(future) => future.clone(), + None => { + let future = self.create_shared_future(); + state.future = Some(future.clone()); + future + } + }; + (future, state.retry_index) + }; + + loop { + let result = future.await; + + match result { + Ok(result) => return result, + Err(join_error) => { + if join_error.is_cancelled() { + let mut state = self.state.lock(); + + if state.retry_index == retry_index { + // we were the first one to retry, so create a new future + // that we'll run from the current runtime + state.retry_index += 1; + state.future = Some(self.create_shared_future()); + } + + retry_index = state.retry_index; + future = state.future.as_ref().unwrap().clone(); + + // just in case we're stuck in a loop + if retry_index > 1000 { + panic!("Something went wrong.") // should never happen + } + } else { + panic!("{}", join_error); + } + } + } + } + } + + fn create_shared_future( + &self, + ) -> Shared<BoxFuture<'static, JoinResult<TResult>>> { + let future = (self.create_future)(); + deno_core::unsync::spawn(future) + .map(|result| result.map_err(Arc::new)) + .boxed() + .shared() + } +} + +#[cfg(test)] +mod test { + use deno_core::unsync::spawn; + + use super::*; + + #[tokio::test] + async fn single_runtime() { + let value_creator = MultiRuntimeAsyncValueCreator::new(Box::new(|| { + async { 1 }.boxed_local() + })); + let value = value_creator.get().await; + assert_eq!(value, 1); + } + + #[test] + fn multi_runtimes() { + let value_creator = + Arc::new(MultiRuntimeAsyncValueCreator::new(Box::new(|| { + async { + tokio::task::yield_now().await; + 1 + } + .boxed_local() + }))); + let handles = (0..3) + .map(|_| { + let value_creator = value_creator.clone(); + std::thread::spawn(|| { + create_runtime().block_on(async move { value_creator.get().await }) + }) + }) + .collect::<Vec<_>>(); + for handle in handles { + assert_eq!(handle.join().unwrap(), 1); + } + } + + #[test] + fn multi_runtimes_first_never_finishes() { + let is_first_run = Arc::new(Mutex::new(true)); + let (tx, rx) = std::sync::mpsc::channel::<()>(); + let value_creator = Arc::new(MultiRuntimeAsyncValueCreator::new({ + let is_first_run = is_first_run.clone(); + Box::new(move || { + let is_first_run = is_first_run.clone(); + let tx = tx.clone(); + async move { + let is_first_run = { + let mut is_first_run = is_first_run.lock(); + let initial_value = *is_first_run; + *is_first_run = false; + tx.send(()).unwrap(); + initial_value + }; + if is_first_run { + tokio::time::sleep(std::time::Duration::from_millis(30_000)).await; + panic!("TIMED OUT"); // should not happen + } else { + tokio::task::yield_now().await; + } + 1 + } + .boxed_local() + }) + })); + std::thread::spawn({ + let value_creator = value_creator.clone(); + let is_first_run = is_first_run.clone(); + move || { + create_runtime().block_on(async { + let value_creator = value_creator.clone(); + // spawn a task that will never complete + spawn(async move { value_creator.get().await }); + // wait for the task to set is_first_run to false + while *is_first_run.lock() { + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + } + // now exit the runtime while the value_creator is still pending + }) + } + }); + let handle = { + let value_creator = value_creator.clone(); + std::thread::spawn(|| { + create_runtime().block_on(async move { + let value_creator = value_creator.clone(); + rx.recv().unwrap(); + // even though the other runtime shutdown, this get() should + // recover and still get the value + value_creator.get().await + }) + }) + }; + assert_eq!(handle.join().unwrap(), 1); + } + + fn create_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + } +} |