summaryrefslogtreecommitdiff
path: root/ext/io/pipe.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2024-02-23 11:11:15 -0700
committerGitHub <noreply@github.com>2024-02-23 11:11:15 -0700
commit5193834cf23e3521b3afd3f5f54eb00daa23c88d (patch)
treed67bcef4d96f615f345ad1679d27d73bdc2ccd39 /ext/io/pipe.rs
parent619acce305ac77f98327718bc1e6a1ae13d8bcf6 (diff)
refactor(cli): clean up test runner channels (#22422)
Gets us closer to solving #20707. Rewrites the `TestEventSender`: - Allow for explicit creation of multiple streams. This will allow for one-std{out,err}-per-worker - All test events are received along with a worker ID, allowing for eventual, proper parallel threading of test events. In theory this should open up proper interleaving of test output, however that is left for a future PR. I had some plans for a better performing synchronization primitive, but the inter-thread communication is tricky. This does, however, speed up the processing of large numbers of tests 15-25% (possibly even more on 100,000+). Before ``` ok | 1000 passed | 0 failed (32ms) ok | 10000 passed | 0 failed (276ms) ``` After ``` ok | 1000 passed | 0 failed (25ms) ok | 10000 passed | 0 failed (230ms) ```
Diffstat (limited to 'ext/io/pipe.rs')
-rw-r--r--ext/io/pipe.rs161
1 files changed, 125 insertions, 36 deletions
diff --git a/ext/io/pipe.rs b/ext/io/pipe.rs
index 0cad7b1f6..70788f752 100644
--- a/ext/io/pipe.rs
+++ b/ext/io/pipe.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use std::io;
use std::pin::Pin;
+use std::process::Stdio;
// The synchronous read end of a unidirectional pipe.
pub struct PipeRead {
@@ -35,32 +36,50 @@ pub struct AsyncPipeWrite {
}
impl PipeRead {
+ /// Converts this sync reader into an async reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_async(self) -> AsyncPipeRead {
+ pub fn into_async(self) -> io::Result<AsyncPipeRead> {
let owned: std::os::windows::io::OwnedHandle = self.file.into();
let stdout = std::process::ChildStdout::from(owned);
- AsyncPipeRead {
- read: tokio::process::ChildStdout::from_std(stdout).unwrap(),
- }
+ Ok(AsyncPipeRead {
+ read: tokio::process::ChildStdout::from_std(stdout)?,
+ })
}
+
+ /// Converts this sync reader into an async reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_async(self) -> AsyncPipeRead {
- AsyncPipeRead {
- read: tokio::net::unix::pipe::Receiver::from_file(self.file).unwrap(),
- }
+ pub fn into_async(self) -> io::Result<AsyncPipeRead> {
+ Ok(AsyncPipeRead {
+ read: tokio::net::unix::pipe::Receiver::from_file(self.file)?,
+ })
+ }
+
+ /// Creates a new [`PipeRead`] instance that shares the same underlying file handle
+ /// as the existing [`PipeRead`] instance.
+ pub fn try_clone(&self) -> io::Result<Self> {
+ Ok(Self {
+ file: self.file.try_clone()?,
+ })
}
}
impl AsyncPipeRead {
+ /// Converts this async reader into an sync reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_sync(self) -> PipeRead {
- let owned = self.read.into_owned_handle().unwrap();
- PipeRead { file: owned.into() }
+ pub fn into_sync(self) -> io::Result<PipeRead> {
+ let owned = self.read.into_owned_handle()?;
+ Ok(PipeRead { file: owned.into() })
}
+
+ /// Converts this async reader into an sync reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_sync(self) -> PipeRead {
- let file = self.read.into_nonblocking_fd().unwrap().into();
- PipeRead { file }
+ pub fn into_sync(self) -> io::Result<PipeRead> {
+ let file = self.read.into_nonblocking_fd()?.into();
+ Ok(PipeRead { file })
}
}
@@ -88,32 +107,50 @@ impl tokio::io::AsyncRead for AsyncPipeRead {
}
impl PipeWrite {
+ /// Converts this sync writer into an async writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_async(self) -> AsyncPipeWrite {
+ pub fn into_async(self) -> io::Result<AsyncPipeWrite> {
let owned: std::os::windows::io::OwnedHandle = self.file.into();
let stdin = std::process::ChildStdin::from(owned);
- AsyncPipeWrite {
- write: tokio::process::ChildStdin::from_std(stdin).unwrap(),
- }
+ Ok(AsyncPipeWrite {
+ write: tokio::process::ChildStdin::from_std(stdin)?,
+ })
}
+
+ /// Converts this sync writer into an async writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_async(self) -> AsyncPipeWrite {
- AsyncPipeWrite {
- write: tokio::net::unix::pipe::Sender::from_file(self.file).unwrap(),
- }
+ pub fn into_async(self) -> io::Result<AsyncPipeWrite> {
+ Ok(AsyncPipeWrite {
+ write: tokio::net::unix::pipe::Sender::from_file(self.file)?,
+ })
+ }
+
+ /// Creates a new [`PipeWrite`] instance that shares the same underlying file handle
+ /// as the existing [`PipeWrite`] instance.
+ pub fn try_clone(&self) -> io::Result<Self> {
+ Ok(Self {
+ file: self.file.try_clone()?,
+ })
}
}
impl AsyncPipeWrite {
+ /// Converts this async writer into an sync writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_sync(self) -> PipeWrite {
- let owned = self.write.into_owned_handle().unwrap();
- PipeWrite { file: owned.into() }
+ pub fn into_sync(self) -> io::Result<PipeWrite> {
+ let owned = self.write.into_owned_handle()?;
+ Ok(PipeWrite { file: owned.into() })
}
+
+ /// Converts this async writer into an sync writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_sync(self) -> PipeWrite {
- let file = self.write.into_nonblocking_fd().unwrap().into();
- PipeWrite { file }
+ pub fn into_sync(self) -> io::Result<PipeWrite> {
+ let file = self.write.into_nonblocking_fd()?.into();
+ Ok(PipeWrite { file })
}
}
@@ -172,6 +209,58 @@ impl tokio::io::AsyncWrite for AsyncPipeWrite {
}
}
+impl From<PipeRead> for Stdio {
+ fn from(val: PipeRead) -> Self {
+ Stdio::from(val.file)
+ }
+}
+
+impl From<PipeWrite> for Stdio {
+ fn from(val: PipeWrite) -> Self {
+ Stdio::from(val.file)
+ }
+}
+
+impl From<PipeRead> for std::fs::File {
+ fn from(val: PipeRead) -> Self {
+ val.file
+ }
+}
+
+impl From<PipeWrite> for std::fs::File {
+ fn from(val: PipeWrite) -> Self {
+ val.file
+ }
+}
+
+#[cfg(not(windows))]
+impl From<PipeRead> for std::os::unix::io::OwnedFd {
+ fn from(val: PipeRead) -> Self {
+ val.file.into()
+ }
+}
+
+#[cfg(not(windows))]
+impl From<PipeWrite> for std::os::unix::io::OwnedFd {
+ fn from(val: PipeWrite) -> Self {
+ val.file.into()
+ }
+}
+
+#[cfg(windows)]
+impl From<PipeRead> for std::os::windows::io::OwnedHandle {
+ fn from(val: PipeRead) -> Self {
+ val.file.into()
+ }
+}
+
+#[cfg(windows)]
+impl From<PipeWrite> for std::os::windows::io::OwnedHandle {
+ fn from(val: PipeWrite) -> Self {
+ val.file.into()
+ }
+}
+
/// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles,
/// but either side may be promoted to an async-capable reader/writer.
///
@@ -228,8 +317,8 @@ mod tests {
#[tokio::test]
async fn test_async_pipe() {
let (read, write) = pipe().unwrap();
- let mut read = read.into_async();
- let mut write = write.into_async();
+ let mut read = read.into_async().unwrap();
+ let mut write = write.into_async().unwrap();
write.write_all(b"hello").await.unwrap();
let mut buf: [u8; 5] = Default::default();
@@ -248,8 +337,8 @@ mod tests {
read.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"hello");
- let mut read = read.into_async();
- let mut write = write.into_async();
+ let mut read = read.into_async().unwrap();
+ let mut write = write.into_async().unwrap();
// Async
write.write_all(b"hello").await.unwrap();
@@ -257,8 +346,8 @@ mod tests {
read.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
- let mut read = read.into_sync();
- let mut write = write.into_sync();
+ let mut read = read.into_sync().unwrap();
+ let mut write = write.into_sync().unwrap();
// Sync
write.write_all(b"hello").unwrap();
@@ -270,8 +359,8 @@ mod tests {
#[tokio::test]
async fn test_async_pipe_is_nonblocking() {
let (read, write) = pipe().unwrap();
- let mut read = read.into_async();
- let mut write = write.into_async();
+ let mut read = read.into_async().unwrap();
+ let mut write = write.into_async().unwrap();
let a = tokio::spawn(async move {
let mut buf: [u8; 5] = Default::default();