summaryrefslogtreecommitdiff
path: root/core/task_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/task_queue.rs')
-rw-r--r--core/task_queue.rs49
1 files changed, 22 insertions, 27 deletions
diff --git a/core/task_queue.rs b/core/task_queue.rs
index 839c47655..36a169650 100644
--- a/core/task_queue.rs
+++ b/core/task_queue.rs
@@ -8,13 +8,13 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
-#[derive(Default)]
+#[derive(Debug, Default)]
struct TaskQueueTaskWaker {
is_ready: AtomicBool,
waker: AtomicWaker,
}
-#[derive(Default)]
+#[derive(Debug, Default)]
struct TaskQueueTasks {
is_running: bool,
wakers: LinkedList<Arc<TaskQueueTaskWaker>>,
@@ -26,40 +26,35 @@ struct TaskQueueTasks {
/// Note that tokio's semaphore doesn't seem to maintain order
/// and so we can't use that in the code that uses this or use
/// that here.
-#[derive(Clone, Default)]
+#[derive(Debug, Default)]
pub struct TaskQueue {
- tasks: Arc<Mutex<TaskQueueTasks>>,
+ tasks: Mutex<TaskQueueTasks>,
}
impl TaskQueue {
+ /// Acquires a permit where the tasks are executed one at a time
+ /// and in the order that they were acquired.
+ pub async fn acquire(&self) -> TaskQueuePermit {
+ let acquire = TaskQueuePermitAcquire::new(self);
+ acquire.await;
+ TaskQueuePermit(self)
+ }
+
/// Alternate API that acquires a permit internally
/// for the duration of the future.
- #[cfg(test)]
pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R {
let _permit = self.acquire().await;
future.await
}
-
- /// Acquires a permit where the tasks are executed one at a time
- /// and in the order that they were acquired.
- pub async fn acquire(&self) -> TaskQueuePermit {
- let acquire = TaskQueuePermitAcquire::new(self.tasks.clone());
- acquire.await;
- TaskQueuePermit {
- tasks: self.tasks.clone(),
- }
- }
}
/// A permit that when dropped will allow another task to proceed.
-pub struct TaskQueuePermit {
- tasks: Arc<Mutex<TaskQueueTasks>>,
-}
+pub struct TaskQueuePermit<'a>(&'a TaskQueue);
-impl Drop for TaskQueuePermit {
+impl<'a> Drop for TaskQueuePermit<'a> {
fn drop(&mut self) {
let next_item = {
- let mut tasks = self.tasks.lock();
+ let mut tasks = self.0.tasks.lock();
let next_item = tasks.wakers.pop_front();
tasks.is_running = next_item.is_some();
next_item
@@ -71,23 +66,23 @@ impl Drop for TaskQueuePermit {
}
}
-struct TaskQueuePermitAcquire {
- tasks: Arc<Mutex<TaskQueueTasks>>,
+struct TaskQueuePermitAcquire<'a> {
+ task_queue: &'a TaskQueue,
initialized: AtomicBool,
waker: Arc<TaskQueueTaskWaker>,
}
-impl TaskQueuePermitAcquire {
- pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self {
+impl<'a> TaskQueuePermitAcquire<'a> {
+ pub fn new(task_queue: &'a TaskQueue) -> Self {
Self {
- tasks,
+ task_queue,
initialized: Default::default(),
waker: Default::default(),
}
}
}
-impl Future for TaskQueuePermitAcquire {
+impl<'a> Future for TaskQueuePermitAcquire<'a> {
type Output = ();
fn poll(
@@ -99,7 +94,7 @@ impl Future for TaskQueuePermitAcquire {
// ensure this is initialized
if !self.initialized.swap(true, Ordering::SeqCst) {
- let mut tasks = self.tasks.lock();
+ let mut tasks = self.task_queue.tasks.lock();
if !tasks.is_running {
tasks.is_running = true;
return std::task::Poll::Ready(());