diff options
author | Matt Mastracci <matthew@mastracci.com> | 2024-02-23 11:11:15 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-23 11:11:15 -0700 |
commit | 5193834cf23e3521b3afd3f5f54eb00daa23c88d (patch) | |
tree | d67bcef4d96f615f345ad1679d27d73bdc2ccd39 /ext/io/pipe.rs | |
parent | 619acce305ac77f98327718bc1e6a1ae13d8bcf6 (diff) |
refactor(cli): clean up test runner channels (#22422)
Gets us closer to solving #20707.
Rewrites the `TestEventSender`:
- Allow for explicit creation of multiple streams. This will allow for
one-std{out,err}-per-worker
- All test events are received along with a worker ID, allowing for
eventual, proper parallel threading of test events.
In theory this should open up proper interleaving of test output,
however that is left for a future PR.
I had some plans for a better performing synchronization primitive, but
the inter-thread communication is tricky. This does, however, speed up
the processing of large numbers of tests 15-25% (possibly even more on
100,000+).
Before
```
ok | 1000 passed | 0 failed (32ms)
ok | 10000 passed | 0 failed (276ms)
```
After
```
ok | 1000 passed | 0 failed (25ms)
ok | 10000 passed | 0 failed (230ms)
```
Diffstat (limited to 'ext/io/pipe.rs')
-rw-r--r-- | ext/io/pipe.rs | 161 |
1 files changed, 125 insertions, 36 deletions
diff --git a/ext/io/pipe.rs b/ext/io/pipe.rs index 0cad7b1f6..70788f752 100644 --- a/ext/io/pipe.rs +++ b/ext/io/pipe.rs @@ -1,6 +1,7 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use std::io; use std::pin::Pin; +use std::process::Stdio; // The synchronous read end of a unidirectional pipe. pub struct PipeRead { @@ -35,32 +36,50 @@ pub struct AsyncPipeWrite { } impl PipeRead { + /// Converts this sync reader into an async reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_async(self) -> AsyncPipeRead { + pub fn into_async(self) -> io::Result<AsyncPipeRead> { let owned: std::os::windows::io::OwnedHandle = self.file.into(); let stdout = std::process::ChildStdout::from(owned); - AsyncPipeRead { - read: tokio::process::ChildStdout::from_std(stdout).unwrap(), - } + Ok(AsyncPipeRead { + read: tokio::process::ChildStdout::from_std(stdout)?, + }) } + + /// Converts this sync reader into an async reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_async(self) -> AsyncPipeRead { - AsyncPipeRead { - read: tokio::net::unix::pipe::Receiver::from_file(self.file).unwrap(), - } + pub fn into_async(self) -> io::Result<AsyncPipeRead> { + Ok(AsyncPipeRead { + read: tokio::net::unix::pipe::Receiver::from_file(self.file)?, + }) + } + + /// Creates a new [`PipeRead`] instance that shares the same underlying file handle + /// as the existing [`PipeRead`] instance. + pub fn try_clone(&self) -> io::Result<Self> { + Ok(Self { + file: self.file.try_clone()?, + }) } } impl AsyncPipeRead { + /// Converts this async reader into an sync reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_sync(self) -> PipeRead { - let owned = self.read.into_owned_handle().unwrap(); - PipeRead { file: owned.into() } + pub fn into_sync(self) -> io::Result<PipeRead> { + let owned = self.read.into_owned_handle()?; + Ok(PipeRead { file: owned.into() }) } + + /// Converts this async reader into an sync reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_sync(self) -> PipeRead { - let file = self.read.into_nonblocking_fd().unwrap().into(); - PipeRead { file } + pub fn into_sync(self) -> io::Result<PipeRead> { + let file = self.read.into_nonblocking_fd()?.into(); + Ok(PipeRead { file }) } } @@ -88,32 +107,50 @@ impl tokio::io::AsyncRead for AsyncPipeRead { } impl PipeWrite { + /// Converts this sync writer into an async writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_async(self) -> AsyncPipeWrite { + pub fn into_async(self) -> io::Result<AsyncPipeWrite> { let owned: std::os::windows::io::OwnedHandle = self.file.into(); let stdin = std::process::ChildStdin::from(owned); - AsyncPipeWrite { - write: tokio::process::ChildStdin::from_std(stdin).unwrap(), - } + Ok(AsyncPipeWrite { + write: tokio::process::ChildStdin::from_std(stdin)?, + }) } + + /// Converts this sync writer into an async writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_async(self) -> AsyncPipeWrite { - AsyncPipeWrite { - write: tokio::net::unix::pipe::Sender::from_file(self.file).unwrap(), - } + pub fn into_async(self) -> io::Result<AsyncPipeWrite> { + Ok(AsyncPipeWrite { + write: tokio::net::unix::pipe::Sender::from_file(self.file)?, + }) + } + + /// Creates a new [`PipeWrite`] instance that shares the same underlying file handle + /// as the existing [`PipeWrite`] instance. + pub fn try_clone(&self) -> io::Result<Self> { + Ok(Self { + file: self.file.try_clone()?, + }) } } impl AsyncPipeWrite { + /// Converts this async writer into an sync writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_sync(self) -> PipeWrite { - let owned = self.write.into_owned_handle().unwrap(); - PipeWrite { file: owned.into() } + pub fn into_sync(self) -> io::Result<PipeWrite> { + let owned = self.write.into_owned_handle()?; + Ok(PipeWrite { file: owned.into() }) } + + /// Converts this async writer into an sync writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_sync(self) -> PipeWrite { - let file = self.write.into_nonblocking_fd().unwrap().into(); - PipeWrite { file } + pub fn into_sync(self) -> io::Result<PipeWrite> { + let file = self.write.into_nonblocking_fd()?.into(); + Ok(PipeWrite { file }) } } @@ -172,6 +209,58 @@ impl tokio::io::AsyncWrite for AsyncPipeWrite { } } +impl From<PipeRead> for Stdio { + fn from(val: PipeRead) -> Self { + Stdio::from(val.file) + } +} + +impl From<PipeWrite> for Stdio { + fn from(val: PipeWrite) -> Self { + Stdio::from(val.file) + } +} + +impl From<PipeRead> for std::fs::File { + fn from(val: PipeRead) -> Self { + val.file + } +} + +impl From<PipeWrite> for std::fs::File { + fn from(val: PipeWrite) -> Self { + val.file + } +} + +#[cfg(not(windows))] +impl From<PipeRead> for std::os::unix::io::OwnedFd { + fn from(val: PipeRead) -> Self { + val.file.into() + } +} + +#[cfg(not(windows))] +impl From<PipeWrite> for std::os::unix::io::OwnedFd { + fn from(val: PipeWrite) -> Self { + val.file.into() + } +} + +#[cfg(windows)] +impl From<PipeRead> for std::os::windows::io::OwnedHandle { + fn from(val: PipeRead) -> Self { + val.file.into() + } +} + +#[cfg(windows)] +impl From<PipeWrite> for std::os::windows::io::OwnedHandle { + fn from(val: PipeWrite) -> Self { + val.file.into() + } +} + /// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles, /// but either side may be promoted to an async-capable reader/writer. /// @@ -228,8 +317,8 @@ mod tests { #[tokio::test] async fn test_async_pipe() { let (read, write) = pipe().unwrap(); - let mut read = read.into_async(); - let mut write = write.into_async(); + let mut read = read.into_async().unwrap(); + let mut write = write.into_async().unwrap(); write.write_all(b"hello").await.unwrap(); let mut buf: [u8; 5] = Default::default(); @@ -248,8 +337,8 @@ mod tests { read.read_exact(&mut buf).unwrap(); assert_eq!(&buf, b"hello"); - let mut read = read.into_async(); - let mut write = write.into_async(); + let mut read = read.into_async().unwrap(); + let mut write = write.into_async().unwrap(); // Async write.write_all(b"hello").await.unwrap(); @@ -257,8 +346,8 @@ mod tests { read.read_exact(&mut buf).await.unwrap(); assert_eq!(&buf, b"hello"); - let mut read = read.into_sync(); - let mut write = write.into_sync(); + let mut read = read.into_sync().unwrap(); + let mut write = write.into_sync().unwrap(); // Sync write.write_all(b"hello").unwrap(); @@ -270,8 +359,8 @@ mod tests { #[tokio::test] async fn test_async_pipe_is_nonblocking() { let (read, write) = pipe().unwrap(); - let mut read = read.into_async(); - let mut write = write.into_async(); + let mut read = read.into_async().unwrap(); + let mut write = write.into_async().unwrap(); let a = tokio::spawn(async move { let mut buf: [u8; 5] = Default::default(); |