diff options
Diffstat (limited to 'ext/io/pipe.rs')
-rw-r--r-- | ext/io/pipe.rs | 161 |
1 files changed, 125 insertions, 36 deletions
diff --git a/ext/io/pipe.rs b/ext/io/pipe.rs index 0cad7b1f6..70788f752 100644 --- a/ext/io/pipe.rs +++ b/ext/io/pipe.rs @@ -1,6 +1,7 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use std::io; use std::pin::Pin; +use std::process::Stdio; // The synchronous read end of a unidirectional pipe. pub struct PipeRead { @@ -35,32 +36,50 @@ pub struct AsyncPipeWrite { } impl PipeRead { + /// Converts this sync reader into an async reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_async(self) -> AsyncPipeRead { + pub fn into_async(self) -> io::Result<AsyncPipeRead> { let owned: std::os::windows::io::OwnedHandle = self.file.into(); let stdout = std::process::ChildStdout::from(owned); - AsyncPipeRead { - read: tokio::process::ChildStdout::from_std(stdout).unwrap(), - } + Ok(AsyncPipeRead { + read: tokio::process::ChildStdout::from_std(stdout)?, + }) } + + /// Converts this sync reader into an async reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_async(self) -> AsyncPipeRead { - AsyncPipeRead { - read: tokio::net::unix::pipe::Receiver::from_file(self.file).unwrap(), - } + pub fn into_async(self) -> io::Result<AsyncPipeRead> { + Ok(AsyncPipeRead { + read: tokio::net::unix::pipe::Receiver::from_file(self.file)?, + }) + } + + /// Creates a new [`PipeRead`] instance that shares the same underlying file handle + /// as the existing [`PipeRead`] instance. + pub fn try_clone(&self) -> io::Result<Self> { + Ok(Self { + file: self.file.try_clone()?, + }) } } impl AsyncPipeRead { + /// Converts this async reader into an sync reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_sync(self) -> PipeRead { - let owned = self.read.into_owned_handle().unwrap(); - PipeRead { file: owned.into() } + pub fn into_sync(self) -> io::Result<PipeRead> { + let owned = self.read.into_owned_handle()?; + Ok(PipeRead { file: owned.into() }) } + + /// Converts this async reader into an sync reader. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_sync(self) -> PipeRead { - let file = self.read.into_nonblocking_fd().unwrap().into(); - PipeRead { file } + pub fn into_sync(self) -> io::Result<PipeRead> { + let file = self.read.into_nonblocking_fd()?.into(); + Ok(PipeRead { file }) } } @@ -88,32 +107,50 @@ impl tokio::io::AsyncRead for AsyncPipeRead { } impl PipeWrite { + /// Converts this sync writer into an async writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_async(self) -> AsyncPipeWrite { + pub fn into_async(self) -> io::Result<AsyncPipeWrite> { let owned: std::os::windows::io::OwnedHandle = self.file.into(); let stdin = std::process::ChildStdin::from(owned); - AsyncPipeWrite { - write: tokio::process::ChildStdin::from_std(stdin).unwrap(), - } + Ok(AsyncPipeWrite { + write: tokio::process::ChildStdin::from_std(stdin)?, + }) } + + /// Converts this sync writer into an async writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_async(self) -> AsyncPipeWrite { - AsyncPipeWrite { - write: tokio::net::unix::pipe::Sender::from_file(self.file).unwrap(), - } + pub fn into_async(self) -> io::Result<AsyncPipeWrite> { + Ok(AsyncPipeWrite { + write: tokio::net::unix::pipe::Sender::from_file(self.file)?, + }) + } + + /// Creates a new [`PipeWrite`] instance that shares the same underlying file handle + /// as the existing [`PipeWrite`] instance. + pub fn try_clone(&self) -> io::Result<Self> { + Ok(Self { + file: self.file.try_clone()?, + }) } } impl AsyncPipeWrite { + /// Converts this async writer into an sync writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(windows)] - pub fn into_sync(self) -> PipeWrite { - let owned = self.write.into_owned_handle().unwrap(); - PipeWrite { file: owned.into() } + pub fn into_sync(self) -> io::Result<PipeWrite> { + let owned = self.write.into_owned_handle()?; + Ok(PipeWrite { file: owned.into() }) } + + /// Converts this async writer into an sync writer. May fail if the Tokio runtime is + /// unavailable. #[cfg(not(windows))] - pub fn into_sync(self) -> PipeWrite { - let file = self.write.into_nonblocking_fd().unwrap().into(); - PipeWrite { file } + pub fn into_sync(self) -> io::Result<PipeWrite> { + let file = self.write.into_nonblocking_fd()?.into(); + Ok(PipeWrite { file }) } } @@ -172,6 +209,58 @@ impl tokio::io::AsyncWrite for AsyncPipeWrite { } } +impl From<PipeRead> for Stdio { + fn from(val: PipeRead) -> Self { + Stdio::from(val.file) + } +} + +impl From<PipeWrite> for Stdio { + fn from(val: PipeWrite) -> Self { + Stdio::from(val.file) + } +} + +impl From<PipeRead> for std::fs::File { + fn from(val: PipeRead) -> Self { + val.file + } +} + +impl From<PipeWrite> for std::fs::File { + fn from(val: PipeWrite) -> Self { + val.file + } +} + +#[cfg(not(windows))] +impl From<PipeRead> for std::os::unix::io::OwnedFd { + fn from(val: PipeRead) -> Self { + val.file.into() + } +} + +#[cfg(not(windows))] +impl From<PipeWrite> for std::os::unix::io::OwnedFd { + fn from(val: PipeWrite) -> Self { + val.file.into() + } +} + +#[cfg(windows)] +impl From<PipeRead> for std::os::windows::io::OwnedHandle { + fn from(val: PipeRead) -> Self { + val.file.into() + } +} + +#[cfg(windows)] +impl From<PipeWrite> for std::os::windows::io::OwnedHandle { + fn from(val: PipeWrite) -> Self { + val.file.into() + } +} + /// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles, /// but either side may be promoted to an async-capable reader/writer. /// @@ -228,8 +317,8 @@ mod tests { #[tokio::test] async fn test_async_pipe() { let (read, write) = pipe().unwrap(); - let mut read = read.into_async(); - let mut write = write.into_async(); + let mut read = read.into_async().unwrap(); + let mut write = write.into_async().unwrap(); write.write_all(b"hello").await.unwrap(); let mut buf: [u8; 5] = Default::default(); @@ -248,8 +337,8 @@ mod tests { read.read_exact(&mut buf).unwrap(); assert_eq!(&buf, b"hello"); - let mut read = read.into_async(); - let mut write = write.into_async(); + let mut read = read.into_async().unwrap(); + let mut write = write.into_async().unwrap(); // Async write.write_all(b"hello").await.unwrap(); @@ -257,8 +346,8 @@ mod tests { read.read_exact(&mut buf).await.unwrap(); assert_eq!(&buf, b"hello"); - let mut read = read.into_sync(); - let mut write = write.into_sync(); + let mut read = read.into_sync().unwrap(); + let mut write = write.into_sync().unwrap(); // Sync write.write_all(b"hello").unwrap(); @@ -270,8 +359,8 @@ mod tests { #[tokio::test] async fn test_async_pipe_is_nonblocking() { let (read, write) = pipe().unwrap(); - let mut read = read.into_async(); - let mut write = write.into_async(); + let mut read = read.into_async().unwrap(); + let mut write = write.into_async().unwrap(); let a = tokio::spawn(async move { let mut buf: [u8; 5] = Default::default(); |