summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/graph_util.rs4
-rw-r--r--cli/npm/registry.rs4
-rw-r--r--cli/npm/resolution.rs2
-rw-r--r--cli/resolver.rs2
-rw-r--r--cli/util/sync.rs258
5 files changed, 263 insertions, 7 deletions
diff --git a/cli/graph_util.rs b/cli/graph_util.rs
index 6fb4b7790..1a5a293a7 100644
--- a/cli/graph_util.rs
+++ b/cli/graph_util.rs
@@ -13,6 +13,8 @@ use crate::npm::CliNpmResolver;
use crate::resolver::CliGraphResolver;
use crate::tools::check;
use crate::tools::check::TypeChecker;
+use crate::util::sync::TaskQueue;
+use crate::util::sync::TaskQueuePermit;
use deno_core::anyhow::bail;
use deno_core::error::custom_error;
@@ -20,8 +22,6 @@ use deno_core::error::AnyError;
use deno_core::parking_lot::Mutex;
use deno_core::parking_lot::RwLock;
use deno_core::ModuleSpecifier;
-use deno_core::TaskQueue;
-use deno_core::TaskQueuePermit;
use deno_graph::source::Loader;
use deno_graph::GraphKind;
use deno_graph::Module;
diff --git a/cli/npm/registry.rs b/cli/npm/registry.rs
index 907258d3b..ec0647023 100644
--- a/cli/npm/registry.rs
+++ b/cli/npm/registry.rs
@@ -18,7 +18,6 @@ use deno_core::futures::FutureExt;
use deno_core::parking_lot::Mutex;
use deno_core::serde_json;
use deno_core::url::Url;
-use deno_core::TaskQueue;
use deno_npm::registry::NpmPackageInfo;
use deno_npm::registry::NpmRegistryApi;
use deno_npm::registry::NpmRegistryPackageInfoLoadError;
@@ -30,6 +29,7 @@ use crate::http_util::HttpClient;
use crate::util::fs::atomic_write_file;
use crate::util::progress_bar::ProgressBar;
use crate::util::sync::AtomicFlag;
+use crate::util::sync::TaskQueue;
use super::cache::should_sync_download;
use super::cache::NpmCache;
@@ -118,7 +118,7 @@ impl NpmRegistryApi for CliNpmRegistryApi {
let result = if should_sync_download() {
let inner = self.inner().clone();
SYNC_DOWNLOAD_TASK_QUEUE
- .queue(async move { inner.maybe_package_info(name).await })
+ .run(async move { inner.maybe_package_info(name).await })
.await
} else {
self.inner().maybe_package_info(name).await
diff --git a/cli/npm/resolution.rs b/cli/npm/resolution.rs
index 73e27f487..10ff5fd92 100644
--- a/cli/npm/resolution.rs
+++ b/cli/npm/resolution.rs
@@ -7,7 +7,6 @@ use std::sync::Arc;
use deno_core::error::AnyError;
use deno_core::parking_lot::Mutex;
use deno_core::parking_lot::RwLock;
-use deno_core::TaskQueue;
use deno_lockfile::NpmPackageDependencyLockfileInfo;
use deno_lockfile::NpmPackageLockfileInfo;
use deno_npm::registry::NpmPackageInfo;
@@ -32,6 +31,7 @@ use deno_semver::package::PackageReq;
use deno_semver::VersionReq;
use crate::args::Lockfile;
+use crate::util::sync::TaskQueue;
use super::registry::CliNpmRegistryApi;
diff --git a/cli/resolver.rs b/cli/resolver.rs
index 4fb912731..a7b2cd01e 100644
--- a/cli/resolver.rs
+++ b/cli/resolver.rs
@@ -7,7 +7,6 @@ use deno_core::futures::future;
use deno_core::futures::future::LocalBoxFuture;
use deno_core::futures::FutureExt;
use deno_core::ModuleSpecifier;
-use deno_core::TaskQueue;
use deno_graph::source::NpmPackageReqResolution;
use deno_graph::source::NpmResolver;
use deno_graph::source::Resolver;
@@ -27,6 +26,7 @@ use crate::npm::CliNpmRegistryApi;
use crate::npm::NpmResolution;
use crate::npm::PackageJsonDepsInstaller;
use crate::util::sync::AtomicFlag;
+use crate::util::sync::TaskQueue;
/// Result of checking if a specifier is mapped via
/// an import map or package.json.
diff --git a/cli/util/sync.rs b/cli/util/sync.rs
index 6d136abc1..6eff974a7 100644
--- a/cli/util/sync.rs
+++ b/cli/util/sync.rs
@@ -1,7 +1,13 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use std::collections::LinkedList;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
+use std::sync::Arc;
+
+use deno_core::futures::task::AtomicWaker;
+use deno_core::futures::Future;
+use deno_core::parking_lot::Mutex;
/// Simplifies the use of an atomic boolean as a flag.
#[derive(Debug, Default)]
@@ -19,9 +25,147 @@ impl AtomicFlag {
}
}
+#[derive(Debug, Default)]
+struct TaskQueueTaskItem {
+ is_ready: AtomicFlag,
+ is_future_dropped: AtomicFlag,
+ waker: AtomicWaker,
+}
+
+#[derive(Debug, Default)]
+struct TaskQueueTasks {
+ is_running: bool,
+ items: LinkedList<Arc<TaskQueueTaskItem>>,
+}
+
+/// A queue that executes tasks sequentially one after the other
+/// ensuring order and that no task runs at the same time as another.
+///
+/// Note that this differs from tokio's semaphore in that the order
+/// is acquired synchronously.
+#[derive(Debug, Default)]
+pub struct TaskQueue {
+ tasks: Mutex<TaskQueueTasks>,
+}
+
+impl TaskQueue {
+ /// Acquires a permit where the tasks are executed one at a time
+ /// and in the order that they were acquired.
+ pub fn acquire(&self) -> TaskQueuePermitAcquireFuture {
+ TaskQueuePermitAcquireFuture::new(self)
+ }
+
+ /// Alternate API that acquires a permit internally
+ /// for the duration of the future.
+ pub fn run<'a, R>(
+ &'a self,
+ future: impl Future<Output = R> + 'a,
+ ) -> impl Future<Output = R> + 'a {
+ let acquire_future = self.acquire();
+ async move {
+ let permit = acquire_future.await;
+ let result = future.await;
+ drop(permit); // explicit for clarity
+ result
+ }
+ }
+
+ fn raise_next(&self) {
+ let front_item = {
+ let mut tasks = self.tasks.lock();
+
+ // clear out any wakers for futures that were dropped
+ while let Some(front_waker) = tasks.items.front() {
+ if front_waker.is_future_dropped.is_raised() {
+ tasks.items.pop_front();
+ } else {
+ break;
+ }
+ }
+ let front_item = tasks.items.pop_front();
+ tasks.is_running = front_item.is_some();
+ front_item
+ };
+
+ // wake up the next waker
+ if let Some(front_item) = front_item {
+ front_item.is_ready.raise();
+ front_item.waker.wake();
+ }
+ }
+}
+
+/// A permit that when dropped will allow another task to proceed.
+pub struct TaskQueuePermit<'a>(&'a TaskQueue);
+
+impl<'a> Drop for TaskQueuePermit<'a> {
+ fn drop(&mut self) {
+ self.0.raise_next();
+ }
+}
+
+pub struct TaskQueuePermitAcquireFuture<'a> {
+ task_queue: Option<&'a TaskQueue>,
+ item: Arc<TaskQueueTaskItem>,
+}
+
+impl<'a> TaskQueuePermitAcquireFuture<'a> {
+ pub fn new(task_queue: &'a TaskQueue) -> Self {
+ // acquire the waker position synchronously
+ let mut tasks = task_queue.tasks.lock();
+ let item = if !tasks.is_running {
+ tasks.is_running = true;
+ let item = Arc::new(TaskQueueTaskItem::default());
+ item.is_ready.raise();
+ item
+ } else {
+ let item = Arc::new(TaskQueueTaskItem::default());
+ tasks.items.push_back(item.clone());
+ item
+ };
+ drop(tasks);
+ Self {
+ task_queue: Some(task_queue),
+ item,
+ }
+ }
+}
+
+impl<'a> Drop for TaskQueuePermitAcquireFuture<'a> {
+ fn drop(&mut self) {
+ if let Some(task_queue) = self.task_queue.take() {
+ if self.item.is_ready.is_raised() {
+ task_queue.raise_next();
+ } else {
+ self.item.is_future_dropped.raise();
+ }
+ }
+ }
+}
+
+impl<'a> Future for TaskQueuePermitAcquireFuture<'a> {
+ type Output = TaskQueuePermit<'a>;
+
+ fn poll(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ if self.item.is_ready.is_raised() {
+ std::task::Poll::Ready(TaskQueuePermit(self.task_queue.take().unwrap()))
+ } else {
+ self.item.waker.register(cx.waker());
+ std::task::Poll::Pending
+ }
+ }
+}
+
#[cfg(test)]
mod test {
- use super::AtomicFlag;
+ use deno_core::futures;
+ use deno_core::parking_lot::Mutex;
+ use std::sync::Arc;
+
+ use super::*;
#[test]
fn atomic_flag_raises() {
@@ -32,4 +176,116 @@ mod test {
assert!(!flag.raise());
assert!(flag.is_raised());
}
+
+ #[tokio::test]
+ async fn task_queue_runs_one_after_other() {
+ let task_queue = TaskQueue::default();
+ let mut tasks = Vec::new();
+ let data = Arc::new(Mutex::new(0));
+ for i in 0..100 {
+ let data = data.clone();
+ tasks.push(task_queue.run(async move {
+ deno_core::unsync::spawn_blocking(move || {
+ let mut data = data.lock();
+ assert_eq!(*data, i);
+ *data = i + 1;
+ })
+ .await
+ .unwrap();
+ }));
+ }
+ futures::future::join_all(tasks).await;
+ }
+
+ #[tokio::test]
+ async fn task_queue_run_in_sequence() {
+ let task_queue = TaskQueue::default();
+ let data = Arc::new(Mutex::new(0));
+
+ let first = task_queue.run(async {
+ *data.lock() = 1;
+ });
+ let second = task_queue.run(async {
+ assert_eq!(*data.lock(), 1);
+ *data.lock() = 2;
+ });
+ let _ = tokio::join!(first, second);
+
+ assert_eq!(*data.lock(), 2);
+ }
+
+ #[tokio::test]
+ async fn task_queue_future_dropped_before_poll() {
+ let task_queue = Arc::new(TaskQueue::default());
+
+ // acquire a future, but do not await it
+ let future = task_queue.acquire();
+
+ // this task tries to acquire another permit, but will be blocked by the first permit.
+ let enter_flag = Arc::new(AtomicFlag::default());
+ let delayed_task = deno_core::unsync::spawn({
+ let enter_flag = enter_flag.clone();
+ let task_queue = task_queue.clone();
+ async move {
+ enter_flag.raise();
+ task_queue.acquire().await;
+ true
+ }
+ });
+
+ // ensure the task gets a chance to be scheduled and blocked
+ tokio::task::yield_now().await;
+ assert!(enter_flag.is_raised());
+
+ // now, drop the first future
+ drop(future);
+
+ assert!(delayed_task.await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn task_queue_many_future_dropped_before_poll() {
+ let task_queue = Arc::new(TaskQueue::default());
+
+ // acquire a future, but do not await it
+ let mut futures = Vec::new();
+ for _ in 0..=10_000 {
+ futures.push(task_queue.acquire());
+ }
+
+ // this task tries to acquire another permit, but will be blocked by the first permit.
+ let enter_flag = Arc::new(AtomicFlag::default());
+ let delayed_task = deno_core::unsync::spawn({
+ let task_queue = task_queue.clone();
+ let enter_flag = enter_flag.clone();
+ async move {
+ enter_flag.raise();
+ task_queue.acquire().await;
+ true
+ }
+ });
+
+ // ensure the task gets a chance to be scheduled and blocked
+ tokio::task::yield_now().await;
+ assert!(enter_flag.is_raised());
+
+ // now, drop the futures
+ drop(futures);
+
+ assert!(delayed_task.await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn task_queue_middle_future_dropped_while_permit_acquired() {
+ let task_queue = TaskQueue::default();
+
+ let fut1 = task_queue.acquire();
+ let fut2 = task_queue.acquire();
+ let fut3 = task_queue.acquire();
+
+ // should not hang
+ drop(fut2);
+ drop(fut1.await);
+ drop(fut3.await);
+ }
}