summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/internal/child_process.ts
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2023-12-13 15:44:16 +0530
committerGitHub <noreply@github.com>2023-12-13 11:14:16 +0100
commit5a91a065b882215dde209baf626247e54c21a392 (patch)
tree192cb8b3b0a4037453b5fd5b2a60e4d52d4543a8 /ext/node/polyfills/internal/child_process.ts
parentbbf8f69cb979be0f36c38ae52b1588e648b3252e (diff)
fix: implement child_process IPC (#21490)
This PR implements the Node child_process IPC functionality in Deno on Unix systems. For `fd > 2` a duplex unix pipe is set up between the parent and child processes. Currently implements data passing via the channel in the JSON serialization format.
Diffstat (limited to 'ext/node/polyfills/internal/child_process.ts')
-rw-r--r--ext/node/polyfills/internal/child_process.ts95
1 files changed, 94 insertions, 1 deletions
diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts
index 04773a8b7..b9bf13396 100644
--- a/ext/node/polyfills/internal/child_process.ts
+++ b/ext/node/polyfills/internal/child_process.ts
@@ -44,6 +44,9 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs";
import process from "node:process";
+const core = globalThis.__bootstrap.core;
+const ops = core.ops;
+
export function mapValues<T, O>(
record: Readonly<Record<string, T>>,
transformer: (value: T) => O,
@@ -167,12 +170,13 @@ export class ChildProcess extends EventEmitter {
signal,
windowsVerbatimArguments = false,
} = options || {};
+ const normalizedStdio = normalizeStdioOption(stdio);
const [
stdin = "pipe",
stdout = "pipe",
stderr = "pipe",
_channel, // TODO(kt3k): handle this correctly
- ] = normalizeStdioOption(stdio);
+ ] = normalizedStdio;
const [cmd, cmdArgs] = buildCommand(
command,
args || [],
@@ -181,6 +185,8 @@ export class ChildProcess extends EventEmitter {
this.spawnfile = cmd;
this.spawnargs = [cmd, ...cmdArgs];
+ const ipc = normalizedStdio.indexOf("ipc");
+
const stringEnv = mapValues(env, (value) => value.toString());
try {
this.#process = new Deno.Command(cmd, {
@@ -191,6 +197,7 @@ export class ChildProcess extends EventEmitter {
stdout: toDenoStdio(stdout),
stderr: toDenoStdio(stderr),
windowsRawArguments: windowsVerbatimArguments,
+ ipc, // internal
}).spawn();
this.pid = this.#process.pid;
@@ -249,6 +256,10 @@ export class ChildProcess extends EventEmitter {
}
}
+ if (typeof this.#process._pipeFd == "number") {
+ setupChannel(this, this.#process._pipeFd);
+ }
+
(async () => {
const status = await this.#process.status;
this.exitCode = status.code;
@@ -1058,9 +1069,91 @@ function toDenoArgs(args: string[]): string[] {
return denoArgs;
}
+export function setupChannel(target, channel) {
+ const ipc = ops.op_node_ipc_pipe(channel);
+
+ async function readLoop() {
+ try {
+ while (true) {
+ if (!target.connected || target.killed) {
+ return;
+ }
+ const msg = await core.opAsync("op_node_ipc_read", ipc);
+ if (msg == null) {
+ // Channel closed.
+ target.disconnect();
+ return;
+ }
+
+ process.nextTick(handleMessage, msg);
+ }
+ } catch (err) {
+ if (
+ err instanceof Deno.errors.Interrupted ||
+ err instanceof Deno.errors.BadResource
+ ) {
+ return;
+ }
+ }
+ }
+
+ function handleMessage(msg) {
+ target.emit("message", msg);
+ }
+
+ target.send = function (message, handle, options, callback) {
+ if (typeof handle === "function") {
+ callback = handle;
+ handle = undefined;
+ options = undefined;
+ } else if (typeof options === "function") {
+ callback = options;
+ options = undefined;
+ } else if (options !== undefined) {
+ validateObject(options, "options");
+ }
+
+ options = { swallowErrors: false, ...options };
+
+ if (message === undefined) {
+ throw new TypeError("ERR_MISSING_ARGS", "message");
+ }
+
+ if (handle !== undefined) {
+ notImplemented("ChildProcess.send with handle");
+ }
+
+ core.opAsync("op_node_ipc_write", ipc, message)
+ .then(() => {
+ if (callback) {
+ process.nextTick(callback, null);
+ }
+ });
+ };
+
+ target.connected = true;
+
+ target.disconnect = function () {
+ if (!this.connected) {
+ this.emit("error", new Error("IPC channel is already disconnected"));
+ return;
+ }
+
+ this.connected = false;
+ process.nextTick(() => {
+ core.close(ipc);
+ target.emit("disconnect");
+ });
+ };
+
+ // Start reading messages from the channel.
+ readLoop();
+}
+
export default {
ChildProcess,
normalizeSpawnArguments,
stdioStringToArray,
spawnSync,
+ setupChannel,
};