summaryrefslogtreecommitdiff
path: root/cli/tools/test/channel.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/tools/test/channel.rs')
-rw-r--r--cli/tools/test/channel.rs71
1 files changed, 71 insertions, 0 deletions
diff --git a/cli/tools/test/channel.rs b/cli/tools/test/channel.rs
index 07fee6521..3784d58f9 100644
--- a/cli/tools/test/channel.rs
+++ b/cli/tools/test/channel.rs
@@ -384,6 +384,8 @@ impl TestEventSender {
#[cfg(test)]
mod tests {
+ use crate::tools::test::TestResult;
+
use super::*;
use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
@@ -466,6 +468,75 @@ mod tests {
assert_eq!(messages.len(), 100000);
}
+ /// Test that large numbers of interleaved steps are routed properly.
+ #[tokio::test]
+ async fn test_interleave() {
+ test_util::timeout!(60);
+ const MESSAGE_COUNT: usize = 10_000;
+ let (mut worker, mut receiver) = create_single_test_event_channel();
+ let recv_handle = spawn(async move {
+ let mut i = 0;
+ while let Some((_, message)) = receiver.recv().await {
+ if i % 2 == 0 {
+ let expected_text = format!("{:08x}", i / 2).into_bytes();
+ let TestEvent::Output(TestStdioStream::Stderr, text) = message else {
+ panic!("Incorrect message: {message:?}");
+ };
+ assert_eq!(text, expected_text);
+ } else {
+ let TestEvent::Result(index, TestResult::Ok, 0) = message else {
+ panic!("Incorrect message: {message:?}");
+ };
+ assert_eq!(index, i / 2);
+ }
+ i += 1;
+ }
+ eprintln!("Receiver closed");
+ i
+ });
+ let send_handle: deno_core::unsync::JoinHandle<()> =
+ spawn_blocking(move || {
+ for i in 0..MESSAGE_COUNT {
+ worker
+ .stderr
+ .write_all(format!("{i:08x}").as_str().as_bytes())
+ .unwrap();
+ worker
+ .sender
+ .send(TestEvent::Result(i, TestResult::Ok, 0))
+ .unwrap();
+ }
+ eprintln!("Sent all messages");
+ });
+ send_handle.await.unwrap();
+ let messages = recv_handle.await.unwrap();
+ assert_eq!(messages, MESSAGE_COUNT * 2);
+ }
+
+ #[tokio::test]
+ async fn test_sender_shutdown_before_receive() {
+ test_util::timeout!(60);
+ for _ in 0..10 {
+ let (mut worker, mut receiver) = create_single_test_event_channel();
+ worker.stderr.write_all(b"hello").unwrap();
+ worker
+ .sender
+ .send(TestEvent::Result(0, TestResult::Ok, 0))
+ .unwrap();
+ drop(worker);
+ let (_, message) = receiver.recv().await.unwrap();
+ let TestEvent::Output(TestStdioStream::Stderr, text) = message else {
+ panic!("Incorrect message: {message:?}");
+ };
+ assert_eq!(text.as_slice(), b"hello");
+ let (_, message) = receiver.recv().await.unwrap();
+ let TestEvent::Result(..) = message else {
+ panic!("Incorrect message: {message:?}");
+ };
+ assert!(receiver.recv().await.is_none());
+ }
+ }
+
/// Ensure nothing panics if we're racing the runtime shutdown.
#[test]
fn test_runtime_shutdown() {