summaryrefslogtreecommitdiff
path: root/cli/tools/test/mod.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2024-02-23 11:11:15 -0700
committerGitHub <noreply@github.com>2024-02-23 11:11:15 -0700
commit5193834cf23e3521b3afd3f5f54eb00daa23c88d (patch)
treed67bcef4d96f615f345ad1679d27d73bdc2ccd39 /cli/tools/test/mod.rs
parent619acce305ac77f98327718bc1e6a1ae13d8bcf6 (diff)
refactor(cli): clean up test runner channels (#22422)
Gets us closer to solving #20707. Rewrites the `TestEventSender`: - Allow for explicit creation of multiple streams. This will allow for one-std{out,err}-per-worker - All test events are received along with a worker ID, allowing for eventual, proper parallel threading of test events. In theory this should open up proper interleaving of test output, however that is left for a future PR. I had some plans for a better performing synchronization primitive, but the inter-thread communication is tricky. This does, however, speed up the processing of large numbers of tests 15-25% (possibly even more on 100,000+). Before ``` ok | 1000 passed | 0 failed (32ms) ok | 10000 passed | 0 failed (276ms) ``` After ``` ok | 1000 passed | 0 failed (25ms) ok | 10000 passed | 0 failed (230ms) ```
Diffstat (limited to 'cli/tools/test/mod.rs')
-rw-r--r--cli/tools/test/mod.rs222
1 files changed, 47 insertions, 175 deletions
diff --git a/cli/tools/test/mod.rs b/cli/tools/test/mod.rs
index b088cf7a3..e98df4671 100644
--- a/cli/tools/test/mod.rs
+++ b/cli/tools/test/mod.rs
@@ -37,7 +37,6 @@ use deno_core::futures::stream;
use deno_core::futures::FutureExt;
use deno_core::futures::StreamExt;
use deno_core::located_script_name;
-use deno_core::parking_lot::Mutex;
use deno_core::serde_v8;
use deno_core::stats::RuntimeActivity;
use deno_core::stats::RuntimeActivityDiff;
@@ -71,7 +70,6 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Write as _;
use std::future::poll_fn;
-use std::io::Read;
use std::io::Write;
use std::num::NonZeroUsize;
use std::path::Path;
@@ -84,14 +82,16 @@ use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use tokio::signal;
-use tokio::sync::mpsc::unbounded_channel;
-use tokio::sync::mpsc::UnboundedReceiver;
-use tokio::sync::mpsc::UnboundedSender;
-use tokio::sync::mpsc::WeakUnboundedSender;
+mod channel;
pub mod fmt;
pub mod reporters;
+pub use channel::create_single_test_event_channel;
+pub use channel::create_test_event_channel;
+pub use channel::TestEventReceiver;
+pub use channel::TestEventSender;
+pub use channel::TestEventWorkerSender;
use fmt::format_sanitizer_diff;
pub use fmt::format_test_error;
use reporters::CompoundTestReporter;
@@ -329,13 +329,19 @@ pub struct TestPlan {
pub used_only: bool,
}
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize)]
+pub enum TestStdioStream {
+ Stdout,
+ Stderr,
+}
+
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum TestEvent {
Register(TestDescription),
Plan(TestPlan),
Wait(usize),
- Output(Vec<u8>),
+ Output(TestStdioStream, Vec<u8>),
Result(usize, TestResult, u64),
UncaughtError(String, Box<JsError>),
StepRegister(TestStepDescription),
@@ -345,6 +351,21 @@ pub enum TestEvent {
Sigint,
}
+impl TestEvent {
+ // Certain messages require us to ensure that all output has been drained to ensure proper
+ // interleaving of output messages.
+ pub fn requires_stdio_sync(&self) -> bool {
+ matches!(
+ self,
+ TestEvent::Result(..)
+ | TestEvent::StepWait(..)
+ | TestEvent::StepResult(..)
+ | TestEvent::UncaughtError(..)
+ | TestEvent::ForceEndReport
+ )
+ }
+}
+
#[derive(Debug, Clone, Deserialize)]
pub struct TestSummary {
pub total: usize,
@@ -432,7 +453,7 @@ pub async fn test_specifier(
worker_factory: Arc<CliMainWorkerFactory>,
permissions: Permissions,
specifier: ModuleSpecifier,
- mut sender: TestEventSender,
+ mut worker_sender: TestEventWorkerSender,
fail_fast_tracker: FailFastTracker,
options: TestSpecifierOptions,
) -> Result<(), AnyError> {
@@ -440,7 +461,9 @@ pub async fn test_specifier(
worker_factory,
permissions,
specifier.clone(),
- &sender,
+ &worker_sender.sender,
+ StdioPipe::file(worker_sender.stdout),
+ StdioPipe::file(worker_sender.stderr),
fail_fast_tracker,
options,
)
@@ -449,7 +472,7 @@ pub async fn test_specifier(
Ok(()) => Ok(()),
Err(error) => {
if error.is::<JsError>() {
- sender.send(TestEvent::UncaughtError(
+ worker_sender.sender.send(TestEvent::UncaughtError(
specifier.to_string(),
Box::new(error.downcast::<JsError>().unwrap()),
))?;
@@ -463,26 +486,27 @@ pub async fn test_specifier(
/// Test a single specifier as documentation containing test programs, an executable test module or
/// both.
+#[allow(clippy::too_many_arguments)]
async fn test_specifier_inner(
worker_factory: Arc<CliMainWorkerFactory>,
permissions: Permissions,
specifier: ModuleSpecifier,
sender: &TestEventSender,
+ stdout: StdioPipe,
+ stderr: StdioPipe,
fail_fast_tracker: FailFastTracker,
options: TestSpecifierOptions,
) -> Result<(), AnyError> {
if fail_fast_tracker.should_stop() {
return Ok(());
}
- let stdout = StdioPipe::File(sender.stdout());
- let stderr = StdioPipe::File(sender.stderr());
let mut worker = worker_factory
.create_custom_worker(
specifier.clone(),
PermissionsContainer::new(permissions),
vec![ops::testing::deno_test::init_ops(sender.clone())],
Stdio {
- stdin: StdioPipe::Inherit,
+ stdin: StdioPipe::inherit(),
stdout,
stderr,
},
@@ -1062,14 +1086,13 @@ async fn test_specifiers(
specifiers
};
- let (sender, receiver) = unbounded_channel::<TestEvent>();
- let sender = TestEventSender::new(sender);
+ let (test_event_sender_factory, receiver) = create_test_event_channel();
let concurrent_jobs = options.concurrent_jobs;
- let sender_ = sender.downgrade();
+ let mut cancel_sender = test_event_sender_factory.weak_sender();
let sigint_handler_handle = spawn(async move {
signal::ctrl_c().await.unwrap();
- sender_.upgrade().map(|s| s.send(TestEvent::Sigint).ok());
+ cancel_sender.send(TestEvent::Sigint).ok();
});
HAS_TEST_RUN_SIGINT_HANDLER.store(true, Ordering::Relaxed);
let reporter = get_test_reporter(&options);
@@ -1078,7 +1101,7 @@ async fn test_specifiers(
let join_handles = specifiers.into_iter().map(move |specifier| {
let worker_factory = worker_factory.clone();
let permissions = permissions.clone();
- let sender = sender.clone();
+ let worker_sender = test_event_sender_factory.worker();
let fail_fast_tracker = fail_fast_tracker.clone();
let specifier_options = options.specifier.clone();
spawn_blocking(move || {
@@ -1086,12 +1109,13 @@ async fn test_specifiers(
worker_factory,
permissions,
specifier,
- sender.clone(),
+ worker_sender,
fail_fast_tracker,
specifier_options,
))
})
});
+
let join_stream = stream::iter(join_handles)
.buffer_unordered(concurrent_jobs.get())
.collect::<Vec<Result<Result<(), AnyError>, tokio::task::JoinError>>>();
@@ -1111,9 +1135,9 @@ async fn test_specifiers(
/// Gives receiver back in case it was ended with `TestEvent::ForceEndReport`.
pub async fn report_tests(
- mut receiver: UnboundedReceiver<TestEvent>,
+ mut receiver: TestEventReceiver,
mut reporter: Box<dyn TestReporter>,
-) -> (Result<(), AnyError>, UnboundedReceiver<TestEvent>) {
+) -> (Result<(), AnyError>, TestEventReceiver) {
let mut tests = IndexMap::new();
let mut test_steps = IndexMap::new();
let mut tests_started = HashSet::new();
@@ -1123,7 +1147,7 @@ pub async fn report_tests(
let mut used_only = false;
let mut failed = false;
- while let Some(event) = receiver.recv().await {
+ while let Some((_, event)) = receiver.recv().await {
match event {
TestEvent::Register(description) => {
reporter.report_register(&description);
@@ -1144,7 +1168,7 @@ pub async fn report_tests(
reporter.report_wait(tests.get(&id).unwrap());
}
}
- TestEvent::Output(output) => {
+ TestEvent::Output(_, output) => {
reporter.report_output(&output);
}
TestEvent::Result(id, result, elapsed) => {
@@ -1629,158 +1653,6 @@ impl FailFastTracker {
}
}
-#[derive(Clone)]
-pub struct TestEventSender {
- sender: UnboundedSender<TestEvent>,
- stdout_writer: TestOutputPipe,
- stderr_writer: TestOutputPipe,
-}
-
-impl TestEventSender {
- pub fn new(sender: UnboundedSender<TestEvent>) -> Self {
- Self {
- stdout_writer: TestOutputPipe::new(sender.clone()),
- stderr_writer: TestOutputPipe::new(sender.clone()),
- sender,
- }
- }
-
- pub fn stdout(&self) -> std::fs::File {
- self.stdout_writer.as_file()
- }
-
- pub fn stderr(&self) -> std::fs::File {
- self.stderr_writer.as_file()
- }
-
- pub fn send(&mut self, message: TestEvent) -> Result<(), AnyError> {
- // for any event that finishes collecting output, we need to
- // ensure that the collected stdout and stderr pipes are flushed
- if matches!(
- message,
- TestEvent::Result(_, _, _)
- | TestEvent::StepWait(_)
- | TestEvent::StepResult(_, _, _)
- | TestEvent::UncaughtError(_, _)
- ) {
- self.flush_stdout_and_stderr()?;
- }
-
- self.sender.send(message)?;
- Ok(())
- }
-
- fn downgrade(&self) -> WeakUnboundedSender<TestEvent> {
- self.sender.downgrade()
- }
-
- fn flush_stdout_and_stderr(&mut self) -> Result<(), AnyError> {
- self.stdout_writer.flush()?;
- self.stderr_writer.flush()?;
-
- Ok(())
- }
-}
-
-// use a string that if it ends up in the output won't affect how things are displayed
-const ZERO_WIDTH_SPACE: &str = "\u{200B}";
-
-struct TestOutputPipe {
- writer: os_pipe::PipeWriter,
- state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
-}
-
-impl Clone for TestOutputPipe {
- fn clone(&self) -> Self {
- Self {
- writer: self.writer.try_clone().unwrap(),
- state: self.state.clone(),
- }
- }
-}
-
-impl TestOutputPipe {
- pub fn new(sender: UnboundedSender<TestEvent>) -> Self {
- let (reader, writer) = os_pipe::pipe().unwrap();
- let state = Arc::new(Mutex::new(None));
-
- start_output_redirect_thread(reader, sender, state.clone());
-
- Self { writer, state }
- }
-
- pub fn flush(&mut self) -> Result<(), AnyError> {
- // We want to wake up the other thread and have it respond back
- // that it's done clearing out its pipe before returning.
- let (sender, receiver) = std::sync::mpsc::channel();
- if let Some(sender) = self.state.lock().replace(sender) {
- let _ = sender.send(()); // just in case
- }
- // Bit of a hack to send a zero width space in order to wake
- // the thread up. It seems that sending zero bytes here does
- // not work on windows.
- self.writer.write_all(ZERO_WIDTH_SPACE.as_bytes())?;
- self.writer.flush()?;
- // ignore the error as it might have been picked up and closed
- let _ = receiver.recv();
-
- Ok(())
- }
-
- pub fn as_file(&self) -> std::fs::File {
- pipe_writer_to_file(self.writer.try_clone().unwrap())
- }
-}
-
-#[cfg(windows)]
-fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File {
- use std::os::windows::prelude::FromRawHandle;
- use std::os::windows::prelude::IntoRawHandle;
- // SAFETY: Requires consuming ownership of the provided handle
- unsafe { std::fs::File::from_raw_handle(writer.into_raw_handle()) }
-}
-
-#[cfg(unix)]
-fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File {
- use std::os::unix::io::FromRawFd;
- use std::os::unix::io::IntoRawFd;
- // SAFETY: Requires consuming ownership of the provided handle
- unsafe { std::fs::File::from_raw_fd(writer.into_raw_fd()) }
-}
-
-fn start_output_redirect_thread(
- mut pipe_reader: os_pipe::PipeReader,
- sender: UnboundedSender<TestEvent>,
- flush_state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
-) {
- spawn_blocking(move || loop {
- let mut buffer = [0; 512];
- let size = match pipe_reader.read(&mut buffer) {
- Ok(0) | Err(_) => break,
- Ok(size) => size,
- };
- let oneshot_sender = flush_state.lock().take();
- let mut data = &buffer[0..size];
- if data.ends_with(ZERO_WIDTH_SPACE.as_bytes()) {
- data = &data[0..data.len() - ZERO_WIDTH_SPACE.len()];
- }
-
- if !data.is_empty()
- && sender
- .send(TestEvent::Output(buffer[0..size].to_vec()))
- .is_err()
- {
- break;
- }
-
- // Always respond back if this was set. Ideally we would also check to
- // ensure the pipe reader is empty before sending back this response.
- if let Some(sender) = oneshot_sender {
- let _ignore = sender.send(());
- }
- });
-}
-
#[cfg(test)]
mod inner_test {
use std::path::Path;