summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tools/test/channel.rs48
1 files changed, 39 insertions, 9 deletions
diff --git a/cli/tools/test/channel.rs b/cli/tools/test/channel.rs
index ff1d4f8f2..611310538 100644
--- a/cli/tools/test/channel.rs
+++ b/cli/tools/test/channel.rs
@@ -5,16 +5,21 @@ use super::TestStdioStream;
use deno_core::futures::future::poll_fn;
use deno_core::parking_lot;
use deno_core::parking_lot::lock_api::RawMutex;
+use deno_core::parking_lot::lock_api::RawMutexTimed;
use deno_runtime::deno_io::pipe;
use deno_runtime::deno_io::AsyncPipeRead;
use deno_runtime::deno_io::PipeRead;
use deno_runtime::deno_io::PipeWrite;
use std::fmt::Display;
+use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
+use std::task::ready;
+use std::task::Poll;
+use std::time::Duration;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::ReadBuf;
@@ -143,17 +148,23 @@ impl TestStream {
self.read_opt.is_some()
}
+ /// Cancellation-safe.
+ #[inline]
+ fn pipe(&mut self) -> impl Future<Output = ()> + '_ {
+ poll_fn(|cx| self.poll_pipe(cx))
+ }
+
/// Attempt to read from a given stream, pushing all of the data in it into the given
/// [`UnboundedSender`] before returning.
- async fn pipe(&mut self) {
+ fn poll_pipe(&mut self, cx: &mut std::task::Context) -> Poll<()> {
let mut buffer = [0_u8; BUFFER_SIZE];
let mut buf = ReadBuf::new(&mut buffer);
let res = {
- // No more stream, so just return.
+ // No more stream, we shouldn't hit this case.
let Some(stream) = &mut self.read_opt else {
- return;
+ unreachable!();
};
- poll_fn(|cx| Pin::new(&mut *stream).poll_read(cx, &mut buf)).await
+ ready!(Pin::new(&mut *stream).poll_read(cx, &mut buf))
};
match res {
Ok(_) => {
@@ -173,6 +184,7 @@ impl TestStream {
self.read_opt.take();
}
}
+ Poll::Ready(())
}
/// Read and "block" until the sync markers have been read.
@@ -249,11 +261,21 @@ impl TestEventSenderFactory {
let mut test_stderr =
TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?;
+ // This ensures that the stdout and stderr streams in the select! loop below cannot starve each
+ // other.
+ let mut alternate_stream_priority = false;
+
// This function will be woken whenever a stream or the receiver is ready
loop {
+ alternate_stream_priority = !alternate_stream_priority;
+ let (a, b) = if alternate_stream_priority {
+ (&mut test_stdout, &mut test_stderr)
+ } else {
+ (&mut test_stderr, &mut test_stdout)
+ };
+
tokio::select! {
- _ = test_stdout.pipe(), if test_stdout.is_alive() => {},
- _ = test_stderr.pipe(), if test_stdout.is_alive() => {},
+ biased; // We actually want to poll the channel first
recv = sync_receiver.recv() => {
match recv {
// If the channel closed, we assume that all important data from the streams was synced,
@@ -273,6 +295,10 @@ impl TestEventSenderFactory {
}
}
}
+ // Poll stdout first if `alternate_stream_priority` is true, otherwise poll stderr first.
+ // This is necessary because of the `biased` flag above to avoid starvation.
+ _ = a.pipe(), if a.is_alive() => {},
+ _ = b.pipe(), if b.is_alive() => {},
}
}
@@ -377,7 +403,12 @@ impl TestEventSender {
let mutex = parking_lot::RawMutex::INIT;
mutex.lock();
self.sync_sender.send(SendMutex(&mutex as _))?;
- mutex.lock();
+ if !mutex.try_lock_for(Duration::from_secs(30)) {
+ panic!(
+ "Test flush deadlock, sender closed = {}",
+ self.sync_sender.is_closed()
+ );
+ }
Ok(())
}
}
@@ -444,10 +475,9 @@ mod tests {
}
/// Test that flushing a large number of times doesn't hang.
- #[ignore]
#[tokio::test]
async fn test_flush_lots() {
- test_util::timeout!(60);
+ test_util::timeout!(240);
let (mut worker, mut receiver) = create_single_test_event_channel();
let recv_handle = spawn(async move {
let mut queue = vec![];