diff options
author | David Sherret <dsherret@users.noreply.github.com> | 2023-03-04 20:07:11 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-04 20:07:11 -0500 |
commit | 2f7222da8a26d8be915b9467fc21649a18f54b77 (patch) | |
tree | adb54f49608bd6cdd27e98ad56130532f49a04f5 /core/task_queue.rs | |
parent | 7afa3aceb04e6b2c8820b7326d6f648db6b571c6 (diff) |
refactor: remove `Semaphore::new(1)` and use `TaskQueue` (#18014)
Diffstat (limited to 'core/task_queue.rs')
-rw-r--r-- | core/task_queue.rs | 49 |
1 files changed, 22 insertions, 27 deletions
diff --git a/core/task_queue.rs b/core/task_queue.rs index 839c47655..36a169650 100644 --- a/core/task_queue.rs +++ b/core/task_queue.rs @@ -8,13 +8,13 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -#[derive(Default)] +#[derive(Debug, Default)] struct TaskQueueTaskWaker { is_ready: AtomicBool, waker: AtomicWaker, } -#[derive(Default)] +#[derive(Debug, Default)] struct TaskQueueTasks { is_running: bool, wakers: LinkedList<Arc<TaskQueueTaskWaker>>, @@ -26,40 +26,35 @@ struct TaskQueueTasks { /// Note that tokio's semaphore doesn't seem to maintain order /// and so we can't use that in the code that uses this or use /// that here. -#[derive(Clone, Default)] +#[derive(Debug, Default)] pub struct TaskQueue { - tasks: Arc<Mutex<TaskQueueTasks>>, + tasks: Mutex<TaskQueueTasks>, } impl TaskQueue { + /// Acquires a permit where the tasks are executed one at a time + /// and in the order that they were acquired. + pub async fn acquire(&self) -> TaskQueuePermit { + let acquire = TaskQueuePermitAcquire::new(self); + acquire.await; + TaskQueuePermit(self) + } + /// Alternate API that acquires a permit internally /// for the duration of the future. - #[cfg(test)] pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R { let _permit = self.acquire().await; future.await } - - /// Acquires a permit where the tasks are executed one at a time - /// and in the order that they were acquired. - pub async fn acquire(&self) -> TaskQueuePermit { - let acquire = TaskQueuePermitAcquire::new(self.tasks.clone()); - acquire.await; - TaskQueuePermit { - tasks: self.tasks.clone(), - } - } } /// A permit that when dropped will allow another task to proceed. -pub struct TaskQueuePermit { - tasks: Arc<Mutex<TaskQueueTasks>>, -} +pub struct TaskQueuePermit<'a>(&'a TaskQueue); -impl Drop for TaskQueuePermit { +impl<'a> Drop for TaskQueuePermit<'a> { fn drop(&mut self) { let next_item = { - let mut tasks = self.tasks.lock(); + let mut tasks = self.0.tasks.lock(); let next_item = tasks.wakers.pop_front(); tasks.is_running = next_item.is_some(); next_item @@ -71,23 +66,23 @@ impl Drop for TaskQueuePermit { } } -struct TaskQueuePermitAcquire { - tasks: Arc<Mutex<TaskQueueTasks>>, +struct TaskQueuePermitAcquire<'a> { + task_queue: &'a TaskQueue, initialized: AtomicBool, waker: Arc<TaskQueueTaskWaker>, } -impl TaskQueuePermitAcquire { - pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self { +impl<'a> TaskQueuePermitAcquire<'a> { + pub fn new(task_queue: &'a TaskQueue) -> Self { Self { - tasks, + task_queue, initialized: Default::default(), waker: Default::default(), } } } -impl Future for TaskQueuePermitAcquire { +impl<'a> Future for TaskQueuePermitAcquire<'a> { type Output = (); fn poll( @@ -99,7 +94,7 @@ impl Future for TaskQueuePermitAcquire { // ensure this is initialized if !self.initialized.swap(true, Ordering::SeqCst) { - let mut tasks = self.tasks.lock(); + let mut tasks = self.task_queue.tasks.lock(); if !tasks.is_running { tasks.is_running = true; return std::task::Poll::Ready(()); |