diff options
author | Matt Mastracci <matthew@mastracci.com> | 2024-04-16 12:54:50 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-16 12:54:50 -0600 |
commit | c4d0fceec37e3842fad4aeb5afb0dfac4353390d (patch) | |
tree | f535c56fd8c122566c091993c1099174fae318e8 /cli/tools/test | |
parent | 760d64bc6b200ae58e8ee948903bf1e42b6799b5 (diff) |
fix(cli): TestEventSender should be !Clone (#23405)
`TestEventSender` should not be Clone so we don't end up with multiple
copies of the same writer FD. This is probably not the cause of the test
channel lockups, but it's a lot easier to reason about.
Diffstat (limited to 'cli/tools/test')
-rw-r--r-- | cli/tools/test/channel.rs | 16 | ||||
-rw-r--r-- | cli/tools/test/mod.rs | 230 |
2 files changed, 150 insertions, 96 deletions
diff --git a/cli/tools/test/channel.rs b/cli/tools/test/channel.rs index a024393bb..aad3f926e 100644 --- a/cli/tools/test/channel.rs +++ b/cli/tools/test/channel.rs @@ -16,7 +16,6 @@ use std::io::Write; use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::sync::Arc; use std::task::ready; use std::task::Poll; use std::time::Duration; @@ -310,7 +309,6 @@ impl TestEventSenderFactory { let sender = TestEventSender { id, - ref_count: Default::default(), sender: self.sender.clone(), sync_sender, stdout_writer, @@ -373,26 +371,12 @@ pub struct TestEventWorkerSender { /// are not guaranteed to be sent on drop unless flush is explicitly called. pub struct TestEventSender { pub id: usize, - ref_count: Arc<()>, sender: UnboundedSender<(usize, TestEvent)>, sync_sender: UnboundedSender<(SendMutex, SendMutex)>, stdout_writer: PipeWrite, stderr_writer: PipeWrite, } -impl Clone for TestEventSender { - fn clone(&self) -> Self { - Self { - id: self.id, - ref_count: self.ref_count.clone(), - sender: self.sender.clone(), - sync_sender: self.sync_sender.clone(), - stdout_writer: self.stdout_writer.try_clone().unwrap(), - stderr_writer: self.stderr_writer.try_clone().unwrap(), - } - } -} - impl TestEventSender { pub fn send(&mut self, message: TestEvent) -> Result<(), ChannelClosedError> { // Certain messages require us to ensure that all output has been drained to ensure proper diff --git a/cli/tools/test/mod.rs b/cli/tools/test/mod.rs index f48827117..390a60762 100644 --- a/cli/tools/test/mod.rs +++ b/cli/tools/test/mod.rs @@ -21,6 +21,7 @@ use crate::util::path::is_script_ext; use crate::util::path::mapped_specifier_for_tsc; use crate::util::path::matches_pattern_or_exact_path; use crate::worker::CliMainWorkerFactory; +use crate::worker::CoverageCollector; use deno_ast::swc::common::comments::CommentKind; use deno_ast::MediaType; @@ -49,6 +50,7 @@ use deno_core::unsync::spawn_blocking; use deno_core::url::Url; use deno_core::v8; use deno_core::ModuleSpecifier; +use deno_core::OpState; use deno_core::PollEventLoopOptions; use deno_runtime::deno_io::Stdio; use deno_runtime::deno_io::StdioPipe; @@ -66,6 +68,7 @@ use rand::SeedableRng; use regex::Regex; use serde::Deserialize; use std::borrow::Cow; +use std::cell::RefCell; use std::collections::BTreeMap; use std::collections::BTreeSet; use std::collections::HashMap; @@ -571,23 +574,81 @@ fn get_test_reporter(options: &TestSpecifiersOptions) -> Box<dyn TestReporter> { reporter } +async fn configure_main_worker( + worker_factory: Arc<CliMainWorkerFactory>, + specifier: &Url, + permissions: Permissions, + worker_sender: TestEventWorkerSender, + options: &TestSpecifierOptions, +) -> Result<(Option<Box<dyn CoverageCollector>>, MainWorker), anyhow::Error> { + let mut worker = worker_factory + .create_custom_worker( + specifier.clone(), + PermissionsContainer::new(permissions), + vec![ops::testing::deno_test::init_ops(worker_sender.sender)], + Stdio { + stdin: StdioPipe::inherit(), + stdout: StdioPipe::file(worker_sender.stdout), + stderr: StdioPipe::file(worker_sender.stderr), + }, + ) + .await?; + let coverage_collector = worker.maybe_setup_coverage_collector().await?; + if options.trace_leaks { + worker.execute_script_static( + located_script_name!(), + "Deno[Deno.internal].core.setLeakTracingEnabled(true);", + )?; + } + let res = worker.execute_side_module_possibly_with_npm().await; + let mut worker = worker.into_main_worker(); + match res { + Ok(()) => Ok(()), + Err(error) => { + // TODO(mmastrac): It would be nice to avoid having this error pattern repeated + if error.is::<JsError>() { + send_test_event( + &worker.js_runtime.op_state(), + TestEvent::UncaughtError( + specifier.to_string(), + Box::new(error.downcast::<JsError>().unwrap()), + ), + )?; + Ok(()) + } else { + Err(error) + } + } + }?; + Ok((coverage_collector, worker)) +} + /// Test a single specifier as documentation containing test programs, an executable test module or /// both. pub async fn test_specifier( worker_factory: Arc<CliMainWorkerFactory>, permissions: Permissions, specifier: ModuleSpecifier, - mut worker_sender: TestEventWorkerSender, + worker_sender: TestEventWorkerSender, fail_fast_tracker: FailFastTracker, options: TestSpecifierOptions, ) -> Result<(), AnyError> { - match test_specifier_inner( + if fail_fast_tracker.should_stop() { + return Ok(()); + } + let (coverage_collector, mut worker) = configure_main_worker( worker_factory, + &specifier, permissions, + worker_sender, + &options, + ) + .await?; + + match test_specifier_inner( + &mut worker, + coverage_collector, specifier.clone(), - &mut worker_sender.sender, - StdioPipe::file(worker_sender.stdout), - StdioPipe::file(worker_sender.stderr), fail_fast_tracker, options, ) @@ -595,11 +656,15 @@ pub async fn test_specifier( { Ok(()) => Ok(()), Err(error) => { + // TODO(mmastrac): It would be nice to avoid having this error pattern repeated if error.is::<JsError>() { - worker_sender.sender.send(TestEvent::UncaughtError( - specifier.to_string(), - Box::new(error.downcast::<JsError>().unwrap()), - ))?; + send_test_event( + &worker.js_runtime.op_state(), + TestEvent::UncaughtError( + specifier.to_string(), + Box::new(error.downcast::<JsError>().unwrap()), + ), + )?; Ok(()) } else { Err(error) @@ -612,51 +677,18 @@ pub async fn test_specifier( /// both. #[allow(clippy::too_many_arguments)] async fn test_specifier_inner( - worker_factory: Arc<CliMainWorkerFactory>, - permissions: Permissions, + worker: &mut MainWorker, + mut coverage_collector: Option<Box<dyn CoverageCollector>>, specifier: ModuleSpecifier, - sender: &mut TestEventSender, - stdout: StdioPipe, - stderr: StdioPipe, fail_fast_tracker: FailFastTracker, options: TestSpecifierOptions, ) -> Result<(), AnyError> { - if fail_fast_tracker.should_stop() { - return Ok(()); - } - let mut worker = worker_factory - .create_custom_worker( - specifier.clone(), - PermissionsContainer::new(permissions), - vec![ops::testing::deno_test::init_ops(sender.clone())], - Stdio { - stdin: StdioPipe::inherit(), - stdout, - stderr, - }, - ) - .await?; - - let mut coverage_collector = worker.maybe_setup_coverage_collector().await?; - - if options.trace_leaks { - worker.execute_script_static( - located_script_name!(), - "Deno[Deno.internal].core.setLeakTracingEnabled(true);", - )?; - } - - // We execute the main module as a side module so that import.meta.main is not set. - worker.execute_side_module_possibly_with_npm().await?; - - let mut worker = worker.into_main_worker(); - // Ensure that there are no pending exceptions before we start running tests worker.run_up_to_duration(Duration::from_millis(0)).await?; worker.dispatch_load_event()?; - run_tests_for_worker(&mut worker, &specifier, &options, &fail_fast_tracker) + run_tests_for_worker(worker, &specifier, &options, &fail_fast_tracker) .await?; // Ignore `defaultPrevented` of the `beforeunload` event. We don't allow the @@ -665,13 +697,18 @@ async fn test_specifier_inner( worker.dispatch_unload_event()?; // Ensure all output has been flushed - _ = sender.flush(); + _ = worker + .js_runtime + .op_state() + .borrow_mut() + .borrow_mut::<TestEventSender>() + .flush(); // Ensure the worker has settled so we can catch any remaining unhandled rejections. We don't // want to wait forever here. worker.run_up_to_duration(Duration::from_millis(0)).await?; - if let Some(coverage_collector) = coverage_collector.as_mut() { + if let Some(coverage_collector) = &mut coverage_collector { worker .js_runtime .with_event_loop_future( @@ -708,33 +745,42 @@ pub async fn poll_event_loop(worker: &mut MainWorker) -> Result<(), AnyError> { .await } +pub fn send_test_event( + op_state: &RefCell<OpState>, + event: TestEvent, +) -> Result<(), AnyError> { + Ok( + op_state + .borrow_mut() + .borrow_mut::<TestEventSender>() + .send(event)?, + ) +} + pub async fn run_tests_for_worker( worker: &mut MainWorker, specifier: &ModuleSpecifier, options: &TestSpecifierOptions, fail_fast_tracker: &FailFastTracker, ) -> Result<(), AnyError> { - let (TestContainer(tests, test_functions), mut sender) = { - let state_rc = worker.js_runtime.op_state(); - let mut state = state_rc.borrow_mut(); - ( - std::mem::take(&mut *state.borrow_mut::<TestContainer>()), - state.borrow::<TestEventSender>().clone(), - ) - }; + let state_rc = worker.js_runtime.op_state(); + // Take whatever tests have been registered + let TestContainer(tests, test_functions) = + std::mem::take(&mut *state_rc.borrow_mut().borrow_mut::<TestContainer>()); + let tests: Arc<TestDescriptions> = tests.into(); - sender.send(TestEvent::Register(tests.clone()))?; + send_test_event(&state_rc, TestEvent::Register(tests.clone()))?; let res = run_tests_for_worker_inner( worker, specifier, tests, test_functions, - &mut sender, options, fail_fast_tracker, ) .await; - _ = sender.send(TestEvent::Completed); + + _ = send_test_event(&state_rc, TestEvent::Completed); res } @@ -743,11 +789,11 @@ async fn run_tests_for_worker_inner( specifier: &ModuleSpecifier, tests: Arc<TestDescriptions>, test_functions: Vec<v8::Global<v8::Function>>, - sender: &mut TestEventSender, options: &TestSpecifierOptions, fail_fast_tracker: &FailFastTracker, ) -> Result<(), AnyError> { let unfiltered = tests.len(); + let state_rc = worker.js_runtime.op_state(); // Build the test plan in a single pass let mut tests_to_run = Vec::with_capacity(tests.len()); @@ -775,12 +821,15 @@ async fn run_tests_for_worker_inner( tests_to_run.shuffle(&mut SmallRng::seed_from_u64(seed)); } - sender.send(TestEvent::Plan(TestPlan { - origin: specifier.to_string(), - total: tests_to_run.len(), - filtered_out: unfiltered - tests_to_run.len(), - used_only, - }))?; + send_test_event( + &state_rc, + TestEvent::Plan(TestPlan { + origin: specifier.to_string(), + total: tests_to_run.len(), + filtered_out: unfiltered - tests_to_run.len(), + used_only, + }), + )?; let mut had_uncaught_error = false; let stats = worker.js_runtime.runtime_activity_stats_factory(); @@ -835,14 +884,20 @@ async fn run_tests_for_worker_inner( .try_take::<deno_runtime::deno_fetch::reqwest::Client>(); if desc.ignore { - sender.send(TestEvent::Result(desc.id, TestResult::Ignored, 0))?; + send_test_event( + &state_rc, + TestEvent::Result(desc.id, TestResult::Ignored, 0), + )?; continue; } if had_uncaught_error { - sender.send(TestEvent::Result(desc.id, TestResult::Cancelled, 0))?; + send_test_event( + &state_rc, + TestEvent::Result(desc.id, TestResult::Cancelled, 0), + )?; continue; } - sender.send(TestEvent::Wait(desc.id))?; + send_test_event(&state_rc, TestEvent::Wait(desc.id))?; // Poll event loop once, to allow all ops that are already resolved, but haven't // responded to settle. @@ -863,12 +918,18 @@ async fn run_tests_for_worker_inner( Ok(r) => r, Err(error) => { if error.is::<JsError>() { - sender.send(TestEvent::UncaughtError( - specifier.to_string(), - Box::new(error.downcast::<JsError>().unwrap()), - ))?; + send_test_event( + &state_rc, + TestEvent::UncaughtError( + specifier.to_string(), + Box::new(error.downcast::<JsError>().unwrap()), + ), + )?; fail_fast_tracker.add_failure(); - sender.send(TestEvent::Result(desc.id, TestResult::Cancelled, 0))?; + send_test_event( + &state_rc, + TestEvent::Result(desc.id, TestResult::Cancelled, 0), + )?; had_uncaught_error = true; continue; } else { @@ -886,7 +947,10 @@ async fn run_tests_for_worker_inner( if matches!(result, TestResult::Failed(_)) { fail_fast_tracker.add_failure(); let elapsed = earlier.elapsed().as_millis(); - sender.send(TestEvent::Result(desc.id, result, elapsed as u64))?; + send_test_event( + &state_rc, + TestEvent::Result(desc.id, result, elapsed as u64), + )?; continue; } @@ -907,17 +971,23 @@ async fn run_tests_for_worker_inner( let failure = TestFailure::Leaked(formatted, trailer_notes); fail_fast_tracker.add_failure(); let elapsed = earlier.elapsed().as_millis(); - sender.send(TestEvent::Result( - desc.id, - TestResult::Failed(failure), - elapsed as u64, - ))?; + send_test_event( + &state_rc, + TestEvent::Result( + desc.id, + TestResult::Failed(failure), + elapsed as u64, + ), + )?; continue; } } let elapsed = earlier.elapsed().as_millis(); - sender.send(TestEvent::Result(desc.id, result, elapsed as u64))?; + send_test_event( + &state_rc, + TestEvent::Result(desc.id, result, elapsed as u64), + )?; } Ok(()) } |