summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/lsp/testing/execution.rs30
-rw-r--r--cli/tools/jupyter/mod.rs26
-rw-r--r--cli/tools/repl/mod.rs9
-rw-r--r--cli/tools/repl/session.rs5
-rw-r--r--cli/tools/test/channel.rs491
-rw-r--r--cli/tools/test/mod.rs222
-rw-r--r--ext/io/lib.rs55
-rw-r--r--ext/io/pipe.rs161
8 files changed, 728 insertions, 271 deletions
diff --git a/cli/lsp/testing/execution.rs b/cli/lsp/testing/execution.rs
index 11882e6af..5fd1feb5a 100644
--- a/cli/lsp/testing/execution.rs
+++ b/cli/lsp/testing/execution.rs
@@ -12,8 +12,8 @@ use crate::lsp::client::TestingNotification;
use crate::lsp::config;
use crate::lsp::logging::lsp_log;
use crate::tools::test;
+use crate::tools::test::create_test_event_channel;
use crate::tools::test::FailFastTracker;
-use crate::tools::test::TestEventSender;
use deno_core::anyhow::anyhow;
use deno_core::error::AnyError;
@@ -35,7 +35,6 @@ use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
-use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tower_lsp::lsp_types as lsp;
@@ -246,8 +245,7 @@ impl TestRun {
unreachable!("Should always be Test subcommand.");
};
- let (sender, mut receiver) = mpsc::unbounded_channel::<test::TestEvent>();
- let sender = TestEventSender::new(sender);
+ let (test_event_sender_factory, mut receiver) = create_test_event_channel();
let fail_fast_tracker = FailFastTracker::new(fail_fast);
let mut queue = self.queue.iter().collect::<Vec<&ModuleSpecifier>>();
@@ -263,7 +261,7 @@ impl TestRun {
let specifier = specifier.clone();
let worker_factory = worker_factory.clone();
let permissions = permissions.clone();
- let mut sender = sender.clone();
+ let worker_sender = test_event_sender_factory.worker();
let fail_fast_tracker = fail_fast_tracker.clone();
let lsp_filter = self.filters.get(&specifier);
let filter = test::TestFilter {
@@ -284,15 +282,16 @@ impl TestRun {
if fail_fast_tracker.should_stop() {
return Ok(());
}
- let origin = specifier.to_string();
- let file_result = if token.is_cancelled() {
+ if token.is_cancelled() {
Ok(())
} else {
+ // All JsErrors are handled by test_specifier and piped into the test
+ // channel.
create_and_run_current_thread(test::test_specifier(
worker_factory,
permissions,
specifier,
- sender.clone(),
+ worker_sender,
fail_fast_tracker,
test::TestSpecifierOptions {
filter,
@@ -300,18 +299,7 @@ impl TestRun {
trace_ops: false,
},
))
- };
- if let Err(error) = file_result {
- if error.is::<JsError>() {
- sender.send(test::TestEvent::UncaughtError(
- origin,
- Box::new(error.downcast::<JsError>().unwrap()),
- ))?;
- } else {
- return Err(error);
- }
}
- Ok(())
})
});
@@ -333,7 +321,7 @@ impl TestRun {
let mut tests_with_result = HashSet::new();
let mut used_only = false;
- while let Some(event) = receiver.recv().await {
+ while let Some((_, event)) = receiver.recv().await {
match event {
test::TestEvent::Register(description) => {
reporter.report_register(&description);
@@ -352,7 +340,7 @@ impl TestRun {
test::TestEvent::Wait(id) => {
reporter.report_wait(tests.read().get(&id).unwrap());
}
- test::TestEvent::Output(output) => {
+ test::TestEvent::Output(_, output) => {
reporter.report_output(&output);
}
test::TestEvent::Result(id, result, elapsed) => {
diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs
index ea58328bb..cf1a44ea5 100644
--- a/cli/tools/jupyter/mod.rs
+++ b/cli/tools/jupyter/mod.rs
@@ -5,6 +5,9 @@ use crate::args::JupyterFlags;
use crate::ops;
use crate::tools::jupyter::server::StdioMsg;
use crate::tools::repl;
+use crate::tools::test::create_single_test_event_channel;
+use crate::tools::test::reporters::PrettyTestReporter;
+use crate::tools::test::TestEventWorkerSender;
use crate::util::logger;
use crate::CliFactory;
use deno_core::anyhow::Context;
@@ -19,13 +22,8 @@ use deno_runtime::permissions::Permissions;
use deno_runtime::permissions::PermissionsContainer;
use deno_terminal::colors;
use tokio::sync::mpsc;
-use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedSender;
-use super::test::reporters::PrettyTestReporter;
-use super::test::TestEvent;
-use super::test::TestEventSender;
-
mod install;
pub(crate) mod jupyter_msg;
pub(crate) mod server;
@@ -79,11 +77,13 @@ pub async fn kernel(
connection_filepath
)
})?;
- let (test_event_sender, test_event_receiver) =
- unbounded_channel::<TestEvent>();
- let test_event_sender = TestEventSender::new(test_event_sender);
- let stdout = StdioPipe::File(test_event_sender.stdout());
- let stderr = StdioPipe::File(test_event_sender.stderr());
+ let (worker, test_event_receiver) = create_single_test_event_channel();
+ let TestEventWorkerSender {
+ sender: test_event_sender,
+ stdout,
+ stderr,
+ } = worker;
+
let mut worker = worker_factory
.create_custom_worker(
main_module.clone(),
@@ -94,9 +94,9 @@ pub async fn kernel(
],
// FIXME(nayeemrmn): Test output capturing currently doesn't work.
Stdio {
- stdin: StdioPipe::Inherit,
- stdout,
- stderr,
+ stdin: StdioPipe::inherit(),
+ stdout: StdioPipe::file(stdout),
+ stderr: StdioPipe::file(stderr),
},
)
.await?;
diff --git a/cli/tools/repl/mod.rs b/cli/tools/repl/mod.rs
index e40c6362a..98519b60d 100644
--- a/cli/tools/repl/mod.rs
+++ b/cli/tools/repl/mod.rs
@@ -16,7 +16,6 @@ use deno_core::unsync::spawn_blocking;
use deno_runtime::permissions::Permissions;
use deno_runtime::permissions::PermissionsContainer;
use rustyline::error::ReadlineError;
-use tokio::sync::mpsc::unbounded_channel;
mod channel;
mod editor;
@@ -32,8 +31,7 @@ pub use session::EvaluationOutput;
pub use session::ReplSession;
pub use session::REPL_INTERNALS_NAME;
-use super::test::TestEvent;
-use super::test::TestEventSender;
+use super::test::create_single_test_event_channel;
struct Repl {
session: ReplSession,
@@ -168,9 +166,8 @@ pub async fn run(flags: Flags, repl_flags: ReplFlags) -> Result<i32, AnyError> {
.deno_dir()
.ok()
.and_then(|dir| dir.repl_history_file_path());
- let (test_event_sender, test_event_receiver) =
- unbounded_channel::<TestEvent>();
- let test_event_sender = TestEventSender::new(test_event_sender);
+ let (worker, test_event_receiver) = create_single_test_event_channel();
+ let test_event_sender = worker.sender;
let mut worker = worker_factory
.create_custom_worker(
main_module.clone(),
diff --git a/cli/tools/repl/session.rs b/cli/tools/repl/session.rs
index a52eb095f..65e27136f 100644
--- a/cli/tools/repl/session.rs
+++ b/cli/tools/repl/session.rs
@@ -14,6 +14,7 @@ use crate::tools::test::reporters::TestReporter;
use crate::tools::test::run_tests_for_worker;
use crate::tools::test::worker_has_tests;
use crate::tools::test::TestEvent;
+use crate::tools::test::TestEventReceiver;
use crate::tools::test::TestEventSender;
use deno_ast::diagnostics::Diagnostic;
@@ -183,7 +184,7 @@ pub struct ReplSession {
test_reporter_factory: Box<dyn Fn() -> Box<dyn TestReporter>>,
test_event_sender: TestEventSender,
/// This is only optional because it's temporarily taken when evaluating.
- test_event_receiver: Option<tokio::sync::mpsc::UnboundedReceiver<TestEvent>>,
+ test_event_receiver: Option<TestEventReceiver>,
jsx: ReplJsxState,
experimental_decorators: bool,
}
@@ -196,7 +197,7 @@ impl ReplSession {
mut worker: MainWorker,
main_module: ModuleSpecifier,
test_event_sender: TestEventSender,
- test_event_receiver: tokio::sync::mpsc::UnboundedReceiver<TestEvent>,
+ test_event_receiver: TestEventReceiver,
) -> Result<Self, AnyError> {
let language_server = ReplLanguageServer::new_initialized().await?;
let mut session = worker.create_inspector_session().await;
diff --git a/cli/tools/test/channel.rs b/cli/tools/test/channel.rs
new file mode 100644
index 000000000..07fee6521
--- /dev/null
+++ b/cli/tools/test/channel.rs
@@ -0,0 +1,491 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use super::TestEvent;
+use super::TestStdioStream;
+use deno_core::futures::future::poll_fn;
+use deno_core::parking_lot;
+use deno_core::parking_lot::lock_api::RawMutex;
+use deno_runtime::deno_io::pipe;
+use deno_runtime::deno_io::AsyncPipeRead;
+use deno_runtime::deno_io::PipeRead;
+use deno_runtime::deno_io::PipeWrite;
+use std::fmt::Display;
+use std::io::Write;
+use std::pin::Pin;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use tokio::io::AsyncRead;
+use tokio::io::AsyncReadExt;
+use tokio::io::ReadBuf;
+use tokio::sync::mpsc::error::SendError;
+use tokio::sync::mpsc::UnboundedReceiver;
+use tokio::sync::mpsc::UnboundedSender;
+use tokio::sync::mpsc::WeakUnboundedSender;
+
+/// 8-byte sync marker that is unlikely to appear in normal output. Equivalent
+/// to the string `"\u{200B}\0\u{200B}\0"`.
+const SYNC_MARKER: &[u8; 8] = &[226, 128, 139, 0, 226, 128, 139, 0];
+
+const BUFFER_SIZE: usize = 4096;
+
+/// The test channel has been closed and cannot be used to send further messages.
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+pub struct ChannelClosedError;
+
+impl std::error::Error for ChannelClosedError {}
+
+impl Display for ChannelClosedError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str("Test channel closed")
+ }
+}
+
+impl<T> From<SendError<T>> for ChannelClosedError {
+ fn from(_: SendError<T>) -> Self {
+ Self
+ }
+}
+
+#[repr(transparent)]
+struct SendMutex(*const parking_lot::RawMutex);
+impl Drop for SendMutex {
+ fn drop(&mut self) {
+ // SAFETY: We know this was locked by the sender
+ unsafe {
+ (*self.0).unlock();
+ }
+ }
+}
+
+// SAFETY: This is a mutex, so it's safe to send a pointer to it
+unsafe impl Send for SendMutex {}
+
+/// Create a [`TestEventSenderFactory`] and [`TestEventReceiver`] pair. The [`TestEventSenderFactory`] may be
+/// used to create [`TestEventSender`]s and stdio streams for multiple workers in the system. The [`TestEventReceiver`]
+/// will be kept alive until the final [`TestEventSender`] is dropped.
+pub fn create_test_event_channel() -> (TestEventSenderFactory, TestEventReceiver)
+{
+ let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
+ (
+ TestEventSenderFactory {
+ sender,
+ worker_id: Default::default(),
+ },
+ TestEventReceiver { receiver },
+ )
+}
+
+/// Create a [`TestEventWorkerSender`] and [`TestEventReceiver`] pair.The [`TestEventReceiver`]
+/// will be kept alive until the [`TestEventSender`] is dropped.
+pub fn create_single_test_event_channel(
+) -> (TestEventWorkerSender, TestEventReceiver) {
+ let (factory, receiver) = create_test_event_channel();
+ (factory.worker(), receiver)
+}
+
+/// Polls for the next [`TestEvent`] from any worker. Events from multiple worker
+/// streams may be interleaved.
+pub struct TestEventReceiver {
+ receiver: UnboundedReceiver<(usize, TestEvent)>,
+}
+
+impl TestEventReceiver {
+ /// Receive a single test event, or `None` if no workers are alive.
+ pub async fn recv(&mut self) -> Option<(usize, TestEvent)> {
+ self.receiver.recv().await
+ }
+}
+
+struct TestStream {
+ id: usize,
+ which: TestStdioStream,
+ read_opt: Option<AsyncPipeRead>,
+ sender: UnboundedSender<(usize, TestEvent)>,
+}
+
+impl TestStream {
+ fn new(
+ id: usize,
+ which: TestStdioStream,
+ pipe_reader: PipeRead,
+ sender: UnboundedSender<(usize, TestEvent)>,
+ ) -> std::io::Result<Self> {
+ // This may fail if the tokio runtime is shutting down
+ let read_opt = Some(pipe_reader.into_async()?);
+ Ok(Self {
+ id,
+ which,
+ read_opt,
+ sender,
+ })
+ }
+
+ /// Send a buffer to the test event channel. If the channel no longer exists, shut down the stream
+ /// because we can't do anything.
+ #[must_use = "If this returns false, don't keep reading because we cannot send"]
+ fn send(&mut self, buffer: Vec<u8>) -> bool {
+ if buffer.is_empty() {
+ true
+ } else if self
+ .sender
+ .send((self.id, TestEvent::Output(self.which, buffer)))
+ .is_err()
+ {
+ self.read_opt.take();
+ false
+ } else {
+ true
+ }
+ }
+
+ fn is_alive(&self) -> bool {
+ self.read_opt.is_some()
+ }
+
+ /// Attempt to read from a given stream, pushing all of the data in it into the given
+ /// [`UnboundedSender`] before returning.
+ async fn pipe(&mut self) {
+ let mut buffer = [0_u8; BUFFER_SIZE];
+ let mut buf = ReadBuf::new(&mut buffer);
+ let res = {
+ // No more stream, so just return.
+ let Some(stream) = &mut self.read_opt else {
+ return;
+ };
+ poll_fn(|cx| Pin::new(&mut *stream).poll_read(cx, &mut buf)).await
+ };
+ match res {
+ Ok(_) => {
+ let buf = buf.filled().to_vec();
+ if buf.is_empty() {
+ // The buffer may return empty in EOF conditions and never return an error,
+ // so we need to treat this as EOF
+ self.read_opt.take();
+ } else {
+ // Attempt to send the buffer, marking as not alive if the channel is closed
+ _ = self.send(buf);
+ }
+ }
+ Err(_) => {
+ // Stream errored, so just return and mark this stream as not alive.
+ _ = self.send(buf.filled().to_vec());
+ self.read_opt.take();
+ }
+ }
+ }
+
+ /// Read and "block" until the sync markers have been read.
+ async fn read_until_sync_marker(&mut self) {
+ let Some(file) = &mut self.read_opt else {
+ return;
+ };
+ let mut flush = Vec::with_capacity(BUFFER_SIZE);
+ loop {
+ let mut buffer = [0_u8; BUFFER_SIZE];
+ match file.read(&mut buffer).await {
+ Err(_) | Ok(0) => {
+ // EOF or error, just return. We make no guarantees about unflushed data at shutdown.
+ self.read_opt.take();
+ return;
+ }
+ Ok(read) => {
+ flush.extend(&buffer[0..read]);
+ if flush.ends_with(SYNC_MARKER) {
+ flush.truncate(flush.len() - SYNC_MARKER.len());
+ // Try to send our flushed buffer. If the channel is closed, this stream will
+ // be marked as not alive.
+ _ = self.send(flush);
+ return;
+ }
+ }
+ }
+ }
+ }
+}
+
+/// A factory for creating [`TestEventSender`]s. This factory must be dropped
+/// before the [`TestEventReceiver`] will complete.
+pub struct TestEventSenderFactory {
+ sender: UnboundedSender<(usize, TestEvent)>,
+ worker_id: AtomicUsize,
+}
+
+impl TestEventSenderFactory {
+ /// Create a [`TestEventWorkerSender`], along with a stdout/stderr stream.
+ pub fn worker(&self) -> TestEventWorkerSender {
+ let id = self.worker_id.fetch_add(1, Ordering::AcqRel);
+ let (stdout_reader, mut stdout_writer) = pipe().unwrap();
+ let (stderr_reader, mut stderr_writer) = pipe().unwrap();
+ let (sync_sender, mut sync_receiver) =
+ tokio::sync::mpsc::unbounded_channel::<SendMutex>();
+ let stdout = stdout_writer.try_clone().unwrap();
+ let stderr = stderr_writer.try_clone().unwrap();
+ let sender = self.sender.clone();
+
+ // Each worker spawns its own output monitoring and serialization task. This task will
+ // poll the stdout/stderr streams and interleave that data with `TestEvents` generated
+ // by the test runner worker.
+ //
+ // Note that this _must_ be a separate thread! Flushing requires locking coördination
+ // on two threads and if we're blocking-locked on the mutex we've sent down the sync_receiver,
+ // there's no way for us to process the actual flush operation here.
+ //
+ // Creating a mini-runtime to flush the stdout/stderr is the easiest way to do this, but
+ // there's no reason we couldn't do it with non-blocking I/O, other than the difficulty
+ // of setting up an I/O reactor in Windows.
+ std::thread::spawn(move || {
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .enable_io()
+ .build()
+ .unwrap();
+ runtime.block_on(tokio::task::unconstrained(async move {
+ let mut test_stdout = TestStream::new(
+ id,
+ TestStdioStream::Stdout,
+ stdout_reader,
+ sender.clone(),
+ )?;
+ let mut test_stderr =
+ TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?;
+
+ // This function will be woken whenever a stream or the receiver is ready
+ loop {
+ tokio::select! {
+ _ = test_stdout.pipe(), if test_stdout.is_alive() => {},
+ _ = test_stderr.pipe(), if test_stdout.is_alive() => {},
+ recv = sync_receiver.recv() => {
+ match recv {
+ // If the channel closed, we assume that all important data from the streams was synced,
+ // so we just end this task immediately.
+ None => { break },
+ Some(mutex) => {
+ // If we fail to write the sync marker for flush (likely in the case where the runtime is shutting down),
+ // we instead just release the mutex and bail.
+ let success = stdout_writer.write_all(SYNC_MARKER).is_ok()
+ && stderr_writer.write_all(SYNC_MARKER).is_ok();
+ if success {
+ for stream in [&mut test_stdout, &mut test_stderr] {
+ stream.read_until_sync_marker().await;
+ }
+ }
+ drop(mutex);
+ }
+ }
+ }
+ }
+ }
+
+ Ok::<_, std::io::Error>(())
+ }))?;
+
+ Ok::<_, std::io::Error>(())
+ });
+
+ let sender = TestEventSender {
+ id,
+ ref_count: Default::default(),
+ sender: self.sender.clone(),
+ sync_sender,
+ };
+
+ TestEventWorkerSender {
+ sender,
+ stdout,
+ stderr,
+ }
+ }
+
+ /// A [`TestEventWeakSender`] has a unique ID, but will not keep the [`TestEventReceiver`] alive.
+ /// This may be useful to add a `SIGINT` or other break handler to tests that isn't part of a
+ /// specific test, but handles the overall orchestration of running tests:
+ ///
+ /// ```nocompile
+ /// let mut cancel_sender = test_event_sender_factory.weak_sender();
+ /// let sigint_handler_handle = spawn(async move {
+ /// signal::ctrl_c().await.unwrap();
+ /// cancel_sender.send(TestEvent::Sigint).ok();
+ /// });
+ /// ```
+ pub fn weak_sender(&self) -> TestEventWeakSender {
+ TestEventWeakSender {
+ id: self.worker_id.fetch_add(1, Ordering::AcqRel),
+ sender: self.sender.downgrade(),
+ }
+ }
+}
+
+pub struct TestEventWeakSender {
+ pub id: usize,
+ sender: WeakUnboundedSender<(usize, TestEvent)>,
+}
+
+impl TestEventWeakSender {
+ pub fn send(&mut self, message: TestEvent) -> Result<(), ChannelClosedError> {
+ Ok(
+ self
+ .sender
+ .upgrade()
+ .ok_or(ChannelClosedError)?
+ .send((self.id, message))?,
+ )
+ }
+}
+
+pub struct TestEventWorkerSender {
+ pub sender: TestEventSender,
+ pub stdout: PipeWrite,
+ pub stderr: PipeWrite,
+}
+
+/// Sends messages from a given worker into the test stream. If multiple clones of
+/// this sender are kept alive, the worker is kept alive.
+///
+/// Any unflushed bytes in the stdout or stderr stream associated with this sender
+/// are not guaranteed to be sent on drop unless flush is explicitly called.
+pub struct TestEventSender {
+ pub id: usize,
+ ref_count: Arc<()>,
+ sender: UnboundedSender<(usize, TestEvent)>,
+ sync_sender: UnboundedSender<SendMutex>,
+}
+
+impl Clone for TestEventSender {
+ fn clone(&self) -> Self {
+ Self {
+ id: self.id,
+ ref_count: self.ref_count.clone(),
+ sender: self.sender.clone(),
+ sync_sender: self.sync_sender.clone(),
+ }
+ }
+}
+
+impl TestEventSender {
+ pub fn send(&mut self, message: TestEvent) -> Result<(), ChannelClosedError> {
+ // Certain messages require us to ensure that all output has been drained to ensure proper
+ // interleaving of messages.
+ if message.requires_stdio_sync() {
+ self.flush()?;
+ }
+ Ok(self.sender.send((self.id, message))?)
+ }
+
+ /// Ensure that all output has been fully flushed by writing a sync marker into the
+ /// stdout and stderr streams and waiting for it on the other side.
+ pub fn flush(&mut self) -> Result<(), ChannelClosedError> {
+ let mutex = parking_lot::RawMutex::INIT;
+ mutex.lock();
+ self.sync_sender.send(SendMutex(&mutex as _))?;
+ mutex.lock();
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use deno_core::unsync::spawn;
+ use deno_core::unsync::spawn_blocking;
+
+ /// Test that output is correctly interleaved with messages.
+ #[tokio::test]
+ async fn spawn_worker() {
+ test_util::timeout!(60);
+ let (mut worker, mut receiver) = create_single_test_event_channel();
+
+ let recv_handle = spawn(async move {
+ let mut queue = vec![];
+ while let Some((_, message)) = receiver.recv().await {
+ let msg_str = format!("{message:?}");
+ if msg_str.len() > 50 {
+ eprintln!("message = {}...", &msg_str[..50]);
+ } else {
+ eprintln!("message = {}", msg_str);
+ }
+ queue.push(message);
+ }
+ eprintln!("done");
+ queue
+ });
+ let send_handle = spawn_blocking(move || {
+ worker.stdout.write_all(&[1; 100_000]).unwrap();
+ eprintln!("Wrote bytes");
+ worker.sender.send(TestEvent::StepWait(1)).unwrap();
+ eprintln!("Sent");
+ worker.stdout.write_all(&[2; 100_000]).unwrap();
+ eprintln!("Wrote bytes 2");
+ worker.sender.flush().unwrap();
+ eprintln!("Done");
+ });
+ send_handle.await.unwrap();
+ let messages = recv_handle.await.unwrap();
+
+ let mut expected = 1;
+ let mut count = 0;
+ for message in messages {
+ match message {
+ TestEvent::Output(_, vec) => {
+ assert_eq!(vec[0], expected);
+ count += vec.len();
+ }
+ TestEvent::StepWait(_) => {
+ assert_eq!(count, 100_000);
+ count = 0;
+ expected = 2;
+ }
+ _ => unreachable!(),
+ }
+ }
+ assert_eq!(expected, 2);
+ assert_eq!(count, 100_000);
+ }
+
+ /// Test that flushing a large number of times doesn't hang.
+ #[tokio::test]
+ async fn test_flush_lots() {
+ test_util::timeout!(60);
+ let (mut worker, mut receiver) = create_single_test_event_channel();
+ let recv_handle = spawn(async move {
+ let mut queue = vec![];
+ while let Some((_, message)) = receiver.recv().await {
+ assert!(!matches!(message, TestEvent::Output(..)));
+ queue.push(message);
+ }
+ eprintln!("Receiver closed");
+ queue
+ });
+ let send_handle = spawn_blocking(move || {
+ for _ in 0..100000 {
+ worker.sender.send(TestEvent::StepWait(1)).unwrap();
+ }
+ eprintln!("Sent all messages");
+ });
+ send_handle.await.unwrap();
+ let messages = recv_handle.await.unwrap();
+ assert_eq!(messages.len(), 100000);
+ }
+
+ /// Ensure nothing panics if we're racing the runtime shutdown.
+ #[test]
+ fn test_runtime_shutdown() {
+ test_util::timeout!(60);
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+ runtime.block_on(async {
+ let (mut worker, mut receiver) = create_single_test_event_channel();
+ tokio::task::spawn(async move {
+ loop {
+ if receiver.recv().await.is_none() {
+ break;
+ }
+ }
+ });
+ tokio::task::spawn(async move {
+ _ = worker.sender.send(TestEvent::Sigint);
+ });
+ });
+ }
+}
diff --git a/cli/tools/test/mod.rs b/cli/tools/test/mod.rs
index b088cf7a3..e98df4671 100644
--- a/cli/tools/test/mod.rs
+++ b/cli/tools/test/mod.rs
@@ -37,7 +37,6 @@ use deno_core::futures::stream;
use deno_core::futures::FutureExt;
use deno_core::futures::StreamExt;
use deno_core::located_script_name;
-use deno_core::parking_lot::Mutex;
use deno_core::serde_v8;
use deno_core::stats::RuntimeActivity;
use deno_core::stats::RuntimeActivityDiff;
@@ -71,7 +70,6 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Write as _;
use std::future::poll_fn;
-use std::io::Read;
use std::io::Write;
use std::num::NonZeroUsize;
use std::path::Path;
@@ -84,14 +82,16 @@ use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use tokio::signal;
-use tokio::sync::mpsc::unbounded_channel;
-use tokio::sync::mpsc::UnboundedReceiver;
-use tokio::sync::mpsc::UnboundedSender;
-use tokio::sync::mpsc::WeakUnboundedSender;
+mod channel;
pub mod fmt;
pub mod reporters;
+pub use channel::create_single_test_event_channel;
+pub use channel::create_test_event_channel;
+pub use channel::TestEventReceiver;
+pub use channel::TestEventSender;
+pub use channel::TestEventWorkerSender;
use fmt::format_sanitizer_diff;
pub use fmt::format_test_error;
use reporters::CompoundTestReporter;
@@ -329,13 +329,19 @@ pub struct TestPlan {
pub used_only: bool,
}
+#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize)]
+pub enum TestStdioStream {
+ Stdout,
+ Stderr,
+}
+
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum TestEvent {
Register(TestDescription),
Plan(TestPlan),
Wait(usize),
- Output(Vec<u8>),
+ Output(TestStdioStream, Vec<u8>),
Result(usize, TestResult, u64),
UncaughtError(String, Box<JsError>),
StepRegister(TestStepDescription),
@@ -345,6 +351,21 @@ pub enum TestEvent {
Sigint,
}
+impl TestEvent {
+ // Certain messages require us to ensure that all output has been drained to ensure proper
+ // interleaving of output messages.
+ pub fn requires_stdio_sync(&self) -> bool {
+ matches!(
+ self,
+ TestEvent::Result(..)
+ | TestEvent::StepWait(..)
+ | TestEvent::StepResult(..)
+ | TestEvent::UncaughtError(..)
+ | TestEvent::ForceEndReport
+ )
+ }
+}
+
#[derive(Debug, Clone, Deserialize)]
pub struct TestSummary {
pub total: usize,
@@ -432,7 +453,7 @@ pub async fn test_specifier(
worker_factory: Arc<CliMainWorkerFactory>,
permissions: Permissions,
specifier: ModuleSpecifier,
- mut sender: TestEventSender,
+ mut worker_sender: TestEventWorkerSender,
fail_fast_tracker: FailFastTracker,
options: TestSpecifierOptions,
) -> Result<(), AnyError> {
@@ -440,7 +461,9 @@ pub async fn test_specifier(
worker_factory,
permissions,
specifier.clone(),
- &sender,
+ &worker_sender.sender,
+ StdioPipe::file(worker_sender.stdout),
+ StdioPipe::file(worker_sender.stderr),
fail_fast_tracker,
options,
)
@@ -449,7 +472,7 @@ pub async fn test_specifier(
Ok(()) => Ok(()),
Err(error) => {
if error.is::<JsError>() {
- sender.send(TestEvent::UncaughtError(
+ worker_sender.sender.send(TestEvent::UncaughtError(
specifier.to_string(),
Box::new(error.downcast::<JsError>().unwrap()),
))?;
@@ -463,26 +486,27 @@ pub async fn test_specifier(
/// Test a single specifier as documentation containing test programs, an executable test module or
/// both.
+#[allow(clippy::too_many_arguments)]
async fn test_specifier_inner(
worker_factory: Arc<CliMainWorkerFactory>,
permissions: Permissions,
specifier: ModuleSpecifier,
sender: &TestEventSender,
+ stdout: StdioPipe,
+ stderr: StdioPipe,
fail_fast_tracker: FailFastTracker,
options: TestSpecifierOptions,
) -> Result<(), AnyError> {
if fail_fast_tracker.should_stop() {
return Ok(());
}
- let stdout = StdioPipe::File(sender.stdout());
- let stderr = StdioPipe::File(sender.stderr());
let mut worker = worker_factory
.create_custom_worker(
specifier.clone(),
PermissionsContainer::new(permissions),
vec![ops::testing::deno_test::init_ops(sender.clone())],
Stdio {
- stdin: StdioPipe::Inherit,
+ stdin: StdioPipe::inherit(),
stdout,
stderr,
},
@@ -1062,14 +1086,13 @@ async fn test_specifiers(
specifiers
};
- let (sender, receiver) = unbounded_channel::<TestEvent>();
- let sender = TestEventSender::new(sender);
+ let (test_event_sender_factory, receiver) = create_test_event_channel();
let concurrent_jobs = options.concurrent_jobs;
- let sender_ = sender.downgrade();
+ let mut cancel_sender = test_event_sender_factory.weak_sender();
let sigint_handler_handle = spawn(async move {
signal::ctrl_c().await.unwrap();
- sender_.upgrade().map(|s| s.send(TestEvent::Sigint).ok());
+ cancel_sender.send(TestEvent::Sigint).ok();
});
HAS_TEST_RUN_SIGINT_HANDLER.store(true, Ordering::Relaxed);
let reporter = get_test_reporter(&options);
@@ -1078,7 +1101,7 @@ async fn test_specifiers(
let join_handles = specifiers.into_iter().map(move |specifier| {
let worker_factory = worker_factory.clone();
let permissions = permissions.clone();
- let sender = sender.clone();
+ let worker_sender = test_event_sender_factory.worker();
let fail_fast_tracker = fail_fast_tracker.clone();
let specifier_options = options.specifier.clone();
spawn_blocking(move || {
@@ -1086,12 +1109,13 @@ async fn test_specifiers(
worker_factory,
permissions,
specifier,
- sender.clone(),
+ worker_sender,
fail_fast_tracker,
specifier_options,
))
})
});
+
let join_stream = stream::iter(join_handles)
.buffer_unordered(concurrent_jobs.get())
.collect::<Vec<Result<Result<(), AnyError>, tokio::task::JoinError>>>();
@@ -1111,9 +1135,9 @@ async fn test_specifiers(
/// Gives receiver back in case it was ended with `TestEvent::ForceEndReport`.
pub async fn report_tests(
- mut receiver: UnboundedReceiver<TestEvent>,
+ mut receiver: TestEventReceiver,
mut reporter: Box<dyn TestReporter>,
-) -> (Result<(), AnyError>, UnboundedReceiver<TestEvent>) {
+) -> (Result<(), AnyError>, TestEventReceiver) {
let mut tests = IndexMap::new();
let mut test_steps = IndexMap::new();
let mut tests_started = HashSet::new();
@@ -1123,7 +1147,7 @@ pub async fn report_tests(
let mut used_only = false;
let mut failed = false;
- while let Some(event) = receiver.recv().await {
+ while let Some((_, event)) = receiver.recv().await {
match event {
TestEvent::Register(description) => {
reporter.report_register(&description);
@@ -1144,7 +1168,7 @@ pub async fn report_tests(
reporter.report_wait(tests.get(&id).unwrap());
}
}
- TestEvent::Output(output) => {
+ TestEvent::Output(_, output) => {
reporter.report_output(&output);
}
TestEvent::Result(id, result, elapsed) => {
@@ -1629,158 +1653,6 @@ impl FailFastTracker {
}
}
-#[derive(Clone)]
-pub struct TestEventSender {
- sender: UnboundedSender<TestEvent>,
- stdout_writer: TestOutputPipe,
- stderr_writer: TestOutputPipe,
-}
-
-impl TestEventSender {
- pub fn new(sender: UnboundedSender<TestEvent>) -> Self {
- Self {
- stdout_writer: TestOutputPipe::new(sender.clone()),
- stderr_writer: TestOutputPipe::new(sender.clone()),
- sender,
- }
- }
-
- pub fn stdout(&self) -> std::fs::File {
- self.stdout_writer.as_file()
- }
-
- pub fn stderr(&self) -> std::fs::File {
- self.stderr_writer.as_file()
- }
-
- pub fn send(&mut self, message: TestEvent) -> Result<(), AnyError> {
- // for any event that finishes collecting output, we need to
- // ensure that the collected stdout and stderr pipes are flushed
- if matches!(
- message,
- TestEvent::Result(_, _, _)
- | TestEvent::StepWait(_)
- | TestEvent::StepResult(_, _, _)
- | TestEvent::UncaughtError(_, _)
- ) {
- self.flush_stdout_and_stderr()?;
- }
-
- self.sender.send(message)?;
- Ok(())
- }
-
- fn downgrade(&self) -> WeakUnboundedSender<TestEvent> {
- self.sender.downgrade()
- }
-
- fn flush_stdout_and_stderr(&mut self) -> Result<(), AnyError> {
- self.stdout_writer.flush()?;
- self.stderr_writer.flush()?;
-
- Ok(())
- }
-}
-
-// use a string that if it ends up in the output won't affect how things are displayed
-const ZERO_WIDTH_SPACE: &str = "\u{200B}";
-
-struct TestOutputPipe {
- writer: os_pipe::PipeWriter,
- state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
-}
-
-impl Clone for TestOutputPipe {
- fn clone(&self) -> Self {
- Self {
- writer: self.writer.try_clone().unwrap(),
- state: self.state.clone(),
- }
- }
-}
-
-impl TestOutputPipe {
- pub fn new(sender: UnboundedSender<TestEvent>) -> Self {
- let (reader, writer) = os_pipe::pipe().unwrap();
- let state = Arc::new(Mutex::new(None));
-
- start_output_redirect_thread(reader, sender, state.clone());
-
- Self { writer, state }
- }
-
- pub fn flush(&mut self) -> Result<(), AnyError> {
- // We want to wake up the other thread and have it respond back
- // that it's done clearing out its pipe before returning.
- let (sender, receiver) = std::sync::mpsc::channel();
- if let Some(sender) = self.state.lock().replace(sender) {
- let _ = sender.send(()); // just in case
- }
- // Bit of a hack to send a zero width space in order to wake
- // the thread up. It seems that sending zero bytes here does
- // not work on windows.
- self.writer.write_all(ZERO_WIDTH_SPACE.as_bytes())?;
- self.writer.flush()?;
- // ignore the error as it might have been picked up and closed
- let _ = receiver.recv();
-
- Ok(())
- }
-
- pub fn as_file(&self) -> std::fs::File {
- pipe_writer_to_file(self.writer.try_clone().unwrap())
- }
-}
-
-#[cfg(windows)]
-fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File {
- use std::os::windows::prelude::FromRawHandle;
- use std::os::windows::prelude::IntoRawHandle;
- // SAFETY: Requires consuming ownership of the provided handle
- unsafe { std::fs::File::from_raw_handle(writer.into_raw_handle()) }
-}
-
-#[cfg(unix)]
-fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File {
- use std::os::unix::io::FromRawFd;
- use std::os::unix::io::IntoRawFd;
- // SAFETY: Requires consuming ownership of the provided handle
- unsafe { std::fs::File::from_raw_fd(writer.into_raw_fd()) }
-}
-
-fn start_output_redirect_thread(
- mut pipe_reader: os_pipe::PipeReader,
- sender: UnboundedSender<TestEvent>,
- flush_state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
-) {
- spawn_blocking(move || loop {
- let mut buffer = [0; 512];
- let size = match pipe_reader.read(&mut buffer) {
- Ok(0) | Err(_) => break,
- Ok(size) => size,
- };
- let oneshot_sender = flush_state.lock().take();
- let mut data = &buffer[0..size];
- if data.ends_with(ZERO_WIDTH_SPACE.as_bytes()) {
- data = &data[0..data.len() - ZERO_WIDTH_SPACE.len()];
- }
-
- if !data.is_empty()
- && sender
- .send(TestEvent::Output(buffer[0..size].to_vec()))
- .is_err()
- {
- break;
- }
-
- // Always respond back if this was set. Ideally we would also check to
- // ensure the pipe reader is empty before sending back this response.
- if let Some(sender) = oneshot_sender {
- let _ignore = sender.send(());
- }
- });
-}
-
#[cfg(test)]
mod inner_test {
use std::path::Path;
diff --git a/ext/io/lib.rs b/ext/io/lib.rs
index e0d649e0a..8d80eec25 100644
--- a/ext/io/lib.rs
+++ b/ext/io/lib.rs
@@ -110,36 +110,36 @@ deno_core::extension!(deno_io,
let t = &mut state.resource_table;
let rid = t.add(fs::FileResource::new(
- Rc::new(match stdio.stdin {
- StdioPipe::Inherit => StdFileResourceInner::new(
+ Rc::new(match stdio.stdin.pipe {
+ StdioPipeInner::Inherit => StdFileResourceInner::new(
StdFileResourceKind::Stdin,
STDIN_HANDLE.try_clone().unwrap(),
),
- StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
+ StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe),
}),
"stdin".to_string(),
));
assert_eq!(rid, 0, "stdin must have ResourceId 0");
let rid = t.add(FileResource::new(
- Rc::new(match stdio.stdout {
- StdioPipe::Inherit => StdFileResourceInner::new(
+ Rc::new(match stdio.stdout.pipe {
+ StdioPipeInner::Inherit => StdFileResourceInner::new(
StdFileResourceKind::Stdout,
STDOUT_HANDLE.try_clone().unwrap(),
),
- StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
+ StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe),
}),
"stdout".to_string(),
));
assert_eq!(rid, 1, "stdout must have ResourceId 1");
let rid = t.add(FileResource::new(
- Rc::new(match stdio.stderr {
- StdioPipe::Inherit => StdFileResourceInner::new(
+ Rc::new(match stdio.stderr.pipe {
+ StdioPipeInner::Inherit => StdFileResourceInner::new(
StdFileResourceKind::Stderr,
STDERR_HANDLE.try_clone().unwrap(),
),
- StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
+ StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe),
}),
"stderr".to_string(),
));
@@ -148,22 +148,41 @@ deno_core::extension!(deno_io,
},
);
-pub enum StdioPipe {
- Inherit,
- File(StdFile),
+#[derive(Default)]
+pub struct StdioPipe {
+ pipe: StdioPipeInner,
}
-impl Default for StdioPipe {
- fn default() -> Self {
- Self::Inherit
+impl StdioPipe {
+ pub const fn inherit() -> Self {
+ StdioPipe {
+ pipe: StdioPipeInner::Inherit,
+ }
+ }
+
+ pub fn file(f: impl Into<StdFile>) -> Self {
+ StdioPipe {
+ pipe: StdioPipeInner::File(f.into()),
+ }
}
}
+#[derive(Default)]
+enum StdioPipeInner {
+ #[default]
+ Inherit,
+ File(StdFile),
+}
+
impl Clone for StdioPipe {
fn clone(&self) -> Self {
- match self {
- StdioPipe::Inherit => StdioPipe::Inherit,
- StdioPipe::File(pipe) => StdioPipe::File(pipe.try_clone().unwrap()),
+ match &self.pipe {
+ StdioPipeInner::Inherit => Self {
+ pipe: StdioPipeInner::Inherit,
+ },
+ StdioPipeInner::File(pipe) => Self {
+ pipe: StdioPipeInner::File(pipe.try_clone().unwrap()),
+ },
}
}
}
diff --git a/ext/io/pipe.rs b/ext/io/pipe.rs
index 0cad7b1f6..70788f752 100644
--- a/ext/io/pipe.rs
+++ b/ext/io/pipe.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use std::io;
use std::pin::Pin;
+use std::process::Stdio;
// The synchronous read end of a unidirectional pipe.
pub struct PipeRead {
@@ -35,32 +36,50 @@ pub struct AsyncPipeWrite {
}
impl PipeRead {
+ /// Converts this sync reader into an async reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_async(self) -> AsyncPipeRead {
+ pub fn into_async(self) -> io::Result<AsyncPipeRead> {
let owned: std::os::windows::io::OwnedHandle = self.file.into();
let stdout = std::process::ChildStdout::from(owned);
- AsyncPipeRead {
- read: tokio::process::ChildStdout::from_std(stdout).unwrap(),
- }
+ Ok(AsyncPipeRead {
+ read: tokio::process::ChildStdout::from_std(stdout)?,
+ })
}
+
+ /// Converts this sync reader into an async reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_async(self) -> AsyncPipeRead {
- AsyncPipeRead {
- read: tokio::net::unix::pipe::Receiver::from_file(self.file).unwrap(),
- }
+ pub fn into_async(self) -> io::Result<AsyncPipeRead> {
+ Ok(AsyncPipeRead {
+ read: tokio::net::unix::pipe::Receiver::from_file(self.file)?,
+ })
+ }
+
+ /// Creates a new [`PipeRead`] instance that shares the same underlying file handle
+ /// as the existing [`PipeRead`] instance.
+ pub fn try_clone(&self) -> io::Result<Self> {
+ Ok(Self {
+ file: self.file.try_clone()?,
+ })
}
}
impl AsyncPipeRead {
+ /// Converts this async reader into an sync reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_sync(self) -> PipeRead {
- let owned = self.read.into_owned_handle().unwrap();
- PipeRead { file: owned.into() }
+ pub fn into_sync(self) -> io::Result<PipeRead> {
+ let owned = self.read.into_owned_handle()?;
+ Ok(PipeRead { file: owned.into() })
}
+
+ /// Converts this async reader into an sync reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_sync(self) -> PipeRead {
- let file = self.read.into_nonblocking_fd().unwrap().into();
- PipeRead { file }
+ pub fn into_sync(self) -> io::Result<PipeRead> {
+ let file = self.read.into_nonblocking_fd()?.into();
+ Ok(PipeRead { file })
}
}
@@ -88,32 +107,50 @@ impl tokio::io::AsyncRead for AsyncPipeRead {
}
impl PipeWrite {
+ /// Converts this sync writer into an async writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_async(self) -> AsyncPipeWrite {
+ pub fn into_async(self) -> io::Result<AsyncPipeWrite> {
let owned: std::os::windows::io::OwnedHandle = self.file.into();
let stdin = std::process::ChildStdin::from(owned);
- AsyncPipeWrite {
- write: tokio::process::ChildStdin::from_std(stdin).unwrap(),
- }
+ Ok(AsyncPipeWrite {
+ write: tokio::process::ChildStdin::from_std(stdin)?,
+ })
}
+
+ /// Converts this sync writer into an async writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_async(self) -> AsyncPipeWrite {
- AsyncPipeWrite {
- write: tokio::net::unix::pipe::Sender::from_file(self.file).unwrap(),
- }
+ pub fn into_async(self) -> io::Result<AsyncPipeWrite> {
+ Ok(AsyncPipeWrite {
+ write: tokio::net::unix::pipe::Sender::from_file(self.file)?,
+ })
+ }
+
+ /// Creates a new [`PipeWrite`] instance that shares the same underlying file handle
+ /// as the existing [`PipeWrite`] instance.
+ pub fn try_clone(&self) -> io::Result<Self> {
+ Ok(Self {
+ file: self.file.try_clone()?,
+ })
}
}
impl AsyncPipeWrite {
+ /// Converts this async writer into an sync writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_sync(self) -> PipeWrite {
- let owned = self.write.into_owned_handle().unwrap();
- PipeWrite { file: owned.into() }
+ pub fn into_sync(self) -> io::Result<PipeWrite> {
+ let owned = self.write.into_owned_handle()?;
+ Ok(PipeWrite { file: owned.into() })
}
+
+ /// Converts this async writer into an sync writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_sync(self) -> PipeWrite {
- let file = self.write.into_nonblocking_fd().unwrap().into();
- PipeWrite { file }
+ pub fn into_sync(self) -> io::Result<PipeWrite> {
+ let file = self.write.into_nonblocking_fd()?.into();
+ Ok(PipeWrite { file })
}
}
@@ -172,6 +209,58 @@ impl tokio::io::AsyncWrite for AsyncPipeWrite {
}
}
+impl From<PipeRead> for Stdio {
+ fn from(val: PipeRead) -> Self {
+ Stdio::from(val.file)
+ }
+}
+
+impl From<PipeWrite> for Stdio {
+ fn from(val: PipeWrite) -> Self {
+ Stdio::from(val.file)
+ }
+}
+
+impl From<PipeRead> for std::fs::File {
+ fn from(val: PipeRead) -> Self {
+ val.file
+ }
+}
+
+impl From<PipeWrite> for std::fs::File {
+ fn from(val: PipeWrite) -> Self {
+ val.file
+ }
+}
+
+#[cfg(not(windows))]
+impl From<PipeRead> for std::os::unix::io::OwnedFd {
+ fn from(val: PipeRead) -> Self {
+ val.file.into()
+ }
+}
+
+#[cfg(not(windows))]
+impl From<PipeWrite> for std::os::unix::io::OwnedFd {
+ fn from(val: PipeWrite) -> Self {
+ val.file.into()
+ }
+}
+
+#[cfg(windows)]
+impl From<PipeRead> for std::os::windows::io::OwnedHandle {
+ fn from(val: PipeRead) -> Self {
+ val.file.into()
+ }
+}
+
+#[cfg(windows)]
+impl From<PipeWrite> for std::os::windows::io::OwnedHandle {
+ fn from(val: PipeWrite) -> Self {
+ val.file.into()
+ }
+}
+
/// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles,
/// but either side may be promoted to an async-capable reader/writer.
///
@@ -228,8 +317,8 @@ mod tests {
#[tokio::test]
async fn test_async_pipe() {
let (read, write) = pipe().unwrap();
- let mut read = read.into_async();
- let mut write = write.into_async();
+ let mut read = read.into_async().unwrap();
+ let mut write = write.into_async().unwrap();
write.write_all(b"hello").await.unwrap();
let mut buf: [u8; 5] = Default::default();
@@ -248,8 +337,8 @@ mod tests {
read.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"hello");
- let mut read = read.into_async();
- let mut write = write.into_async();
+ let mut read = read.into_async().unwrap();
+ let mut write = write.into_async().unwrap();
// Async
write.write_all(b"hello").await.unwrap();
@@ -257,8 +346,8 @@ mod tests {
read.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
- let mut read = read.into_sync();
- let mut write = write.into_sync();
+ let mut read = read.into_sync().unwrap();
+ let mut write = write.into_sync().unwrap();
// Sync
write.write_all(b"hello").unwrap();
@@ -270,8 +359,8 @@ mod tests {
#[tokio::test]
async fn test_async_pipe_is_nonblocking() {
let (read, write) = pipe().unwrap();
- let mut read = read.into_async();
- let mut write = write.into_async();
+ let mut read = read.into_async().unwrap();
+ let mut write = write.into_async().unwrap();
let a = tokio::spawn(async move {
let mut buf: [u8; 5] = Default::default();