summaryrefslogtreecommitdiff
path: root/ext/io
diff options
context:
space:
mode:
Diffstat (limited to 'ext/io')
-rw-r--r--ext/io/lib.rs55
-rw-r--r--ext/io/pipe.rs161
2 files changed, 162 insertions, 54 deletions
diff --git a/ext/io/lib.rs b/ext/io/lib.rs
index e0d649e0a..8d80eec25 100644
--- a/ext/io/lib.rs
+++ b/ext/io/lib.rs
@@ -110,36 +110,36 @@ deno_core::extension!(deno_io,
let t = &mut state.resource_table;
let rid = t.add(fs::FileResource::new(
- Rc::new(match stdio.stdin {
- StdioPipe::Inherit => StdFileResourceInner::new(
+ Rc::new(match stdio.stdin.pipe {
+ StdioPipeInner::Inherit => StdFileResourceInner::new(
StdFileResourceKind::Stdin,
STDIN_HANDLE.try_clone().unwrap(),
),
- StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
+ StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe),
}),
"stdin".to_string(),
));
assert_eq!(rid, 0, "stdin must have ResourceId 0");
let rid = t.add(FileResource::new(
- Rc::new(match stdio.stdout {
- StdioPipe::Inherit => StdFileResourceInner::new(
+ Rc::new(match stdio.stdout.pipe {
+ StdioPipeInner::Inherit => StdFileResourceInner::new(
StdFileResourceKind::Stdout,
STDOUT_HANDLE.try_clone().unwrap(),
),
- StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
+ StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe),
}),
"stdout".to_string(),
));
assert_eq!(rid, 1, "stdout must have ResourceId 1");
let rid = t.add(FileResource::new(
- Rc::new(match stdio.stderr {
- StdioPipe::Inherit => StdFileResourceInner::new(
+ Rc::new(match stdio.stderr.pipe {
+ StdioPipeInner::Inherit => StdFileResourceInner::new(
StdFileResourceKind::Stderr,
STDERR_HANDLE.try_clone().unwrap(),
),
- StdioPipe::File(pipe) => StdFileResourceInner::file(pipe),
+ StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe),
}),
"stderr".to_string(),
));
@@ -148,22 +148,41 @@ deno_core::extension!(deno_io,
},
);
-pub enum StdioPipe {
- Inherit,
- File(StdFile),
+#[derive(Default)]
+pub struct StdioPipe {
+ pipe: StdioPipeInner,
}
-impl Default for StdioPipe {
- fn default() -> Self {
- Self::Inherit
+impl StdioPipe {
+ pub const fn inherit() -> Self {
+ StdioPipe {
+ pipe: StdioPipeInner::Inherit,
+ }
+ }
+
+ pub fn file(f: impl Into<StdFile>) -> Self {
+ StdioPipe {
+ pipe: StdioPipeInner::File(f.into()),
+ }
}
}
+#[derive(Default)]
+enum StdioPipeInner {
+ #[default]
+ Inherit,
+ File(StdFile),
+}
+
impl Clone for StdioPipe {
fn clone(&self) -> Self {
- match self {
- StdioPipe::Inherit => StdioPipe::Inherit,
- StdioPipe::File(pipe) => StdioPipe::File(pipe.try_clone().unwrap()),
+ match &self.pipe {
+ StdioPipeInner::Inherit => Self {
+ pipe: StdioPipeInner::Inherit,
+ },
+ StdioPipeInner::File(pipe) => Self {
+ pipe: StdioPipeInner::File(pipe.try_clone().unwrap()),
+ },
}
}
}
diff --git a/ext/io/pipe.rs b/ext/io/pipe.rs
index 0cad7b1f6..70788f752 100644
--- a/ext/io/pipe.rs
+++ b/ext/io/pipe.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use std::io;
use std::pin::Pin;
+use std::process::Stdio;
// The synchronous read end of a unidirectional pipe.
pub struct PipeRead {
@@ -35,32 +36,50 @@ pub struct AsyncPipeWrite {
}
impl PipeRead {
+ /// Converts this sync reader into an async reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_async(self) -> AsyncPipeRead {
+ pub fn into_async(self) -> io::Result<AsyncPipeRead> {
let owned: std::os::windows::io::OwnedHandle = self.file.into();
let stdout = std::process::ChildStdout::from(owned);
- AsyncPipeRead {
- read: tokio::process::ChildStdout::from_std(stdout).unwrap(),
- }
+ Ok(AsyncPipeRead {
+ read: tokio::process::ChildStdout::from_std(stdout)?,
+ })
}
+
+ /// Converts this sync reader into an async reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_async(self) -> AsyncPipeRead {
- AsyncPipeRead {
- read: tokio::net::unix::pipe::Receiver::from_file(self.file).unwrap(),
- }
+ pub fn into_async(self) -> io::Result<AsyncPipeRead> {
+ Ok(AsyncPipeRead {
+ read: tokio::net::unix::pipe::Receiver::from_file(self.file)?,
+ })
+ }
+
+ /// Creates a new [`PipeRead`] instance that shares the same underlying file handle
+ /// as the existing [`PipeRead`] instance.
+ pub fn try_clone(&self) -> io::Result<Self> {
+ Ok(Self {
+ file: self.file.try_clone()?,
+ })
}
}
impl AsyncPipeRead {
+ /// Converts this async reader into an sync reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_sync(self) -> PipeRead {
- let owned = self.read.into_owned_handle().unwrap();
- PipeRead { file: owned.into() }
+ pub fn into_sync(self) -> io::Result<PipeRead> {
+ let owned = self.read.into_owned_handle()?;
+ Ok(PipeRead { file: owned.into() })
}
+
+ /// Converts this async reader into an sync reader. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_sync(self) -> PipeRead {
- let file = self.read.into_nonblocking_fd().unwrap().into();
- PipeRead { file }
+ pub fn into_sync(self) -> io::Result<PipeRead> {
+ let file = self.read.into_nonblocking_fd()?.into();
+ Ok(PipeRead { file })
}
}
@@ -88,32 +107,50 @@ impl tokio::io::AsyncRead for AsyncPipeRead {
}
impl PipeWrite {
+ /// Converts this sync writer into an async writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_async(self) -> AsyncPipeWrite {
+ pub fn into_async(self) -> io::Result<AsyncPipeWrite> {
let owned: std::os::windows::io::OwnedHandle = self.file.into();
let stdin = std::process::ChildStdin::from(owned);
- AsyncPipeWrite {
- write: tokio::process::ChildStdin::from_std(stdin).unwrap(),
- }
+ Ok(AsyncPipeWrite {
+ write: tokio::process::ChildStdin::from_std(stdin)?,
+ })
}
+
+ /// Converts this sync writer into an async writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_async(self) -> AsyncPipeWrite {
- AsyncPipeWrite {
- write: tokio::net::unix::pipe::Sender::from_file(self.file).unwrap(),
- }
+ pub fn into_async(self) -> io::Result<AsyncPipeWrite> {
+ Ok(AsyncPipeWrite {
+ write: tokio::net::unix::pipe::Sender::from_file(self.file)?,
+ })
+ }
+
+ /// Creates a new [`PipeWrite`] instance that shares the same underlying file handle
+ /// as the existing [`PipeWrite`] instance.
+ pub fn try_clone(&self) -> io::Result<Self> {
+ Ok(Self {
+ file: self.file.try_clone()?,
+ })
}
}
impl AsyncPipeWrite {
+ /// Converts this async writer into an sync writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(windows)]
- pub fn into_sync(self) -> PipeWrite {
- let owned = self.write.into_owned_handle().unwrap();
- PipeWrite { file: owned.into() }
+ pub fn into_sync(self) -> io::Result<PipeWrite> {
+ let owned = self.write.into_owned_handle()?;
+ Ok(PipeWrite { file: owned.into() })
}
+
+ /// Converts this async writer into an sync writer. May fail if the Tokio runtime is
+ /// unavailable.
#[cfg(not(windows))]
- pub fn into_sync(self) -> PipeWrite {
- let file = self.write.into_nonblocking_fd().unwrap().into();
- PipeWrite { file }
+ pub fn into_sync(self) -> io::Result<PipeWrite> {
+ let file = self.write.into_nonblocking_fd()?.into();
+ Ok(PipeWrite { file })
}
}
@@ -172,6 +209,58 @@ impl tokio::io::AsyncWrite for AsyncPipeWrite {
}
}
+impl From<PipeRead> for Stdio {
+ fn from(val: PipeRead) -> Self {
+ Stdio::from(val.file)
+ }
+}
+
+impl From<PipeWrite> for Stdio {
+ fn from(val: PipeWrite) -> Self {
+ Stdio::from(val.file)
+ }
+}
+
+impl From<PipeRead> for std::fs::File {
+ fn from(val: PipeRead) -> Self {
+ val.file
+ }
+}
+
+impl From<PipeWrite> for std::fs::File {
+ fn from(val: PipeWrite) -> Self {
+ val.file
+ }
+}
+
+#[cfg(not(windows))]
+impl From<PipeRead> for std::os::unix::io::OwnedFd {
+ fn from(val: PipeRead) -> Self {
+ val.file.into()
+ }
+}
+
+#[cfg(not(windows))]
+impl From<PipeWrite> for std::os::unix::io::OwnedFd {
+ fn from(val: PipeWrite) -> Self {
+ val.file.into()
+ }
+}
+
+#[cfg(windows)]
+impl From<PipeRead> for std::os::windows::io::OwnedHandle {
+ fn from(val: PipeRead) -> Self {
+ val.file.into()
+ }
+}
+
+#[cfg(windows)]
+impl From<PipeWrite> for std::os::windows::io::OwnedHandle {
+ fn from(val: PipeWrite) -> Self {
+ val.file.into()
+ }
+}
+
/// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles,
/// but either side may be promoted to an async-capable reader/writer.
///
@@ -228,8 +317,8 @@ mod tests {
#[tokio::test]
async fn test_async_pipe() {
let (read, write) = pipe().unwrap();
- let mut read = read.into_async();
- let mut write = write.into_async();
+ let mut read = read.into_async().unwrap();
+ let mut write = write.into_async().unwrap();
write.write_all(b"hello").await.unwrap();
let mut buf: [u8; 5] = Default::default();
@@ -248,8 +337,8 @@ mod tests {
read.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"hello");
- let mut read = read.into_async();
- let mut write = write.into_async();
+ let mut read = read.into_async().unwrap();
+ let mut write = write.into_async().unwrap();
// Async
write.write_all(b"hello").await.unwrap();
@@ -257,8 +346,8 @@ mod tests {
read.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
- let mut read = read.into_sync();
- let mut write = write.into_sync();
+ let mut read = read.into_sync().unwrap();
+ let mut write = write.into_sync().unwrap();
// Sync
write.write_all(b"hello").unwrap();
@@ -270,8 +359,8 @@ mod tests {
#[tokio::test]
async fn test_async_pipe_is_nonblocking() {
let (read, write) = pipe().unwrap();
- let mut read = read.into_async();
- let mut write = write.into_async();
+ let mut read = read.into_async().unwrap();
+ let mut write = write.into_async().unwrap();
let a = tokio::spawn(async move {
let mut buf: [u8; 5] = Default::default();