summaryrefslogtreecommitdiff
path: root/ext/io/pipe.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2024-02-21 18:00:57 -0700
committerGitHub <noreply@github.com>2024-02-22 01:00:57 +0000
commit27579f6fcb51661524ded70145c7f2dd67000bc2 (patch)
treedeb0eea0e470ebf698a36b5eb5103328de3c2d2b /ext/io/pipe.rs
parentd29fb911f6eee81d1ae40e9c34166f0e64d69744 (diff)
chore(io): Add a cross-platform unidirectional pipe implementation (#22522)
Currently useful for `deno test` and internal tests, but could potentially be exposed at a later time as a `Deno` API.
Diffstat (limited to 'ext/io/pipe.rs')
-rw-r--r--ext/io/pipe.rs288
1 files changed, 288 insertions, 0 deletions
diff --git a/ext/io/pipe.rs b/ext/io/pipe.rs
new file mode 100644
index 000000000..0cad7b1f6
--- /dev/null
+++ b/ext/io/pipe.rs
@@ -0,0 +1,288 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+use std::io;
+use std::pin::Pin;
+
+// The synchronous read end of a unidirectional pipe.
+pub struct PipeRead {
+ file: std::fs::File,
+}
+
+// The asynchronous read end of a unidirectional pipe.
+pub struct AsyncPipeRead {
+ #[cfg(windows)]
+ /// We use a `ChildStdout` here as it's a much better fit for a Windows named pipe on Windows. We
+ /// might also be able to use `tokio::net::windows::named_pipe::NamedPipeClient` in the future
+ /// if those can be created from raw handles down the road.
+ read: tokio::process::ChildStdout,
+ #[cfg(not(windows))]
+ read: tokio::net::unix::pipe::Receiver,
+}
+
+// The synchronous write end of a unidirectional pipe.
+pub struct PipeWrite {
+ file: std::fs::File,
+}
+
+// The asynchronous write end of a unidirectional pipe.
+pub struct AsyncPipeWrite {
+ #[cfg(windows)]
+ /// We use a `ChildStdin` here as it's a much better fit for a Windows named pipe on Windows. We
+ /// might also be able to use `tokio::net::windows::named_pipe::NamedPipeClient` in the future
+ /// if those can be created from raw handles down the road.
+ write: tokio::process::ChildStdin,
+ #[cfg(not(windows))]
+ write: tokio::net::unix::pipe::Sender,
+}
+
+impl PipeRead {
+ #[cfg(windows)]
+ pub fn into_async(self) -> AsyncPipeRead {
+ let owned: std::os::windows::io::OwnedHandle = self.file.into();
+ let stdout = std::process::ChildStdout::from(owned);
+ AsyncPipeRead {
+ read: tokio::process::ChildStdout::from_std(stdout).unwrap(),
+ }
+ }
+ #[cfg(not(windows))]
+ pub fn into_async(self) -> AsyncPipeRead {
+ AsyncPipeRead {
+ read: tokio::net::unix::pipe::Receiver::from_file(self.file).unwrap(),
+ }
+ }
+}
+
+impl AsyncPipeRead {
+ #[cfg(windows)]
+ pub fn into_sync(self) -> PipeRead {
+ let owned = self.read.into_owned_handle().unwrap();
+ PipeRead { file: owned.into() }
+ }
+ #[cfg(not(windows))]
+ pub fn into_sync(self) -> PipeRead {
+ let file = self.read.into_nonblocking_fd().unwrap().into();
+ PipeRead { file }
+ }
+}
+
+impl std::io::Read for PipeRead {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.file.read(buf)
+ }
+
+ fn read_vectored(
+ &mut self,
+ bufs: &mut [io::IoSliceMut<'_>],
+ ) -> io::Result<usize> {
+ self.file.read_vectored(bufs)
+ }
+}
+
+impl tokio::io::AsyncRead for AsyncPipeRead {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> std::task::Poll<io::Result<()>> {
+ Pin::new(&mut self.get_mut().read).poll_read(cx, buf)
+ }
+}
+
+impl PipeWrite {
+ #[cfg(windows)]
+ pub fn into_async(self) -> AsyncPipeWrite {
+ let owned: std::os::windows::io::OwnedHandle = self.file.into();
+ let stdin = std::process::ChildStdin::from(owned);
+ AsyncPipeWrite {
+ write: tokio::process::ChildStdin::from_std(stdin).unwrap(),
+ }
+ }
+ #[cfg(not(windows))]
+ pub fn into_async(self) -> AsyncPipeWrite {
+ AsyncPipeWrite {
+ write: tokio::net::unix::pipe::Sender::from_file(self.file).unwrap(),
+ }
+ }
+}
+
+impl AsyncPipeWrite {
+ #[cfg(windows)]
+ pub fn into_sync(self) -> PipeWrite {
+ let owned = self.write.into_owned_handle().unwrap();
+ PipeWrite { file: owned.into() }
+ }
+ #[cfg(not(windows))]
+ pub fn into_sync(self) -> PipeWrite {
+ let file = self.write.into_nonblocking_fd().unwrap().into();
+ PipeWrite { file }
+ }
+}
+
+impl std::io::Write for PipeWrite {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.file.write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.file.flush()
+ }
+
+ fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.file.write_vectored(bufs)
+ }
+}
+
+impl tokio::io::AsyncWrite for AsyncPipeWrite {
+ #[inline(always)]
+ fn poll_write(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, io::Error>> {
+ Pin::new(&mut self.get_mut().write).poll_write(cx, buf)
+ }
+
+ #[inline(always)]
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), io::Error>> {
+ Pin::new(&mut self.get_mut().write).poll_flush(cx)
+ }
+
+ #[inline(always)]
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), io::Error>> {
+ Pin::new(&mut self.get_mut().write).poll_shutdown(cx)
+ }
+
+ #[inline(always)]
+ fn is_write_vectored(&self) -> bool {
+ self.write.is_write_vectored()
+ }
+
+ #[inline(always)]
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> std::task::Poll<Result<usize, io::Error>> {
+ Pin::new(&mut self.get_mut().write).poll_write_vectored(cx, bufs)
+ }
+}
+
+/// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles,
+/// but either side may be promoted to an async-capable reader/writer.
+///
+/// On Windows, we use a named pipe because that's the only way to get reliable async I/O
+/// support. On Unix platforms, we use the `os_pipe` library, which uses `pipe2` under the hood
+/// (or `pipe` on OSX).
+pub fn pipe() -> io::Result<(PipeRead, PipeWrite)> {
+ pipe_impl()
+}
+
+/// Creates a unidirectional pipe on top of a named pipe (which is technically bidirectional).
+#[cfg(windows)]
+pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> {
+ // SAFETY: We're careful with handles here
+ unsafe {
+ use std::os::windows::io::FromRawHandle;
+ use std::os::windows::io::OwnedHandle;
+ let (server, client) = crate::winpipe::create_named_pipe()?;
+ let read = std::fs::File::from(OwnedHandle::from_raw_handle(client));
+ let write = std::fs::File::from(OwnedHandle::from_raw_handle(server));
+ Ok((PipeRead { file: read }, PipeWrite { file: write }))
+ }
+}
+
+/// Creates a unidirectional pipe for unix platforms.
+#[cfg(not(windows))]
+pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> {
+ use std::os::unix::io::OwnedFd;
+ let (read, write) = os_pipe::pipe()?;
+ let read = std::fs::File::from(Into::<OwnedFd>::into(read));
+ let write = std::fs::File::from(Into::<OwnedFd>::into(write));
+ Ok((PipeRead { file: read }, PipeWrite { file: write }))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use std::io::Read;
+ use std::io::Write;
+ use tokio::io::AsyncReadExt;
+ use tokio::io::AsyncWriteExt;
+
+ #[test]
+ fn test_pipe() {
+ let (mut read, mut write) = pipe().unwrap();
+ // Write to the server and read from the client
+ write.write_all(b"hello").unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).unwrap();
+ assert_eq!(&buf, b"hello");
+ }
+
+ #[tokio::test]
+ async fn test_async_pipe() {
+ let (read, write) = pipe().unwrap();
+ let mut read = read.into_async();
+ let mut write = write.into_async();
+
+ write.write_all(b"hello").await.unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).await.unwrap();
+ assert_eq!(&buf, b"hello");
+ }
+
+ /// Test a round-trip through async mode and back.
+ #[tokio::test]
+ async fn test_pipe_transmute() {
+ let (mut read, mut write) = pipe().unwrap();
+
+ // Sync
+ write.write_all(b"hello").unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).unwrap();
+ assert_eq!(&buf, b"hello");
+
+ let mut read = read.into_async();
+ let mut write = write.into_async();
+
+ // Async
+ write.write_all(b"hello").await.unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).await.unwrap();
+ assert_eq!(&buf, b"hello");
+
+ let mut read = read.into_sync();
+ let mut write = write.into_sync();
+
+ // Sync
+ write.write_all(b"hello").unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).unwrap();
+ assert_eq!(&buf, b"hello");
+ }
+
+ #[tokio::test]
+ async fn test_async_pipe_is_nonblocking() {
+ let (read, write) = pipe().unwrap();
+ let mut read = read.into_async();
+ let mut write = write.into_async();
+
+ let a = tokio::spawn(async move {
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).await.unwrap();
+ assert_eq!(&buf, b"hello");
+ });
+ let b = tokio::spawn(async move {
+ write.write_all(b"hello").await.unwrap();
+ });
+
+ a.await.unwrap();
+ b.await.unwrap();
+ }
+}