diff options
author | Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> | 2024-08-15 09:38:46 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-15 09:38:46 -0700 |
commit | 8749d651fb5e0964cdb8e62be7a59a603cbc3c7c (patch) | |
tree | 1506d08504561a4013ad03ff1068bec23e572102 /ext/node | |
parent | 7ca95fc999f22cb0eb312e02f8c40d7589b35b7e (diff) |
fix(node): Create additional pipes for child processes (#25016)
Linux/macos only currently.
Part of https://github.com/denoland/deno/issues/23524 (fixes it on
platforms other than windows).
Part of #16899 (fixes it on platforms other than windows).
After this PR, playwright is functional on mac/linux.
Diffstat (limited to 'ext/node')
-rw-r--r-- | ext/node/ops/ipc.rs | 84 | ||||
-rw-r--r-- | ext/node/polyfills/internal/child_process.ts | 102 | ||||
-rw-r--r-- | ext/node/polyfills/internal_binding/pipe_wrap.ts | 11 | ||||
-rw-r--r-- | ext/node/polyfills/internal_binding/stream_wrap.ts | 11 |
4 files changed, 119 insertions, 89 deletions
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index 7cfab65a4..59b6fece1 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -9,10 +9,6 @@ mod impl_ { use std::future::Future; use std::io; use std::mem; - #[cfg(unix)] - use std::os::fd::FromRawFd; - #[cfg(unix)] - use std::os::fd::RawFd; use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::AtomicBool; @@ -43,15 +39,9 @@ mod impl_ { use tokio::io::AsyncWriteExt; use tokio::io::ReadBuf; - #[cfg(unix)] - use tokio::net::unix::OwnedReadHalf; - #[cfg(unix)] - use tokio::net::unix::OwnedWriteHalf; - #[cfg(unix)] - use tokio::net::UnixStream; - - #[cfg(windows)] - type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; + use deno_io::BiPipe; + use deno_io::BiPipeRead; + use deno_io::BiPipeWrite; /// Wrapper around v8 value that implements Serialize. struct SerializeWrapper<'a, 'b>( @@ -349,10 +339,7 @@ mod impl_ { pub struct IpcJsonStreamResource { read_half: AsyncRefCell<IpcJsonStream>, - #[cfg(unix)] - write_half: AsyncRefCell<OwnedWriteHalf>, - #[cfg(windows)] - write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>, + write_half: AsyncRefCell<BiPipeWrite>, cancel: Rc<CancelHandle>, queued_bytes: AtomicUsize, ref_tracker: IpcRefTracker, @@ -364,38 +351,12 @@ mod impl_ { } } - #[cfg(unix)] - fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> { - // Safety: The fd is part of a pair of connected sockets create by child process - // implementation. - let unix_stream = UnixStream::from_std(unsafe { - std::os::unix::net::UnixStream::from_raw_fd(stream) - })?; - Ok(unix_stream.into_split()) - } - - #[cfg(windows)] - fn pipe( - handle: i64, - ) -> Result< - ( - tokio::io::ReadHalf<NamedPipeClient>, - tokio::io::WriteHalf<NamedPipeClient>, - ), - io::Error, - > { - // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the - // fd handle map will be the same. - let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? }; - Ok(tokio::io::split(pipe)) - } - impl IpcJsonStreamResource { pub fn new( stream: i64, ref_tracker: IpcRefTracker, ) -> Result<Self, std::io::Error> { - let (read_half, write_half) = pipe(stream as _)?; + let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split(); Ok(Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), @@ -406,11 +367,14 @@ mod impl_ { } #[cfg(all(unix, test))] - fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self { + fn from_stream( + stream: tokio::net::UnixStream, + ref_tracker: IpcRefTracker, + ) -> Self { let (read_half, write_half) = stream.into_split(); Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), - write_half: AsyncRefCell::new(write_half), + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), + write_half: AsyncRefCell::new(write_half.into()), cancel: Default::default(), queued_bytes: Default::default(), ref_tracker, @@ -418,11 +382,14 @@ mod impl_ { } #[cfg(all(windows, test))] - fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self { + fn from_stream( + pipe: tokio::net::windows::named_pipe::NamedPipeClient, + ref_tracker: IpcRefTracker, + ) -> Self { let (read_half, write_half) = tokio::io::split(pipe); Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), - write_half: AsyncRefCell::new(write_half), + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), + write_half: AsyncRefCell::new(write_half.into()), cancel: Default::default(), queued_bytes: Default::default(), ref_tracker, @@ -492,26 +459,13 @@ mod impl_ { // // `\n` is used as a delimiter between messages. struct IpcJsonStream { - #[cfg(unix)] - pipe: OwnedReadHalf, - #[cfg(windows)] - pipe: tokio::io::ReadHalf<NamedPipeClient>, + pipe: BiPipeRead, buffer: Vec<u8>, read_buffer: ReadBuffer, } impl IpcJsonStream { - #[cfg(unix)] - fn new(pipe: OwnedReadHalf) -> Self { - Self { - pipe, - buffer: Vec::with_capacity(INITIAL_CAPACITY), - read_buffer: ReadBuffer::new(), - } - } - - #[cfg(windows)] - fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self { + fn new(pipe: BiPipeRead) -> Self { Self { pipe, buffer: Vec::with_capacity(INITIAL_CAPACITY), diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index 0996806d7..edc11df73 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -25,7 +25,6 @@ import { StringPrototypeStartsWith, StringPrototypeToUpperCase, } from "ext:deno_node/internal/primordials.mjs"; - import { assert } from "ext:deno_node/_util/asserts.ts"; import { EventEmitter } from "node:events"; import { os } from "ext:deno_node/internal_binding/constants.ts"; @@ -54,6 +53,10 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs"; +import { StreamBase } from "ext:deno_node/internal_binding/stream_wrap.ts"; +import { Pipe, socketType } from "ext:deno_node/internal_binding/pipe_wrap.ts"; +import console from "node:console"; +import { Socket } from "node:net"; export function mapValues<T, O>( record: Readonly<Record<string, T>>, @@ -118,6 +121,47 @@ function maybeClose(child: ChildProcess) { } } +function flushStdio(subprocess: ChildProcess) { + const stdio = subprocess.stdio; + + if (stdio == null) return; + + for (let i = 0; i < stdio.length; i++) { + const stream = stdio[i]; + if (!stream || !stream.readable) { + continue; + } + stream.resume(); + } +} + +// Wraps a resource in a class that implements +// StreamBase, so it can be used with node streams +class StreamResource implements StreamBase { + #rid: number; + constructor(rid: number) { + this.#rid = rid; + } + close(): void { + core.close(this.#rid); + } + async read(p: Uint8Array): Promise<number | null> { + const readPromise = core.read(this.#rid, p); + core.unrefOpPromise(readPromise); + const nread = await readPromise; + return nread > 0 ? nread : null; + } + ref(): void { + return; + } + unref(): void { + return; + } + write(p: Uint8Array): Promise<number> { + return core.write(this.#rid, p); + } +} + export class ChildProcess extends EventEmitter { /** * The exit code of the child process. This property will be `null` until the child process exits. @@ -201,7 +245,7 @@ export class ChildProcess extends EventEmitter { stdin = "pipe", stdout = "pipe", stderr = "pipe", - _channel, // TODO(kt3k): handle this correctly + ...extraStdio ] = normalizedStdio; const [cmd, cmdArgs] = buildCommand( command, @@ -213,6 +257,15 @@ export class ChildProcess extends EventEmitter { const ipc = normalizedStdio.indexOf("ipc"); + const extraStdioOffset = 3; // stdin, stdout, stderr + + const extraStdioNormalized: DenoStdio[] = []; + for (let i = 0; i < extraStdio.length; i++) { + const fd = i + extraStdioOffset; + if (fd === ipc) extraStdioNormalized.push("null"); + extraStdioNormalized.push(toDenoStdio(extraStdio[i])); + } + const stringEnv = mapValues(env, (value) => value.toString()); try { this.#process = new Deno.Command(cmd, { @@ -224,6 +277,7 @@ export class ChildProcess extends EventEmitter { stderr: toDenoStdio(stderr), windowsRawArguments: windowsVerbatimArguments, ipc, // internal + extraStdio: extraStdioNormalized, }).spawn(); this.pid = this.#process.pid; @@ -259,13 +313,36 @@ export class ChildProcess extends EventEmitter { maybeClose(this); }); } - // TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their - // close events (like above) this.stdio[0] = this.stdin; this.stdio[1] = this.stdout; this.stdio[2] = this.stderr; + if (ipc >= 0) { + this.stdio[ipc] = null; + } + + const pipeRids = internals.getExtraPipeRids(this.#process); + for (let i = 0; i < pipeRids.length; i++) { + const rid: number | null = pipeRids[i]; + const fd = i + extraStdioOffset; + if (rid) { + this[kClosesNeeded]++; + this.stdio[fd] = new Socket( + { + handle: new Pipe( + socketType.IPC, + new StreamResource(rid), + ), + // deno-lint-ignore no-explicit-any + } as any, + ); + this.stdio[fd]?.on("close", () => { + maybeClose(this); + }); + } + } + nextTick(() => { this.emit("spawn"); this.#spawned.resolve(); @@ -292,9 +369,9 @@ export class ChildProcess extends EventEmitter { } } - const pipeFd = internals.getPipeFd(this.#process); - if (typeof pipeFd == "number") { - setupChannel(this, pipeFd); + const pipeRid = internals.getIpcPipeRid(this.#process); + if (typeof pipeRid == "number") { + setupChannel(this, pipeRid); this[kClosesNeeded]++; this.on("disconnect", () => { maybeClose(this); @@ -312,6 +389,7 @@ export class ChildProcess extends EventEmitter { await this.#_waitForChildStreamsToClose(); this.#closePipes(); maybeClose(this); + nextTick(flushStdio, this); }); })(); } catch (err) { @@ -395,16 +473,6 @@ export class ChildProcess extends EventEmitter { assert(this.stdin); this.stdin.destroy(); } - /// TODO(nathanwhit): for some reason when the child process exits - /// and the child end of the named pipe closes, reads still just return `Pending` - /// instead of returning that 0 bytes were read (to signal the pipe died). - /// For now, just forcibly disconnect, but in theory I think we could miss messages - /// that haven't been read yet. - if (Deno.build.os === "windows") { - if (this.canDisconnect) { - this.disconnect?.(); - } - } } } diff --git a/ext/node/polyfills/internal_binding/pipe_wrap.ts b/ext/node/polyfills/internal_binding/pipe_wrap.ts index a657f7018..f5c3c5439 100644 --- a/ext/node/polyfills/internal_binding/pipe_wrap.ts +++ b/ext/node/polyfills/internal_binding/pipe_wrap.ts @@ -37,7 +37,10 @@ import { import { LibuvStreamWrap } from "ext:deno_node/internal_binding/stream_wrap.ts"; import { codeMap } from "ext:deno_node/internal_binding/uv.ts"; import { delay } from "ext:deno_node/_util/async.ts"; -import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts"; +import { + kStreamBaseField, + StreamBase, +} from "ext:deno_node/internal_binding/stream_wrap.ts"; import { ceilPowOf2, INITIAL_ACCEPT_BACKOFF_DELAY, @@ -68,7 +71,7 @@ export class Pipe extends ConnectionWrap { #closed = false; #acceptBackoffDelay?: number; - constructor(type: number, conn?: Deno.UnixConn) { + constructor(type: number, conn?: Deno.UnixConn | StreamBase) { let provider: providerType; let ipc: boolean; @@ -100,8 +103,8 @@ export class Pipe extends ConnectionWrap { this.ipc = ipc; - if (conn && provider === providerType.PIPEWRAP) { - const localAddr = conn.localAddr as Deno.UnixAddr; + if (conn && provider === providerType.PIPEWRAP && "localAddr" in conn) { + const localAddr = conn.localAddr; this.#address = localAddr.path; } } diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts index 4915a38ca..dc30bfdfe 100644 --- a/ext/node/polyfills/internal_binding/stream_wrap.ts +++ b/ext/node/polyfills/internal_binding/stream_wrap.ts @@ -44,11 +44,11 @@ import { } from "ext:deno_node/internal_binding/async_wrap.ts"; import { codeMap } from "ext:deno_node/internal_binding/uv.ts"; -interface Reader { +export interface Reader { read(p: Uint8Array): Promise<number | null>; } -interface Writer { +export interface Writer { write(p: Uint8Array): Promise<number>; } @@ -56,7 +56,12 @@ export interface Closer { close(): void; } -type Ref = { ref(): void; unref(): void }; +export interface Ref { + ref(): void; + unref(): void; +} + +export interface StreamBase extends Reader, Writer, Closer, Ref {} const enum StreamBaseStateFields { kReadBytesOrError, |