diff options
author | Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> | 2024-08-15 09:38:46 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-15 09:38:46 -0700 |
commit | 8749d651fb5e0964cdb8e62be7a59a603cbc3c7c (patch) | |
tree | 1506d08504561a4013ad03ff1068bec23e572102 /ext | |
parent | 7ca95fc999f22cb0eb312e02f8c40d7589b35b7e (diff) |
fix(node): Create additional pipes for child processes (#25016)
Linux/macos only currently.
Part of https://github.com/denoland/deno/issues/23524 (fixes it on
platforms other than windows).
Part of #16899 (fixes it on platforms other than windows).
After this PR, playwright is functional on mac/linux.
Diffstat (limited to 'ext')
-rw-r--r-- | ext/io/Cargo.toml | 4 | ||||
-rw-r--r-- | ext/io/bi_pipe.rs | 433 | ||||
-rw-r--r-- | ext/io/lib.rs | 9 | ||||
-rw-r--r-- | ext/node/ops/ipc.rs | 84 | ||||
-rw-r--r-- | ext/node/polyfills/internal/child_process.ts | 102 | ||||
-rw-r--r-- | ext/node/polyfills/internal_binding/pipe_wrap.ts | 11 | ||||
-rw-r--r-- | ext/node/polyfills/internal_binding/stream_wrap.ts | 11 |
7 files changed, 565 insertions, 89 deletions
diff --git a/ext/io/Cargo.toml b/ext/io/Cargo.toml index 9d02f88c9..414bf0739 100644 --- a/ext/io/Cargo.toml +++ b/ext/io/Cargo.toml @@ -20,12 +20,16 @@ filetime.workspace = true fs3.workspace = true log.workspace = true once_cell.workspace = true +pin-project.workspace = true tokio.workspace = true +uuid.workspace = true [target.'cfg(not(windows))'.dependencies] os_pipe.workspace = true +libc.workspace = true [target.'cfg(windows)'.dependencies] winapi = { workspace = true, features = ["winbase", "processenv", "errhandlingapi"] } rand.workspace = true parking_lot.workspace = true +windows-sys.workspace = true diff --git a/ext/io/bi_pipe.rs b/ext/io/bi_pipe.rs new file mode 100644 index 000000000..04fff7b00 --- /dev/null +++ b/ext/io/bi_pipe.rs @@ -0,0 +1,433 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use std::rc::Rc; + +use deno_core::error::AnyError; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::RcRef; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; + +#[cfg(unix)] +pub type RawBiPipeHandle = std::os::fd::RawFd; + +#[cfg(windows)] +pub type RawBiPipeHandle = std::os::windows::io::RawHandle; + +/// One end of a bidirectional pipe. This implements the +/// `Resource` trait. +pub struct BiPipeResource { + read_half: AsyncRefCell<BiPipeRead>, + write_half: AsyncRefCell<BiPipeWrite>, + cancel: CancelHandle, + raw_handle: RawBiPipeHandle, +} + +#[cfg(windows)] +// workaround because `RawHandle` doesn't impl `AsRawHandle` +mod as_raw_handle { + use super::RawBiPipeHandle; + pub(super) struct RawHandleWrap(pub(super) RawBiPipeHandle); + impl std::os::windows::io::AsRawHandle for RawHandleWrap { + fn as_raw_handle(&self) -> std::os::windows::prelude::RawHandle { + self.0 + } + } +} + +impl deno_core::Resource for BiPipeResource { + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } + + fn backing_handle(self: Rc<Self>) -> Option<deno_core::ResourceHandle> { + #[cfg(unix)] + { + Some(deno_core::ResourceHandle::from_fd_like(&self.raw_handle)) + } + #[cfg(windows)] + { + Some(deno_core::ResourceHandle::from_fd_like( + &as_raw_handle::RawHandleWrap(self.raw_handle), + )) + } + } + + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); +} + +impl BiPipeResource { + pub fn from_raw_handle(raw: RawBiPipeHandle) -> Result<Self, std::io::Error> { + let pipe = BiPipe::from_raw(raw)?; + let (read, write) = pipe.split(); + Ok(Self { + raw_handle: raw, + read_half: AsyncRefCell::new(read), + write_half: AsyncRefCell::new(write), + cancel: Default::default(), + }) + } + + pub async fn read( + self: Rc<Self>, + data: &mut [u8], + ) -> Result<usize, AnyError> { + let mut rd = RcRef::map(&self, |r| &r.read_half).borrow_mut().await; + let cancel_handle = RcRef::map(&self, |r| &r.cancel); + Ok(rd.read(data).try_or_cancel(cancel_handle).await?) + } + + pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> { + let mut wr = RcRef::map(self, |r| &r.write_half).borrow_mut().await; + let nwritten = wr.write(data).await?; + wr.flush().await?; + Ok(nwritten) + } +} + +/// One end of a bidirectional pipe +#[pin_project::pin_project] +pub struct BiPipe { + #[pin] + read_end: BiPipeRead, + #[pin] + write_end: BiPipeWrite, +} + +impl BiPipe { + pub fn from_raw(raw: RawBiPipeHandle) -> Result<Self, std::io::Error> { + let (read_end, write_end) = from_raw(raw)?; + Ok(Self { + read_end, + write_end, + }) + } + + pub fn split(self) -> (BiPipeRead, BiPipeWrite) { + (self.read_end, self.write_end) + } + + pub fn unsplit(read_end: BiPipeRead, write_end: BiPipeWrite) -> Self { + Self { + read_end, + write_end, + } + } +} + +#[pin_project::pin_project] +pub struct BiPipeRead { + #[cfg(unix)] + #[pin] + inner: tokio::net::unix::OwnedReadHalf, + #[cfg(windows)] + #[pin] + inner: tokio::io::ReadHalf<tokio::net::windows::named_pipe::NamedPipeClient>, +} + +#[cfg(unix)] +impl From<tokio::net::unix::OwnedReadHalf> for BiPipeRead { + fn from(value: tokio::net::unix::OwnedReadHalf) -> Self { + Self { inner: value } + } +} +#[cfg(windows)] +impl From<tokio::io::ReadHalf<tokio::net::windows::named_pipe::NamedPipeClient>> + for BiPipeRead +{ + fn from( + value: tokio::io::ReadHalf< + tokio::net::windows::named_pipe::NamedPipeClient, + >, + ) -> Self { + Self { inner: value } + } +} + +#[pin_project::pin_project] +pub struct BiPipeWrite { + #[cfg(unix)] + #[pin] + inner: tokio::net::unix::OwnedWriteHalf, + #[cfg(windows)] + #[pin] + inner: tokio::io::WriteHalf<tokio::net::windows::named_pipe::NamedPipeClient>, +} + +#[cfg(unix)] +impl From<tokio::net::unix::OwnedWriteHalf> for BiPipeWrite { + fn from(value: tokio::net::unix::OwnedWriteHalf) -> Self { + Self { inner: value } + } +} + +#[cfg(windows)] +impl + From<tokio::io::WriteHalf<tokio::net::windows::named_pipe::NamedPipeClient>> + for BiPipeWrite +{ + fn from( + value: tokio::io::WriteHalf< + tokio::net::windows::named_pipe::NamedPipeClient, + >, + ) -> Self { + Self { inner: value } + } +} + +#[cfg(unix)] +fn from_raw( + stream: RawBiPipeHandle, +) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> { + use std::os::fd::FromRawFd; + // Safety: The fd is part of a pair of connected sockets + let unix_stream = tokio::net::UnixStream::from_std(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(stream) + })?; + let (read, write) = unix_stream.into_split(); + Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write })) +} + +#[cfg(windows)] +fn from_raw( + handle: RawBiPipeHandle, +) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> { + // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the + // fd handle map will be the same. + let pipe = unsafe { + tokio::net::windows::named_pipe::NamedPipeClient::from_raw_handle( + handle as _, + )? + }; + let (read, write) = tokio::io::split(pipe); + Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write })) +} + +impl tokio::io::AsyncRead for BiPipeRead { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + self.project().inner.poll_read(cx, buf) + } +} +impl tokio::io::AsyncRead for BiPipe { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + self.project().read_end.poll_read(cx, buf) + } +} + +// implement `AsyncWrite` for `$name`, delegating +// the impl to `$field`. `$name` must have a `project` method +// with a projected `$field` (e.g. with `pin_project::pin_project`) +macro_rules! impl_async_write { + (for $name: ident -> self.$field: ident) => { + impl tokio::io::AsyncWrite for $name { + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll<Result<usize, std::io::Error>> { + self.project().$field.poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.$field.is_write_vectored() + } + + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll<Result<usize, std::io::Error>> { + self.project().$field.poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), std::io::Error>> { + self.project().$field.poll_flush(cx) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Result<(), std::io::Error>> { + self.project().$field.poll_shutdown(cx) + } + } + }; +} + +impl_async_write!(for BiPipeWrite -> self.inner); +impl_async_write!(for BiPipe -> self.write_end); + +/// Creates both sides of a bidirectional pipe, returning the raw +/// handles to the underlying OS resources. +pub fn bi_pipe_pair_raw() -> Result<(RawBiPipeHandle, RawBiPipeHandle), AnyError> +{ + #[cfg(unix)] + { + // SockFlag is broken on macOS + // https://github.com/nix-rust/nix/issues/861 + let mut fds = [-1, -1]; + #[cfg(not(target_os = "macos"))] + let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK; + + #[cfg(target_os = "macos")] + let flags = 0; + + // SAFETY: libc call, fds are correct size+align + let ret = unsafe { + libc::socketpair( + libc::AF_UNIX, + libc::SOCK_STREAM | flags, + 0, + fds.as_mut_ptr(), + ) + }; + if ret != 0 { + return Err(std::io::Error::last_os_error().into()); + } + + if cfg!(target_os = "macos") { + let fcntl = |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> { + // SAFETY: libc call, fd is valid + let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; + + if flags == -1 { + return Err(fail(fds)); + } + // SAFETY: libc call, fd is valid + let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | flag) }; + if ret == -1 { + return Err(fail(fds)); + } + Ok(()) + }; + + fn fail(fds: [i32; 2]) -> std::io::Error { + // SAFETY: libc call, fds are valid + unsafe { + libc::close(fds[0]); + libc::close(fds[1]); + } + std::io::Error::last_os_error() + } + + // SOCK_NONBLOCK is not supported on macOS. + (fcntl)(fds[0], libc::O_NONBLOCK)?; + (fcntl)(fds[1], libc::O_NONBLOCK)?; + + // SOCK_CLOEXEC is not supported on macOS. + (fcntl)(fds[0], libc::FD_CLOEXEC)?; + (fcntl)(fds[1], libc::FD_CLOEXEC)?; + } + + let fd1 = fds[0]; + let fd2 = fds[1]; + Ok((fd1, fd2)) + } + #[cfg(windows)] + { + // TODO(nathanwhit): more granular unsafe blocks + // SAFETY: win32 calls + unsafe { + use windows_sys::Win32::Foundation::CloseHandle; + use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED; + use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED; + use windows_sys::Win32::Foundation::GENERIC_READ; + use windows_sys::Win32::Foundation::GENERIC_WRITE; + use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE; + use windows_sys::Win32::Security::SECURITY_ATTRIBUTES; + use windows_sys::Win32::Storage::FileSystem::CreateFileW; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; + use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING; + use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX; + use windows_sys::Win32::System::Pipes::ConnectNamedPipe; + use windows_sys::Win32::System::Pipes::CreateNamedPipeW; + use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE; + use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE; + + use std::io; + use std::os::windows::ffi::OsStrExt; + use std::path::Path; + use std::ptr; + + let (path, hd1) = loop { + let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4()); + let mut path = Path::new(&name) + .as_os_str() + .encode_wide() + .collect::<Vec<_>>(); + path.push(0); + + let hd1 = CreateNamedPipeW( + path.as_ptr(), + PIPE_ACCESS_DUPLEX + | FILE_FLAG_FIRST_PIPE_INSTANCE + | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + 1, + 65536, + 65536, + 0, + std::ptr::null_mut(), + ); + + if hd1 == INVALID_HANDLE_VALUE { + let err = io::Error::last_os_error(); + /* If the pipe name is already in use, try again. */ + if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) { + continue; + } + + return Err(err.into()); + } + + break (path, hd1); + }; + + /* Create child pipe handle. */ + let s = SECURITY_ATTRIBUTES { + nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32, + lpSecurityDescriptor: ptr::null_mut(), + bInheritHandle: 1, + }; + let hd2 = CreateFileW( + path.as_ptr(), + GENERIC_READ | GENERIC_WRITE, + 0, + &s, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + 0, + ); + if hd2 == INVALID_HANDLE_VALUE { + return Err(io::Error::last_os_error().into()); + } + + // Will not block because we have create the pair. + if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 { + let err = std::io::Error::last_os_error(); + if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) { + CloseHandle(hd2); + return Err(err.into()); + } + } + + Ok((hd1 as _, hd2 as _)) + } + } +} diff --git a/ext/io/lib.rs b/ext/io/lib.rs index a2f14e0db..47921bcee 100644 --- a/ext/io/lib.rs +++ b/ext/io/lib.rs @@ -60,12 +60,21 @@ mod pipe; #[cfg(windows)] mod winpipe; +mod bi_pipe; + pub use pipe::pipe; pub use pipe::AsyncPipeRead; pub use pipe::AsyncPipeWrite; pub use pipe::PipeRead; pub use pipe::PipeWrite; +pub use bi_pipe::bi_pipe_pair_raw; +pub use bi_pipe::BiPipe; +pub use bi_pipe::BiPipeRead; +pub use bi_pipe::BiPipeResource; +pub use bi_pipe::BiPipeWrite; +pub use bi_pipe::RawBiPipeHandle; + // Store the stdio fd/handles in global statics in order to keep them // alive for the duration of the application since the last handle/fd // being dropped will close the corresponding pipe. diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index 7cfab65a4..59b6fece1 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -9,10 +9,6 @@ mod impl_ { use std::future::Future; use std::io; use std::mem; - #[cfg(unix)] - use std::os::fd::FromRawFd; - #[cfg(unix)] - use std::os::fd::RawFd; use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::AtomicBool; @@ -43,15 +39,9 @@ mod impl_ { use tokio::io::AsyncWriteExt; use tokio::io::ReadBuf; - #[cfg(unix)] - use tokio::net::unix::OwnedReadHalf; - #[cfg(unix)] - use tokio::net::unix::OwnedWriteHalf; - #[cfg(unix)] - use tokio::net::UnixStream; - - #[cfg(windows)] - type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; + use deno_io::BiPipe; + use deno_io::BiPipeRead; + use deno_io::BiPipeWrite; /// Wrapper around v8 value that implements Serialize. struct SerializeWrapper<'a, 'b>( @@ -349,10 +339,7 @@ mod impl_ { pub struct IpcJsonStreamResource { read_half: AsyncRefCell<IpcJsonStream>, - #[cfg(unix)] - write_half: AsyncRefCell<OwnedWriteHalf>, - #[cfg(windows)] - write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>, + write_half: AsyncRefCell<BiPipeWrite>, cancel: Rc<CancelHandle>, queued_bytes: AtomicUsize, ref_tracker: IpcRefTracker, @@ -364,38 +351,12 @@ mod impl_ { } } - #[cfg(unix)] - fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> { - // Safety: The fd is part of a pair of connected sockets create by child process - // implementation. - let unix_stream = UnixStream::from_std(unsafe { - std::os::unix::net::UnixStream::from_raw_fd(stream) - })?; - Ok(unix_stream.into_split()) - } - - #[cfg(windows)] - fn pipe( - handle: i64, - ) -> Result< - ( - tokio::io::ReadHalf<NamedPipeClient>, - tokio::io::WriteHalf<NamedPipeClient>, - ), - io::Error, - > { - // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the - // fd handle map will be the same. - let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? }; - Ok(tokio::io::split(pipe)) - } - impl IpcJsonStreamResource { pub fn new( stream: i64, ref_tracker: IpcRefTracker, ) -> Result<Self, std::io::Error> { - let (read_half, write_half) = pipe(stream as _)?; + let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split(); Ok(Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), @@ -406,11 +367,14 @@ mod impl_ { } #[cfg(all(unix, test))] - fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self { + fn from_stream( + stream: tokio::net::UnixStream, + ref_tracker: IpcRefTracker, + ) -> Self { let (read_half, write_half) = stream.into_split(); Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), - write_half: AsyncRefCell::new(write_half), + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), + write_half: AsyncRefCell::new(write_half.into()), cancel: Default::default(), queued_bytes: Default::default(), ref_tracker, @@ -418,11 +382,14 @@ mod impl_ { } #[cfg(all(windows, test))] - fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self { + fn from_stream( + pipe: tokio::net::windows::named_pipe::NamedPipeClient, + ref_tracker: IpcRefTracker, + ) -> Self { let (read_half, write_half) = tokio::io::split(pipe); Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), - write_half: AsyncRefCell::new(write_half), + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), + write_half: AsyncRefCell::new(write_half.into()), cancel: Default::default(), queued_bytes: Default::default(), ref_tracker, @@ -492,26 +459,13 @@ mod impl_ { // // `\n` is used as a delimiter between messages. struct IpcJsonStream { - #[cfg(unix)] - pipe: OwnedReadHalf, - #[cfg(windows)] - pipe: tokio::io::ReadHalf<NamedPipeClient>, + pipe: BiPipeRead, buffer: Vec<u8>, read_buffer: ReadBuffer, } impl IpcJsonStream { - #[cfg(unix)] - fn new(pipe: OwnedReadHalf) -> Self { - Self { - pipe, - buffer: Vec::with_capacity(INITIAL_CAPACITY), - read_buffer: ReadBuffer::new(), - } - } - - #[cfg(windows)] - fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self { + fn new(pipe: BiPipeRead) -> Self { Self { pipe, buffer: Vec::with_capacity(INITIAL_CAPACITY), diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index 0996806d7..edc11df73 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -25,7 +25,6 @@ import { StringPrototypeStartsWith, StringPrototypeToUpperCase, } from "ext:deno_node/internal/primordials.mjs"; - import { assert } from "ext:deno_node/_util/asserts.ts"; import { EventEmitter } from "node:events"; import { os } from "ext:deno_node/internal_binding/constants.ts"; @@ -54,6 +53,10 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs"; +import { StreamBase } from "ext:deno_node/internal_binding/stream_wrap.ts"; +import { Pipe, socketType } from "ext:deno_node/internal_binding/pipe_wrap.ts"; +import console from "node:console"; +import { Socket } from "node:net"; export function mapValues<T, O>( record: Readonly<Record<string, T>>, @@ -118,6 +121,47 @@ function maybeClose(child: ChildProcess) { } } +function flushStdio(subprocess: ChildProcess) { + const stdio = subprocess.stdio; + + if (stdio == null) return; + + for (let i = 0; i < stdio.length; i++) { + const stream = stdio[i]; + if (!stream || !stream.readable) { + continue; + } + stream.resume(); + } +} + +// Wraps a resource in a class that implements +// StreamBase, so it can be used with node streams +class StreamResource implements StreamBase { + #rid: number; + constructor(rid: number) { + this.#rid = rid; + } + close(): void { + core.close(this.#rid); + } + async read(p: Uint8Array): Promise<number | null> { + const readPromise = core.read(this.#rid, p); + core.unrefOpPromise(readPromise); + const nread = await readPromise; + return nread > 0 ? nread : null; + } + ref(): void { + return; + } + unref(): void { + return; + } + write(p: Uint8Array): Promise<number> { + return core.write(this.#rid, p); + } +} + export class ChildProcess extends EventEmitter { /** * The exit code of the child process. This property will be `null` until the child process exits. @@ -201,7 +245,7 @@ export class ChildProcess extends EventEmitter { stdin = "pipe", stdout = "pipe", stderr = "pipe", - _channel, // TODO(kt3k): handle this correctly + ...extraStdio ] = normalizedStdio; const [cmd, cmdArgs] = buildCommand( command, @@ -213,6 +257,15 @@ export class ChildProcess extends EventEmitter { const ipc = normalizedStdio.indexOf("ipc"); + const extraStdioOffset = 3; // stdin, stdout, stderr + + const extraStdioNormalized: DenoStdio[] = []; + for (let i = 0; i < extraStdio.length; i++) { + const fd = i + extraStdioOffset; + if (fd === ipc) extraStdioNormalized.push("null"); + extraStdioNormalized.push(toDenoStdio(extraStdio[i])); + } + const stringEnv = mapValues(env, (value) => value.toString()); try { this.#process = new Deno.Command(cmd, { @@ -224,6 +277,7 @@ export class ChildProcess extends EventEmitter { stderr: toDenoStdio(stderr), windowsRawArguments: windowsVerbatimArguments, ipc, // internal + extraStdio: extraStdioNormalized, }).spawn(); this.pid = this.#process.pid; @@ -259,13 +313,36 @@ export class ChildProcess extends EventEmitter { maybeClose(this); }); } - // TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their - // close events (like above) this.stdio[0] = this.stdin; this.stdio[1] = this.stdout; this.stdio[2] = this.stderr; + if (ipc >= 0) { + this.stdio[ipc] = null; + } + + const pipeRids = internals.getExtraPipeRids(this.#process); + for (let i = 0; i < pipeRids.length; i++) { + const rid: number | null = pipeRids[i]; + const fd = i + extraStdioOffset; + if (rid) { + this[kClosesNeeded]++; + this.stdio[fd] = new Socket( + { + handle: new Pipe( + socketType.IPC, + new StreamResource(rid), + ), + // deno-lint-ignore no-explicit-any + } as any, + ); + this.stdio[fd]?.on("close", () => { + maybeClose(this); + }); + } + } + nextTick(() => { this.emit("spawn"); this.#spawned.resolve(); @@ -292,9 +369,9 @@ export class ChildProcess extends EventEmitter { } } - const pipeFd = internals.getPipeFd(this.#process); - if (typeof pipeFd == "number") { - setupChannel(this, pipeFd); + const pipeRid = internals.getIpcPipeRid(this.#process); + if (typeof pipeRid == "number") { + setupChannel(this, pipeRid); this[kClosesNeeded]++; this.on("disconnect", () => { maybeClose(this); @@ -312,6 +389,7 @@ export class ChildProcess extends EventEmitter { await this.#_waitForChildStreamsToClose(); this.#closePipes(); maybeClose(this); + nextTick(flushStdio, this); }); })(); } catch (err) { @@ -395,16 +473,6 @@ export class ChildProcess extends EventEmitter { assert(this.stdin); this.stdin.destroy(); } - /// TODO(nathanwhit): for some reason when the child process exits - /// and the child end of the named pipe closes, reads still just return `Pending` - /// instead of returning that 0 bytes were read (to signal the pipe died). - /// For now, just forcibly disconnect, but in theory I think we could miss messages - /// that haven't been read yet. - if (Deno.build.os === "windows") { - if (this.canDisconnect) { - this.disconnect?.(); - } - } } } diff --git a/ext/node/polyfills/internal_binding/pipe_wrap.ts b/ext/node/polyfills/internal_binding/pipe_wrap.ts index a657f7018..f5c3c5439 100644 --- a/ext/node/polyfills/internal_binding/pipe_wrap.ts +++ b/ext/node/polyfills/internal_binding/pipe_wrap.ts @@ -37,7 +37,10 @@ import { import { LibuvStreamWrap } from "ext:deno_node/internal_binding/stream_wrap.ts"; import { codeMap } from "ext:deno_node/internal_binding/uv.ts"; import { delay } from "ext:deno_node/_util/async.ts"; -import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts"; +import { + kStreamBaseField, + StreamBase, +} from "ext:deno_node/internal_binding/stream_wrap.ts"; import { ceilPowOf2, INITIAL_ACCEPT_BACKOFF_DELAY, @@ -68,7 +71,7 @@ export class Pipe extends ConnectionWrap { #closed = false; #acceptBackoffDelay?: number; - constructor(type: number, conn?: Deno.UnixConn) { + constructor(type: number, conn?: Deno.UnixConn | StreamBase) { let provider: providerType; let ipc: boolean; @@ -100,8 +103,8 @@ export class Pipe extends ConnectionWrap { this.ipc = ipc; - if (conn && provider === providerType.PIPEWRAP) { - const localAddr = conn.localAddr as Deno.UnixAddr; + if (conn && provider === providerType.PIPEWRAP && "localAddr" in conn) { + const localAddr = conn.localAddr; this.#address = localAddr.path; } } diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts index 4915a38ca..dc30bfdfe 100644 --- a/ext/node/polyfills/internal_binding/stream_wrap.ts +++ b/ext/node/polyfills/internal_binding/stream_wrap.ts @@ -44,11 +44,11 @@ import { } from "ext:deno_node/internal_binding/async_wrap.ts"; import { codeMap } from "ext:deno_node/internal_binding/uv.ts"; -interface Reader { +export interface Reader { read(p: Uint8Array): Promise<number | null>; } -interface Writer { +export interface Writer { write(p: Uint8Array): Promise<number>; } @@ -56,7 +56,12 @@ export interface Closer { close(): void; } -type Ref = { ref(): void; unref(): void }; +export interface Ref { + ref(): void; + unref(): void; +} + +export interface StreamBase extends Reader, Writer, Closer, Ref {} const enum StreamBaseStateFields { kReadBytesOrError, |