summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/internal/child_process.ts
diff options
context:
space:
mode:
authorNathan Whitaker <17734409+nathanwhit@users.noreply.github.com>2024-07-30 16:13:24 -0700
committerGitHub <noreply@github.com>2024-07-30 16:13:24 -0700
commitcd59fc53a528603112addfe8b10fe4e30d04e7f0 (patch)
tree1abe3976361b39ad3969aabdd2b40380ae79c85d /ext/node/polyfills/internal/child_process.ts
parent3659781f88236a369aa9ca5142c0fb7d690fc898 (diff)
fix(node): Rework node:child_process IPC (#24763)
Fixes https://github.com/denoland/deno/issues/24756. Fixes https://github.com/denoland/deno/issues/24796. This also gets vitest working when using [`--pool=forks`](https://vitest.dev/guide/improving-performance#pool) (which is the default as of vitest 2.0). Ref https://github.com/denoland/deno/issues/23882. --- This PR resolves a handful of issues with child_process IPC. In particular: - We didn't support sending typed array views over IPC - Opening an IPC channel resulted in the event loop never exiting - Sending a `null` over IPC would terminate the channel - There was some UB in the read implementation (transmuting an `&[u8]` to `&mut [u8]`) - The `send` method wasn't returning anything, so there was no way to signal backpressure (this also resulted in the benchmark `child_process_ipc.mjs` being misleading, as it tried to respect backpressure. That gave node much worse results at larger message sizes, and gave us much worse results at smaller message sizes). - We weren't setting up the `channel` property on the `process` global (or on the `ChildProcess` object), and also didn't have a way to ref/unref the channel - Calling `kill` multiple times (or disconnecting the channel, then calling kill) would throw an error - Node couldn't spawn a deno subprocess and communicate with it over IPC
Diffstat (limited to 'ext/node/polyfills/internal/child_process.ts')
-rw-r--r--ext/node/polyfills/internal/child_process.ts220
1 files changed, 200 insertions, 20 deletions
diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts
index cabae63ee..2dcf0e782 100644
--- a/ext/node/polyfills/internal/child_process.ts
+++ b/ext/node/polyfills/internal/child_process.ts
@@ -7,7 +7,12 @@
// deno-lint-ignore-file prefer-primordials
import { core, internals } from "ext:core/mod.js";
-import { op_node_ipc_read, op_node_ipc_write } from "ext:core/ops";
+import {
+ op_node_ipc_read,
+ op_node_ipc_ref,
+ op_node_ipc_unref,
+ op_node_ipc_write,
+} from "ext:core/ops";
import {
ArrayIsArray,
ArrayPrototypeFilter,
@@ -17,13 +22,14 @@ import {
ArrayPrototypeSort,
ArrayPrototypeUnshift,
ObjectHasOwn,
+ StringPrototypeStartsWith,
StringPrototypeToUpperCase,
} from "ext:deno_node/internal/primordials.mjs";
import { assert } from "ext:deno_node/_util/asserts.ts";
import { EventEmitter } from "node:events";
import { os } from "ext:deno_node/internal_binding/constants.ts";
-import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
+import { notImplemented } from "ext:deno_node/_utils.ts";
import { Readable, Stream, Writable } from "node:stream";
import { isWindows } from "ext:deno_node/_util/os.ts";
import { nextTick } from "ext:deno_node/_next_tick.ts";
@@ -31,6 +37,7 @@ import {
AbortError,
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
+ ERR_IPC_CHANNEL_CLOSED,
ERR_UNKNOWN_SIGNAL,
} from "ext:deno_node/internal/errors.ts";
import { Buffer } from "node:buffer";
@@ -46,6 +53,7 @@ import {
import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs";
import process from "node:process";
+import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs";
export function mapValues<T, O>(
record: Readonly<Record<string, T>>,
@@ -97,6 +105,19 @@ export function stdioStringToArray(
return options;
}
+const kClosesNeeded = Symbol("_closesNeeded");
+const kClosesReceived = Symbol("_closesReceived");
+
+// We only want to emit a close event for the child process when all of
+// the writable streams have closed. The value of `child[kClosesNeeded]` should be 1 +
+// the number of opened writable streams (note this excludes `stdin`).
+function maybeClose(child: ChildProcess) {
+ child[kClosesReceived]++;
+ if (child[kClosesNeeded] === child[kClosesReceived]) {
+ child.emit("close", child.exitCode, child.signalCode);
+ }
+}
+
export class ChildProcess extends EventEmitter {
/**
* The exit code of the child process. This property will be `null` until the child process exits.
@@ -152,8 +173,13 @@ export class ChildProcess extends EventEmitter {
null,
];
+ disconnect?: () => void;
+
#process!: Deno.ChildProcess;
#spawned = Promise.withResolvers<void>();
+ [kClosesNeeded] = 1;
+ [kClosesReceived] = 0;
+ canDisconnect = false;
constructor(
command: string,
@@ -218,13 +244,23 @@ export class ChildProcess extends EventEmitter {
if (stdout === "pipe") {
assert(this.#process.stdout);
+ this[kClosesNeeded]++;
this.stdout = Readable.fromWeb(this.#process.stdout);
+ this.stdout.on("close", () => {
+ maybeClose(this);
+ });
}
if (stderr === "pipe") {
assert(this.#process.stderr);
+ this[kClosesNeeded]++;
this.stderr = Readable.fromWeb(this.#process.stderr);
+ this.stderr.on("close", () => {
+ maybeClose(this);
+ });
}
+ // TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their
+ // close events (like above)
this.stdio[0] = this.stdin;
this.stdio[1] = this.stdout;
@@ -259,6 +295,10 @@ export class ChildProcess extends EventEmitter {
const pipeFd = internals.getPipeFd(this.#process);
if (typeof pipeFd == "number") {
setupChannel(this, pipeFd);
+ this[kClosesNeeded]++;
+ this.on("disconnect", () => {
+ maybeClose(this);
+ });
}
(async () => {
@@ -271,7 +311,7 @@ export class ChildProcess extends EventEmitter {
this.emit("exit", exitCode, signalCode);
await this.#_waitForChildStreamsToClose();
this.#closePipes();
- this.emit("close", exitCode, signalCode);
+ maybeClose(this);
});
})();
} catch (err) {
@@ -304,7 +344,7 @@ export class ChildProcess extends EventEmitter {
}
/* Cancel any pending IPC I/O */
- if (this.implementsDisconnect) {
+ if (this.canDisconnect) {
this.disconnect?.();
}
@@ -321,10 +361,6 @@ export class ChildProcess extends EventEmitter {
this.#process.unref();
}
- disconnect() {
- warnNotImplemented("ChildProcess.prototype.disconnect");
- }
-
async #_waitForChildStreamsToClose() {
const promises = [] as Array<Promise<void>>;
// Don't close parent process stdin if that's passed through
@@ -359,6 +395,16 @@ export class ChildProcess extends EventEmitter {
assert(this.stdin);
this.stdin.destroy();
}
+ /// TODO(nathanwhit): for some reason when the child process exits
+ /// and the child end of the named pipe closes, reads still just return `Pending`
+ /// instead of returning that 0 bytes were read (to signal the pipe died).
+ /// For now, just forcibly disconnect, but in theory I think we could miss messages
+ /// that haven't been read yet.
+ if (Deno.build.os === "windows") {
+ if (this.canDisconnect) {
+ this.disconnect?.();
+ }
+ }
}
}
@@ -1099,18 +1145,109 @@ function toDenoArgs(args: string[]): string[] {
return denoArgs;
}
-export function setupChannel(target, ipc) {
+const kControlDisconnect = Symbol("kControlDisconnect");
+const kPendingMessages = Symbol("kPendingMessages");
+
+// controls refcounting for the IPC channel
+class Control extends EventEmitter {
+ #channel: number;
+ #refs: number = 0;
+ #refExplicitlySet = false;
+ #connected = true;
+ [kPendingMessages] = [];
+ constructor(channel: number) {
+ super();
+ this.#channel = channel;
+ }
+
+ #ref() {
+ if (this.#connected) {
+ op_node_ipc_ref(this.#channel);
+ }
+ }
+
+ #unref() {
+ if (this.#connected) {
+ op_node_ipc_unref(this.#channel);
+ }
+ }
+
+ [kControlDisconnect]() {
+ this.#unref();
+ this.#connected = false;
+ }
+
+ refCounted() {
+ if (++this.#refs === 1 && !this.#refExplicitlySet) {
+ this.#ref();
+ }
+ }
+
+ unrefCounted() {
+ if (--this.#refs === 0 && !this.#refExplicitlySet) {
+ this.#unref();
+ this.emit("unref");
+ }
+ }
+
+ ref() {
+ this.#refExplicitlySet = true;
+ this.#ref();
+ }
+
+ unref() {
+ this.#refExplicitlySet = false;
+ this.#unref();
+ }
+}
+
+type InternalMessage = {
+ cmd: `NODE_${string}`;
+};
+
+// deno-lint-ignore no-explicit-any
+function isInternal(msg: any): msg is InternalMessage {
+ if (msg && typeof msg === "object") {
+ const cmd = msg["cmd"];
+ if (typeof cmd === "string") {
+ return StringPrototypeStartsWith(cmd, "NODE_");
+ }
+ }
+ return false;
+}
+
+function internalCmdName(msg: InternalMessage): string {
+ return StringPrototypeSlice(msg.cmd, 5);
+}
+
+// deno-lint-ignore no-explicit-any
+export function setupChannel(target: any, ipc: number) {
+ const control = new Control(ipc);
+ target.channel = control;
+
async function readLoop() {
try {
while (true) {
if (!target.connected || target.killed) {
return;
}
- const msg = await op_node_ipc_read(ipc);
- if (msg == null) {
- // Channel closed.
- target.disconnect();
- return;
+ const prom = op_node_ipc_read(ipc);
+ // there will always be a pending read promise,
+ // but it shouldn't keep the event loop from exiting
+ core.unrefOpPromise(prom);
+ const msg = await prom;
+ if (isInternal(msg)) {
+ const cmd = internalCmdName(msg);
+ if (cmd === "CLOSE") {
+ // Channel closed.
+ target.disconnect();
+ return;
+ } else {
+ // TODO(nathanwhit): once we add support for sending
+ // handles, if we want to support deno-node IPC interop,
+ // we'll need to handle the NODE_HANDLE_* messages here.
+ continue;
+ }
}
process.nextTick(handleMessage, msg);
@@ -1126,9 +1263,29 @@ export function setupChannel(target, ipc) {
}
function handleMessage(msg) {
- target.emit("message", msg);
+ if (!target.channel) {
+ return;
+ }
+ if (target.listenerCount("message") !== 0) {
+ target.emit("message", msg);
+ return;
+ }
+
+ ArrayPrototypePush(target.channel[kPendingMessages], msg);
}
+ target.on("newListener", () => {
+ nextTick(() => {
+ if (!target.channel || !target.listenerCount("message")) {
+ return;
+ }
+ for (const msg of target.channel[kPendingMessages]) {
+ target.emit("message", msg);
+ }
+ target.channel[kPendingMessages] = [];
+ });
+ });
+
target.send = function (message, handle, options, callback) {
if (typeof handle === "function") {
callback = handle;
@@ -1151,32 +1308,55 @@ export function setupChannel(target, ipc) {
notImplemented("ChildProcess.send with handle");
}
- op_node_ipc_write(ipc, message)
+ if (!target.connected) {
+ const err = new ERR_IPC_CHANNEL_CLOSED();
+ if (typeof callback === "function") {
+ console.error("ChildProcess.send with callback");
+ process.nextTick(callback, err);
+ } else {
+ nextTick(() => target.emit("error", err));
+ }
+ return false;
+ }
+
+ // signals whether the queue is within the limit.
+ // if false, the sender should slow down.
+ // this acts as a backpressure mechanism.
+ const queueOk = [true];
+ control.refCounted();
+ op_node_ipc_write(ipc, message, queueOk)
.then(() => {
+ control.unrefCounted();
if (callback) {
process.nextTick(callback, null);
}
});
+ return queueOk[0];
};
target.connected = true;
target.disconnect = function () {
- if (!this.connected) {
- this.emit("error", new Error("IPC channel is already disconnected"));
+ if (!target.connected) {
+ target.emit("error", new Error("IPC channel is already disconnected"));
return;
}
- this.connected = false;
+ target.connected = false;
+ target.canDisconnect = false;
+ control[kControlDisconnect]();
process.nextTick(() => {
+ target.channel = null;
core.close(ipc);
target.emit("disconnect");
});
};
- target.implementsDisconnect = true;
+ target.canDisconnect = true;
// Start reading messages from the channel.
readLoop();
+
+ return control;
}
export default {