summaryrefslogtreecommitdiff
path: root/cli/tools/test
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2024-04-04 12:06:58 -0600
committerGitHub <noreply@github.com>2024-04-04 18:06:58 +0000
commit7cc584ed79da3addeb7758cf0c6480000a00a112 (patch)
treebb422fc02171429709d7cb25c614fe1959d8247f /cli/tools/test
parent207349cfb7080493592f61f9b465c25892531171 (diff)
fix(cli): fix deadlock in test writer when test pipe is full (#23210)
The tests would deadlock if we tried to write the sync marker into a pipe that was full because one test streamed just enough data to fill the pipe, so when we went to actually write the sync marker we blocked when nobody was reading. We use a two-phase lock for sync markers now: one to indicate "ready to sync" and the second to indicate that the sync bytes have been received.
Diffstat (limited to 'cli/tools/test')
-rw-r--r--cli/tools/test/channel.rs137
1 files changed, 118 insertions, 19 deletions
diff --git a/cli/tools/test/channel.rs b/cli/tools/test/channel.rs
index 611310538..a024393bb 100644
--- a/cli/tools/test/channel.rs
+++ b/cli/tools/test/channel.rs
@@ -227,10 +227,10 @@ impl TestEventSenderFactory {
/// Create a [`TestEventWorkerSender`], along with a stdout/stderr stream.
pub fn worker(&self) -> TestEventWorkerSender {
let id = self.worker_id.fetch_add(1, Ordering::AcqRel);
- let (stdout_reader, mut stdout_writer) = pipe().unwrap();
- let (stderr_reader, mut stderr_writer) = pipe().unwrap();
+ let (stdout_reader, stdout_writer) = pipe().unwrap();
+ let (stderr_reader, stderr_writer) = pipe().unwrap();
let (sync_sender, mut sync_receiver) =
- tokio::sync::mpsc::unbounded_channel::<SendMutex>();
+ tokio::sync::mpsc::unbounded_channel::<(SendMutex, SendMutex)>();
let stdout = stdout_writer.try_clone().unwrap();
let stderr = stderr_writer.try_clone().unwrap();
let sender = self.sender.clone();
@@ -281,17 +281,17 @@ impl TestEventSenderFactory {
// If the channel closed, we assume that all important data from the streams was synced,
// so we just end this task immediately.
None => { break },
- Some(mutex) => {
- // If we fail to write the sync marker for flush (likely in the case where the runtime is shutting down),
- // we instead just release the mutex and bail.
- let success = stdout_writer.write_all(SYNC_MARKER).is_ok()
- && stderr_writer.write_all(SYNC_MARKER).is_ok();
- if success {
- for stream in [&mut test_stdout, &mut test_stderr] {
+ Some((mutex1, mutex2)) => {
+ // Two phase lock: mutex1 indicates that we are done our general read phase and are ready for
+ // the sync phase. mutex2 indicates that we have completed the sync phase. This prevents deadlock
+ // when the pipe is too full to accept the sync marker.
+ drop(mutex1);
+ for stream in [&mut test_stdout, &mut test_stderr] {
+ if stream.is_alive() {
stream.read_until_sync_marker().await;
}
}
- drop(mutex);
+ drop(mutex2);
}
}
}
@@ -313,6 +313,8 @@ impl TestEventSenderFactory {
ref_count: Default::default(),
sender: self.sender.clone(),
sync_sender,
+ stdout_writer,
+ stderr_writer,
};
TestEventWorkerSender {
@@ -373,7 +375,9 @@ pub struct TestEventSender {
pub id: usize,
ref_count: Arc<()>,
sender: UnboundedSender<(usize, TestEvent)>,
- sync_sender: UnboundedSender<SendMutex>,
+ sync_sender: UnboundedSender<(SendMutex, SendMutex)>,
+ stdout_writer: PipeWrite,
+ stderr_writer: PipeWrite,
}
impl Clone for TestEventSender {
@@ -383,6 +387,8 @@ impl Clone for TestEventSender {
ref_count: self.ref_count.clone(),
sender: self.sender.clone(),
sync_sender: self.sync_sender.clone(),
+ stdout_writer: self.stdout_writer.try_clone().unwrap(),
+ stderr_writer: self.stderr_writer.try_clone().unwrap(),
}
}
}
@@ -400,12 +406,27 @@ impl TestEventSender {
/// Ensure that all output has been fully flushed by writing a sync marker into the
/// stdout and stderr streams and waiting for it on the other side.
pub fn flush(&mut self) -> Result<(), ChannelClosedError> {
- let mutex = parking_lot::RawMutex::INIT;
- mutex.lock();
- self.sync_sender.send(SendMutex(&mutex as _))?;
- if !mutex.try_lock_for(Duration::from_secs(30)) {
+ // Two phase lock: mutex1 indicates that we are done our general read phase and are ready for
+ // the sync phase. mutex2 indicates that we have completed the sync phase. This prevents deadlock
+ // when the pipe is too full to accept the sync marker.
+ let mutex1 = parking_lot::RawMutex::INIT;
+ mutex1.lock();
+ let mutex2 = parking_lot::RawMutex::INIT;
+ mutex2.lock();
+ self
+ .sync_sender
+ .send((SendMutex(&mutex1 as _), SendMutex(&mutex2 as _)))?;
+ if !mutex1.try_lock_for(Duration::from_secs(30)) {
panic!(
- "Test flush deadlock, sender closed = {}",
+ "Test flush deadlock 1, sender closed = {}",
+ self.sync_sender.is_closed()
+ );
+ }
+ _ = self.stdout_writer.write_all(SYNC_MARKER);
+ _ = self.stderr_writer.write_all(SYNC_MARKER);
+ if !mutex2.try_lock_for(Duration::from_secs(30)) {
+ panic!(
+ "Test flush deadlock 2, sender closed = {}",
self.sync_sender.is_closed()
);
}
@@ -415,9 +436,8 @@ impl TestEventSender {
#[cfg(test)]
mod tests {
- use crate::tools::test::TestResult;
-
use super::*;
+ use crate::tools::test::TestResult;
use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
@@ -499,6 +519,85 @@ mod tests {
assert_eq!(messages.len(), 100000);
}
+ /// Test that flushing a large number of times doesn't hang.
+ #[tokio::test]
+ async fn test_flush_large() {
+ test_util::timeout!(240);
+ let (mut worker, mut receiver) = create_single_test_event_channel();
+ let recv_handle = spawn(async move {
+ let mut queue = vec![];
+ while let Some((_, message)) = receiver.recv().await {
+ if let TestEvent::StepWait(..) = message {
+ queue.push(());
+ }
+ }
+ eprintln!("Receiver closed");
+ queue
+ });
+ let send_handle = spawn_blocking(move || {
+ for _ in 0..25000 {
+ // Write one pipe buffer's worth of message here. We try a few different sizes of potentially
+ // blocking writes.
+ worker.stderr.write_all(&[0; 4 * 1024]).unwrap();
+ worker.sender.send(TestEvent::StepWait(1)).unwrap();
+ worker.stderr.write_all(&[0; 16 * 1024]).unwrap();
+ worker.sender.send(TestEvent::StepWait(1)).unwrap();
+ worker.stderr.write_all(&[0; 64 * 1024]).unwrap();
+ worker.sender.send(TestEvent::StepWait(1)).unwrap();
+ worker.stderr.write_all(&[0; 128 * 1024]).unwrap();
+ worker.sender.send(TestEvent::StepWait(1)).unwrap();
+ }
+ eprintln!("Sent all messages");
+ });
+ send_handle.await.unwrap();
+ let messages = recv_handle.await.unwrap();
+ assert_eq!(messages.len(), 100000);
+ }
+
+ /// Test that flushing a large number of times doesn't hang.
+ #[tokio::test]
+ async fn test_flush_with_close() {
+ test_util::timeout!(240);
+ let (worker, mut receiver) = create_single_test_event_channel();
+ let TestEventWorkerSender {
+ mut sender,
+ stderr,
+ stdout,
+ } = worker;
+ let recv_handle = spawn(async move {
+ let mut queue = vec![];
+ while let Some((_, _)) = receiver.recv().await {
+ queue.push(());
+ }
+ eprintln!("Receiver closed");
+ queue
+ });
+ let send_handle = spawn_blocking(move || {
+ let mut stdout = Some(stdout);
+ let mut stderr = Some(stderr);
+ for i in 0..100000 {
+ if i == 20000 {
+ stdout.take();
+ }
+ if i == 40000 {
+ stderr.take();
+ }
+ if i % 2 == 0 {
+ if let Some(stdout) = &mut stdout {
+ stdout.write_all(b"message").unwrap();
+ }
+ } else if let Some(stderr) = &mut stderr {
+ stderr.write_all(b"message").unwrap();
+ }
+ sender.send(TestEvent::StepWait(1)).unwrap();
+ }
+ eprintln!("Sent all messages");
+ });
+ send_handle.await.unwrap();
+ let messages = recv_handle.await.unwrap();
+ assert_eq!(messages.len(), 130000);
+ }
+
/// Test that large numbers of interleaved steps are routed properly.
#[tokio::test]
async fn test_interleave() {