summaryrefslogtreecommitdiff
path: root/ext/node
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node')
-rw-r--r--ext/node/ops/ipc.rs84
-rw-r--r--ext/node/polyfills/internal/child_process.ts102
-rw-r--r--ext/node/polyfills/internal_binding/pipe_wrap.ts11
-rw-r--r--ext/node/polyfills/internal_binding/stream_wrap.ts11
4 files changed, 119 insertions, 89 deletions
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs
index 7cfab65a4..59b6fece1 100644
--- a/ext/node/ops/ipc.rs
+++ b/ext/node/ops/ipc.rs
@@ -9,10 +9,6 @@ mod impl_ {
use std::future::Future;
use std::io;
use std::mem;
- #[cfg(unix)]
- use std::os::fd::FromRawFd;
- #[cfg(unix)]
- use std::os::fd::RawFd;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
@@ -43,15 +39,9 @@ mod impl_ {
use tokio::io::AsyncWriteExt;
use tokio::io::ReadBuf;
- #[cfg(unix)]
- use tokio::net::unix::OwnedReadHalf;
- #[cfg(unix)]
- use tokio::net::unix::OwnedWriteHalf;
- #[cfg(unix)]
- use tokio::net::UnixStream;
-
- #[cfg(windows)]
- type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;
+ use deno_io::BiPipe;
+ use deno_io::BiPipeRead;
+ use deno_io::BiPipeWrite;
/// Wrapper around v8 value that implements Serialize.
struct SerializeWrapper<'a, 'b>(
@@ -349,10 +339,7 @@ mod impl_ {
pub struct IpcJsonStreamResource {
read_half: AsyncRefCell<IpcJsonStream>,
- #[cfg(unix)]
- write_half: AsyncRefCell<OwnedWriteHalf>,
- #[cfg(windows)]
- write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
+ write_half: AsyncRefCell<BiPipeWrite>,
cancel: Rc<CancelHandle>,
queued_bytes: AtomicUsize,
ref_tracker: IpcRefTracker,
@@ -364,38 +351,12 @@ mod impl_ {
}
}
- #[cfg(unix)]
- fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> {
- // Safety: The fd is part of a pair of connected sockets create by child process
- // implementation.
- let unix_stream = UnixStream::from_std(unsafe {
- std::os::unix::net::UnixStream::from_raw_fd(stream)
- })?;
- Ok(unix_stream.into_split())
- }
-
- #[cfg(windows)]
- fn pipe(
- handle: i64,
- ) -> Result<
- (
- tokio::io::ReadHalf<NamedPipeClient>,
- tokio::io::WriteHalf<NamedPipeClient>,
- ),
- io::Error,
- > {
- // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
- // fd handle map will be the same.
- let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? };
- Ok(tokio::io::split(pipe))
- }
-
impl IpcJsonStreamResource {
pub fn new(
stream: i64,
ref_tracker: IpcRefTracker,
) -> Result<Self, std::io::Error> {
- let (read_half, write_half) = pipe(stream as _)?;
+ let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split();
Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
@@ -406,11 +367,14 @@ mod impl_ {
}
#[cfg(all(unix, test))]
- fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self {
+ fn from_stream(
+ stream: tokio::net::UnixStream,
+ ref_tracker: IpcRefTracker,
+ ) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
- read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
- write_half: AsyncRefCell::new(write_half),
+ read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())),
+ write_half: AsyncRefCell::new(write_half.into()),
cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
@@ -418,11 +382,14 @@ mod impl_ {
}
#[cfg(all(windows, test))]
- fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self {
+ fn from_stream(
+ pipe: tokio::net::windows::named_pipe::NamedPipeClient,
+ ref_tracker: IpcRefTracker,
+ ) -> Self {
let (read_half, write_half) = tokio::io::split(pipe);
Self {
- read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
- write_half: AsyncRefCell::new(write_half),
+ read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())),
+ write_half: AsyncRefCell::new(write_half.into()),
cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
@@ -492,26 +459,13 @@ mod impl_ {
//
// `\n` is used as a delimiter between messages.
struct IpcJsonStream {
- #[cfg(unix)]
- pipe: OwnedReadHalf,
- #[cfg(windows)]
- pipe: tokio::io::ReadHalf<NamedPipeClient>,
+ pipe: BiPipeRead,
buffer: Vec<u8>,
read_buffer: ReadBuffer,
}
impl IpcJsonStream {
- #[cfg(unix)]
- fn new(pipe: OwnedReadHalf) -> Self {
- Self {
- pipe,
- buffer: Vec::with_capacity(INITIAL_CAPACITY),
- read_buffer: ReadBuffer::new(),
- }
- }
-
- #[cfg(windows)]
- fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
+ fn new(pipe: BiPipeRead) -> Self {
Self {
pipe,
buffer: Vec::with_capacity(INITIAL_CAPACITY),
diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts
index 0996806d7..edc11df73 100644
--- a/ext/node/polyfills/internal/child_process.ts
+++ b/ext/node/polyfills/internal/child_process.ts
@@ -25,7 +25,6 @@ import {
StringPrototypeStartsWith,
StringPrototypeToUpperCase,
} from "ext:deno_node/internal/primordials.mjs";
-
import { assert } from "ext:deno_node/_util/asserts.ts";
import { EventEmitter } from "node:events";
import { os } from "ext:deno_node/internal_binding/constants.ts";
@@ -54,6 +53,10 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs";
import process from "node:process";
import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs";
+import { StreamBase } from "ext:deno_node/internal_binding/stream_wrap.ts";
+import { Pipe, socketType } from "ext:deno_node/internal_binding/pipe_wrap.ts";
+import console from "node:console";
+import { Socket } from "node:net";
export function mapValues<T, O>(
record: Readonly<Record<string, T>>,
@@ -118,6 +121,47 @@ function maybeClose(child: ChildProcess) {
}
}
+function flushStdio(subprocess: ChildProcess) {
+ const stdio = subprocess.stdio;
+
+ if (stdio == null) return;
+
+ for (let i = 0; i < stdio.length; i++) {
+ const stream = stdio[i];
+ if (!stream || !stream.readable) {
+ continue;
+ }
+ stream.resume();
+ }
+}
+
+// Wraps a resource in a class that implements
+// StreamBase, so it can be used with node streams
+class StreamResource implements StreamBase {
+ #rid: number;
+ constructor(rid: number) {
+ this.#rid = rid;
+ }
+ close(): void {
+ core.close(this.#rid);
+ }
+ async read(p: Uint8Array): Promise<number | null> {
+ const readPromise = core.read(this.#rid, p);
+ core.unrefOpPromise(readPromise);
+ const nread = await readPromise;
+ return nread > 0 ? nread : null;
+ }
+ ref(): void {
+ return;
+ }
+ unref(): void {
+ return;
+ }
+ write(p: Uint8Array): Promise<number> {
+ return core.write(this.#rid, p);
+ }
+}
+
export class ChildProcess extends EventEmitter {
/**
* The exit code of the child process. This property will be `null` until the child process exits.
@@ -201,7 +245,7 @@ export class ChildProcess extends EventEmitter {
stdin = "pipe",
stdout = "pipe",
stderr = "pipe",
- _channel, // TODO(kt3k): handle this correctly
+ ...extraStdio
] = normalizedStdio;
const [cmd, cmdArgs] = buildCommand(
command,
@@ -213,6 +257,15 @@ export class ChildProcess extends EventEmitter {
const ipc = normalizedStdio.indexOf("ipc");
+ const extraStdioOffset = 3; // stdin, stdout, stderr
+
+ const extraStdioNormalized: DenoStdio[] = [];
+ for (let i = 0; i < extraStdio.length; i++) {
+ const fd = i + extraStdioOffset;
+ if (fd === ipc) extraStdioNormalized.push("null");
+ extraStdioNormalized.push(toDenoStdio(extraStdio[i]));
+ }
+
const stringEnv = mapValues(env, (value) => value.toString());
try {
this.#process = new Deno.Command(cmd, {
@@ -224,6 +277,7 @@ export class ChildProcess extends EventEmitter {
stderr: toDenoStdio(stderr),
windowsRawArguments: windowsVerbatimArguments,
ipc, // internal
+ extraStdio: extraStdioNormalized,
}).spawn();
this.pid = this.#process.pid;
@@ -259,13 +313,36 @@ export class ChildProcess extends EventEmitter {
maybeClose(this);
});
}
- // TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their
- // close events (like above)
this.stdio[0] = this.stdin;
this.stdio[1] = this.stdout;
this.stdio[2] = this.stderr;
+ if (ipc >= 0) {
+ this.stdio[ipc] = null;
+ }
+
+ const pipeRids = internals.getExtraPipeRids(this.#process);
+ for (let i = 0; i < pipeRids.length; i++) {
+ const rid: number | null = pipeRids[i];
+ const fd = i + extraStdioOffset;
+ if (rid) {
+ this[kClosesNeeded]++;
+ this.stdio[fd] = new Socket(
+ {
+ handle: new Pipe(
+ socketType.IPC,
+ new StreamResource(rid),
+ ),
+ // deno-lint-ignore no-explicit-any
+ } as any,
+ );
+ this.stdio[fd]?.on("close", () => {
+ maybeClose(this);
+ });
+ }
+ }
+
nextTick(() => {
this.emit("spawn");
this.#spawned.resolve();
@@ -292,9 +369,9 @@ export class ChildProcess extends EventEmitter {
}
}
- const pipeFd = internals.getPipeFd(this.#process);
- if (typeof pipeFd == "number") {
- setupChannel(this, pipeFd);
+ const pipeRid = internals.getIpcPipeRid(this.#process);
+ if (typeof pipeRid == "number") {
+ setupChannel(this, pipeRid);
this[kClosesNeeded]++;
this.on("disconnect", () => {
maybeClose(this);
@@ -312,6 +389,7 @@ export class ChildProcess extends EventEmitter {
await this.#_waitForChildStreamsToClose();
this.#closePipes();
maybeClose(this);
+ nextTick(flushStdio, this);
});
})();
} catch (err) {
@@ -395,16 +473,6 @@ export class ChildProcess extends EventEmitter {
assert(this.stdin);
this.stdin.destroy();
}
- /// TODO(nathanwhit): for some reason when the child process exits
- /// and the child end of the named pipe closes, reads still just return `Pending`
- /// instead of returning that 0 bytes were read (to signal the pipe died).
- /// For now, just forcibly disconnect, but in theory I think we could miss messages
- /// that haven't been read yet.
- if (Deno.build.os === "windows") {
- if (this.canDisconnect) {
- this.disconnect?.();
- }
- }
}
}
diff --git a/ext/node/polyfills/internal_binding/pipe_wrap.ts b/ext/node/polyfills/internal_binding/pipe_wrap.ts
index a657f7018..f5c3c5439 100644
--- a/ext/node/polyfills/internal_binding/pipe_wrap.ts
+++ b/ext/node/polyfills/internal_binding/pipe_wrap.ts
@@ -37,7 +37,10 @@ import {
import { LibuvStreamWrap } from "ext:deno_node/internal_binding/stream_wrap.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
import { delay } from "ext:deno_node/_util/async.ts";
-import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
+import {
+ kStreamBaseField,
+ StreamBase,
+} from "ext:deno_node/internal_binding/stream_wrap.ts";
import {
ceilPowOf2,
INITIAL_ACCEPT_BACKOFF_DELAY,
@@ -68,7 +71,7 @@ export class Pipe extends ConnectionWrap {
#closed = false;
#acceptBackoffDelay?: number;
- constructor(type: number, conn?: Deno.UnixConn) {
+ constructor(type: number, conn?: Deno.UnixConn | StreamBase) {
let provider: providerType;
let ipc: boolean;
@@ -100,8 +103,8 @@ export class Pipe extends ConnectionWrap {
this.ipc = ipc;
- if (conn && provider === providerType.PIPEWRAP) {
- const localAddr = conn.localAddr as Deno.UnixAddr;
+ if (conn && provider === providerType.PIPEWRAP && "localAddr" in conn) {
+ const localAddr = conn.localAddr;
this.#address = localAddr.path;
}
}
diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts
index 4915a38ca..dc30bfdfe 100644
--- a/ext/node/polyfills/internal_binding/stream_wrap.ts
+++ b/ext/node/polyfills/internal_binding/stream_wrap.ts
@@ -44,11 +44,11 @@ import {
} from "ext:deno_node/internal_binding/async_wrap.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
-interface Reader {
+export interface Reader {
read(p: Uint8Array): Promise<number | null>;
}
-interface Writer {
+export interface Writer {
write(p: Uint8Array): Promise<number>;
}
@@ -56,7 +56,12 @@ export interface Closer {
close(): void;
}
-type Ref = { ref(): void; unref(): void };
+export interface Ref {
+ ref(): void;
+ unref(): void;
+}
+
+export interface StreamBase extends Reader, Writer, Closer, Ref {}
const enum StreamBaseStateFields {
kReadBytesOrError,