summaryrefslogtreecommitdiff
path: root/cli/util/sync
diff options
context:
space:
mode:
authorDavid Sherret <dsherret@users.noreply.github.com>2024-06-05 15:17:35 -0400
committerGitHub <noreply@github.com>2024-06-05 15:17:35 -0400
commit1b355d8a87a3ad43bf240aa66b88eb98c1cd777f (patch)
tree0422d22a6a0e40873eea4c1ef3eff9d6648f1c51 /cli/util/sync
parent7ed90a20d04982ae15a52ae2378cbffd4b6839df (diff)
refactor(npm): improve locking around updating npm resolution (#24104)
Introduces a `SyncReadAsyncWriteLock` to make it harder to write to the npm resolution without first waiting async in a queue. For the npm resolution, reading synchronously is fine, but when updating, someone should wait async, clone the data, then write the data at the end back.
Diffstat (limited to 'cli/util/sync')
-rw-r--r--cli/util/sync/async_flag.rs20
-rw-r--r--cli/util/sync/atomic_flag.rs35
-rw-r--r--cli/util/sync/mod.rs14
-rw-r--r--cli/util/sync/sync_read_async_write_lock.rs62
-rw-r--r--cli/util/sync/task_queue.rs266
-rw-r--r--cli/util/sync/value_creator.rs213
6 files changed, 610 insertions, 0 deletions
diff --git a/cli/util/sync/async_flag.rs b/cli/util/sync/async_flag.rs
new file mode 100644
index 000000000..2bdff63c0
--- /dev/null
+++ b/cli/util/sync/async_flag.rs
@@ -0,0 +1,20 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use tokio_util::sync::CancellationToken;
+
+#[derive(Debug, Default, Clone)]
+pub struct AsyncFlag(CancellationToken);
+
+impl AsyncFlag {
+ pub fn raise(&self) {
+ self.0.cancel();
+ }
+
+ pub fn is_raised(&self) -> bool {
+ self.0.is_cancelled()
+ }
+
+ pub fn wait_raised(&self) -> impl std::future::Future<Output = ()> + '_ {
+ self.0.cancelled()
+ }
+}
diff --git a/cli/util/sync/atomic_flag.rs b/cli/util/sync/atomic_flag.rs
new file mode 100644
index 000000000..75396dcf4
--- /dev/null
+++ b/cli/util/sync/atomic_flag.rs
@@ -0,0 +1,35 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
+
+/// Simplifies the use of an atomic boolean as a flag.
+#[derive(Debug, Default)]
+pub struct AtomicFlag(AtomicBool);
+
+impl AtomicFlag {
+ /// Raises the flag returning if the raise was successful.
+ pub fn raise(&self) -> bool {
+ !self.0.swap(true, Ordering::SeqCst)
+ }
+
+ /// Gets if the flag is raised.
+ pub fn is_raised(&self) -> bool {
+ self.0.load(Ordering::SeqCst)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn atomic_flag_raises() {
+ let flag = AtomicFlag::default();
+ assert!(!flag.is_raised()); // false by default
+ assert!(flag.raise());
+ assert!(flag.is_raised());
+ assert!(!flag.raise());
+ assert!(flag.is_raised());
+ }
+}
diff --git a/cli/util/sync/mod.rs b/cli/util/sync/mod.rs
new file mode 100644
index 000000000..28aab7f47
--- /dev/null
+++ b/cli/util/sync/mod.rs
@@ -0,0 +1,14 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+mod async_flag;
+mod atomic_flag;
+mod sync_read_async_write_lock;
+mod task_queue;
+mod value_creator;
+
+pub use async_flag::AsyncFlag;
+pub use atomic_flag::AtomicFlag;
+pub use sync_read_async_write_lock::SyncReadAsyncWriteLock;
+pub use task_queue::TaskQueue;
+pub use task_queue::TaskQueuePermit;
+pub use value_creator::MultiRuntimeAsyncValueCreator;
diff --git a/cli/util/sync/sync_read_async_write_lock.rs b/cli/util/sync/sync_read_async_write_lock.rs
new file mode 100644
index 000000000..8bd211aa7
--- /dev/null
+++ b/cli/util/sync/sync_read_async_write_lock.rs
@@ -0,0 +1,62 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use deno_core::parking_lot::RwLock;
+use deno_core::parking_lot::RwLockReadGuard;
+use deno_core::parking_lot::RwLockWriteGuard;
+
+use super::TaskQueue;
+use super::TaskQueuePermit;
+
+/// A lock that can be read synchronously at any time (including when
+/// being written to), but must write asynchronously.
+pub struct SyncReadAsyncWriteLockWriteGuard<'a, T: Send + Sync> {
+ _update_permit: TaskQueuePermit<'a>,
+ data: &'a RwLock<T>,
+}
+
+impl<'a, T: Send + Sync> SyncReadAsyncWriteLockWriteGuard<'a, T> {
+ pub fn read(&self) -> RwLockReadGuard<'_, T> {
+ self.data.read()
+ }
+
+ /// Warning: Only `write()` with data you created within this
+ /// write this `SyncReadAsyncWriteLockWriteGuard`.
+ ///
+ /// ```rs
+ /// let mut data = lock.write().await;
+ ///
+ /// let mut data = data.read().clone();
+ /// data.value = 2;
+ /// *data.write() = data;
+ /// ```
+ pub fn write(&self) -> RwLockWriteGuard<'_, T> {
+ self.data.write()
+ }
+}
+
+/// A lock that can only be
+pub struct SyncReadAsyncWriteLock<T: Send + Sync> {
+ data: RwLock<T>,
+ update_queue: TaskQueue,
+}
+
+impl<T: Send + Sync> SyncReadAsyncWriteLock<T> {
+ pub fn new(data: T) -> Self {
+ Self {
+ data: RwLock::new(data),
+ update_queue: TaskQueue::default(),
+ }
+ }
+
+ pub fn read(&self) -> RwLockReadGuard<'_, T> {
+ self.data.read()
+ }
+
+ pub async fn acquire(&self) -> SyncReadAsyncWriteLockWriteGuard<'_, T> {
+ let update_permit = self.update_queue.acquire().await;
+ SyncReadAsyncWriteLockWriteGuard {
+ _update_permit: update_permit,
+ data: &self.data,
+ }
+ }
+}
diff --git a/cli/util/sync/task_queue.rs b/cli/util/sync/task_queue.rs
new file mode 100644
index 000000000..6ef747e1a
--- /dev/null
+++ b/cli/util/sync/task_queue.rs
@@ -0,0 +1,266 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use std::collections::LinkedList;
+use std::sync::Arc;
+
+use deno_core::futures::task::AtomicWaker;
+use deno_core::futures::Future;
+use deno_core::parking_lot::Mutex;
+
+use super::AtomicFlag;
+
+#[derive(Debug, Default)]
+struct TaskQueueTaskItem {
+ is_ready: AtomicFlag,
+ is_future_dropped: AtomicFlag,
+ waker: AtomicWaker,
+}
+
+#[derive(Debug, Default)]
+struct TaskQueueTasks {
+ is_running: bool,
+ items: LinkedList<Arc<TaskQueueTaskItem>>,
+}
+
+/// A queue that executes tasks sequentially one after the other
+/// ensuring order and that no task runs at the same time as another.
+///
+/// Note that this differs from tokio's semaphore in that the order
+/// is acquired synchronously.
+#[derive(Debug, Default)]
+pub struct TaskQueue {
+ tasks: Mutex<TaskQueueTasks>,
+}
+
+impl TaskQueue {
+ /// Acquires a permit where the tasks are executed one at a time
+ /// and in the order that they were acquired.
+ pub fn acquire(&self) -> TaskQueuePermitAcquireFuture {
+ TaskQueuePermitAcquireFuture::new(self)
+ }
+
+ /// Alternate API that acquires a permit internally
+ /// for the duration of the future.
+ #[allow(unused)]
+ pub fn run<'a, R>(
+ &'a self,
+ future: impl Future<Output = R> + 'a,
+ ) -> impl Future<Output = R> + 'a {
+ let acquire_future = self.acquire();
+ async move {
+ let permit = acquire_future.await;
+ let result = future.await;
+ drop(permit); // explicit for clarity
+ result
+ }
+ }
+
+ fn raise_next(&self) {
+ let front_item = {
+ let mut tasks = self.tasks.lock();
+
+ // clear out any wakers for futures that were dropped
+ while let Some(front_waker) = tasks.items.front() {
+ if front_waker.is_future_dropped.is_raised() {
+ tasks.items.pop_front();
+ } else {
+ break;
+ }
+ }
+ let front_item = tasks.items.pop_front();
+ tasks.is_running = front_item.is_some();
+ front_item
+ };
+
+ // wake up the next waker
+ if let Some(front_item) = front_item {
+ front_item.is_ready.raise();
+ front_item.waker.wake();
+ }
+ }
+}
+
+/// A permit that when dropped will allow another task to proceed.
+pub struct TaskQueuePermit<'a>(&'a TaskQueue);
+
+impl<'a> Drop for TaskQueuePermit<'a> {
+ fn drop(&mut self) {
+ self.0.raise_next();
+ }
+}
+
+pub struct TaskQueuePermitAcquireFuture<'a> {
+ task_queue: Option<&'a TaskQueue>,
+ item: Arc<TaskQueueTaskItem>,
+}
+
+impl<'a> TaskQueuePermitAcquireFuture<'a> {
+ pub fn new(task_queue: &'a TaskQueue) -> Self {
+ // acquire the waker position synchronously
+ let mut tasks = task_queue.tasks.lock();
+ let item = if !tasks.is_running {
+ tasks.is_running = true;
+ let item = Arc::new(TaskQueueTaskItem::default());
+ item.is_ready.raise();
+ item
+ } else {
+ let item = Arc::new(TaskQueueTaskItem::default());
+ tasks.items.push_back(item.clone());
+ item
+ };
+ drop(tasks);
+ Self {
+ task_queue: Some(task_queue),
+ item,
+ }
+ }
+}
+
+impl<'a> Drop for TaskQueuePermitAcquireFuture<'a> {
+ fn drop(&mut self) {
+ if let Some(task_queue) = self.task_queue.take() {
+ if self.item.is_ready.is_raised() {
+ task_queue.raise_next();
+ } else {
+ self.item.is_future_dropped.raise();
+ }
+ }
+ }
+}
+
+impl<'a> Future for TaskQueuePermitAcquireFuture<'a> {
+ type Output = TaskQueuePermit<'a>;
+
+ fn poll(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ if self.item.is_ready.is_raised() {
+ std::task::Poll::Ready(TaskQueuePermit(self.task_queue.take().unwrap()))
+ } else {
+ self.item.waker.register(cx.waker());
+ std::task::Poll::Pending
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use deno_core::futures;
+ use deno_core::parking_lot::Mutex;
+ use std::sync::Arc;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn task_queue_runs_one_after_other() {
+ let task_queue = TaskQueue::default();
+ let mut tasks = Vec::new();
+ let data = Arc::new(Mutex::new(0));
+ for i in 0..100 {
+ let data = data.clone();
+ tasks.push(task_queue.run(async move {
+ deno_core::unsync::spawn_blocking(move || {
+ let mut data = data.lock();
+ assert_eq!(*data, i);
+ *data = i + 1;
+ })
+ .await
+ .unwrap();
+ }));
+ }
+ futures::future::join_all(tasks).await;
+ }
+
+ #[tokio::test]
+ async fn task_queue_run_in_sequence() {
+ let task_queue = TaskQueue::default();
+ let data = Arc::new(Mutex::new(0));
+
+ let first = task_queue.run(async {
+ *data.lock() = 1;
+ });
+ let second = task_queue.run(async {
+ assert_eq!(*data.lock(), 1);
+ *data.lock() = 2;
+ });
+ let _ = tokio::join!(first, second);
+
+ assert_eq!(*data.lock(), 2);
+ }
+
+ #[tokio::test]
+ async fn task_queue_future_dropped_before_poll() {
+ let task_queue = Arc::new(TaskQueue::default());
+
+ // acquire a future, but do not await it
+ let future = task_queue.acquire();
+
+ // this task tries to acquire another permit, but will be blocked by the first permit.
+ let enter_flag = Arc::new(AtomicFlag::default());
+ let delayed_task = deno_core::unsync::spawn({
+ let enter_flag = enter_flag.clone();
+ let task_queue = task_queue.clone();
+ async move {
+ enter_flag.raise();
+ task_queue.acquire().await;
+ true
+ }
+ });
+
+ // ensure the task gets a chance to be scheduled and blocked
+ tokio::task::yield_now().await;
+ assert!(enter_flag.is_raised());
+
+ // now, drop the first future
+ drop(future);
+
+ assert!(delayed_task.await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn task_queue_many_future_dropped_before_poll() {
+ let task_queue = Arc::new(TaskQueue::default());
+
+ // acquire a future, but do not await it
+ let mut futures = Vec::new();
+ for _ in 0..=10_000 {
+ futures.push(task_queue.acquire());
+ }
+
+ // this task tries to acquire another permit, but will be blocked by the first permit.
+ let enter_flag = Arc::new(AtomicFlag::default());
+ let delayed_task = deno_core::unsync::spawn({
+ let task_queue = task_queue.clone();
+ let enter_flag = enter_flag.clone();
+ async move {
+ enter_flag.raise();
+ task_queue.acquire().await;
+ true
+ }
+ });
+
+ // ensure the task gets a chance to be scheduled and blocked
+ tokio::task::yield_now().await;
+ assert!(enter_flag.is_raised());
+
+ // now, drop the futures
+ drop(futures);
+
+ assert!(delayed_task.await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn task_queue_middle_future_dropped_while_permit_acquired() {
+ let task_queue = TaskQueue::default();
+
+ let fut1 = task_queue.acquire();
+ let fut2 = task_queue.acquire();
+ let fut3 = task_queue.acquire();
+
+ // should not hang
+ drop(fut2);
+ drop(fut1.await);
+ drop(fut3.await);
+ }
+}
diff --git a/cli/util/sync/value_creator.rs b/cli/util/sync/value_creator.rs
new file mode 100644
index 000000000..57aabe801
--- /dev/null
+++ b/cli/util/sync/value_creator.rs
@@ -0,0 +1,213 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use std::sync::Arc;
+
+use deno_core::futures::future::BoxFuture;
+use deno_core::futures::future::LocalBoxFuture;
+use deno_core::futures::future::Shared;
+use deno_core::futures::FutureExt;
+use deno_core::parking_lot::Mutex;
+use tokio::task::JoinError;
+
+type JoinResult<TResult> = Result<TResult, Arc<JoinError>>;
+type CreateFutureFn<TResult> =
+ Box<dyn Fn() -> LocalBoxFuture<'static, TResult> + Send + Sync>;
+
+#[derive(Debug)]
+struct State<TResult> {
+ retry_index: usize,
+ future: Option<Shared<BoxFuture<'static, JoinResult<TResult>>>>,
+}
+
+/// Attempts to create a shared value asynchronously on one tokio runtime while
+/// many runtimes are requesting the value.
+///
+/// This is only useful when the value needs to get created once across
+/// many runtimes.
+///
+/// This handles the case where the tokio runtime creating the value goes down
+/// while another one is waiting on the value.
+pub struct MultiRuntimeAsyncValueCreator<TResult: Send + Clone + 'static> {
+ create_future: CreateFutureFn<TResult>,
+ state: Mutex<State<TResult>>,
+}
+
+impl<TResult: Send + Clone + 'static> std::fmt::Debug
+ for MultiRuntimeAsyncValueCreator<TResult>
+{
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("MultiRuntimeAsyncValueCreator").finish()
+ }
+}
+
+impl<TResult: Send + Clone + 'static> MultiRuntimeAsyncValueCreator<TResult> {
+ pub fn new(create_future: CreateFutureFn<TResult>) -> Self {
+ Self {
+ state: Mutex::new(State {
+ retry_index: 0,
+ future: None,
+ }),
+ create_future,
+ }
+ }
+
+ pub async fn get(&self) -> TResult {
+ let (mut future, mut retry_index) = {
+ let mut state = self.state.lock();
+ let future = match &state.future {
+ Some(future) => future.clone(),
+ None => {
+ let future = self.create_shared_future();
+ state.future = Some(future.clone());
+ future
+ }
+ };
+ (future, state.retry_index)
+ };
+
+ loop {
+ let result = future.await;
+
+ match result {
+ Ok(result) => return result,
+ Err(join_error) => {
+ if join_error.is_cancelled() {
+ let mut state = self.state.lock();
+
+ if state.retry_index == retry_index {
+ // we were the first one to retry, so create a new future
+ // that we'll run from the current runtime
+ state.retry_index += 1;
+ state.future = Some(self.create_shared_future());
+ }
+
+ retry_index = state.retry_index;
+ future = state.future.as_ref().unwrap().clone();
+
+ // just in case we're stuck in a loop
+ if retry_index > 1000 {
+ panic!("Something went wrong.") // should never happen
+ }
+ } else {
+ panic!("{}", join_error);
+ }
+ }
+ }
+ }
+ }
+
+ fn create_shared_future(
+ &self,
+ ) -> Shared<BoxFuture<'static, JoinResult<TResult>>> {
+ let future = (self.create_future)();
+ deno_core::unsync::spawn(future)
+ .map(|result| result.map_err(Arc::new))
+ .boxed()
+ .shared()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use deno_core::unsync::spawn;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn single_runtime() {
+ let value_creator = MultiRuntimeAsyncValueCreator::new(Box::new(|| {
+ async { 1 }.boxed_local()
+ }));
+ let value = value_creator.get().await;
+ assert_eq!(value, 1);
+ }
+
+ #[test]
+ fn multi_runtimes() {
+ let value_creator =
+ Arc::new(MultiRuntimeAsyncValueCreator::new(Box::new(|| {
+ async {
+ tokio::task::yield_now().await;
+ 1
+ }
+ .boxed_local()
+ })));
+ let handles = (0..3)
+ .map(|_| {
+ let value_creator = value_creator.clone();
+ std::thread::spawn(|| {
+ create_runtime().block_on(async move { value_creator.get().await })
+ })
+ })
+ .collect::<Vec<_>>();
+ for handle in handles {
+ assert_eq!(handle.join().unwrap(), 1);
+ }
+ }
+
+ #[test]
+ fn multi_runtimes_first_never_finishes() {
+ let is_first_run = Arc::new(Mutex::new(true));
+ let (tx, rx) = std::sync::mpsc::channel::<()>();
+ let value_creator = Arc::new(MultiRuntimeAsyncValueCreator::new({
+ let is_first_run = is_first_run.clone();
+ Box::new(move || {
+ let is_first_run = is_first_run.clone();
+ let tx = tx.clone();
+ async move {
+ let is_first_run = {
+ let mut is_first_run = is_first_run.lock();
+ let initial_value = *is_first_run;
+ *is_first_run = false;
+ tx.send(()).unwrap();
+ initial_value
+ };
+ if is_first_run {
+ tokio::time::sleep(std::time::Duration::from_millis(30_000)).await;
+ panic!("TIMED OUT"); // should not happen
+ } else {
+ tokio::task::yield_now().await;
+ }
+ 1
+ }
+ .boxed_local()
+ })
+ }));
+ std::thread::spawn({
+ let value_creator = value_creator.clone();
+ let is_first_run = is_first_run.clone();
+ move || {
+ create_runtime().block_on(async {
+ let value_creator = value_creator.clone();
+ // spawn a task that will never complete
+ spawn(async move { value_creator.get().await });
+ // wait for the task to set is_first_run to false
+ while *is_first_run.lock() {
+ tokio::time::sleep(std::time::Duration::from_millis(20)).await;
+ }
+ // now exit the runtime while the value_creator is still pending
+ })
+ }
+ });
+ let handle = {
+ let value_creator = value_creator.clone();
+ std::thread::spawn(|| {
+ create_runtime().block_on(async move {
+ let value_creator = value_creator.clone();
+ rx.recv().unwrap();
+ // even though the other runtime shutdown, this get() should
+ // recover and still get the value
+ value_creator.get().await
+ })
+ })
+ };
+ assert_eq!(handle.join().unwrap(), 1);
+ }
+
+ fn create_runtime() -> tokio::runtime::Runtime {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ }
+}