summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathan Whitaker <17734409+nathanwhit@users.noreply.github.com>2024-08-15 09:38:46 -0700
committerGitHub <noreply@github.com>2024-08-15 09:38:46 -0700
commit8749d651fb5e0964cdb8e62be7a59a603cbc3c7c (patch)
tree1506d08504561a4013ad03ff1068bec23e572102
parent7ca95fc999f22cb0eb312e02f8c40d7589b35b7e (diff)
fix(node): Create additional pipes for child processes (#25016)
Linux/macos only currently. Part of https://github.com/denoland/deno/issues/23524 (fixes it on platforms other than windows). Part of #16899 (fixes it on platforms other than windows). After this PR, playwright is functional on mac/linux.
-rw-r--r--Cargo.lock4
-rw-r--r--ext/io/Cargo.toml4
-rw-r--r--ext/io/bi_pipe.rs433
-rw-r--r--ext/io/lib.rs9
-rw-r--r--ext/node/ops/ipc.rs84
-rw-r--r--ext/node/polyfills/internal/child_process.ts102
-rw-r--r--ext/node/polyfills/internal_binding/pipe_wrap.ts11
-rw-r--r--ext/node/polyfills/internal_binding/stream_wrap.ts11
-rw-r--r--runtime/js/40_process.js24
-rw-r--r--runtime/ops/process.rs313
-rw-r--r--tests/specs/node/child_process_extra_pipes/__test__.jsonc17
-rw-r--r--tests/specs/node/child_process_extra_pipes/main.out5
-rw-r--r--tests/specs/node/child_process_extra_pipes/main.ts26
-rw-r--r--tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.lock7
-rw-r--r--tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.toml8
-rw-r--r--tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs12
-rw-r--r--tools/core_import_map.json1
17 files changed, 776 insertions, 295 deletions
diff --git a/Cargo.lock b/Cargo.lock
index a22fa2938..e3cdc49d6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1658,13 +1658,17 @@ dependencies = [
"deno_core",
"filetime",
"fs3",
+ "libc",
"log",
"once_cell",
"os_pipe",
"parking_lot 0.12.3",
+ "pin-project",
"rand",
"tokio",
+ "uuid",
"winapi",
+ "windows-sys 0.52.0",
]
[[package]]
diff --git a/ext/io/Cargo.toml b/ext/io/Cargo.toml
index 9d02f88c9..414bf0739 100644
--- a/ext/io/Cargo.toml
+++ b/ext/io/Cargo.toml
@@ -20,12 +20,16 @@ filetime.workspace = true
fs3.workspace = true
log.workspace = true
once_cell.workspace = true
+pin-project.workspace = true
tokio.workspace = true
+uuid.workspace = true
[target.'cfg(not(windows))'.dependencies]
os_pipe.workspace = true
+libc.workspace = true
[target.'cfg(windows)'.dependencies]
winapi = { workspace = true, features = ["winbase", "processenv", "errhandlingapi"] }
rand.workspace = true
parking_lot.workspace = true
+windows-sys.workspace = true
diff --git a/ext/io/bi_pipe.rs b/ext/io/bi_pipe.rs
new file mode 100644
index 000000000..04fff7b00
--- /dev/null
+++ b/ext/io/bi_pipe.rs
@@ -0,0 +1,433 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use std::rc::Rc;
+
+use deno_core::error::AnyError;
+use deno_core::AsyncRefCell;
+use deno_core::AsyncResult;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
+use deno_core::RcRef;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWriteExt;
+
+#[cfg(unix)]
+pub type RawBiPipeHandle = std::os::fd::RawFd;
+
+#[cfg(windows)]
+pub type RawBiPipeHandle = std::os::windows::io::RawHandle;
+
+/// One end of a bidirectional pipe. This implements the
+/// `Resource` trait.
+pub struct BiPipeResource {
+ read_half: AsyncRefCell<BiPipeRead>,
+ write_half: AsyncRefCell<BiPipeWrite>,
+ cancel: CancelHandle,
+ raw_handle: RawBiPipeHandle,
+}
+
+#[cfg(windows)]
+// workaround because `RawHandle` doesn't impl `AsRawHandle`
+mod as_raw_handle {
+ use super::RawBiPipeHandle;
+ pub(super) struct RawHandleWrap(pub(super) RawBiPipeHandle);
+ impl std::os::windows::io::AsRawHandle for RawHandleWrap {
+ fn as_raw_handle(&self) -> std::os::windows::prelude::RawHandle {
+ self.0
+ }
+ }
+}
+
+impl deno_core::Resource for BiPipeResource {
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+
+ fn backing_handle(self: Rc<Self>) -> Option<deno_core::ResourceHandle> {
+ #[cfg(unix)]
+ {
+ Some(deno_core::ResourceHandle::from_fd_like(&self.raw_handle))
+ }
+ #[cfg(windows)]
+ {
+ Some(deno_core::ResourceHandle::from_fd_like(
+ &as_raw_handle::RawHandleWrap(self.raw_handle),
+ ))
+ }
+ }
+
+ deno_core::impl_readable_byob!();
+ deno_core::impl_writable!();
+}
+
+impl BiPipeResource {
+ pub fn from_raw_handle(raw: RawBiPipeHandle) -> Result<Self, std::io::Error> {
+ let pipe = BiPipe::from_raw(raw)?;
+ let (read, write) = pipe.split();
+ Ok(Self {
+ raw_handle: raw,
+ read_half: AsyncRefCell::new(read),
+ write_half: AsyncRefCell::new(write),
+ cancel: Default::default(),
+ })
+ }
+
+ pub async fn read(
+ self: Rc<Self>,
+ data: &mut [u8],
+ ) -> Result<usize, AnyError> {
+ let mut rd = RcRef::map(&self, |r| &r.read_half).borrow_mut().await;
+ let cancel_handle = RcRef::map(&self, |r| &r.cancel);
+ Ok(rd.read(data).try_or_cancel(cancel_handle).await?)
+ }
+
+ pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
+ let mut wr = RcRef::map(self, |r| &r.write_half).borrow_mut().await;
+ let nwritten = wr.write(data).await?;
+ wr.flush().await?;
+ Ok(nwritten)
+ }
+}
+
+/// One end of a bidirectional pipe
+#[pin_project::pin_project]
+pub struct BiPipe {
+ #[pin]
+ read_end: BiPipeRead,
+ #[pin]
+ write_end: BiPipeWrite,
+}
+
+impl BiPipe {
+ pub fn from_raw(raw: RawBiPipeHandle) -> Result<Self, std::io::Error> {
+ let (read_end, write_end) = from_raw(raw)?;
+ Ok(Self {
+ read_end,
+ write_end,
+ })
+ }
+
+ pub fn split(self) -> (BiPipeRead, BiPipeWrite) {
+ (self.read_end, self.write_end)
+ }
+
+ pub fn unsplit(read_end: BiPipeRead, write_end: BiPipeWrite) -> Self {
+ Self {
+ read_end,
+ write_end,
+ }
+ }
+}
+
+#[pin_project::pin_project]
+pub struct BiPipeRead {
+ #[cfg(unix)]
+ #[pin]
+ inner: tokio::net::unix::OwnedReadHalf,
+ #[cfg(windows)]
+ #[pin]
+ inner: tokio::io::ReadHalf<tokio::net::windows::named_pipe::NamedPipeClient>,
+}
+
+#[cfg(unix)]
+impl From<tokio::net::unix::OwnedReadHalf> for BiPipeRead {
+ fn from(value: tokio::net::unix::OwnedReadHalf) -> Self {
+ Self { inner: value }
+ }
+}
+#[cfg(windows)]
+impl From<tokio::io::ReadHalf<tokio::net::windows::named_pipe::NamedPipeClient>>
+ for BiPipeRead
+{
+ fn from(
+ value: tokio::io::ReadHalf<
+ tokio::net::windows::named_pipe::NamedPipeClient,
+ >,
+ ) -> Self {
+ Self { inner: value }
+ }
+}
+
+#[pin_project::pin_project]
+pub struct BiPipeWrite {
+ #[cfg(unix)]
+ #[pin]
+ inner: tokio::net::unix::OwnedWriteHalf,
+ #[cfg(windows)]
+ #[pin]
+ inner: tokio::io::WriteHalf<tokio::net::windows::named_pipe::NamedPipeClient>,
+}
+
+#[cfg(unix)]
+impl From<tokio::net::unix::OwnedWriteHalf> for BiPipeWrite {
+ fn from(value: tokio::net::unix::OwnedWriteHalf) -> Self {
+ Self { inner: value }
+ }
+}
+
+#[cfg(windows)]
+impl
+ From<tokio::io::WriteHalf<tokio::net::windows::named_pipe::NamedPipeClient>>
+ for BiPipeWrite
+{
+ fn from(
+ value: tokio::io::WriteHalf<
+ tokio::net::windows::named_pipe::NamedPipeClient,
+ >,
+ ) -> Self {
+ Self { inner: value }
+ }
+}
+
+#[cfg(unix)]
+fn from_raw(
+ stream: RawBiPipeHandle,
+) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> {
+ use std::os::fd::FromRawFd;
+ // Safety: The fd is part of a pair of connected sockets
+ let unix_stream = tokio::net::UnixStream::from_std(unsafe {
+ std::os::unix::net::UnixStream::from_raw_fd(stream)
+ })?;
+ let (read, write) = unix_stream.into_split();
+ Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write }))
+}
+
+#[cfg(windows)]
+fn from_raw(
+ handle: RawBiPipeHandle,
+) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> {
+ // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
+ // fd handle map will be the same.
+ let pipe = unsafe {
+ tokio::net::windows::named_pipe::NamedPipeClient::from_raw_handle(
+ handle as _,
+ )?
+ };
+ let (read, write) = tokio::io::split(pipe);
+ Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write }))
+}
+
+impl tokio::io::AsyncRead for BiPipeRead {
+ fn poll_read(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ self.project().inner.poll_read(cx, buf)
+ }
+}
+impl tokio::io::AsyncRead for BiPipe {
+ fn poll_read(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &mut tokio::io::ReadBuf<'_>,
+ ) -> std::task::Poll<std::io::Result<()>> {
+ self.project().read_end.poll_read(cx, buf)
+ }
+}
+
+// implement `AsyncWrite` for `$name`, delegating
+// the impl to `$field`. `$name` must have a `project` method
+// with a projected `$field` (e.g. with `pin_project::pin_project`)
+macro_rules! impl_async_write {
+ (for $name: ident -> self.$field: ident) => {
+ impl tokio::io::AsyncWrite for $name {
+ fn poll_write_vectored(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ self.project().$field.poll_write_vectored(cx, bufs)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.$field.is_write_vectored()
+ }
+
+ fn poll_write(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, std::io::Error>> {
+ self.project().$field.poll_write(cx, buf)
+ }
+
+ fn poll_flush(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ self.project().$field.poll_flush(cx)
+ }
+
+ fn poll_shutdown(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), std::io::Error>> {
+ self.project().$field.poll_shutdown(cx)
+ }
+ }
+ };
+}
+
+impl_async_write!(for BiPipeWrite -> self.inner);
+impl_async_write!(for BiPipe -> self.write_end);
+
+/// Creates both sides of a bidirectional pipe, returning the raw
+/// handles to the underlying OS resources.
+pub fn bi_pipe_pair_raw() -> Result<(RawBiPipeHandle, RawBiPipeHandle), AnyError>
+{
+ #[cfg(unix)]
+ {
+ // SockFlag is broken on macOS
+ // https://github.com/nix-rust/nix/issues/861
+ let mut fds = [-1, -1];
+ #[cfg(not(target_os = "macos"))]
+ let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
+
+ #[cfg(target_os = "macos")]
+ let flags = 0;
+
+ // SAFETY: libc call, fds are correct size+align
+ let ret = unsafe {
+ libc::socketpair(
+ libc::AF_UNIX,
+ libc::SOCK_STREAM | flags,
+ 0,
+ fds.as_mut_ptr(),
+ )
+ };
+ if ret != 0 {
+ return Err(std::io::Error::last_os_error().into());
+ }
+
+ if cfg!(target_os = "macos") {
+ let fcntl = |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> {
+ // SAFETY: libc call, fd is valid
+ let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
+
+ if flags == -1 {
+ return Err(fail(fds));
+ }
+ // SAFETY: libc call, fd is valid
+ let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | flag) };
+ if ret == -1 {
+ return Err(fail(fds));
+ }
+ Ok(())
+ };
+
+ fn fail(fds: [i32; 2]) -> std::io::Error {
+ // SAFETY: libc call, fds are valid
+ unsafe {
+ libc::close(fds[0]);
+ libc::close(fds[1]);
+ }
+ std::io::Error::last_os_error()
+ }
+
+ // SOCK_NONBLOCK is not supported on macOS.
+ (fcntl)(fds[0], libc::O_NONBLOCK)?;
+ (fcntl)(fds[1], libc::O_NONBLOCK)?;
+
+ // SOCK_CLOEXEC is not supported on macOS.
+ (fcntl)(fds[0], libc::FD_CLOEXEC)?;
+ (fcntl)(fds[1], libc::FD_CLOEXEC)?;
+ }
+
+ let fd1 = fds[0];
+ let fd2 = fds[1];
+ Ok((fd1, fd2))
+ }
+ #[cfg(windows)]
+ {
+ // TODO(nathanwhit): more granular unsafe blocks
+ // SAFETY: win32 calls
+ unsafe {
+ use windows_sys::Win32::Foundation::CloseHandle;
+ use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED;
+ use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED;
+ use windows_sys::Win32::Foundation::GENERIC_READ;
+ use windows_sys::Win32::Foundation::GENERIC_WRITE;
+ use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
+ use windows_sys::Win32::Security::SECURITY_ATTRIBUTES;
+ use windows_sys::Win32::Storage::FileSystem::CreateFileW;
+ use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE;
+ use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
+ use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING;
+ use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX;
+ use windows_sys::Win32::System::Pipes::ConnectNamedPipe;
+ use windows_sys::Win32::System::Pipes::CreateNamedPipeW;
+ use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE;
+ use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE;
+
+ use std::io;
+ use std::os::windows::ffi::OsStrExt;
+ use std::path::Path;
+ use std::ptr;
+
+ let (path, hd1) = loop {
+ let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4());
+ let mut path = Path::new(&name)
+ .as_os_str()
+ .encode_wide()
+ .collect::<Vec<_>>();
+ path.push(0);
+
+ let hd1 = CreateNamedPipeW(
+ path.as_ptr(),
+ PIPE_ACCESS_DUPLEX
+ | FILE_FLAG_FIRST_PIPE_INSTANCE
+ | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
+ 1,
+ 65536,
+ 65536,
+ 0,
+ std::ptr::null_mut(),
+ );
+
+ if hd1 == INVALID_HANDLE_VALUE {
+ let err = io::Error::last_os_error();
+ /* If the pipe name is already in use, try again. */
+ if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) {
+ continue;
+ }
+
+ return Err(err.into());
+ }
+
+ break (path, hd1);
+ };
+
+ /* Create child pipe handle. */
+ let s = SECURITY_ATTRIBUTES {
+ nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
+ lpSecurityDescriptor: ptr::null_mut(),
+ bInheritHandle: 1,
+ };
+ let hd2 = CreateFileW(
+ path.as_ptr(),
+ GENERIC_READ | GENERIC_WRITE,
+ 0,
+ &s,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ 0,
+ );
+ if hd2 == INVALID_HANDLE_VALUE {
+ return Err(io::Error::last_os_error().into());
+ }
+
+ // Will not block because we have create the pair.
+ if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 {
+ let err = std::io::Error::last_os_error();
+ if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) {
+ CloseHandle(hd2);
+ return Err(err.into());
+ }
+ }
+
+ Ok((hd1 as _, hd2 as _))
+ }
+ }
+}
diff --git a/ext/io/lib.rs b/ext/io/lib.rs
index a2f14e0db..47921bcee 100644
--- a/ext/io/lib.rs
+++ b/ext/io/lib.rs
@@ -60,12 +60,21 @@ mod pipe;
#[cfg(windows)]
mod winpipe;
+mod bi_pipe;
+
pub use pipe::pipe;
pub use pipe::AsyncPipeRead;
pub use pipe::AsyncPipeWrite;
pub use pipe::PipeRead;
pub use pipe::PipeWrite;
+pub use bi_pipe::bi_pipe_pair_raw;
+pub use bi_pipe::BiPipe;
+pub use bi_pipe::BiPipeRead;
+pub use bi_pipe::BiPipeResource;
+pub use bi_pipe::BiPipeWrite;
+pub use bi_pipe::RawBiPipeHandle;
+
// Store the stdio fd/handles in global statics in order to keep them
// alive for the duration of the application since the last handle/fd
// being dropped will close the corresponding pipe.
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs
index 7cfab65a4..59b6fece1 100644
--- a/ext/node/ops/ipc.rs
+++ b/ext/node/ops/ipc.rs
@@ -9,10 +9,6 @@ mod impl_ {
use std::future::Future;
use std::io;
use std::mem;
- #[cfg(unix)]
- use std::os::fd::FromRawFd;
- #[cfg(unix)]
- use std::os::fd::RawFd;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
@@ -43,15 +39,9 @@ mod impl_ {
use tokio::io::AsyncWriteExt;
use tokio::io::ReadBuf;
- #[cfg(unix)]
- use tokio::net::unix::OwnedReadHalf;
- #[cfg(unix)]
- use tokio::net::unix::OwnedWriteHalf;
- #[cfg(unix)]
- use tokio::net::UnixStream;
-
- #[cfg(windows)]
- type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;
+ use deno_io::BiPipe;
+ use deno_io::BiPipeRead;
+ use deno_io::BiPipeWrite;
/// Wrapper around v8 value that implements Serialize.
struct SerializeWrapper<'a, 'b>(
@@ -349,10 +339,7 @@ mod impl_ {
pub struct IpcJsonStreamResource {
read_half: AsyncRefCell<IpcJsonStream>,
- #[cfg(unix)]
- write_half: AsyncRefCell<OwnedWriteHalf>,
- #[cfg(windows)]
- write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
+ write_half: AsyncRefCell<BiPipeWrite>,
cancel: Rc<CancelHandle>,
queued_bytes: AtomicUsize,
ref_tracker: IpcRefTracker,
@@ -364,38 +351,12 @@ mod impl_ {
}
}
- #[cfg(unix)]
- fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> {
- // Safety: The fd is part of a pair of connected sockets create by child process
- // implementation.
- let unix_stream = UnixStream::from_std(unsafe {
- std::os::unix::net::UnixStream::from_raw_fd(stream)
- })?;
- Ok(unix_stream.into_split())
- }
-
- #[cfg(windows)]
- fn pipe(
- handle: i64,
- ) -> Result<
- (
- tokio::io::ReadHalf<NamedPipeClient>,
- tokio::io::WriteHalf<NamedPipeClient>,
- ),
- io::Error,
- > {
- // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
- // fd handle map will be the same.
- let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? };
- Ok(tokio::io::split(pipe))
- }
-
impl IpcJsonStreamResource {
pub fn new(
stream: i64,
ref_tracker: IpcRefTracker,
) -> Result<Self, std::io::Error> {
- let (read_half, write_half) = pipe(stream as _)?;
+ let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split();
Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
@@ -406,11 +367,14 @@ mod impl_ {
}
#[cfg(all(unix, test))]
- fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self {
+ fn from_stream(
+ stream: tokio::net::UnixStream,
+ ref_tracker: IpcRefTracker,
+ ) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
- read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
- write_half: AsyncRefCell::new(write_half),
+ read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())),
+ write_half: AsyncRefCell::new(write_half.into()),
cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
@@ -418,11 +382,14 @@ mod impl_ {
}
#[cfg(all(windows, test))]
- fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self {
+ fn from_stream(
+ pipe: tokio::net::windows::named_pipe::NamedPipeClient,
+ ref_tracker: IpcRefTracker,
+ ) -> Self {
let (read_half, write_half) = tokio::io::split(pipe);
Self {
- read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
- write_half: AsyncRefCell::new(write_half),
+ read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())),
+ write_half: AsyncRefCell::new(write_half.into()),
cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
@@ -492,26 +459,13 @@ mod impl_ {
//
// `\n` is used as a delimiter between messages.
struct IpcJsonStream {
- #[cfg(unix)]
- pipe: OwnedReadHalf,
- #[cfg(windows)]
- pipe: tokio::io::ReadHalf<NamedPipeClient>,
+ pipe: BiPipeRead,
buffer: Vec<u8>,
read_buffer: ReadBuffer,
}
impl IpcJsonStream {
- #[cfg(unix)]
- fn new(pipe: OwnedReadHalf) -> Self {
- Self {
- pipe,
- buffer: Vec::with_capacity(INITIAL_CAPACITY),
- read_buffer: ReadBuffer::new(),
- }
- }
-
- #[cfg(windows)]
- fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
+ fn new(pipe: BiPipeRead) -> Self {
Self {
pipe,
buffer: Vec::with_capacity(INITIAL_CAPACITY),
diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts
index 0996806d7..edc11df73 100644
--- a/ext/node/polyfills/internal/child_process.ts
+++ b/ext/node/polyfills/internal/child_process.ts
@@ -25,7 +25,6 @@ import {
StringPrototypeStartsWith,
StringPrototypeToUpperCase,
} from "ext:deno_node/internal/primordials.mjs";
-
import { assert } from "ext:deno_node/_util/asserts.ts";
import { EventEmitter } from "node:events";
import { os } from "ext:deno_node/internal_binding/constants.ts";
@@ -54,6 +53,10 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs";
import process from "node:process";
import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs";
+import { StreamBase } from "ext:deno_node/internal_binding/stream_wrap.ts";
+import { Pipe, socketType } from "ext:deno_node/internal_binding/pipe_wrap.ts";
+import console from "node:console";
+import { Socket } from "node:net";
export function mapValues<T, O>(
record: Readonly<Record<string, T>>,
@@ -118,6 +121,47 @@ function maybeClose(child: ChildProcess) {
}
}
+function flushStdio(subprocess: ChildProcess) {
+ const stdio = subprocess.stdio;
+
+ if (stdio == null) return;
+
+ for (let i = 0; i < stdio.length; i++) {
+ const stream = stdio[i];
+ if (!stream || !stream.readable) {
+ continue;
+ }
+ stream.resume();
+ }
+}
+
+// Wraps a resource in a class that implements
+// StreamBase, so it can be used with node streams
+class StreamResource implements StreamBase {
+ #rid: number;
+ constructor(rid: number) {
+ this.#rid = rid;
+ }
+ close(): void {
+ core.close(this.#rid);
+ }
+ async read(p: Uint8Array): Promise<number | null> {
+ const readPromise = core.read(this.#rid, p);
+ core.unrefOpPromise(readPromise);
+ const nread = await readPromise;
+ return nread > 0 ? nread : null;
+ }
+ ref(): void {
+ return;
+ }
+ unref(): void {
+ return;
+ }
+ write(p: Uint8Array): Promise<number> {
+ return core.write(this.#rid, p);
+ }
+}
+
export class ChildProcess extends EventEmitter {
/**
* The exit code of the child process. This property will be `null` until the child process exits.
@@ -201,7 +245,7 @@ export class ChildProcess extends EventEmitter {
stdin = "pipe",
stdout = "pipe",
stderr = "pipe",
- _channel, // TODO(kt3k): handle this correctly
+ ...extraStdio
] = normalizedStdio;
const [cmd, cmdArgs] = buildCommand(
command,
@@ -213,6 +257,15 @@ export class ChildProcess extends EventEmitter {
const ipc = normalizedStdio.indexOf("ipc");
+ const extraStdioOffset = 3; // stdin, stdout, stderr
+
+ const extraStdioNormalized: DenoStdio[] = [];
+ for (let i = 0; i < extraStdio.length; i++) {
+ const fd = i + extraStdioOffset;
+ if (fd === ipc) extraStdioNormalized.push("null");
+ extraStdioNormalized.push(toDenoStdio(extraStdio[i]));
+ }
+
const stringEnv = mapValues(env, (value) => value.toString());
try {
this.#process = new Deno.Command(cmd, {
@@ -224,6 +277,7 @@ export class ChildProcess extends EventEmitter {
stderr: toDenoStdio(stderr),
windowsRawArguments: windowsVerbatimArguments,
ipc, // internal
+ extraStdio: extraStdioNormalized,
}).spawn();
this.pid = this.#process.pid;
@@ -259,13 +313,36 @@ export class ChildProcess extends EventEmitter {
maybeClose(this);
});
}
- // TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their
- // close events (like above)
this.stdio[0] = this.stdin;
this.stdio[1] = this.stdout;
this.stdio[2] = this.stderr;
+ if (ipc >= 0) {
+ this.stdio[ipc] = null;
+ }
+
+ const pipeRids = internals.getExtraPipeRids(this.#process);
+ for (let i = 0; i < pipeRids.length; i++) {
+ const rid: number | null = pipeRids[i];
+ const fd = i + extraStdioOffset;
+ if (rid) {
+ this[kClosesNeeded]++;
+ this.stdio[fd] = new Socket(
+ {
+ handle: new Pipe(
+ socketType.IPC,
+ new StreamResource(rid),
+ ),
+ // deno-lint-ignore no-explicit-any
+ } as any,
+ );
+ this.stdio[fd]?.on("close", () => {
+ maybeClose(this);
+ });
+ }
+ }
+
nextTick(() => {
this.emit("spawn");
this.#spawned.resolve();
@@ -292,9 +369,9 @@ export class ChildProcess extends EventEmitter {
}
}
- const pipeFd = internals.getPipeFd(this.#process);
- if (typeof pipeFd == "number") {
- setupChannel(this, pipeFd);
+ const pipeRid = internals.getIpcPipeRid(this.#process);
+ if (typeof pipeRid == "number") {
+ setupChannel(this, pipeRid);
this[kClosesNeeded]++;
this.on("disconnect", () => {
maybeClose(this);
@@ -312,6 +389,7 @@ export class ChildProcess extends EventEmitter {
await this.#_waitForChildStreamsToClose();
this.#closePipes();
maybeClose(this);
+ nextTick(flushStdio, this);
});
})();
} catch (err) {
@@ -395,16 +473,6 @@ export class ChildProcess extends EventEmitter {
assert(this.stdin);
this.stdin.destroy();
}
- /// TODO(nathanwhit): for some reason when the child process exits
- /// and the child end of the named pipe closes, reads still just return `Pending`
- /// instead of returning that 0 bytes were read (to signal the pipe died).
- /// For now, just forcibly disconnect, but in theory I think we could miss messages
- /// that haven't been read yet.
- if (Deno.build.os === "windows") {
- if (this.canDisconnect) {
- this.disconnect?.();
- }
- }
}
}
diff --git a/ext/node/polyfills/internal_binding/pipe_wrap.ts b/ext/node/polyfills/internal_binding/pipe_wrap.ts
index a657f7018..f5c3c5439 100644
--- a/ext/node/polyfills/internal_binding/pipe_wrap.ts
+++ b/ext/node/polyfills/internal_binding/pipe_wrap.ts
@@ -37,7 +37,10 @@ import {
import { LibuvStreamWrap } from "ext:deno_node/internal_binding/stream_wrap.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
import { delay } from "ext:deno_node/_util/async.ts";
-import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
+import {
+ kStreamBaseField,
+ StreamBase,
+} from "ext:deno_node/internal_binding/stream_wrap.ts";
import {
ceilPowOf2,
INITIAL_ACCEPT_BACKOFF_DELAY,
@@ -68,7 +71,7 @@ export class Pipe extends ConnectionWrap {
#closed = false;
#acceptBackoffDelay?: number;
- constructor(type: number, conn?: Deno.UnixConn) {
+ constructor(type: number, conn?: Deno.UnixConn | StreamBase) {
let provider: providerType;
let ipc: boolean;
@@ -100,8 +103,8 @@ export class Pipe extends ConnectionWrap {
this.ipc = ipc;
- if (conn && provider === providerType.PIPEWRAP) {
- const localAddr = conn.localAddr as Deno.UnixAddr;
+ if (conn && provider === providerType.PIPEWRAP && "localAddr" in conn) {
+ const localAddr = conn.localAddr;
this.#address = localAddr.path;
}
}
diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts
index 4915a38ca..dc30bfdfe 100644
--- a/ext/node/polyfills/internal_binding/stream_wrap.ts
+++ b/ext/node/polyfills/internal_binding/stream_wrap.ts
@@ -44,11 +44,11 @@ import {
} from "ext:deno_node/internal_binding/async_wrap.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
-interface Reader {
+export interface Reader {
read(p: Uint8Array): Promise<number | null>;
}
-interface Writer {
+export interface Writer {
write(p: Uint8Array): Promise<number>;
}
@@ -56,7 +56,12 @@ export interface Closer {
close(): void;
}
-type Ref = { ref(): void; unref(): void };
+export interface Ref {
+ ref(): void;
+ unref(): void;
+}
+
+export interface StreamBase extends Reader, Writer, Closer, Ref {}
const enum StreamBaseStateFields {
kReadBytesOrError,
diff --git a/runtime/js/40_process.js b/runtime/js/40_process.js
index 6db04468f..0f28b9d5c 100644
--- a/runtime/js/40_process.js
+++ b/runtime/js/40_process.js
@@ -168,7 +168,7 @@ function run({
const illegalConstructorKey = Symbol("illegalConstructorKey");
-function spawnChildInner(opFn, command, apiName, {
+function spawnChildInner(command, apiName, {
args = [],
cwd = undefined,
clearEnv = false,
@@ -181,8 +181,9 @@ function spawnChildInner(opFn, command, apiName, {
signal = undefined,
windowsRawArguments = false,
ipc = -1,
+ extraStdio = [],
} = { __proto__: null }) {
- const child = opFn({
+ const child = op_spawn_child({
cmd: pathFromURL(command),
args: ArrayPrototypeMap(args, String),
cwd: pathFromURL(cwd),
@@ -195,6 +196,7 @@ function spawnChildInner(opFn, command, apiName, {
stderr,
windowsRawArguments,
ipc,
+ extraStdio,
}, apiName);
return new ChildProcess(illegalConstructorKey, {
...child,
@@ -204,7 +206,6 @@ function spawnChildInner(opFn, command, apiName, {
function spawnChild(command, options = { __proto__: null }) {
return spawnChildInner(
- op_spawn_child,
command,
"Deno.Command().spawn()",
options,
@@ -221,16 +222,19 @@ function collectOutput(readableStream) {
return readableStreamCollectIntoUint8Array(readableStream);
}
-const _pipeFd = Symbol("[[pipeFd]]");
+const _ipcPipeRid = Symbol("[[ipcPipeRid]]");
+const _extraPipeRids = Symbol("[[_extraPipeRids]]");
-internals.getPipeFd = (process) => process[_pipeFd];
+internals.getIpcPipeRid = (process) => process[_ipcPipeRid];
+internals.getExtraPipeRids = (process) => process[_extraPipeRids];
class ChildProcess {
#rid;
#waitPromise;
#waitComplete = false;
- [_pipeFd];
+ [_ipcPipeRid];
+ [_extraPipeRids];
#pid;
get pid() {
@@ -268,7 +272,8 @@ class ChildProcess {
stdinRid,
stdoutRid,
stderrRid,
- pipeFd, // internal
+ ipcPipeRid, // internal
+ extraPipeRids,
} = null) {
if (key !== illegalConstructorKey) {
throw new TypeError("Illegal constructor.");
@@ -276,7 +281,8 @@ class ChildProcess {
this.#rid = rid;
this.#pid = pid;
- this[_pipeFd] = pipeFd;
+ this[_ipcPipeRid] = ipcPipeRid;
+ this[_extraPipeRids] = extraPipeRids;
if (stdinRid !== null) {
this.#stdin = writableStreamForRid(stdinRid);
@@ -380,7 +386,6 @@ function spawn(command, options) {
);
}
return spawnChildInner(
- op_spawn_child,
command,
"Deno.Command().output()",
options,
@@ -417,6 +422,7 @@ function spawnSync(command, {
stdout,
stderr,
windowsRawArguments,
+ extraStdio: [],
});
return {
success: result.status.success,
diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs
index 69fb5cf29..9d166a801 100644
--- a/runtime/ops/process.rs
+++ b/runtime/ops/process.rs
@@ -154,6 +154,8 @@ pub struct SpawnArgs {
#[serde(flatten)]
stdio: ChildStdio,
+
+ extra_stdio: Vec<Stdio>,
}
#[derive(Deserialize)]
@@ -215,7 +217,12 @@ pub struct SpawnOutput {
stderr: Option<ToJsBuffer>,
}
-type CreateCommand = (std::process::Command, Option<ResourceId>);
+type CreateCommand = (
+ std::process::Command,
+ Option<ResourceId>,
+ Vec<Option<ResourceId>>,
+ Vec<deno_io::RawBiPipeHandle>,
+);
fn create_command(
state: &mut OpState,
@@ -277,216 +284,103 @@ fn create_command(
// TODO(bartlomieju):
#[allow(clippy::undocumented_unsafe_blocks)]
unsafe {
+ let mut extra_pipe_rids = Vec::new();
+ let mut fds_to_dup = Vec::new();
+ let mut fds_to_close = Vec::new();
+ let mut ipc_rid = None;
if let Some(ipc) = args.ipc {
- if ipc < 0 {
- return Ok((command, None));
- }
- // SockFlag is broken on macOS
- // https://github.com/nix-rust/nix/issues/861
- let mut fds = [-1, -1];
- #[cfg(not(target_os = "macos"))]
- let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
-
- #[cfg(target_os = "macos")]
- let flags = 0;
-
- let ret = libc::socketpair(
- libc::AF_UNIX,
- libc::SOCK_STREAM | flags,
- 0,
- fds.as_mut_ptr(),
- );
- if ret != 0 {
- return Err(std::io::Error::last_os_error().into());
+ if ipc >= 0 {
+ let (ipc_fd1, ipc_fd2) = deno_io::bi_pipe_pair_raw()?;
+ fds_to_dup.push((ipc_fd2, ipc));
+ fds_to_close.push(ipc_fd2);
+ /* One end returned to parent process (this) */
+ let pipe_rid =
+ state
+ .resource_table
+ .add(deno_node::IpcJsonStreamResource::new(
+ ipc_fd1 as _,
+ deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
+ )?);
+ /* The other end passed to child process via NODE_CHANNEL_FD */
+ command.env("NODE_CHANNEL_FD", format!("{}", ipc));
+ ipc_rid = Some(pipe_rid);
}
+ }
- if cfg!(target_os = "macos") {
- let fcntl =
- |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> {
- let flags = libc::fcntl(fd, libc::F_GETFL, 0);
-
- if flags == -1 {
- return Err(fail(fds));
+ for (i, stdio) in args.extra_stdio.into_iter().enumerate() {
+ // index 0 in `extra_stdio` actually refers to fd 3
+ // because we handle stdin,stdout,stderr specially
+ let fd = (i + 3) as i32;
+ // TODO(nathanwhit): handle inherited, but this relies on the parent process having
+ // fds open already. since we don't generally support dealing with raw fds,
+ // we can't properly support this
+ if matches!(stdio, Stdio::Piped) {
+ let (fd1, fd2) = deno_io::bi_pipe_pair_raw()?;
+ fds_to_dup.push((fd2, fd));
+ fds_to_close.push(fd2);
+ let rid = state.resource_table.add(
+ match deno_io::BiPipeResource::from_raw_handle(fd1) {
+ Ok(v) => v,
+ Err(e) => {
+ log::warn!("Failed to open bidirectional pipe for fd {fd}: {e}");
+ extra_pipe_rids.push(None);
+ continue;
}
- let ret = libc::fcntl(fd, libc::F_SETFL, flags | flag);
- if ret == -1 {
- return Err(fail(fds));
- }
- Ok(())
- };
-
- fn fail(fds: [i32; 2]) -> std::io::Error {
- unsafe {
- libc::close(fds[0]);
- libc::close(fds[1]);
- }
- std::io::Error::last_os_error()
- }
-
- // SOCK_NONBLOCK is not supported on macOS.
- (fcntl)(fds[0], libc::O_NONBLOCK)?;
- (fcntl)(fds[1], libc::O_NONBLOCK)?;
-
- // SOCK_CLOEXEC is not supported on macOS.
- (fcntl)(fds[0], libc::FD_CLOEXEC)?;
- (fcntl)(fds[1], libc::FD_CLOEXEC)?;
+ },
+ );
+ extra_pipe_rids.push(Some(rid));
+ } else {
+ extra_pipe_rids.push(None);
}
+ }
- let fd1 = fds[0];
- let fd2 = fds[1];
-
- command.pre_exec(move || {
- if ipc >= 0 {
- let _fd = libc::dup2(fd2, ipc);
- libc::close(fd2);
+ command.pre_exec(move || {
+ for &(src, dst) in &fds_to_dup {
+ if src >= 0 && dst >= 0 {
+ let _fd = libc::dup2(src, dst);
+ libc::close(src);
}
- libc::setgroups(0, std::ptr::null());
- Ok(())
- });
-
- /* One end returned to parent process (this) */
- let pipe_rid = Some(state.resource_table.add(
- deno_node::IpcJsonStreamResource::new(
- fd1 as _,
- deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
- )?,
- ));
-
- /* The other end passed to child process via NODE_CHANNEL_FD */
- command.env("NODE_CHANNEL_FD", format!("{}", ipc));
-
- return Ok((command, pipe_rid));
- }
+ }
+ libc::setgroups(0, std::ptr::null());
+ Ok(())
+ });
- Ok((command, None))
+ Ok((command, ipc_rid, extra_pipe_rids, fds_to_close))
}
#[cfg(windows)]
- // Safety: We setup a windows named pipe and pass one end to the child process.
- unsafe {
- use windows_sys::Win32::Foundation::CloseHandle;
- use windows_sys::Win32::Foundation::DuplicateHandle;
- use windows_sys::Win32::Foundation::DUPLICATE_SAME_ACCESS;
- use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED;
- use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED;
- use windows_sys::Win32::Foundation::GENERIC_READ;
- use windows_sys::Win32::Foundation::GENERIC_WRITE;
- use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
- use windows_sys::Win32::Security::SECURITY_ATTRIBUTES;
- use windows_sys::Win32::Storage::FileSystem::CreateFileW;
- use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE;
- use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
- use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING;
- use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX;
- use windows_sys::Win32::System::Pipes::ConnectNamedPipe;
- use windows_sys::Win32::System::Pipes::CreateNamedPipeW;
- use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE;
- use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE;
- use windows_sys::Win32::System::Threading::GetCurrentProcess;
-
- use std::io;
- use std::os::windows::ffi::OsStrExt;
- use std::path::Path;
- use std::ptr;
-
+ {
+ let mut ipc_rid = None;
+ let mut handles_to_close = Vec::with_capacity(1);
if let Some(ipc) = args.ipc {
- if ipc < 0 {
- return Ok((command, None));
- }
+ if ipc >= 0 {
+ let (hd1, hd2) = deno_io::bi_pipe_pair_raw()?;
- let (path, hd1) = loop {
- let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4());
- let mut path = Path::new(&name)
- .as_os_str()
- .encode_wide()
- .collect::<Vec<_>>();
- path.push(0);
-
- let hd1 = CreateNamedPipeW(
- path.as_ptr(),
- PIPE_ACCESS_DUPLEX
- | FILE_FLAG_FIRST_PIPE_INSTANCE
- | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
- 1,
- 65536,
- 65536,
- 0,
- std::ptr::null_mut(),
- );
-
- if hd1 == INVALID_HANDLE_VALUE {
- let err = io::Error::last_os_error();
- /* If the pipe name is already in use, try again. */
- if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) {
- continue;
- }
+ /* One end returned to parent process (this) */
+ let pipe_rid = Some(state.resource_table.add(
+ deno_node::IpcJsonStreamResource::new(
+ hd1 as i64,
+ deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
+ )?,
+ ));
- return Err(err.into());
- }
+ /* The other end passed to child process via NODE_CHANNEL_FD */
+ command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64));
- break (path, hd1);
- };
-
- /* Create child pipe handle. */
- let s = SECURITY_ATTRIBUTES {
- nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
- lpSecurityDescriptor: ptr::null_mut(),
- bInheritHandle: 1,
- };
- let mut hd2 = CreateFileW(
- path.as_ptr(),
- GENERIC_READ | GENERIC_WRITE,
- 0,
- &s,
- OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED,
- 0,
- );
- if hd2 == INVALID_HANDLE_VALUE {
- return Err(io::Error::last_os_error().into());
- }
+ handles_to_close.push(hd2);
- // Will not block because we have create the pair.
- if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 {
- let err = std::io::Error::last_os_error();
- if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) {
- CloseHandle(hd2);
- return Err(err.into());
- }
+ ipc_rid = pipe_rid;
}
+ }
- // Duplicating the handle to allow the child process to use it.
- if DuplicateHandle(
- GetCurrentProcess(),
- hd2,
- GetCurrentProcess(),
- &mut hd2,
- 0,
- 1,
- DUPLICATE_SAME_ACCESS,
- ) == 0
- {
- return Err(std::io::Error::last_os_error().into());
- }
-
- /* One end returned to parent process (this) */
- let pipe_fd = Some(state.resource_table.add(
- deno_node::IpcJsonStreamResource::new(
- hd1 as i64,
- deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
- )?,
- ));
-
- /* The other end passed to child process via NODE_CHANNEL_FD */
- command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64));
-
- return Ok((command, pipe_fd));
+ if args.extra_stdio.iter().any(|s| matches!(s, Stdio::Piped)) {
+ log::warn!(
+ "Additional stdio pipes beyond stdin/stdout/stderr are not currently supported on windows"
+ );
}
- }
- #[cfg(not(unix))]
- return Ok((command, None));
+ Ok((command, ipc_rid, vec![], handles_to_close))
+ }
}
#[derive(Serialize)]
@@ -497,13 +391,15 @@ struct Child {
stdin_rid: Option<ResourceId>,
stdout_rid: Option<ResourceId>,
stderr_rid: Option<ResourceId>,
- pipe_fd: Option<ResourceId>,
+ ipc_pipe_rid: Option<ResourceId>,
+ extra_pipe_rids: Vec<Option<ResourceId>>,
}
fn spawn_child(
state: &mut OpState,
command: std::process::Command,
- pipe_fd: Option<ResourceId>,
+ ipc_pipe_rid: Option<ResourceId>,
+ extra_pipe_rids: Vec<Option<ResourceId>>,
) -> Result<Child, AnyError> {
let mut command = tokio::process::Command::from(command);
// TODO(@crowlkats): allow detaching processes.
@@ -585,10 +481,28 @@ fn spawn_child(
stdin_rid,
stdout_rid,
stderr_rid,
- pipe_fd,
+ ipc_pipe_rid,
+ extra_pipe_rids,
})
}
+fn close_raw_handle(handle: deno_io::RawBiPipeHandle) {
+ #[cfg(unix)]
+ {
+ // SAFETY: libc call
+ unsafe {
+ libc::close(handle);
+ }
+ }
+ #[cfg(windows)]
+ {
+ // SAFETY: win32 call
+ unsafe {
+ windows_sys::Win32::Foundation::CloseHandle(handle as _);
+ }
+ }
+}
+
#[op2]
#[serde]
fn op_spawn_child(
@@ -596,8 +510,13 @@ fn op_spawn_child(
#[serde] args: SpawnArgs,
#[string] api_name: String,
) -> Result<Child, AnyError> {
- let (command, pipe_rid) = create_command(state, args, &api_name)?;
- spawn_child(state, command, pipe_rid)
+ let (command, pipe_rid, extra_pipe_rids, handles_to_close) =
+ create_command(state, args, &api_name)?;
+ let child = spawn_child(state, command, pipe_rid, extra_pipe_rids);
+ for handle in handles_to_close {
+ close_raw_handle(handle);
+ }
+ child
}
#[op2(async)]
@@ -626,7 +545,7 @@ fn op_spawn_sync(
) -> Result<SpawnOutput, AnyError> {
let stdout = matches!(args.stdio.stdout, StdioOrRid::Stdio(Stdio::Piped));
let stderr = matches!(args.stdio.stderr, StdioOrRid::Stdio(Stdio::Piped));
- let (mut command, _) =
+ let (mut command, _, _, _) =
create_command(state, args, "Deno.Command().outputSync()")?;
let output = command.output().with_context(|| {
format!(
diff --git a/tests/specs/node/child_process_extra_pipes/__test__.jsonc b/tests/specs/node/child_process_extra_pipes/__test__.jsonc
new file mode 100644
index 000000000..f8073f3df
--- /dev/null
+++ b/tests/specs/node/child_process_extra_pipes/__test__.jsonc
@@ -0,0 +1,17 @@
+{
+ "tempDir": true,
+ "steps": [
+ {
+ "if": "unix",
+ "cwd": "./test-pipe",
+ "commandName": "cargo",
+ "args": "build",
+ "output": "[WILDCARD]"
+ },
+ {
+ "if": "unix",
+ "args": "run -A main.ts",
+ "output": "main.out"
+ }
+ ]
+}
diff --git a/tests/specs/node/child_process_extra_pipes/main.out b/tests/specs/node/child_process_extra_pipes/main.out
new file mode 100644
index 000000000..694126b92
--- /dev/null
+++ b/tests/specs/node/child_process_extra_pipes/main.out
@@ -0,0 +1,5 @@
+data: hello world
+[UNORDERED_START]
+child closed
+pipe closed
+[UNORDERED_END]
diff --git a/tests/specs/node/child_process_extra_pipes/main.ts b/tests/specs/node/child_process_extra_pipes/main.ts
new file mode 100644
index 000000000..a3683fe9e
--- /dev/null
+++ b/tests/specs/node/child_process_extra_pipes/main.ts
@@ -0,0 +1,26 @@
+import child_process from "node:child_process";
+import { Buffer } from "node:buffer";
+import console from "node:console";
+
+const child = child_process.spawn("./test-pipe/target/debug/test-pipe", [], {
+ stdio: ["inherit", "inherit", "inherit", "ignore", "pipe"],
+});
+
+const extra = child.stdio[4];
+
+const p = Promise.withResolvers();
+
+child.on("close", () => {
+ console.log("child closed");
+ p.resolve();
+});
+
+extra.on("data", (d) => {
+ console.log("data:", d.toString().trim());
+});
+
+extra.on("close", () => {
+ console.log("pipe closed");
+});
+
+await p.promise;
diff --git a/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.lock b/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.lock
new file mode 100644
index 000000000..51bfabdad
--- /dev/null
+++ b/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.lock
@@ -0,0 +1,7 @@
+# This file is automatically @generated by Cargo.
+# It is not intended for manual editing.
+version = 3
+
+[[package]]
+name = "test-pipe"
+version = "0.1.0"
diff --git a/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.toml b/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.toml
new file mode 100644
index 000000000..26c552299
--- /dev/null
+++ b/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.toml
@@ -0,0 +1,8 @@
+[package]
+name = "test-pipe"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+
+[workspace]
diff --git a/tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs b/tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs
new file mode 100644
index 000000000..192f82731
--- /dev/null
+++ b/tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs
@@ -0,0 +1,12 @@
+use std::io::prelude::*;
+use std::os::fd::FromRawFd;
+use std::os::unix::net::UnixStream;
+
+fn main() {
+ #[cfg(unix)]
+ {
+ let mut stream = unsafe { UnixStream::from_raw_fd(4) };
+
+ stream.write_all(b"hello world\n").unwrap();
+ }
+}
diff --git a/tools/core_import_map.json b/tools/core_import_map.json
index e7703a40c..ba4cd105d 100644
--- a/tools/core_import_map.json
+++ b/tools/core_import_map.json
@@ -166,6 +166,7 @@
"ext:deno_node/internal/test/binding.ts": "../ext/node/polyfills/internal/test/binding.ts",
"ext:deno_node/internal/timers.mjs": "../ext/node/polyfills/internal/timers.mjs",
"ext:deno_node/internal/url.ts": "../ext/node/polyfills/internal/url.ts",
+ "ext:deno_node/internal/stream_base_commons.ts": "../ext/node/polyfills/internal/stream_base_commons.ts",
"ext:deno_node/internal/util.mjs": "../ext/node/polyfills/internal/util.mjs",
"ext:deno_node/internal/util/debuglog.ts": "../ext/node/polyfills/internal/util/debuglog.ts",
"ext:deno_node/internal/util/inspect.mjs": "../ext/node/polyfills/internal/util/inspect.mjs",