summaryrefslogtreecommitdiff
path: root/ext/io
diff options
context:
space:
mode:
authorNathan Whitaker <17734409+nathanwhit@users.noreply.github.com>2024-08-15 09:38:46 -0700
committerGitHub <noreply@github.com>2024-08-15 09:38:46 -0700
commit8749d651fb5e0964cdb8e62be7a59a603cbc3c7c (patch)
tree1506d08504561a4013ad03ff1068bec23e572102 /ext/io
parent7ca95fc999f22cb0eb312e02f8c40d7589b35b7e (diff)
fix(node): Create additional pipes for child processes (#25016)
Linux/macos only currently. Part of https://github.com/denoland/deno/issues/23524 (fixes it on platforms other than windows). Part of #16899 (fixes it on platforms other than windows). After this PR, playwright is functional on mac/linux.
Diffstat (limited to 'ext/io')
-rw-r--r--ext/io/Cargo.toml4
-rw-r--r--ext/io/bi_pipe.rs433
-rw-r--r--ext/io/lib.rs9
3 files changed, 446 insertions, 0 deletions
diff --git a/ext/io/Cargo.toml b/ext/io/Cargo.toml
index 9d02f88c9..414bf0739 100644
--- a/ext/io/Cargo.toml
+++ b/ext/io/Cargo.toml
@@ -20,12 +20,16 @@ filetime.workspace = true
fs3.workspace = true
log.workspace = true
once_cell.workspace = true
+pin-project.workspace = true
tokio.workspace = true
+uuid.workspace = true
[target.'cfg(not(windows))'.dependencies]
os_pipe.workspace = true
+libc.workspace = true
[target.'cfg(windows)'.dependencies]
winapi = { workspace = true, features = ["winbase", "processenv", "errhandlingapi"] }
rand.workspace = true
parking_lot.workspace = true
+windows-sys.workspace = true
diff --git a/ext/io/bi_pipe.rs b/ext/io/bi_pipe.rs
new file mode 100644
index 000000000..04fff7b00
--- /dev/null
+++ b/ext/io/bi_pipe.rs
@@ -0,0 +1,433 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use std::rc::Rc;
+
+use deno_core::error::AnyError;
+use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::RcRef;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWriteExt;
+
+#[cfg(unix)]
+pub type RawBiPipeHandle = std::os::fd::RawFd;
+
+#[cfg(windows)]
+pub type RawBiPipeHandle = std::os::windows::io::RawHandle;
+
+/// One end of a bidirectional pipe. This implements the
+/// `Resource` trait.
+pub struct BiPipeResource {
+ read_half: AsyncRefCell<BiPipeRead>,
+ write_half: AsyncRefCell<BiPipeWrite>,
+ cancel: CancelHandle,
+ raw_handle: RawBiPipeHandle,
+}
+
+#[cfg(windows)]
+// workaround because `RawHandle` doesn't impl `AsRawHandle`
+mod as_raw_handle {
+ use super::RawBiPipeHandle;
+ pub(super) struct RawHandleWrap(pub(super) RawBiPipeHandle);
+ impl std::os::windows::io::AsRawHandle for RawHandleWrap {
+ fn as_raw_handle(&self) -> std::os::windows::prelude::RawHandle {
+ self.0
+ }
+ }
+}
+
+impl deno_core::Resource for BiPipeResource {
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+
+ fn backing_handle(self: Rc<Self>) -> Option<deno_core::ResourceHandle> {
+ #[cfg(unix)]
+ {
+ Some(deno_core::ResourceHandle::from_fd_like(&self.raw_handle))
+ }
+ #[cfg(windows)]
+ {
+ Some(deno_core::ResourceHandle::from_fd_like(
+ &as_raw_handle::RawHandleWrap(self.raw_handle),
+ ))
+ }
+ }
+
+ deno_core::impl_readable_byob!();
+ deno_core::impl_writable!();
+}
+
+impl BiPipeResource {
+ pub fn from_raw_handle(raw: RawBiPipeHandle) -> Result<Self, std::io::Error> {
+ let pipe = BiPipe::from_raw(raw)?;
+ let (read, write) = pipe.split();
+ Ok(Self {
+ raw_handle: raw,
+ read_half: AsyncRefCell::new(read),
+ write_half: AsyncRefCell::new(write),
+ cancel: Default::default(),
+ })
+ }
+
+ pub async fn read(
+ self: Rc<Self>,
+ data: &mut [u8],
+ ) -> Result<usize, AnyError> {
+ let mut rd = RcRef::map(&self, |r| &r.read_half).borrow_mut().await;
+ let cancel_handle = RcRef::map(&self, |r| &r.cancel);
+ Ok(rd.read(data).try_or_cancel(cancel_handle).await?)
+ }
+
+ pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
+ let mut wr = RcRef::map(self, |r| &r.write_half).borrow_mut().await;
+ let nwritten = wr.write(data).await?;
+ wr.flush().await?;
+ Ok(nwritten)
+ }
+}
+
+/// One end of a bidirectional pipe
+#[pin_project::pin_project]
+pub struct BiPipe {
+ #[pin]
+ read_end: BiPipeRead,
+ #[pin]
+ write_end: BiPipeWrite,
+}
+
+impl BiPipe {
+ pub fn from_raw(raw: RawBiPipeHandle) -> Result<Self, std::io::Error> {
+ let (read_end, write_end) = from_raw(raw)?;
+ Ok(Self {
+ read_end,
+ write_end,
+ })
+ }
+
+ pub fn split(self) -> (BiPipeRead, BiPipeWrite) {
+ (self.read_end, self.write_end)
+ }
+
+ pub fn unsplit(read_end: BiPipeRead, write_end: BiPipeWrite) -> Self {
+ Self {
+ read_end,
+ write_end,
+ }
+ }
+}
+
+#[pin_project::pin_project]
+pub struct BiPipeRead {
+ #[cfg(unix)]
+ #[pin]
+ inner: tokio::net::unix::OwnedReadHalf,
+ #[cfg(windows)]
+ #[pin]
+ inner: tokio::io::ReadHalf<tokio::net::windows::named_pipe::NamedPipeClient>,
+}
+
+#[cfg(unix)]
+impl From<tokio::net::unix::OwnedReadHalf> for BiPipeRead {
+ fn from(value: tokio::net::unix::OwnedReadHalf) -> Self {
+ Self { inner: value }
+ }
+}
+#[cfg(windows)]
+impl From<tokio::io::ReadHalf<tokio::net::windows::named_pipe::NamedPipeClient>>
+ for BiPipeRead
+{
+ fn from(
+ value: tokio::io::ReadHalf<
+ tokio::net::windows::named_pipe::NamedPipeClient,
+ >,
+ ) -> Self {
+ Self { inner: value }
+ }
+}
+
+#[pin_project::pin_project]
+pub struct BiPipeWrite {
+ #[cfg(unix)]
+ #[pin]
+ inner: tokio::net::unix::OwnedWriteHalf,
+ #[cfg(windows)]
+ #[pin]
+ inner: tokio::io::WriteHalf<tokio::net::windows::named_pipe::NamedPipeClient>,
+}
+
+#[cfg(unix)]
+impl From<tokio::net::unix::OwnedWriteHalf> for BiPipeWrite {
+ fn from(value: tokio::net::unix::OwnedWriteHalf) -> Self {
+ Self { inner: value }
+ }
+}
+
+#[cfg(windows)]
+impl
+ From<tokio::io::WriteHalf<tokio::net::windows::named_pipe::NamedPipeClient>>
+ for BiPipeWrite
+{
+ fn from(
+ value: tokio::io::WriteHalf<
+ tokio::net::windows::named_pipe::NamedPipeClient,
+ >,
+ ) -> Self {
+ Self { inner: value }
+ }
+}
+
+#[cfg(unix)]
+fn from_raw(
+ stream: RawBiPipeHandle,
+) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> {
+ use std::os::fd::FromRawFd;
+ // Safety: The fd is part of a pair of connected sockets
+ let unix_stream = tokio::net::UnixStream::from_std(unsafe {
+ std::os::unix::net::UnixStream::from_raw_fd(stream)
+ })?;
+ let (read, write) = unix_stream.into_split();
+ Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write }))
+}
+
+#[cfg(windows)]
+fn from_raw(
+ handle: RawBiPipeHandle,
+) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> {
+ // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
+ // fd handle map will be the same.
+ let pipe = unsafe {
+ tokio::net::windows::named_pipe::NamedPipeClient::from_raw_handle(
+ handle as _,
+ )?
+ };
+ let (read, write) = tokio::io::split(pipe);
+ Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write }))
+}
+
+impl tokio::io::AsyncRead for BiPipeRead {
+ fn poll_read(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ self.project().inner.poll_read(cx, buf)
+ }
+}
+impl tokio::io::AsyncRead for BiPipe {
+ fn poll_read(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ self.project().read_end.poll_read(cx, buf)
+ }
+}
+
+// implement `AsyncWrite` for `$name`, delegating
+// the impl to `$field`. `$name` must have a `project` method
+// with a projected `$field` (e.g. with `pin_project::pin_project`)
+macro_rules! impl_async_write {
+ (for $name: ident -> self.$field: ident) => {
+ impl tokio::io::AsyncWrite for $name {
+ fn poll_write_vectored(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ self.project().$field.poll_write_vectored(cx, bufs)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.$field.is_write_vectored()
+ }
+
+ fn poll_write(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ self.project().$field.poll_write(cx, buf)
+ }
+
+ fn poll_flush(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ self.project().$field.poll_flush(cx)
+ }
+
+ fn poll_shutdown(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ self.project().$field.poll_shutdown(cx)
+ }
+ }
+ };
+}
+
+impl_async_write!(for BiPipeWrite -> self.inner);
+impl_async_write!(for BiPipe -> self.write_end);
+
+/// Creates both sides of a bidirectional pipe, returning the raw
+/// handles to the underlying OS resources.
+pub fn bi_pipe_pair_raw() -> Result<(RawBiPipeHandle, RawBiPipeHandle), AnyError>
+{
+ #[cfg(unix)]
+ {
+ // SockFlag is broken on macOS
+ // https://github.com/nix-rust/nix/issues/861
+ let mut fds = [-1, -1];
+ #[cfg(not(target_os = "macos"))]
+ let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
+
+ #[cfg(target_os = "macos")]
+ let flags = 0;
+
+ // SAFETY: libc call, fds are correct size+align
+ let ret = unsafe {
+ libc::socketpair(
+ libc::AF_UNIX,
+ libc::SOCK_STREAM | flags,
+ 0,
+ fds.as_mut_ptr(),
+ )
+ };
+ if ret != 0 {
+ return Err(std::io::Error::last_os_error().into());
+ }
+
+ if cfg!(target_os = "macos") {
+ let fcntl = |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> {
+ // SAFETY: libc call, fd is valid
+ let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
+
+ if flags == -1 {
+ return Err(fail(fds));
+ }
+ // SAFETY: libc call, fd is valid
+ let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | flag) };
+ if ret == -1 {
+ return Err(fail(fds));
+ }
+ Ok(())
+ };
+
+ fn fail(fds: [i32; 2]) -> std::io::Error {
+ // SAFETY: libc call, fds are valid
+ unsafe {
+ libc::close(fds[0]);
+ libc::close(fds[1]);
+ }
+ std::io::Error::last_os_error()
+ }
+
+ // SOCK_NONBLOCK is not supported on macOS.
+ (fcntl)(fds[0], libc::O_NONBLOCK)?;
+ (fcntl)(fds[1], libc::O_NONBLOCK)?;
+
+ // SOCK_CLOEXEC is not supported on macOS.
+ (fcntl)(fds[0], libc::FD_CLOEXEC)?;
+ (fcntl)(fds[1], libc::FD_CLOEXEC)?;
+ }
+
+ let fd1 = fds[0];
+ let fd2 = fds[1];
+ Ok((fd1, fd2))
+ }
+ #[cfg(windows)]
+ {
+ // TODO(nathanwhit): more granular unsafe blocks
+ // SAFETY: win32 calls
+ unsafe {
+ use windows_sys::Win32::Foundation::CloseHandle;
+ use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED;
+ use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED;
+ use windows_sys::Win32::Foundation::GENERIC_READ;
+ use windows_sys::Win32::Foundation::GENERIC_WRITE;
+ use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
+ use windows_sys::Win32::Security::SECURITY_ATTRIBUTES;
+ use windows_sys::Win32::Storage::FileSystem::CreateFileW;
+ use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE;
+ use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
+ use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING;
+ use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX;
+ use windows_sys::Win32::System::Pipes::ConnectNamedPipe;
+ use windows_sys::Win32::System::Pipes::CreateNamedPipeW;
+ use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE;
+ use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE;
+
+ use std::io;
+ use std::os::windows::ffi::OsStrExt;
+ use std::path::Path;
+ use std::ptr;
+
+ let (path, hd1) = loop {
+ let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4());
+ let mut path = Path::new(&name)
+ .as_os_str()
+ .encode_wide()
+ .collect::<Vec<_>>();
+ path.push(0);
+
+ let hd1 = CreateNamedPipeW(
+ path.as_ptr(),
+ PIPE_ACCESS_DUPLEX
+ | FILE_FLAG_FIRST_PIPE_INSTANCE
+ | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
+ 1,
+ 65536,
+ 65536,
+ 0,
+ std::ptr::null_mut(),
+ );
+
+ if hd1 == INVALID_HANDLE_VALUE {
+ let err = io::Error::last_os_error();
+ /* If the pipe name is already in use, try again. */
+ if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) {
+ continue;
+ }
+
+ return Err(err.into());
+ }
+
+ break (path, hd1);
+ };
+
+ /* Create child pipe handle. */
+ let s = SECURITY_ATTRIBUTES {
+ nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
+ lpSecurityDescriptor: ptr::null_mut(),
+ bInheritHandle: 1,
+ };
+ let hd2 = CreateFileW(
+ path.as_ptr(),
+ GENERIC_READ | GENERIC_WRITE,
+ 0,
+ &s,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ 0,
+ );
+ if hd2 == INVALID_HANDLE_VALUE {
+ return Err(io::Error::last_os_error().into());
+ }
+
+ // Will not block because we have create the pair.
+ if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 {
+ let err = std::io::Error::last_os_error();
+ if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) {
+ CloseHandle(hd2);
+ return Err(err.into());
+ }
+ }
+
+ Ok((hd1 as _, hd2 as _))
+ }
+ }
+}
diff --git a/ext/io/lib.rs b/ext/io/lib.rs
index a2f14e0db..47921bcee 100644
--- a/ext/io/lib.rs
+++ b/ext/io/lib.rs
@@ -60,12 +60,21 @@ mod pipe;
#[cfg(windows)]
mod winpipe;
+mod bi_pipe;
+
pub use pipe::pipe;
pub use pipe::AsyncPipeRead;
pub use pipe::AsyncPipeWrite;
pub use pipe::PipeRead;
pub use pipe::PipeWrite;
+pub use bi_pipe::bi_pipe_pair_raw;
+pub use bi_pipe::BiPipe;
+pub use bi_pipe::BiPipeRead;
+pub use bi_pipe::BiPipeResource;
+pub use bi_pipe::BiPipeWrite;
+pub use bi_pipe::RawBiPipeHandle;
+
// Store the stdio fd/handles in global statics in order to keep them
// alive for the duration of the application since the last handle/fd
// being dropped will close the corresponding pipe.