summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock12
-rw-r--r--Cargo.toml4
-rw-r--r--ext/io/Cargo.toml4
-rw-r--r--ext/io/lib.rs9
-rw-r--r--ext/io/pipe.rs288
-rw-r--r--ext/io/winpipe.rs115
6 files changed, 425 insertions, 7 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 6a2eb0804..fa6866eaa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1497,6 +1497,8 @@ dependencies = [
"filetime",
"fs3",
"once_cell",
+ "os_pipe",
+ "rand",
"tokio",
"winapi",
]
@@ -4253,12 +4255,12 @@ dependencies = [
[[package]]
name = "os_pipe"
-version = "1.1.4"
+version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ae859aa07428ca9a929b936690f8b12dc5f11dd8c6992a18ca93919f28bc177"
+checksum = "57119c3b893986491ec9aa85056780d3a0f3cf4da7cc09dd3650dbd6c6738fb9"
dependencies = [
"libc",
- "windows-sys 0.48.0",
+ "windows-sys 0.52.0",
]
[[package]]
@@ -6526,9 +6528,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
-version = "1.34.0"
+version = "1.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9"
+checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
dependencies = [
"backtrace",
"bytes",
diff --git a/Cargo.toml b/Cargo.toml
index 7f158a776..b30f1bcdd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -129,7 +129,7 @@ monch = "=0.5.0"
notify = "=5.0.0"
num-bigint = { version = "0.4", features = ["rand"] }
once_cell = "1.17.1"
-os_pipe = "=1.1.4"
+os_pipe = { version = "=1.1.5", features = ["io_safety"] }
p224 = { version = "0.13.0", features = ["ecdh"] }
p256 = { version = "0.13.2", features = ["ecdh"] }
p384 = { version = "0.13.0", features = ["ecdh"] }
@@ -165,7 +165,7 @@ tar = "=0.4.40"
tempfile = "3.4.0"
termcolor = "1.1.3"
thiserror = "1.0.40"
-tokio = { version = "1.28.1", features = ["full"] }
+tokio = { version = "1.36.0", features = ["full"] }
tokio-metrics = { version = "0.3.0", features = ["rt"] }
tokio-util = "0.7.4"
tower-lsp = { version = "=0.20.0", features = ["proposed"] }
diff --git a/ext/io/Cargo.toml b/ext/io/Cargo.toml
index d87ba701b..5e664549a 100644
--- a/ext/io/Cargo.toml
+++ b/ext/io/Cargo.toml
@@ -21,5 +21,9 @@ fs3.workspace = true
once_cell.workspace = true
tokio.workspace = true
+[target.'cfg(not(windows))'.dependencies]
+os_pipe.workspace = true
+
[target.'cfg(windows)'.dependencies]
winapi = { workspace = true, features = ["winbase", "processenv"] }
+rand.workspace = true
diff --git a/ext/io/lib.rs b/ext/io/lib.rs
index c85696f64..e0d649e0a 100644
--- a/ext/io/lib.rs
+++ b/ext/io/lib.rs
@@ -50,6 +50,15 @@ use winapi::um::processenv::GetStdHandle;
use winapi::um::winbase;
pub mod fs;
+mod pipe;
+#[cfg(windows)]
+mod winpipe;
+
+pub use pipe::pipe;
+pub use pipe::AsyncPipeRead;
+pub use pipe::AsyncPipeWrite;
+pub use pipe::PipeRead;
+pub use pipe::PipeWrite;
// Store the stdio fd/handles in global statics in order to keep them
// alive for the duration of the application since the last handle/fd
diff --git a/ext/io/pipe.rs b/ext/io/pipe.rs
new file mode 100644
index 000000000..0cad7b1f6
--- /dev/null
+++ b/ext/io/pipe.rs
@@ -0,0 +1,288 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+use std::io;
+use std::pin::Pin;
+
+// The synchronous read end of a unidirectional pipe.
+pub struct PipeRead {
+ file: std::fs::File,
+}
+
+// The asynchronous read end of a unidirectional pipe.
+pub struct AsyncPipeRead {
+ #[cfg(windows)]
+ /// We use a `ChildStdout` here as it's a much better fit for a Windows named pipe on Windows. We
+ /// might also be able to use `tokio::net::windows::named_pipe::NamedPipeClient` in the future
+ /// if those can be created from raw handles down the road.
+ read: tokio::process::ChildStdout,
+ #[cfg(not(windows))]
+ read: tokio::net::unix::pipe::Receiver,
+}
+
+// The synchronous write end of a unidirectional pipe.
+pub struct PipeWrite {
+ file: std::fs::File,
+}
+
+// The asynchronous write end of a unidirectional pipe.
+pub struct AsyncPipeWrite {
+ #[cfg(windows)]
+ /// We use a `ChildStdin` here as it's a much better fit for a Windows named pipe on Windows. We
+ /// might also be able to use `tokio::net::windows::named_pipe::NamedPipeClient` in the future
+ /// if those can be created from raw handles down the road.
+ write: tokio::process::ChildStdin,
+ #[cfg(not(windows))]
+ write: tokio::net::unix::pipe::Sender,
+}
+
+impl PipeRead {
+ #[cfg(windows)]
+ pub fn into_async(self) -> AsyncPipeRead {
+ let owned: std::os::windows::io::OwnedHandle = self.file.into();
+ let stdout = std::process::ChildStdout::from(owned);
+ AsyncPipeRead {
+ read: tokio::process::ChildStdout::from_std(stdout).unwrap(),
+ }
+ }
+ #[cfg(not(windows))]
+ pub fn into_async(self) -> AsyncPipeRead {
+ AsyncPipeRead {
+ read: tokio::net::unix::pipe::Receiver::from_file(self.file).unwrap(),
+ }
+ }
+}
+
+impl AsyncPipeRead {
+ #[cfg(windows)]
+ pub fn into_sync(self) -> PipeRead {
+ let owned = self.read.into_owned_handle().unwrap();
+ PipeRead { file: owned.into() }
+ }
+ #[cfg(not(windows))]
+ pub fn into_sync(self) -> PipeRead {
+ let file = self.read.into_nonblocking_fd().unwrap().into();
+ PipeRead { file }
+ }
+}
+
+impl std::io::Read for PipeRead {
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+ self.file.read(buf)
+ }
+
+ fn read_vectored(
+ &mut self,
+ bufs: &mut [io::IoSliceMut<'_>],
+ ) -> io::Result<usize> {
+ self.file.read_vectored(bufs)
+ }
+}
+
+impl tokio::io::AsyncRead for AsyncPipeRead {
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> std::task::Poll<io::Result<()>> {
+ Pin::new(&mut self.get_mut().read).poll_read(cx, buf)
+ }
+}
+
+impl PipeWrite {
+ #[cfg(windows)]
+ pub fn into_async(self) -> AsyncPipeWrite {
+ let owned: std::os::windows::io::OwnedHandle = self.file.into();
+ let stdin = std::process::ChildStdin::from(owned);
+ AsyncPipeWrite {
+ write: tokio::process::ChildStdin::from_std(stdin).unwrap(),
+ }
+ }
+ #[cfg(not(windows))]
+ pub fn into_async(self) -> AsyncPipeWrite {
+ AsyncPipeWrite {
+ write: tokio::net::unix::pipe::Sender::from_file(self.file).unwrap(),
+ }
+ }
+}
+
+impl AsyncPipeWrite {
+ #[cfg(windows)]
+ pub fn into_sync(self) -> PipeWrite {
+ let owned = self.write.into_owned_handle().unwrap();
+ PipeWrite { file: owned.into() }
+ }
+ #[cfg(not(windows))]
+ pub fn into_sync(self) -> PipeWrite {
+ let file = self.write.into_nonblocking_fd().unwrap().into();
+ PipeWrite { file }
+ }
+}
+
+impl std::io::Write for PipeWrite {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.file.write(buf)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.file.flush()
+ }
+
+ fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
+ self.file.write_vectored(bufs)
+ }
+}
+
+impl tokio::io::AsyncWrite for AsyncPipeWrite {
+ #[inline(always)]
+ fn poll_write(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, io::Error>> {
+ Pin::new(&mut self.get_mut().write).poll_write(cx, buf)
+ }
+
+ #[inline(always)]
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), io::Error>> {
+ Pin::new(&mut self.get_mut().write).poll_flush(cx)
+ }
+
+ #[inline(always)]
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), io::Error>> {
+ Pin::new(&mut self.get_mut().write).poll_shutdown(cx)
+ }
+
+ #[inline(always)]
+ fn is_write_vectored(&self) -> bool {
+ self.write.is_write_vectored()
+ }
+
+ #[inline(always)]
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ bufs: &[io::IoSlice<'_>],
+ ) -> std::task::Poll<Result<usize, io::Error>> {
+ Pin::new(&mut self.get_mut().write).poll_write_vectored(cx, bufs)
+ }
+}
+
+/// Create a unidirectional pipe pair that starts off as a pair of synchronous file handles,
+/// but either side may be promoted to an async-capable reader/writer.
+///
+/// On Windows, we use a named pipe because that's the only way to get reliable async I/O
+/// support. On Unix platforms, we use the `os_pipe` library, which uses `pipe2` under the hood
+/// (or `pipe` on OSX).
+pub fn pipe() -> io::Result<(PipeRead, PipeWrite)> {
+ pipe_impl()
+}
+
+/// Creates a unidirectional pipe on top of a named pipe (which is technically bidirectional).
+#[cfg(windows)]
+pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> {
+ // SAFETY: We're careful with handles here
+ unsafe {
+ use std::os::windows::io::FromRawHandle;
+ use std::os::windows::io::OwnedHandle;
+ let (server, client) = crate::winpipe::create_named_pipe()?;
+ let read = std::fs::File::from(OwnedHandle::from_raw_handle(client));
+ let write = std::fs::File::from(OwnedHandle::from_raw_handle(server));
+ Ok((PipeRead { file: read }, PipeWrite { file: write }))
+ }
+}
+
+/// Creates a unidirectional pipe for unix platforms.
+#[cfg(not(windows))]
+pub fn pipe_impl() -> io::Result<(PipeRead, PipeWrite)> {
+ use std::os::unix::io::OwnedFd;
+ let (read, write) = os_pipe::pipe()?;
+ let read = std::fs::File::from(Into::<OwnedFd>::into(read));
+ let write = std::fs::File::from(Into::<OwnedFd>::into(write));
+ Ok((PipeRead { file: read }, PipeWrite { file: write }))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ use std::io::Read;
+ use std::io::Write;
+ use tokio::io::AsyncReadExt;
+ use tokio::io::AsyncWriteExt;
+
+ #[test]
+ fn test_pipe() {
+ let (mut read, mut write) = pipe().unwrap();
+ // Write to the server and read from the client
+ write.write_all(b"hello").unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).unwrap();
+ assert_eq!(&buf, b"hello");
+ }
+
+ #[tokio::test]
+ async fn test_async_pipe() {
+ let (read, write) = pipe().unwrap();
+ let mut read = read.into_async();
+ let mut write = write.into_async();
+
+ write.write_all(b"hello").await.unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).await.unwrap();
+ assert_eq!(&buf, b"hello");
+ }
+
+ /// Test a round-trip through async mode and back.
+ #[tokio::test]
+ async fn test_pipe_transmute() {
+ let (mut read, mut write) = pipe().unwrap();
+
+ // Sync
+ write.write_all(b"hello").unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).unwrap();
+ assert_eq!(&buf, b"hello");
+
+ let mut read = read.into_async();
+ let mut write = write.into_async();
+
+ // Async
+ write.write_all(b"hello").await.unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).await.unwrap();
+ assert_eq!(&buf, b"hello");
+
+ let mut read = read.into_sync();
+ let mut write = write.into_sync();
+
+ // Sync
+ write.write_all(b"hello").unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).unwrap();
+ assert_eq!(&buf, b"hello");
+ }
+
+ #[tokio::test]
+ async fn test_async_pipe_is_nonblocking() {
+ let (read, write) = pipe().unwrap();
+ let mut read = read.into_async();
+ let mut write = write.into_async();
+
+ let a = tokio::spawn(async move {
+ let mut buf: [u8; 5] = Default::default();
+ read.read_exact(&mut buf).await.unwrap();
+ assert_eq!(&buf, b"hello");
+ });
+ let b = tokio::spawn(async move {
+ write.write_all(b"hello").await.unwrap();
+ });
+
+ a.await.unwrap();
+ b.await.unwrap();
+ }
+}
diff --git a/ext/io/winpipe.rs b/ext/io/winpipe.rs
new file mode 100644
index 000000000..01272300d
--- /dev/null
+++ b/ext/io/winpipe.rs
@@ -0,0 +1,115 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+use rand::thread_rng;
+use rand::RngCore;
+use std::io;
+use std::os::windows::io::RawHandle;
+use winapi::shared::minwindef::DWORD;
+use winapi::um::fileapi::CreateFileA;
+use winapi::um::fileapi::OPEN_EXISTING;
+use winapi::um::handleapi::CloseHandle;
+use winapi::um::handleapi::INVALID_HANDLE_VALUE;
+use winapi::um::minwinbase::SECURITY_ATTRIBUTES;
+use winapi::um::winbase::CreateNamedPipeA;
+use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
+use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
+use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
+use winapi::um::winbase::PIPE_READMODE_BYTE;
+use winapi::um::winbase::PIPE_TYPE_BYTE;
+use winapi::um::winnt::FILE_ATTRIBUTE_NORMAL;
+use winapi::um::winnt::GENERIC_READ;
+use winapi::um::winnt::GENERIC_WRITE;
+
+/// Create a pair of file descriptors for a named pipe with non-inheritable handles. We cannot use
+/// the anonymous pipe from `os_pipe` because that does not support OVERLAPPED (aka async) I/O.
+///
+/// This is the same way that Rust and pretty much everyone else does it.
+///
+/// For more information, there is an interesting S.O. question that explains the history, as
+/// well as offering a complex NTAPI solution if we decide to try to make these pipes truely
+/// anonymous: https://stackoverflow.com/questions/60645/overlapped-i-o-on-anonymous-pipe
+pub fn create_named_pipe() -> io::Result<(RawHandle, RawHandle)> {
+ let pipe_name = format!(
+ r#"\\.\pipe\deno_pipe_{:x}_{:x}\0"#,
+ std::process::id(),
+ thread_rng().next_u64()
+ );
+
+ // Create security attributes to make the pipe handles non-inheritable
+ let mut security_attributes = SECURITY_ATTRIBUTES {
+ nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as DWORD,
+ lpSecurityDescriptor: std::ptr::null_mut(),
+ bInheritHandle: 0,
+ };
+
+ // SAFETY: Create the pipe server with non-inheritable handle
+ let server_handle = unsafe {
+ CreateNamedPipeA(
+ pipe_name.as_ptr() as *const i8,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
+ // Read and write bytes, not messages
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
+ // The maximum number of instances that can be created for this pipe.
+ 1,
+ // 4kB buffer sizes
+ 4096,
+ 4096,
+ // "The default time-out value, in milliseconds, if the WaitNamedPipe function specifies NMPWAIT_USE_DEFAULT_WAIT.
+ // Each instance of a named pipe must specify the same value. A value of zero will result in a default time-out of
+ // 50 milliseconds."
+ 0,
+ &mut security_attributes,
+ )
+ };
+
+ if server_handle == INVALID_HANDLE_VALUE {
+ return Err(io::Error::last_os_error());
+ }
+
+ // SAFETY: Create the pipe client with non-inheritable handle
+ let client_handle = unsafe {
+ CreateFileA(
+ pipe_name.as_ptr() as *const i8,
+ GENERIC_READ | GENERIC_WRITE | FILE_FLAG_OVERLAPPED,
+ 0,
+ &mut security_attributes,
+ OPEN_EXISTING,
+ FILE_ATTRIBUTE_NORMAL,
+ std::ptr::null_mut(),
+ )
+ };
+
+ if client_handle == INVALID_HANDLE_VALUE {
+ let err = io::Error::last_os_error();
+ // SAFETY: Close the handles if we failed
+ unsafe {
+ CloseHandle(server_handle);
+ }
+ return Err(err);
+ }
+
+ Ok((server_handle, client_handle))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::fs::File;
+ use std::io::Read;
+ use std::io::Write;
+ use std::os::windows::io::FromRawHandle;
+
+ #[test]
+ fn make_named_pipe() {
+ let (server, client) = create_named_pipe().unwrap();
+ // SAFETY: For testing
+ let mut server = unsafe { File::from_raw_handle(server) };
+ // SAFETY: For testing
+ let mut client = unsafe { File::from_raw_handle(client) };
+
+ // Write to the server and read from the client
+ server.write_all(b"hello").unwrap();
+ let mut buf: [u8; 5] = Default::default();
+ client.read_exact(&mut buf).unwrap();
+ assert_eq!(&buf, b"hello");
+ }
+}