diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2023-12-13 15:44:16 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-13 11:14:16 +0100 |
commit | 5a91a065b882215dde209baf626247e54c21a392 (patch) | |
tree | 192cb8b3b0a4037453b5fd5b2a60e4d52d4543a8 /ext/node/polyfills/internal/child_process.ts | |
parent | bbf8f69cb979be0f36c38ae52b1588e648b3252e (diff) |
fix: implement child_process IPC (#21490)
This PR implements the Node child_process IPC functionality in Deno on
Unix systems.
For `fd > 2` a duplex unix pipe is set up between the parent and child
processes. Currently implements data passing via the channel in the JSON
serialization format.
Diffstat (limited to 'ext/node/polyfills/internal/child_process.ts')
-rw-r--r-- | ext/node/polyfills/internal/child_process.ts | 95 |
1 files changed, 94 insertions, 1 deletions
diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index 04773a8b7..b9bf13396 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -44,6 +44,9 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; +const core = globalThis.__bootstrap.core; +const ops = core.ops; + export function mapValues<T, O>( record: Readonly<Record<string, T>>, transformer: (value: T) => O, @@ -167,12 +170,13 @@ export class ChildProcess extends EventEmitter { signal, windowsVerbatimArguments = false, } = options || {}; + const normalizedStdio = normalizeStdioOption(stdio); const [ stdin = "pipe", stdout = "pipe", stderr = "pipe", _channel, // TODO(kt3k): handle this correctly - ] = normalizeStdioOption(stdio); + ] = normalizedStdio; const [cmd, cmdArgs] = buildCommand( command, args || [], @@ -181,6 +185,8 @@ export class ChildProcess extends EventEmitter { this.spawnfile = cmd; this.spawnargs = [cmd, ...cmdArgs]; + const ipc = normalizedStdio.indexOf("ipc"); + const stringEnv = mapValues(env, (value) => value.toString()); try { this.#process = new Deno.Command(cmd, { @@ -191,6 +197,7 @@ export class ChildProcess extends EventEmitter { stdout: toDenoStdio(stdout), stderr: toDenoStdio(stderr), windowsRawArguments: windowsVerbatimArguments, + ipc, // internal }).spawn(); this.pid = this.#process.pid; @@ -249,6 +256,10 @@ export class ChildProcess extends EventEmitter { } } + if (typeof this.#process._pipeFd == "number") { + setupChannel(this, this.#process._pipeFd); + } + (async () => { const status = await this.#process.status; this.exitCode = status.code; @@ -1058,9 +1069,91 @@ function toDenoArgs(args: string[]): string[] { return denoArgs; } +export function setupChannel(target, channel) { + const ipc = ops.op_node_ipc_pipe(channel); + + async function readLoop() { + try { + while (true) { + if (!target.connected || target.killed) { + return; + } + const msg = await core.opAsync("op_node_ipc_read", ipc); + if (msg == null) { + // Channel closed. + target.disconnect(); + return; + } + + process.nextTick(handleMessage, msg); + } + } catch (err) { + if ( + err instanceof Deno.errors.Interrupted || + err instanceof Deno.errors.BadResource + ) { + return; + } + } + } + + function handleMessage(msg) { + target.emit("message", msg); + } + + target.send = function (message, handle, options, callback) { + if (typeof handle === "function") { + callback = handle; + handle = undefined; + options = undefined; + } else if (typeof options === "function") { + callback = options; + options = undefined; + } else if (options !== undefined) { + validateObject(options, "options"); + } + + options = { swallowErrors: false, ...options }; + + if (message === undefined) { + throw new TypeError("ERR_MISSING_ARGS", "message"); + } + + if (handle !== undefined) { + notImplemented("ChildProcess.send with handle"); + } + + core.opAsync("op_node_ipc_write", ipc, message) + .then(() => { + if (callback) { + process.nextTick(callback, null); + } + }); + }; + + target.connected = true; + + target.disconnect = function () { + if (!this.connected) { + this.emit("error", new Error("IPC channel is already disconnected")); + return; + } + + this.connected = false; + process.nextTick(() => { + core.close(ipc); + target.emit("disconnect"); + }); + }; + + // Start reading messages from the channel. + readLoop(); +} + export default { ChildProcess, normalizeSpawnArguments, stdioStringToArray, spawnSync, + setupChannel, }; |