summaryrefslogtreecommitdiff
path: root/cli/tools/test.rs
diff options
context:
space:
mode:
authorDavid Sherret <dsherret@users.noreply.github.com>2022-05-04 17:01:51 -0400
committerGitHub <noreply@github.com>2022-05-04 17:01:51 -0400
commitd79a869ee6aae578ffcde1a12fd55a4c8c01c0bf (patch)
treed4942844803916c02db32178f312c09d9f549632 /cli/tools/test.rs
parentfb390c57013a51338a58e33ed059c5604c14ab55 (diff)
chore: fix flaky `steps_output_within` test (#14479)
Diffstat (limited to 'cli/tools/test.rs')
-rw-r--r--cli/tools/test.rs102
1 files changed, 75 insertions, 27 deletions
diff --git a/cli/tools/test.rs b/cli/tools/test.rs
index d3ab9dbb0..096be383d 100644
--- a/cli/tools/test.rs
+++ b/cli/tools/test.rs
@@ -36,6 +36,7 @@ use deno_core::futures::future;
use deno_core::futures::stream;
use deno_core::futures::FutureExt;
use deno_core::futures::StreamExt;
+use deno_core::parking_lot::Mutex;
use deno_core::serde_json::json;
use deno_core::url::Url;
use deno_core::ModuleSpecifier;
@@ -1449,43 +1450,28 @@ pub async fn run_tests_with_watch(
Ok(())
}
+#[derive(Clone)]
pub struct TestEventSender {
sender: UnboundedSender<TestEvent>,
- stdout_writer: os_pipe::PipeWriter,
- stderr_writer: os_pipe::PipeWriter,
-}
-
-impl Clone for TestEventSender {
- fn clone(&self) -> Self {
- Self {
- sender: self.sender.clone(),
- stdout_writer: self.stdout_writer.try_clone().unwrap(),
- stderr_writer: self.stderr_writer.try_clone().unwrap(),
- }
- }
+ stdout_writer: TestOutputPipe,
+ stderr_writer: TestOutputPipe,
}
impl TestEventSender {
pub fn new(sender: UnboundedSender<TestEvent>) -> Self {
- let (stdout_reader, stdout_writer) = os_pipe::pipe().unwrap();
- let (stderr_reader, stderr_writer) = os_pipe::pipe().unwrap();
-
- start_output_redirect_thread(stdout_reader, sender.clone());
- start_output_redirect_thread(stderr_reader, sender.clone());
-
Self {
+ stdout_writer: TestOutputPipe::new(sender.clone()),
+ stderr_writer: TestOutputPipe::new(sender.clone()),
sender,
- stdout_writer,
- stderr_writer,
}
}
pub fn stdout(&self) -> std::fs::File {
- pipe_writer_to_file(self.stdout_writer.try_clone().unwrap())
+ self.stdout_writer.as_file()
}
pub fn stderr(&self) -> std::fs::File {
- pipe_writer_to_file(self.stderr_writer.try_clone().unwrap())
+ self.stderr_writer.as_file()
}
pub fn send(&mut self, message: TestEvent) -> Result<(), AnyError> {
@@ -1497,13 +1483,62 @@ impl TestEventSender {
| TestEvent::StepWait(_)
| TestEvent::StepResult(_, _, _)
) {
- self.stdout_writer.flush().unwrap();
- self.stderr_writer.flush().unwrap();
+ self.flush_stdout_and_stderr();
}
self.sender.send(message)?;
Ok(())
}
+
+ fn flush_stdout_and_stderr(&mut self) {
+ self.stdout_writer.flush();
+ self.stderr_writer.flush();
+ }
+}
+
+// use a string that if it ends up in the output won't affect how things are displayed
+const ZERO_WIDTH_SPACE: &str = "\u{200B}";
+
+struct TestOutputPipe {
+ writer: os_pipe::PipeWriter,
+ state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
+}
+
+impl Clone for TestOutputPipe {
+ fn clone(&self) -> Self {
+ Self {
+ writer: self.writer.try_clone().unwrap(),
+ state: self.state.clone(),
+ }
+ }
+}
+
+impl TestOutputPipe {
+ pub fn new(sender: UnboundedSender<TestEvent>) -> Self {
+ let (reader, writer) = os_pipe::pipe().unwrap();
+ let state = Arc::new(Mutex::new(None));
+
+ start_output_redirect_thread(reader, sender, state.clone());
+
+ Self { writer, state }
+ }
+
+ pub fn flush(&mut self) {
+ // We want to wake up the other thread and have it respond back
+ // that it's done clearing out its pipe before returning.
+ let (sender, receiver) = std::sync::mpsc::channel();
+ self.state.lock().replace(sender);
+ // Bit of a hack in order to send a zero width space in order
+ // to wake the thread up. It seems that sending zero bytes
+ // does not work here on windows.
+ self.writer.write_all(ZERO_WIDTH_SPACE.as_bytes()).unwrap();
+ self.writer.flush().unwrap();
+ receiver.recv().unwrap();
+ }
+
+ pub fn as_file(&self) -> std::fs::File {
+ pipe_writer_to_file(self.writer.try_clone().unwrap())
+ }
}
#[cfg(windows)]
@@ -1525,6 +1560,7 @@ fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File {
fn start_output_redirect_thread(
mut pipe_reader: os_pipe::PipeReader,
sender: UnboundedSender<TestEvent>,
+ flush_state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
) {
tokio::task::spawn_blocking(move || loop {
let mut buffer = [0; 512];
@@ -1532,12 +1568,24 @@ fn start_output_redirect_thread(
Ok(0) | Err(_) => break,
Ok(size) => size,
};
+ let oneshot_sender = flush_state.lock().take();
+ let mut data = &buffer[0..size];
+ if data.ends_with(ZERO_WIDTH_SPACE.as_bytes()) {
+ data = &data[0..data.len() - ZERO_WIDTH_SPACE.len()];
+ }
- if sender
- .send(TestEvent::Output(buffer[0..size].to_vec()))
- .is_err()
+ if !data.is_empty()
+ && sender
+ .send(TestEvent::Output(buffer[0..size].to_vec()))
+ .is_err()
{
break;
}
+
+ // Always respond back if this was set. Ideally we would also check to
+ // ensure the pipe reader is empty before sending back this response.
+ if let Some(sender) = oneshot_sender {
+ let _ignore = sender.send(());
+ }
});
}