diff options
author | Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> | 2024-07-30 16:13:24 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-30 16:13:24 -0700 |
commit | cd59fc53a528603112addfe8b10fe4e30d04e7f0 (patch) | |
tree | 1abe3976361b39ad3969aabdd2b40380ae79c85d /ext/node/polyfills/internal/child_process.ts | |
parent | 3659781f88236a369aa9ca5142c0fb7d690fc898 (diff) |
fix(node): Rework node:child_process IPC (#24763)
Fixes https://github.com/denoland/deno/issues/24756. Fixes
https://github.com/denoland/deno/issues/24796.
This also gets vitest working when using
[`--pool=forks`](https://vitest.dev/guide/improving-performance#pool)
(which is the default as of vitest 2.0). Ref
https://github.com/denoland/deno/issues/23882.
---
This PR resolves a handful of issues with child_process IPC. In
particular:
- We didn't support sending typed array views over IPC
- Opening an IPC channel resulted in the event loop never exiting
- Sending a `null` over IPC would terminate the channel
- There was some UB in the read implementation (transmuting an `&[u8]`
to `&mut [u8]`)
- The `send` method wasn't returning anything, so there was no way to
signal backpressure (this also resulted in the benchmark
`child_process_ipc.mjs` being misleading, as it tried to respect
backpressure. That gave node much worse results at larger message sizes,
and gave us much worse results at smaller message sizes).
- We weren't setting up the `channel` property on the `process` global
(or on the `ChildProcess` object), and also didn't have a way to
ref/unref the channel
- Calling `kill` multiple times (or disconnecting the channel, then
calling kill) would throw an error
- Node couldn't spawn a deno subprocess and communicate with it over IPC
Diffstat (limited to 'ext/node/polyfills/internal/child_process.ts')
-rw-r--r-- | ext/node/polyfills/internal/child_process.ts | 220 |
1 files changed, 200 insertions, 20 deletions
diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index cabae63ee..2dcf0e782 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -7,7 +7,12 @@ // deno-lint-ignore-file prefer-primordials import { core, internals } from "ext:core/mod.js"; -import { op_node_ipc_read, op_node_ipc_write } from "ext:core/ops"; +import { + op_node_ipc_read, + op_node_ipc_ref, + op_node_ipc_unref, + op_node_ipc_write, +} from "ext:core/ops"; import { ArrayIsArray, ArrayPrototypeFilter, @@ -17,13 +22,14 @@ import { ArrayPrototypeSort, ArrayPrototypeUnshift, ObjectHasOwn, + StringPrototypeStartsWith, StringPrototypeToUpperCase, } from "ext:deno_node/internal/primordials.mjs"; import { assert } from "ext:deno_node/_util/asserts.ts"; import { EventEmitter } from "node:events"; import { os } from "ext:deno_node/internal_binding/constants.ts"; -import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts"; +import { notImplemented } from "ext:deno_node/_utils.ts"; import { Readable, Stream, Writable } from "node:stream"; import { isWindows } from "ext:deno_node/_util/os.ts"; import { nextTick } from "ext:deno_node/_next_tick.ts"; @@ -31,6 +37,7 @@ import { AbortError, ERR_INVALID_ARG_TYPE, ERR_INVALID_ARG_VALUE, + ERR_IPC_CHANNEL_CLOSED, ERR_UNKNOWN_SIGNAL, } from "ext:deno_node/internal/errors.ts"; import { Buffer } from "node:buffer"; @@ -46,6 +53,7 @@ import { import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; +import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs"; export function mapValues<T, O>( record: Readonly<Record<string, T>>, @@ -97,6 +105,19 @@ export function stdioStringToArray( return options; } +const kClosesNeeded = Symbol("_closesNeeded"); +const kClosesReceived = Symbol("_closesReceived"); + +// We only want to emit a close event for the child process when all of +// the writable streams have closed. The value of `child[kClosesNeeded]` should be 1 + +// the number of opened writable streams (note this excludes `stdin`). +function maybeClose(child: ChildProcess) { + child[kClosesReceived]++; + if (child[kClosesNeeded] === child[kClosesReceived]) { + child.emit("close", child.exitCode, child.signalCode); + } +} + export class ChildProcess extends EventEmitter { /** * The exit code of the child process. This property will be `null` until the child process exits. @@ -152,8 +173,13 @@ export class ChildProcess extends EventEmitter { null, ]; + disconnect?: () => void; + #process!: Deno.ChildProcess; #spawned = Promise.withResolvers<void>(); + [kClosesNeeded] = 1; + [kClosesReceived] = 0; + canDisconnect = false; constructor( command: string, @@ -218,13 +244,23 @@ export class ChildProcess extends EventEmitter { if (stdout === "pipe") { assert(this.#process.stdout); + this[kClosesNeeded]++; this.stdout = Readable.fromWeb(this.#process.stdout); + this.stdout.on("close", () => { + maybeClose(this); + }); } if (stderr === "pipe") { assert(this.#process.stderr); + this[kClosesNeeded]++; this.stderr = Readable.fromWeb(this.#process.stderr); + this.stderr.on("close", () => { + maybeClose(this); + }); } + // TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their + // close events (like above) this.stdio[0] = this.stdin; this.stdio[1] = this.stdout; @@ -259,6 +295,10 @@ export class ChildProcess extends EventEmitter { const pipeFd = internals.getPipeFd(this.#process); if (typeof pipeFd == "number") { setupChannel(this, pipeFd); + this[kClosesNeeded]++; + this.on("disconnect", () => { + maybeClose(this); + }); } (async () => { @@ -271,7 +311,7 @@ export class ChildProcess extends EventEmitter { this.emit("exit", exitCode, signalCode); await this.#_waitForChildStreamsToClose(); this.#closePipes(); - this.emit("close", exitCode, signalCode); + maybeClose(this); }); })(); } catch (err) { @@ -304,7 +344,7 @@ export class ChildProcess extends EventEmitter { } /* Cancel any pending IPC I/O */ - if (this.implementsDisconnect) { + if (this.canDisconnect) { this.disconnect?.(); } @@ -321,10 +361,6 @@ export class ChildProcess extends EventEmitter { this.#process.unref(); } - disconnect() { - warnNotImplemented("ChildProcess.prototype.disconnect"); - } - async #_waitForChildStreamsToClose() { const promises = [] as Array<Promise<void>>; // Don't close parent process stdin if that's passed through @@ -359,6 +395,16 @@ export class ChildProcess extends EventEmitter { assert(this.stdin); this.stdin.destroy(); } + /// TODO(nathanwhit): for some reason when the child process exits + /// and the child end of the named pipe closes, reads still just return `Pending` + /// instead of returning that 0 bytes were read (to signal the pipe died). + /// For now, just forcibly disconnect, but in theory I think we could miss messages + /// that haven't been read yet. + if (Deno.build.os === "windows") { + if (this.canDisconnect) { + this.disconnect?.(); + } + } } } @@ -1099,18 +1145,109 @@ function toDenoArgs(args: string[]): string[] { return denoArgs; } -export function setupChannel(target, ipc) { +const kControlDisconnect = Symbol("kControlDisconnect"); +const kPendingMessages = Symbol("kPendingMessages"); + +// controls refcounting for the IPC channel +class Control extends EventEmitter { + #channel: number; + #refs: number = 0; + #refExplicitlySet = false; + #connected = true; + [kPendingMessages] = []; + constructor(channel: number) { + super(); + this.#channel = channel; + } + + #ref() { + if (this.#connected) { + op_node_ipc_ref(this.#channel); + } + } + + #unref() { + if (this.#connected) { + op_node_ipc_unref(this.#channel); + } + } + + [kControlDisconnect]() { + this.#unref(); + this.#connected = false; + } + + refCounted() { + if (++this.#refs === 1 && !this.#refExplicitlySet) { + this.#ref(); + } + } + + unrefCounted() { + if (--this.#refs === 0 && !this.#refExplicitlySet) { + this.#unref(); + this.emit("unref"); + } + } + + ref() { + this.#refExplicitlySet = true; + this.#ref(); + } + + unref() { + this.#refExplicitlySet = false; + this.#unref(); + } +} + +type InternalMessage = { + cmd: `NODE_${string}`; +}; + +// deno-lint-ignore no-explicit-any +function isInternal(msg: any): msg is InternalMessage { + if (msg && typeof msg === "object") { + const cmd = msg["cmd"]; + if (typeof cmd === "string") { + return StringPrototypeStartsWith(cmd, "NODE_"); + } + } + return false; +} + +function internalCmdName(msg: InternalMessage): string { + return StringPrototypeSlice(msg.cmd, 5); +} + +// deno-lint-ignore no-explicit-any +export function setupChannel(target: any, ipc: number) { + const control = new Control(ipc); + target.channel = control; + async function readLoop() { try { while (true) { if (!target.connected || target.killed) { return; } - const msg = await op_node_ipc_read(ipc); - if (msg == null) { - // Channel closed. - target.disconnect(); - return; + const prom = op_node_ipc_read(ipc); + // there will always be a pending read promise, + // but it shouldn't keep the event loop from exiting + core.unrefOpPromise(prom); + const msg = await prom; + if (isInternal(msg)) { + const cmd = internalCmdName(msg); + if (cmd === "CLOSE") { + // Channel closed. + target.disconnect(); + return; + } else { + // TODO(nathanwhit): once we add support for sending + // handles, if we want to support deno-node IPC interop, + // we'll need to handle the NODE_HANDLE_* messages here. + continue; + } } process.nextTick(handleMessage, msg); @@ -1126,9 +1263,29 @@ export function setupChannel(target, ipc) { } function handleMessage(msg) { - target.emit("message", msg); + if (!target.channel) { + return; + } + if (target.listenerCount("message") !== 0) { + target.emit("message", msg); + return; + } + + ArrayPrototypePush(target.channel[kPendingMessages], msg); } + target.on("newListener", () => { + nextTick(() => { + if (!target.channel || !target.listenerCount("message")) { + return; + } + for (const msg of target.channel[kPendingMessages]) { + target.emit("message", msg); + } + target.channel[kPendingMessages] = []; + }); + }); + target.send = function (message, handle, options, callback) { if (typeof handle === "function") { callback = handle; @@ -1151,32 +1308,55 @@ export function setupChannel(target, ipc) { notImplemented("ChildProcess.send with handle"); } - op_node_ipc_write(ipc, message) + if (!target.connected) { + const err = new ERR_IPC_CHANNEL_CLOSED(); + if (typeof callback === "function") { + console.error("ChildProcess.send with callback"); + process.nextTick(callback, err); + } else { + nextTick(() => target.emit("error", err)); + } + return false; + } + + // signals whether the queue is within the limit. + // if false, the sender should slow down. + // this acts as a backpressure mechanism. + const queueOk = [true]; + control.refCounted(); + op_node_ipc_write(ipc, message, queueOk) .then(() => { + control.unrefCounted(); if (callback) { process.nextTick(callback, null); } }); + return queueOk[0]; }; target.connected = true; target.disconnect = function () { - if (!this.connected) { - this.emit("error", new Error("IPC channel is already disconnected")); + if (!target.connected) { + target.emit("error", new Error("IPC channel is already disconnected")); return; } - this.connected = false; + target.connected = false; + target.canDisconnect = false; + control[kControlDisconnect](); process.nextTick(() => { + target.channel = null; core.close(ipc); target.emit("disconnect"); }); }; - target.implementsDisconnect = true; + target.canDisconnect = true; // Start reading messages from the channel. readLoop(); + + return control; } export default { |