From 4894e500cf8c60c2971d186d6a21b994bf36e7d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 4 Mar 2023 20:10:31 -0400 Subject: refactor: move TaskQueue from deno_runtime to deno_core (#18016) This utility is useful in several contexts so it seems reasonable to have it in `deno_core`. --- runtime/ops/utils.rs | 147 --------------------------------------------------- 1 file changed, 147 deletions(-) (limited to 'runtime/ops/utils.rs') diff --git a/runtime/ops/utils.rs b/runtime/ops/utils.rs index bdbe7f6d0..e7bf02c10 100644 --- a/runtime/ops/utils.rs +++ b/runtime/ops/utils.rs @@ -2,13 +2,6 @@ use deno_core::error::custom_error; use deno_core::error::AnyError; -use deno_core::futures::task::AtomicWaker; -use deno_core::futures::Future; -use deno_core::parking_lot::Mutex; -use std::collections::LinkedList; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use std::sync::Arc; /// A utility function to map OsStrings to Strings pub fn into_string(s: std::ffi::OsString) -> Result { @@ -17,143 +10,3 @@ pub fn into_string(s: std::ffi::OsString) -> Result { custom_error("InvalidData", message) }) } - -#[derive(Default)] -struct TaskQueueTaskWaker { - is_ready: AtomicBool, - waker: AtomicWaker, -} - -#[derive(Default)] -struct TaskQueueTasks { - is_running: bool, - wakers: LinkedList>, -} - -/// A queue that executes tasks sequentially one after the other -/// ensuring order and that no task runs at the same time as another. -/// -/// Note that tokio's semaphore doesn't seem to maintain order -/// and so we can't use that in the code that uses this or use -/// that here. -#[derive(Clone, Default)] -pub struct TaskQueue { - tasks: Arc>, -} - -impl TaskQueue { - /// Alternate API that acquires a permit internally - /// for the duration of the future. - #[cfg(test)] - pub async fn queue(&self, future: impl Future) -> R { - let _permit = self.acquire().await; - future.await - } - - /// Acquires a permit where the tasks are executed one at a time - /// and in the order that they were acquired. - pub async fn acquire(&self) -> TaskQueuePermit { - let acquire = TaskQueuePermitAcquire::new(self.tasks.clone()); - acquire.await; - TaskQueuePermit { - tasks: self.tasks.clone(), - } - } -} - -/// A permit that when dropped will allow another task to proceed. -pub struct TaskQueuePermit { - tasks: Arc>, -} - -impl Drop for TaskQueuePermit { - fn drop(&mut self) { - let next_item = { - let mut tasks = self.tasks.lock(); - let next_item = tasks.wakers.pop_front(); - tasks.is_running = next_item.is_some(); - next_item - }; - if let Some(next_item) = next_item { - next_item.is_ready.store(true, Ordering::SeqCst); - next_item.waker.wake(); - } - } -} - -struct TaskQueuePermitAcquire { - tasks: Arc>, - initialized: AtomicBool, - waker: Arc, -} - -impl TaskQueuePermitAcquire { - pub fn new(tasks: Arc>) -> Self { - Self { - tasks, - initialized: Default::default(), - waker: Default::default(), - } - } -} - -impl Future for TaskQueuePermitAcquire { - type Output = (); - - fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - // update with the latest waker - self.waker.waker.register(cx.waker()); - - // ensure this is initialized - if !self.initialized.swap(true, Ordering::SeqCst) { - let mut tasks = self.tasks.lock(); - if !tasks.is_running { - tasks.is_running = true; - return std::task::Poll::Ready(()); - } - tasks.wakers.push_back(self.waker.clone()); - return std::task::Poll::Pending; - } - - // check if we're ready to run - if self.waker.is_ready.load(Ordering::SeqCst) { - std::task::Poll::Ready(()) - } else { - std::task::Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use deno_core::futures; - use deno_core::parking_lot::Mutex; - use std::sync::Arc; - - use super::TaskQueue; - - #[tokio::test] - async fn task_queue_runs_one_after_other() { - let task_queue = TaskQueue::default(); - let mut tasks = Vec::new(); - let data = Arc::new(Mutex::new(0)); - for i in 0..100 { - let data = data.clone(); - tasks.push(task_queue.queue(async move { - tokio::task::spawn_blocking(move || { - let mut data = data.lock(); - if *data != i { - panic!("Value was not equal."); - } - *data = i + 1; - }) - .await - .unwrap(); - })); - } - futures::future::join_all(tasks).await; - } -} -- cgit v1.2.3