diff options
author | David Sherret <dsherret@users.noreply.github.com> | 2022-09-05 09:05:48 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-05 09:05:48 -0400 |
commit | f6636d41450fd6d0d3d5d6fc6f4c6ead2a639814 (patch) | |
tree | 9509290024da61a4885ba0f2332049e7e40d8bbd | |
parent | 1cdd2504b1cebc8ecf50f65f57959765e2a7b328 (diff) |
chore: fix flaky stdout_write_all test (#15772)
-rw-r--r-- | runtime/ops/io.rs | 13 | ||||
-rw-r--r-- | runtime/ops/utils.rs | 148 |
2 files changed, 155 insertions, 6 deletions
diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 0eeab4c24..ce2ed252f 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -29,7 +29,6 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::process; -use tokio::sync::Semaphore; #[cfg(unix)] use std::os::unix::io::FromRawFd; @@ -163,6 +162,8 @@ pub fn init_stdio(stdio: Stdio) -> Extension { #[cfg(unix)] use nix::sys::termios; +use super::utils::TaskQueue; + #[derive(Default)] pub struct TtyMetadata { #[cfg(unix)] @@ -445,8 +446,8 @@ pub struct StdFileResource { // asynchronously one at a time in order cell: RefCell<Option<StdFileResourceCellValue>>, // Used to keep async actions in order and only allow one - // to occurr at a time - cell_async_sempahore: Semaphore, + // to occur at a time + cell_async_task_queue: TaskQueue, } impl StdFileResource { @@ -456,7 +457,7 @@ impl StdFileResource { inner, meta_data: Default::default(), })), - cell_async_sempahore: Semaphore::new(1), + cell_async_task_queue: Default::default(), name: name.to_string(), } } @@ -467,7 +468,7 @@ impl StdFileResource { inner: StdFileResourceInner::file(fs_file), meta_data: Default::default(), })), - cell_async_sempahore: Semaphore::new(1), + cell_async_task_queue: Default::default(), name: "fsFile".to_string(), } } @@ -498,7 +499,7 @@ impl StdFileResource { F: FnOnce(&mut StdFileResourceInner) -> R + Send + 'static, { // we want to restrict this to one async action at a time - let _permit = self.cell_async_sempahore.acquire().await.unwrap(); + let _permit = self.cell_async_task_queue.acquire().await; // we take the value out of the cell, use it on a blocking task, // then put it back into the cell when we're done let mut did_take = false; diff --git a/runtime/ops/utils.rs b/runtime/ops/utils.rs index d950ae9ab..f1414e8bc 100644 --- a/runtime/ops/utils.rs +++ b/runtime/ops/utils.rs @@ -1,6 +1,14 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + use deno_core::error::custom_error; use deno_core::error::AnyError; +use deno_core::futures::task::AtomicWaker; +use deno_core::futures::Future; +use deno_core::parking_lot::Mutex; +use std::collections::LinkedList; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; /// A utility function to map OsStrings to Strings pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> { @@ -9,3 +17,143 @@ pub fn into_string(s: std::ffi::OsString) -> Result<String, AnyError> { custom_error("InvalidData", message) }) } + +#[derive(Default)] +struct TaskQueueTaskWaker { + is_ready: AtomicBool, + waker: AtomicWaker, +} + +#[derive(Default)] +struct TaskQueueTasks { + is_running: bool, + wakers: LinkedList<Arc<TaskQueueTaskWaker>>, +} + +/// A queue that executes tasks sequentially one after the other +/// ensuring order and that no task runs at the same time as another. +/// +/// Note that tokio's semaphore doesn't seem to maintain order +/// and so we can't use that in the code that uses this or use +/// that here. +#[derive(Clone, Default)] +pub struct TaskQueue { + tasks: Arc<Mutex<TaskQueueTasks>>, +} + +impl TaskQueue { + /// Alternate API that acquires a permit internally + /// for the duration of the future. + #[cfg(test)] + pub async fn queue<R>(&self, future: impl Future<Output = R>) -> R { + let _permit = self.acquire().await; + future.await + } + + /// Acquires a permit where the tasks are executed one at a time + /// and in the order that they were acquired. + pub async fn acquire(&self) -> TaskQueuePermit { + let acquire = TaskQueuePermitAcquire::new(self.tasks.clone()); + acquire.await; + TaskQueuePermit { + tasks: self.tasks.clone(), + } + } +} + +/// A permit that when dropped will allow another task to proceed. +pub struct TaskQueuePermit { + tasks: Arc<Mutex<TaskQueueTasks>>, +} + +impl Drop for TaskQueuePermit { + fn drop(&mut self) { + let next_item = { + let mut tasks = self.tasks.lock(); + let next_item = tasks.wakers.pop_front(); + tasks.is_running = next_item.is_some(); + next_item + }; + if let Some(next_item) = next_item { + next_item.is_ready.store(true, Ordering::SeqCst); + next_item.waker.wake(); + } + } +} + +struct TaskQueuePermitAcquire { + tasks: Arc<Mutex<TaskQueueTasks>>, + initialized: AtomicBool, + waker: Arc<TaskQueueTaskWaker>, +} + +impl TaskQueuePermitAcquire { + pub fn new(tasks: Arc<Mutex<TaskQueueTasks>>) -> Self { + Self { + tasks, + initialized: Default::default(), + waker: Default::default(), + } + } +} + +impl Future for TaskQueuePermitAcquire { + type Output = (); + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Self::Output> { + // update with the latest waker + self.waker.waker.register(cx.waker()); + + // ensure this is initialized + if !self.initialized.swap(true, Ordering::SeqCst) { + let mut tasks = self.tasks.lock(); + if !tasks.is_running { + tasks.is_running = true; + return std::task::Poll::Ready(()); + } + tasks.wakers.push_back(self.waker.clone()); + return std::task::Poll::Pending; + } + + // check if we're ready to run + if self.waker.is_ready.load(Ordering::SeqCst) { + std::task::Poll::Ready(()) + } else { + std::task::Poll::Pending + } + } +} + +#[cfg(test)] +mod tests { + use deno_core::futures; + use deno_core::parking_lot::Mutex; + use std::sync::Arc; + + use super::TaskQueue; + + #[tokio::test] + async fn task_queue_runs_one_after_other() { + let task_queue = TaskQueue::default(); + let mut tasks = Vec::new(); + let data = Arc::new(Mutex::new(0)); + for i in 0..100 { + let data = data.clone(); + tasks.push(task_queue.queue(async move { + tokio::task::spawn_blocking(move || { + let mut data = data.lock(); + if *data != i { + panic!("Value was not equal."); + } + *data = i + 1; + }) + .await + .unwrap(); + })); + } + futures::future::join_all(tasks).await; + } +} |