summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tools/test/channel.rs137
1 files changed, 118 insertions, 19 deletions
diff --git a/cli/tools/test/channel.rs b/cli/tools/test/channel.rs
index 611310538..a024393bb 100644
--- a/cli/tools/test/channel.rs
+++ b/cli/tools/test/channel.rs
@@ -227,10 +227,10 @@ impl TestEventSenderFactory {
/// Create a [`TestEventWorkerSender`], along with a stdout/stderr stream.
pub fn worker(&self) -> TestEventWorkerSender {
let id = self.worker_id.fetch_add(1, Ordering::AcqRel);
- let (stdout_reader, mut stdout_writer) = pipe().unwrap();
- let (stderr_reader, mut stderr_writer) = pipe().unwrap();
+ let (stdout_reader, stdout_writer) = pipe().unwrap();
+ let (stderr_reader, stderr_writer) = pipe().unwrap();
let (sync_sender, mut sync_receiver) =
- tokio::sync::mpsc::unbounded_channel::<SendMutex>();
+ tokio::sync::mpsc::unbounded_channel::<(SendMutex, SendMutex)>();
let stdout = stdout_writer.try_clone().unwrap();
let stderr = stderr_writer.try_clone().unwrap();
let sender = self.sender.clone();
@@ -281,17 +281,17 @@ impl TestEventSenderFactory {
// If the channel closed, we assume that all important data from the streams was synced,
// so we just end this task immediately.
None => { break },
- Some(mutex) => {
- // If we fail to write the sync marker for flush (likely in the case where the runtime is shutting down),
- // we instead just release the mutex and bail.
- let success = stdout_writer.write_all(SYNC_MARKER).is_ok()
- && stderr_writer.write_all(SYNC_MARKER).is_ok();
- if success {
- for stream in [&mut test_stdout, &mut test_stderr] {
+ Some((mutex1, mutex2)) => {
+ // Two phase lock: mutex1 indicates that we are done our general read phase and are ready for
+ // the sync phase. mutex2 indicates that we have completed the sync phase. This prevents deadlock
+ // when the pipe is too full to accept the sync marker.
+ drop(mutex1);
+ for stream in [&mut test_stdout, &mut test_stderr] {
+ if stream.is_alive() {
stream.read_until_sync_marker().await;
}
}
- drop(mutex);
+ drop(mutex2);
}
}
}
@@ -313,6 +313,8 @@ impl TestEventSenderFactory {
ref_count: Default::default(),
sender: self.sender.clone(),
sync_sender,
+ stdout_writer,
+ stderr_writer,
};
TestEventWorkerSender {
@@ -373,7 +375,9 @@ pub struct TestEventSender {
pub id: usize,
ref_count: Arc<()>,
sender: UnboundedSender<(usize, TestEvent)>,
- sync_sender: UnboundedSender<SendMutex>,
+ sync_sender: UnboundedSender<(SendMutex, SendMutex)>,
+ stdout_writer: PipeWrite,
+ stderr_writer: PipeWrite,
}
impl Clone for TestEventSender {
@@ -383,6 +387,8 @@ impl Clone for TestEventSender {
ref_count: self.ref_count.clone(),
sender: self.sender.clone(),
sync_sender: self.sync_sender.clone(),
+ stdout_writer: self.stdout_writer.try_clone().unwrap(),
+ stderr_writer: self.stderr_writer.try_clone().unwrap(),
}
}
}
@@ -400,12 +406,27 @@ impl TestEventSender {
/// Ensure that all output has been fully flushed by writing a sync marker into the
/// stdout and stderr streams and waiting for it on the other side.
pub fn flush(&mut self) -> Result<(), ChannelClosedError> {
- let mutex = parking_lot::RawMutex::INIT;
- mutex.lock();
- self.sync_sender.send(SendMutex(&mutex as _))?;
- if !mutex.try_lock_for(Duration::from_secs(30)) {
+ // Two phase lock: mutex1 indicates that we are done our general read phase and are ready for
+ // the sync phase. mutex2 indicates that we have completed the sync phase. This prevents deadlock
+ // when the pipe is too full to accept the sync marker.
+ let mutex1 = parking_lot::RawMutex::INIT;
+ mutex1.lock();
+ let mutex2 = parking_lot::RawMutex::INIT;
+ mutex2.lock();
+ self
+ .sync_sender
+ .send((SendMutex(&mutex1 as _), SendMutex(&mutex2 as _)))?;
+ if !mutex1.try_lock_for(Duration::from_secs(30)) {
panic!(
- "Test flush deadlock, sender closed = {}",
+ "Test flush deadlock 1, sender closed = {}",
+ self.sync_sender.is_closed()
+ );
+ }
+ _ = self.stdout_writer.write_all(SYNC_MARKER);
+ _ = self.stderr_writer.write_all(SYNC_MARKER);
+ if !mutex2.try_lock_for(Duration::from_secs(30)) {
+ panic!(
+ "Test flush deadlock 2, sender closed = {}",
self.sync_sender.is_closed()
);
}
@@ -415,9 +436,8 @@ impl TestEventSender {
#[cfg(test)]
mod tests {
- use crate::tools::test::TestResult;
-
use super::*;
+ use crate::tools::test::TestResult;
use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
@@ -499,6 +519,85 @@ mod tests {
assert_eq!(messages.len(), 100000);
}
+ /// Test that flushing a large number of times doesn't hang.
+ #[tokio::test]
+ async fn test_flush_large() {
+ test_util::timeout!(240);
+ let (mut worker, mut receiver) = create_single_test_event_channel();
+ let recv_handle = spawn(async move {
+ let mut queue = vec![];
+ while let Some((_, message)) = receiver.recv().await {
+ if let TestEvent::StepWait(..) = message {
+ queue.push(());
+ }
+ }
+ eprintln!("Receiver closed");
+ queue
+ });
+ let send_handle = spawn_blocking(move || {
+ for _ in 0..25000 {
+ // Write one pipe buffer's worth of message here. We try a few different sizes of potentially
+ // blocking writes.
+ worker.stderr.write_all(&[0; 4 * 1024]).unwrap();
+ worker.sender.send(TestEvent::StepWait(1)).unwrap();
+ worker.stderr.write_all(&[0; 16 * 1024]).unwrap();
+ worker.sender.send(TestEvent::StepWait(1)).unwrap();
+ worker.stderr.write_all(&[0; 64 * 1024]).unwrap();
+ worker.sender.send(TestEvent::StepWait(1)).unwrap();
+ worker.stderr.write_all(&[0; 128 * 1024]).unwrap();
+ worker.sender.send(TestEvent::StepWait(1)).unwrap();
+ }
+ eprintln!("Sent all messages");
+ });
+ send_handle.await.unwrap();
+ let messages = recv_handle.await.unwrap();
+ assert_eq!(messages.len(), 100000);
+ }
+
+ /// Test that flushing a large number of times doesn't hang.
+ #[tokio::test]
+ async fn test_flush_with_close() {
+ test_util::timeout!(240);
+ let (worker, mut receiver) = create_single_test_event_channel();
+ let TestEventWorkerSender {
+ mut sender,
+ stderr,
+ stdout,
+ } = worker;
+ let recv_handle = spawn(async move {
+ let mut queue = vec![];
+ while let Some((_, _)) = receiver.recv().await {
+ queue.push(());
+ }
+ eprintln!("Receiver closed");
+ queue
+ });
+ let send_handle = spawn_blocking(move || {
+ let mut stdout = Some(stdout);
+ let mut stderr = Some(stderr);
+ for i in 0..100000 {
+ if i == 20000 {
+ stdout.take();
+ }
+ if i == 40000 {
+ stderr.take();
+ }
+ if i % 2 == 0 {
+ if let Some(stdout) = &mut stdout {
+ stdout.write_all(b"message").unwrap();
+ }
+ } else if let Some(stderr) = &mut stderr {
+ stderr.write_all(b"message").unwrap();
+ }
+ sender.send(TestEvent::StepWait(1)).unwrap();
+ }
+ eprintln!("Sent all messages");
+ });
+ send_handle.await.unwrap();
+ let messages = recv_handle.await.unwrap();
+ assert_eq!(messages.len(), 130000);
+ }
+
/// Test that large numbers of interleaved steps are routed properly.
#[tokio::test]
async fn test_interleave() {