summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-03-04 20:10:31 -0400
committerGitHub <noreply@github.com>2023-03-05 00:10:31 +0000
commit4894e500cf8c60c2971d186d6a21b994bf36e7d1 (patch)
tree79dabdc836b7485652f85a3a87de47fa57f54fd0
parent19bb23b60a102ff5b0c06e302487a964edd68db1 (diff)
refactor: move TaskQueue from deno_runtime to deno_core (#18016)
This utility is useful in several contexts so it seems reasonable to have it in `deno_core`.
-rw-r--r--core/lib.rs2
-rw-r--r--core/task_queue.rs148
-rw-r--r--runtime/ops/io.rs3
-rw-r--r--runtime/ops/utils.rs147
4 files changed, 151 insertions, 149 deletions
diff --git a/core/lib.rs b/core/lib.rs
index 51a03493d..7ec40e311 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -20,6 +20,7 @@ mod resources;
mod runtime;
pub mod snapshot_util;
mod source_map;
+mod task_queue;
// Re-exports
pub use anyhow;
@@ -116,6 +117,7 @@ pub use crate::runtime::Snapshot;
pub use crate::runtime::V8_WRAPPER_OBJECT_INDEX;
pub use crate::runtime::V8_WRAPPER_TYPE_INDEX;
pub use crate::source_map::SourceMapGetter;
+pub use crate::task_queue::TaskQueue;
pub fn v8_version() -> &'static str {
v8::V8::get_version()
diff --git a/core/task_queue.rs b/core/task_queue.rs
new file mode 100644
index 000000000..839c47655
--- /dev/null
+++ b/core/task_queue.rs
@@ -0,0 +1,148 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use futures::task::AtomicWaker;
+use futures::Future;
+use parking_lot::Mutex;
+use std::collections::LinkedList;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+
+#[derive(Default)]
+struct TaskQueueTaskWaker {
+ is_ready: AtomicBool,
+ waker: AtomicWaker,
+}
+
+#[derive(Default)]
+struct TaskQueueTasks {
+ is_running: bool,
+ wakers: LinkedList<Arc<TaskQueueTaskWaker>>,
+}
+
+/// A queue that executes tasks sequentially one after the other
+/// ensuring order and that no task runs at the same time as another.
+///
+/// Note that tokio's semaphore doesn't seem to maintain order
+/// and so we can't use that in the code that uses this or use
+/// that here.
+#[derive(Clone, Default)]
+pub struct TaskQueue {
+ tasks: Arc<Mutex<TaskQueueTasks>>,
+}
+
+impl TaskQueue {
+ /// Alternate API that acquires a permit internally
+ /// for the duration of the future.
+ #[cfg(test)]
+ pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R {
+ let _permit = self.acquire().await;
+ future.await
+ }
+
+ /// Acquires a permit where the tasks are executed one at a time
+ /// and in the order that they were acquired.
+ pub async fn acquire(&self) -> TaskQueuePermit {
+ let acquire = TaskQueuePermitAcquire::new(self.tasks.clone());
+ acquire.await;
+ TaskQueuePermit {
+ tasks: self.tasks.clone(),
+ }
+ }
+}
+
+/// A permit that when dropped will allow another task to proceed.
+pub struct TaskQueuePermit {
+ tasks: Arc<Mutex<TaskQueueTasks>>,
+}
+
+impl Drop for TaskQueuePermit {
+ fn drop(&mut self) {
+ let next_item = {
+ let mut tasks = self.tasks.lock();
+ let next_item = tasks.wakers.pop_front();
+ tasks.is_running = next_item.is_some();
+ next_item
+ };
+ if let Some(next_item) = next_item {
+ next_item.is_ready.store(true, Ordering::SeqCst);
+ next_item.waker.wake();
+ }
+ }
+}
+
+struct TaskQueuePermitAcquire {
+ tasks: Arc<Mutex<TaskQueueTasks>>,
+ initialized: AtomicBool,
+ waker: Arc<TaskQueueTaskWaker>,
+}
+
+impl TaskQueuePermitAcquire {
+ pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self {
+ Self {
+ tasks,
+ initialized: Default::default(),
+ waker: Default::default(),
+ }
+ }
+}
+
+impl Future for TaskQueuePermitAcquire {
+ type Output = ();
+
+ fn poll(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ // update with the latest waker
+ self.waker.waker.register(cx.waker());
+
+ // ensure this is initialized
+ if !self.initialized.swap(true, Ordering::SeqCst) {
+ let mut tasks = self.tasks.lock();
+ if !tasks.is_running {
+ tasks.is_running = true;
+ return std::task::Poll::Ready(());
+ }
+ tasks.wakers.push_back(self.waker.clone());
+ return std::task::Poll::Pending;
+ }
+
+ // check if we're ready to run
+ if self.waker.is_ready.load(Ordering::SeqCst) {
+ std::task::Poll::Ready(())
+ } else {
+ std::task::Poll::Pending
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use parking_lot::Mutex;
+ use std::sync::Arc;
+
+ use super::TaskQueue;
+
+ #[tokio::test]
+ async fn task_queue_runs_one_after_other() {
+ let task_queue = TaskQueue::default();
+ let mut tasks = Vec::new();
+ let data = Arc::new(Mutex::new(0));
+ for i in 0..100 {
+ let data = data.clone();
+ tasks.push(task_queue.queue(async move {
+ tokio::task::spawn_blocking(move || {
+ let mut data = data.lock();
+ if *data != i {
+ panic!("Value was not equal.");
+ }
+ *data = i + 1;
+ })
+ .await
+ .unwrap();
+ }));
+ }
+ futures::future::join_all(tasks).await;
+ }
+}
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs
index 55eae6373..d0ee116a5 100644
--- a/runtime/ops/io.rs
+++ b/runtime/ops/io.rs
@@ -16,6 +16,7 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
+use deno_core::TaskQueue;
use once_cell::sync::Lazy;
use std::borrow::Cow;
use std::cell::RefCell;
@@ -170,8 +171,6 @@ pub fn init_stdio(stdio: Stdio) -> Extension {
#[cfg(unix)]
use nix::sys::termios;
-use super::utils::TaskQueue;
-
#[derive(Default)]
pub struct TtyMetadata {
#[cfg(unix)]
diff --git a/runtime/ops/utils.rs b/runtime/ops/utils.rs
index bdbe7f6d0..e7bf02c10 100644
--- a/runtime/ops/utils.rs
+++ b/runtime/ops/utils.rs
@@ -2,13 +2,6 @@
use deno_core::error::custom_error;
use deno_core::error::AnyError;
-use deno_core::futures::task::AtomicWaker;
-use deno_core::futures::Future;
-use deno_core::parking_lot::Mutex;
-use std::collections::LinkedList;
-use std::sync::atomic::AtomicBool;
-use std::sync::atomic::Ordering;
-use std::sync::Arc;
/// A utility function to map OsStrings to Strings
pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
@@ -17,143 +10,3 @@ pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> {
custom_error("InvalidData", message)
})
}
-
-#[derive(Default)]
-struct TaskQueueTaskWaker {
- is_ready: AtomicBool,
- waker: AtomicWaker,
-}
-
-#[derive(Default)]
-struct TaskQueueTasks {
- is_running: bool,
- wakers: LinkedList<Arc<TaskQueueTaskWaker>>,
-}
-
-/// A queue that executes tasks sequentially one after the other
-/// ensuring order and that no task runs at the same time as another.
-///
-/// Note that tokio's semaphore doesn't seem to maintain order
-/// and so we can't use that in the code that uses this or use
-/// that here.
-#[derive(Clone, Default)]
-pub struct TaskQueue {
- tasks: Arc<Mutex<TaskQueueTasks>>,
-}
-
-impl TaskQueue {
- /// Alternate API that acquires a permit internally
- /// for the duration of the future.
- #[cfg(test)]
- pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R {
- let _permit = self.acquire().await;
- future.await
- }
-
- /// Acquires a permit where the tasks are executed one at a time
- /// and in the order that they were acquired.
- pub async fn acquire(&self) -> TaskQueuePermit {
- let acquire = TaskQueuePermitAcquire::new(self.tasks.clone());
- acquire.await;
- TaskQueuePermit {
- tasks: self.tasks.clone(),
- }
- }
-}
-
-/// A permit that when dropped will allow another task to proceed.
-pub struct TaskQueuePermit {
- tasks: Arc<Mutex<TaskQueueTasks>>,
-}
-
-impl Drop for TaskQueuePermit {
- fn drop(&mut self) {
- let next_item = {
- let mut tasks = self.tasks.lock();
- let next_item = tasks.wakers.pop_front();
- tasks.is_running = next_item.is_some();
- next_item
- };
- if let Some(next_item) = next_item {
- next_item.is_ready.store(true, Ordering::SeqCst);
- next_item.waker.wake();
- }
- }
-}
-
-struct TaskQueuePermitAcquire {
- tasks: Arc<Mutex<TaskQueueTasks>>,
- initialized: AtomicBool,
- waker: Arc<TaskQueueTaskWaker>,
-}
-
-impl TaskQueuePermitAcquire {
- pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self {
- Self {
- tasks,
- initialized: Default::default(),
- waker: Default::default(),
- }
- }
-}
-
-impl Future for TaskQueuePermitAcquire {
- type Output = ();
-
- fn poll(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Self::Output> {
- // update with the latest waker
- self.waker.waker.register(cx.waker());
-
- // ensure this is initialized
- if !self.initialized.swap(true, Ordering::SeqCst) {
- let mut tasks = self.tasks.lock();
- if !tasks.is_running {
- tasks.is_running = true;
- return std::task::Poll::Ready(());
- }
- tasks.wakers.push_back(self.waker.clone());
- return std::task::Poll::Pending;
- }
-
- // check if we're ready to run
- if self.waker.is_ready.load(Ordering::SeqCst) {
- std::task::Poll::Ready(())
- } else {
- std::task::Poll::Pending
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use deno_core::futures;
- use deno_core::parking_lot::Mutex;
- use std::sync::Arc;
-
- use super::TaskQueue;
-
- #[tokio::test]
- async fn task_queue_runs_one_after_other() {
- let task_queue = TaskQueue::default();
- let mut tasks = Vec::new();
- let data = Arc::new(Mutex::new(0));
- for i in 0..100 {
- let data = data.clone();
- tasks.push(task_queue.queue(async move {
- tokio::task::spawn_blocking(move || {
- let mut data = data.lock();
- if *data != i {
- panic!("Value was not equal.");
- }
- *data = i + 1;
- })
- .await
- .unwrap();
- }));
- }
- futures::future::join_all(tasks).await;
- }
-}