diff options
author | Matt Mastracci <matthew@mastracci.com> | 2024-02-23 11:11:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-23 11:11:15 -0700 |
commit | 5193834cf23e3521b3afd3f5f54eb00daa23c88d (patch) | |
tree | d67bcef4d96f615f345ad1679d27d73bdc2ccd39 /cli/tools/test/mod.rs | |
parent | 619acce305ac77f98327718bc1e6a1ae13d8bcf6 (diff) |
refactor(cli): clean up test runner channels (#22422)
Gets us closer to solving #20707.
Rewrites the `TestEventSender`:
- Allow for explicit creation of multiple streams. This will allow for
one-std{out,err}-per-worker
- All test events are received along with a worker ID, allowing for
eventual, proper parallel threading of test events.
In theory this should open up proper interleaving of test output,
however that is left for a future PR.
I had some plans for a better performing synchronization primitive, but
the inter-thread communication is tricky. This does, however, speed up
the processing of large numbers of tests 15-25% (possibly even more on
100,000+).
Before
```
ok | 1000 passed | 0 failed (32ms)
ok | 10000 passed | 0 failed (276ms)
```
After
```
ok | 1000 passed | 0 failed (25ms)
ok | 10000 passed | 0 failed (230ms)
```
Diffstat (limited to 'cli/tools/test/mod.rs')
-rw-r--r-- | cli/tools/test/mod.rs | 222 |
1 files changed, 47 insertions, 175 deletions
diff --git a/cli/tools/test/mod.rs b/cli/tools/test/mod.rs index b088cf7a3..e98df4671 100644 --- a/cli/tools/test/mod.rs +++ b/cli/tools/test/mod.rs @@ -37,7 +37,6 @@ use deno_core::futures::stream; use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; use deno_core::located_script_name; -use deno_core::parking_lot::Mutex; use deno_core::serde_v8; use deno_core::stats::RuntimeActivity; use deno_core::stats::RuntimeActivityDiff; @@ -71,7 +70,6 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Write as _; use std::future::poll_fn; -use std::io::Read; use std::io::Write; use std::num::NonZeroUsize; use std::path::Path; @@ -84,14 +82,16 @@ use std::time::Duration; use std::time::Instant; use std::time::SystemTime; use tokio::signal; -use tokio::sync::mpsc::unbounded_channel; -use tokio::sync::mpsc::UnboundedReceiver; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::mpsc::WeakUnboundedSender; +mod channel; pub mod fmt; pub mod reporters; +pub use channel::create_single_test_event_channel; +pub use channel::create_test_event_channel; +pub use channel::TestEventReceiver; +pub use channel::TestEventSender; +pub use channel::TestEventWorkerSender; use fmt::format_sanitizer_diff; pub use fmt::format_test_error; use reporters::CompoundTestReporter; @@ -329,13 +329,19 @@ pub struct TestPlan { pub used_only: bool, } +#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize)] +pub enum TestStdioStream { + Stdout, + Stderr, +} + #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub enum TestEvent { Register(TestDescription), Plan(TestPlan), Wait(usize), - Output(Vec<u8>), + Output(TestStdioStream, Vec<u8>), Result(usize, TestResult, u64), UncaughtError(String, Box<JsError>), StepRegister(TestStepDescription), @@ -345,6 +351,21 @@ pub enum TestEvent { Sigint, } +impl TestEvent { + // Certain messages require us to ensure that all output has been drained to ensure proper + // interleaving of output messages. + pub fn requires_stdio_sync(&self) -> bool { + matches!( + self, + TestEvent::Result(..) + | TestEvent::StepWait(..) + | TestEvent::StepResult(..) + | TestEvent::UncaughtError(..) + | TestEvent::ForceEndReport + ) + } +} + #[derive(Debug, Clone, Deserialize)] pub struct TestSummary { pub total: usize, @@ -432,7 +453,7 @@ pub async fn test_specifier( worker_factory: Arc<CliMainWorkerFactory>, permissions: Permissions, specifier: ModuleSpecifier, - mut sender: TestEventSender, + mut worker_sender: TestEventWorkerSender, fail_fast_tracker: FailFastTracker, options: TestSpecifierOptions, ) -> Result<(), AnyError> { @@ -440,7 +461,9 @@ pub async fn test_specifier( worker_factory, permissions, specifier.clone(), - &sender, + &worker_sender.sender, + StdioPipe::file(worker_sender.stdout), + StdioPipe::file(worker_sender.stderr), fail_fast_tracker, options, ) @@ -449,7 +472,7 @@ pub async fn test_specifier( Ok(()) => Ok(()), Err(error) => { if error.is::<JsError>() { - sender.send(TestEvent::UncaughtError( + worker_sender.sender.send(TestEvent::UncaughtError( specifier.to_string(), Box::new(error.downcast::<JsError>().unwrap()), ))?; @@ -463,26 +486,27 @@ pub async fn test_specifier( /// Test a single specifier as documentation containing test programs, an executable test module or /// both. +#[allow(clippy::too_many_arguments)] async fn test_specifier_inner( worker_factory: Arc<CliMainWorkerFactory>, permissions: Permissions, specifier: ModuleSpecifier, sender: &TestEventSender, + stdout: StdioPipe, + stderr: StdioPipe, fail_fast_tracker: FailFastTracker, options: TestSpecifierOptions, ) -> Result<(), AnyError> { if fail_fast_tracker.should_stop() { return Ok(()); } - let stdout = StdioPipe::File(sender.stdout()); - let stderr = StdioPipe::File(sender.stderr()); let mut worker = worker_factory .create_custom_worker( specifier.clone(), PermissionsContainer::new(permissions), vec![ops::testing::deno_test::init_ops(sender.clone())], Stdio { - stdin: StdioPipe::Inherit, + stdin: StdioPipe::inherit(), stdout, stderr, }, @@ -1062,14 +1086,13 @@ async fn test_specifiers( specifiers }; - let (sender, receiver) = unbounded_channel::<TestEvent>(); - let sender = TestEventSender::new(sender); + let (test_event_sender_factory, receiver) = create_test_event_channel(); let concurrent_jobs = options.concurrent_jobs; - let sender_ = sender.downgrade(); + let mut cancel_sender = test_event_sender_factory.weak_sender(); let sigint_handler_handle = spawn(async move { signal::ctrl_c().await.unwrap(); - sender_.upgrade().map(|s| s.send(TestEvent::Sigint).ok()); + cancel_sender.send(TestEvent::Sigint).ok(); }); HAS_TEST_RUN_SIGINT_HANDLER.store(true, Ordering::Relaxed); let reporter = get_test_reporter(&options); @@ -1078,7 +1101,7 @@ async fn test_specifiers( let join_handles = specifiers.into_iter().map(move |specifier| { let worker_factory = worker_factory.clone(); let permissions = permissions.clone(); - let sender = sender.clone(); + let worker_sender = test_event_sender_factory.worker(); let fail_fast_tracker = fail_fast_tracker.clone(); let specifier_options = options.specifier.clone(); spawn_blocking(move || { @@ -1086,12 +1109,13 @@ async fn test_specifiers( worker_factory, permissions, specifier, - sender.clone(), + worker_sender, fail_fast_tracker, specifier_options, )) }) }); + let join_stream = stream::iter(join_handles) .buffer_unordered(concurrent_jobs.get()) .collect::<Vec<Result<Result<(), AnyError>, tokio::task::JoinError>>>(); @@ -1111,9 +1135,9 @@ async fn test_specifiers( /// Gives receiver back in case it was ended with `TestEvent::ForceEndReport`. pub async fn report_tests( - mut receiver: UnboundedReceiver<TestEvent>, + mut receiver: TestEventReceiver, mut reporter: Box<dyn TestReporter>, -) -> (Result<(), AnyError>, UnboundedReceiver<TestEvent>) { +) -> (Result<(), AnyError>, TestEventReceiver) { let mut tests = IndexMap::new(); let mut test_steps = IndexMap::new(); let mut tests_started = HashSet::new(); @@ -1123,7 +1147,7 @@ pub async fn report_tests( let mut used_only = false; let mut failed = false; - while let Some(event) = receiver.recv().await { + while let Some((_, event)) = receiver.recv().await { match event { TestEvent::Register(description) => { reporter.report_register(&description); @@ -1144,7 +1168,7 @@ pub async fn report_tests( reporter.report_wait(tests.get(&id).unwrap()); } } - TestEvent::Output(output) => { + TestEvent::Output(_, output) => { reporter.report_output(&output); } TestEvent::Result(id, result, elapsed) => { @@ -1629,158 +1653,6 @@ impl FailFastTracker { } } -#[derive(Clone)] -pub struct TestEventSender { - sender: UnboundedSender<TestEvent>, - stdout_writer: TestOutputPipe, - stderr_writer: TestOutputPipe, -} - -impl TestEventSender { - pub fn new(sender: UnboundedSender<TestEvent>) -> Self { - Self { - stdout_writer: TestOutputPipe::new(sender.clone()), - stderr_writer: TestOutputPipe::new(sender.clone()), - sender, - } - } - - pub fn stdout(&self) -> std::fs::File { - self.stdout_writer.as_file() - } - - pub fn stderr(&self) -> std::fs::File { - self.stderr_writer.as_file() - } - - pub fn send(&mut self, message: TestEvent) -> Result<(), AnyError> { - // for any event that finishes collecting output, we need to - // ensure that the collected stdout and stderr pipes are flushed - if matches!( - message, - TestEvent::Result(_, _, _) - | TestEvent::StepWait(_) - | TestEvent::StepResult(_, _, _) - | TestEvent::UncaughtError(_, _) - ) { - self.flush_stdout_and_stderr()?; - } - - self.sender.send(message)?; - Ok(()) - } - - fn downgrade(&self) -> WeakUnboundedSender<TestEvent> { - self.sender.downgrade() - } - - fn flush_stdout_and_stderr(&mut self) -> Result<(), AnyError> { - self.stdout_writer.flush()?; - self.stderr_writer.flush()?; - - Ok(()) - } -} - -// use a string that if it ends up in the output won't affect how things are displayed -const ZERO_WIDTH_SPACE: &str = "\u{200B}"; - -struct TestOutputPipe { - writer: os_pipe::PipeWriter, - state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>, -} - -impl Clone for TestOutputPipe { - fn clone(&self) -> Self { - Self { - writer: self.writer.try_clone().unwrap(), - state: self.state.clone(), - } - } -} - -impl TestOutputPipe { - pub fn new(sender: UnboundedSender<TestEvent>) -> Self { - let (reader, writer) = os_pipe::pipe().unwrap(); - let state = Arc::new(Mutex::new(None)); - - start_output_redirect_thread(reader, sender, state.clone()); - - Self { writer, state } - } - - pub fn flush(&mut self) -> Result<(), AnyError> { - // We want to wake up the other thread and have it respond back - // that it's done clearing out its pipe before returning. - let (sender, receiver) = std::sync::mpsc::channel(); - if let Some(sender) = self.state.lock().replace(sender) { - let _ = sender.send(()); // just in case - } - // Bit of a hack to send a zero width space in order to wake - // the thread up. It seems that sending zero bytes here does - // not work on windows. - self.writer.write_all(ZERO_WIDTH_SPACE.as_bytes())?; - self.writer.flush()?; - // ignore the error as it might have been picked up and closed - let _ = receiver.recv(); - - Ok(()) - } - - pub fn as_file(&self) -> std::fs::File { - pipe_writer_to_file(self.writer.try_clone().unwrap()) - } -} - -#[cfg(windows)] -fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File { - use std::os::windows::prelude::FromRawHandle; - use std::os::windows::prelude::IntoRawHandle; - // SAFETY: Requires consuming ownership of the provided handle - unsafe { std::fs::File::from_raw_handle(writer.into_raw_handle()) } -} - -#[cfg(unix)] -fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File { - use std::os::unix::io::FromRawFd; - use std::os::unix::io::IntoRawFd; - // SAFETY: Requires consuming ownership of the provided handle - unsafe { std::fs::File::from_raw_fd(writer.into_raw_fd()) } -} - -fn start_output_redirect_thread( - mut pipe_reader: os_pipe::PipeReader, - sender: UnboundedSender<TestEvent>, - flush_state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>, -) { - spawn_blocking(move || loop { - let mut buffer = [0; 512]; - let size = match pipe_reader.read(&mut buffer) { - Ok(0) | Err(_) => break, - Ok(size) => size, - }; - let oneshot_sender = flush_state.lock().take(); - let mut data = &buffer[0..size]; - if data.ends_with(ZERO_WIDTH_SPACE.as_bytes()) { - data = &data[0..data.len() - ZERO_WIDTH_SPACE.len()]; - } - - if !data.is_empty() - && sender - .send(TestEvent::Output(buffer[0..size].to_vec())) - .is_err() - { - break; - } - - // Always respond back if this was set. Ideally we would also check to - // ensure the pipe reader is empty before sending back this response. - if let Some(sender) = oneshot_sender { - let _ignore = sender.send(()); - } - }); -} - #[cfg(test)] mod inner_test { use std::path::Path; |