summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2023-12-13 15:44:16 +0530
committerGitHub <noreply@github.com>2023-12-13 11:14:16 +0100
commit5a91a065b882215dde209baf626247e54c21a392 (patch)
tree192cb8b3b0a4037453b5fd5b2a60e4d52d4543a8
parentbbf8f69cb979be0f36c38ae52b1588e648b3252e (diff)
fix: implement child_process IPC (#21490)
This PR implements the Node child_process IPC functionality in Deno on Unix systems. For `fd > 2` a duplex unix pipe is set up between the parent and child processes. Currently implements data passing via the channel in the JSON serialization format.
-rw-r--r--Cargo.lock145
-rw-r--r--cli/args/mod.rs11
-rw-r--r--cli/factory.rs1
-rw-r--r--cli/standalone/mod.rs1
-rw-r--r--cli/tests/node_compat/config.jsonc18
-rw-r--r--cli/tests/node_compat/test/parallel/test-child-process-fork-ref.js72
-rw-r--r--cli/tests/node_compat/test/parallel/test-child-process-fork-ref2.js63
-rw-r--r--cli/tests/node_compat/test/parallel/test-child-process-ipc-next-tick.js52
-rw-r--r--cli/worker.rs5
-rw-r--r--ext/node/Cargo.toml3
-rw-r--r--ext/node/benchmarks/child_process_ipc.mjs64
-rw-r--r--ext/node/lib.rs3
-rw-r--r--ext/node/ops/ipc.rs504
-rw-r--r--ext/node/ops/mod.rs1
-rw-r--r--ext/node/polyfills/02_init.js8
-rw-r--r--ext/node/polyfills/child_process.ts9
-rw-r--r--ext/node/polyfills/internal/child_process.ts95
-rw-r--r--ext/node/polyfills/process.ts1
-rw-r--r--runtime/js/40_process.js10
-rw-r--r--runtime/js/99_main.js3
-rw-r--r--runtime/ops/process.rs116
-rw-r--r--runtime/worker_bootstrap.rs5
22 files changed, 1158 insertions, 32 deletions
diff --git a/Cargo.lock b/Cargo.lock
index f2044731a..953b8bce5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1508,6 +1508,7 @@ dependencies = [
"libz-sys",
"md-5",
"md4",
+ "nix 0.26.2",
"num-bigint",
"num-bigint-dig",
"num-integer",
@@ -1518,6 +1519,7 @@ dependencies = [
"p384",
"path-clean",
"pbkdf2",
+ "pin-project-lite",
"rand",
"regex",
"reqwest",
@@ -1529,6 +1531,7 @@ dependencies = [
"sha-1",
"sha2",
"signature",
+ "simd-json",
"tokio",
"typenum",
"url",
@@ -2406,6 +2409,15 @@ dependencies = [
]
[[package]]
+name = "float-cmp"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4"
+dependencies = [
+ "num-traits",
+]
+
+[[package]]
name = "fly-accept-encoding"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2617,8 +2629,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f"
dependencies = [
"cfg-if",
+ "js-sys",
"libc",
"wasi",
+ "wasm-bindgen",
]
[[package]]
@@ -2768,6 +2782,16 @@ dependencies = [
]
[[package]]
+name = "halfbrown"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5681137554ddff44396e5f149892c769d45301dd9aa19c51602a89ee214cb0ec"
+dependencies = [
+ "hashbrown 0.13.2",
+ "serde",
+]
+
+[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2775,6 +2799,15 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
+version = "0.13.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
+dependencies = [
+ "ahash",
+]
+
+[[package]]
+name = "hashbrown"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604"
@@ -3293,6 +3326,70 @@ dependencies = [
]
[[package]]
+name = "lexical-core"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46"
+dependencies = [
+ "lexical-parse-float",
+ "lexical-parse-integer",
+ "lexical-util",
+ "lexical-write-float",
+ "lexical-write-integer",
+]
+
+[[package]]
+name = "lexical-parse-float"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f"
+dependencies = [
+ "lexical-parse-integer",
+ "lexical-util",
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-parse-integer"
+version = "0.8.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9"
+dependencies = [
+ "lexical-util",
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-util"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc"
+dependencies = [
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-write-float"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862"
+dependencies = [
+ "lexical-util",
+ "lexical-write-integer",
+ "static_assertions",
+]
+
+[[package]]
+name = "lexical-write-integer"
+version = "0.8.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446"
+dependencies = [
+ "lexical-util",
+ "static_assertions",
+]
+
+[[package]]
name = "libc"
version = "0.2.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4438,6 +4535,26 @@ dependencies = [
]
[[package]]
+name = "ref-cast"
+version = "1.0.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "acde58d073e9c79da00f2b5b84eed919c8326832648a5b109b3fce1bb1175280"
+dependencies = [
+ "ref-cast-impl",
+]
+
+[[package]]
+name = "ref-cast-impl"
+version = "1.0.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7f7473c2cfcf90008193dd0e3e16599455cb601a9fce322b5bb55de799664925"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.39",
+]
+
+[[package]]
name = "regex"
version = "1.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -5037,6 +5154,22 @@ dependencies = [
]
[[package]]
+name = "simd-json"
+version = "0.13.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e5a3720326b20bf5b95b72dbbd133caae7e0dcf71eae8f6e6656e71a7e5c9aaa"
+dependencies = [
+ "getrandom",
+ "halfbrown",
+ "lexical-core",
+ "ref-cast",
+ "serde",
+ "serde_json",
+ "simdutf8",
+ "value-trait",
+]
+
+[[package]]
name = "simdutf8"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -6389,6 +6522,18 @@ dependencies = [
]
[[package]]
+name = "value-trait"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ea87257cfcbedcb9444eda79c59fdfea71217e6305afee8ee33f500375c2ac97"
+dependencies = [
+ "float-cmp",
+ "halfbrown",
+ "itoa",
+ "ryu",
+]
+
+[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/cli/args/mod.rs b/cli/args/mod.rs
index c65745e29..17711371a 100644
--- a/cli/args/mod.rs
+++ b/cli/args/mod.rs
@@ -939,6 +939,17 @@ impl CliOptions {
.map(Some)
}
+ pub fn node_ipc_fd(&self) -> Option<i32> {
+ let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok();
+ if let Some(node_channel_fd) = maybe_node_channel_fd {
+ // Remove so that child processes don't inherit this environment variable.
+ std::env::remove_var("DENO_CHANNEL_FD");
+ node_channel_fd.parse::<i32>().ok()
+ } else {
+ None
+ }
+ }
+
pub fn resolve_main_module(&self) -> Result<ModuleSpecifier, AnyError> {
match &self.flags.subcommand {
DenoSubcommand::Bundle(bundle_flags) => {
diff --git a/cli/factory.rs b/cli/factory.rs
index 5db09767c..0b21f6eca 100644
--- a/cli/factory.rs
+++ b/cli/factory.rs
@@ -672,6 +672,7 @@ impl CliFactory {
self.maybe_lockfile().clone(),
self.feature_checker().clone(),
self.create_cli_main_worker_options()?,
+ self.options.node_ipc_fd(),
))
}
diff --git a/cli/standalone/mod.rs b/cli/standalone/mod.rs
index a6bf12862..63f014118 100644
--- a/cli/standalone/mod.rs
+++ b/cli/standalone/mod.rs
@@ -530,6 +530,7 @@ pub async fn run(
unstable: metadata.unstable,
maybe_root_package_json_deps: package_json_deps_provider.deps().cloned(),
},
+ None,
);
v8_set_flags(construct_v8_flags(&[], &metadata.v8_flags, vec![]));
diff --git a/cli/tests/node_compat/config.jsonc b/cli/tests/node_compat/config.jsonc
index 943be9759..a29d75a19 100644
--- a/cli/tests/node_compat/config.jsonc
+++ b/cli/tests/node_compat/config.jsonc
@@ -38,6 +38,10 @@
"test-child-process-execfile.js",
"test-child-process-execsync-maxbuf.js",
"test-child-process-exit-code.js",
+ // TODO(littledivy): windows ipc streams not yet implemented
+ "test-child-process-fork-ref.js",
+ "test-child-process-fork-ref2.js",
+ "test-child-process-ipc-next-tick.js",
"test-child-process-ipc.js",
"test-child-process-spawnsync-env.js",
"test-child-process-stdio-inherit.js",
@@ -109,9 +113,7 @@
"test-zlib-zero-windowBits.js"
],
"pummel": [],
- "sequential": [
- "test-child-process-exit.js"
- ]
+ "sequential": ["test-child-process-exit.js"]
},
"tests": {
"common": [
@@ -138,11 +140,7 @@
"print-chars.js",
"x.txt"
],
- "fixtures/keys": [
- "agent1-cert.pem",
- "agent1-key.pem",
- "ca1-cert.pem"
- ],
+ "fixtures/keys": ["agent1-cert.pem", "agent1-key.pem", "ca1-cert.pem"],
"internet": [
"test-dns-any.js",
"test-dns-idna2008.js",
@@ -695,9 +693,7 @@
"test-tty-stdout-end.js"
],
"pummel": [],
- "sequential": [
- "test-child-process-exit.js"
- ]
+ "sequential": ["test-child-process-exit.js"]
},
"windowsIgnore": {
"parallel": [
diff --git a/cli/tests/node_compat/test/parallel/test-child-process-fork-ref.js b/cli/tests/node_compat/test/parallel/test-child-process-fork-ref.js
new file mode 100644
index 000000000..37c186af8
--- /dev/null
+++ b/cli/tests/node_compat/test/parallel/test-child-process-fork-ref.js
@@ -0,0 +1,72 @@
+// deno-fmt-ignore-file
+// deno-lint-ignore-file
+
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// Taken from Node 18.12.1
+// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually.
+
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+'use strict';
+
+// Ignore on Windows.
+if (process.platform === 'win32') {
+ process.exit(0);
+}
+
+require('../common');
+const assert = require('assert');
+const fork = require('child_process').fork;
+
+if (process.argv[2] === 'child') {
+ process.send('1');
+
+ // Check that child don't instantly die
+ setTimeout(function() {
+ process.send('2');
+ }, 200);
+
+ process.on('disconnect', function() {
+ process.stdout.write('3');
+ });
+
+} else {
+ const child = fork(__filename, ['child'], { silent: true });
+
+ const ipc = [];
+ let stdout = '';
+
+ child.on('message', function(msg) {
+ ipc.push(msg);
+
+ if (msg === '2') child.disconnect();
+ });
+
+ child.stdout.on('data', function(chunk) {
+ stdout += chunk;
+ });
+
+ child.once('exit', function() {
+ assert.deepStrictEqual(ipc, ['1', '2']);
+ assert.strictEqual(stdout, '3');
+ });
+}
diff --git a/cli/tests/node_compat/test/parallel/test-child-process-fork-ref2.js b/cli/tests/node_compat/test/parallel/test-child-process-fork-ref2.js
new file mode 100644
index 000000000..da59d9378
--- /dev/null
+++ b/cli/tests/node_compat/test/parallel/test-child-process-fork-ref2.js
@@ -0,0 +1,63 @@
+// deno-fmt-ignore-file
+// deno-lint-ignore-file
+
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// Taken from Node 18.12.1
+// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually.
+
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+'use strict';
+
+// Ignore on Windows.
+if (process.platform === 'win32') {
+ process.exit(0);
+}
+
+const {
+ mustCall,
+ mustNotCall,
+ platformTimeout,
+} = require('../common');
+const fork = require('child_process').fork;
+const debug = require('util').debuglog('test');
+
+if (process.argv[2] === 'child') {
+ debug('child -> call disconnect');
+ process.disconnect();
+
+ setTimeout(() => {
+ debug('child -> will this keep it alive?');
+ process.on('message', mustNotCall());
+ }, platformTimeout(400));
+
+} else {
+ const child = fork(__filename, ['child']);
+
+ child.on('disconnect', mustCall(() => {
+ debug('parent -> disconnect');
+ }));
+
+ child.once('exit', mustCall(() => {
+ debug('parent -> exit');
+ }));
+}
diff --git a/cli/tests/node_compat/test/parallel/test-child-process-ipc-next-tick.js b/cli/tests/node_compat/test/parallel/test-child-process-ipc-next-tick.js
new file mode 100644
index 000000000..d255a0a64
--- /dev/null
+++ b/cli/tests/node_compat/test/parallel/test-child-process-ipc-next-tick.js
@@ -0,0 +1,52 @@
+// deno-fmt-ignore-file
+// deno-lint-ignore-file
+
+// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+// Taken from Node 18.12.1
+// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually.
+
+'use strict';
+
+// Ignore on Windows.
+if (process.platform === 'win32') {
+ process.exit(0);
+}
+
+const common = require('../common');
+const assert = require('assert');
+const cp = require('child_process');
+const NUM_MESSAGES = 10;
+const values = [];
+
+for (let i = 0; i < NUM_MESSAGES; ++i) {
+ values[i] = i;
+}
+
+if (process.argv[2] === 'child') {
+ const received = values.map(() => { return false; });
+
+ process.on('uncaughtException', common.mustCall((err) => {
+ received[err] = true;
+ const done = received.every((element) => { return element === true; });
+
+ if (done)
+ process.disconnect();
+ }, NUM_MESSAGES));
+
+ process.on('message', (msg) => {
+ // If messages are handled synchronously, throwing should break the IPC
+ // message processing.
+ throw msg;
+ });
+
+ process.send('ready');
+} else {
+ const child = cp.fork(__filename, ['child']);
+
+ child.on('message', common.mustCall((msg) => {
+ assert.strictEqual(msg, 'ready');
+ values.forEach((value) => {
+ child.send(value);
+ });
+ }));
+}
diff --git a/cli/worker.rs b/cli/worker.rs
index ce9c05701..22e534e1d 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -124,6 +124,7 @@ struct SharedWorkerState {
maybe_inspector_server: Option<Arc<InspectorServer>>,
maybe_lockfile: Option<Arc<Mutex<Lockfile>>>,
feature_checker: Arc<FeatureChecker>,
+ node_ipc: Option<i32>,
}
impl SharedWorkerState {
@@ -415,6 +416,7 @@ impl CliMainWorkerFactory {
maybe_lockfile: Option<Arc<Mutex<Lockfile>>>,
feature_checker: Arc<FeatureChecker>,
options: CliMainWorkerOptions,
+ node_ipc: Option<i32>,
) -> Self {
Self {
shared: Arc::new(SharedWorkerState {
@@ -435,6 +437,7 @@ impl CliMainWorkerFactory {
maybe_inspector_server,
maybe_lockfile,
feature_checker,
+ node_ipc,
}),
}
}
@@ -596,6 +599,7 @@ impl CliMainWorkerFactory {
.options
.maybe_binary_npm_command_name
.clone(),
+ node_ipc_fd: shared.node_ipc,
},
extensions: custom_extensions,
startup_snapshot: crate::js::deno_isolate_init(),
@@ -793,6 +797,7 @@ fn create_web_worker_callback(
.options
.maybe_binary_npm_command_name
.clone(),
+ node_ipc_fd: None,
},
extensions: vec![],
startup_snapshot: crate::js::deno_isolate_init(),
diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml
index 1393c82c0..e5f984145 100644
--- a/ext/node/Cargo.toml
+++ b/ext/node/Cargo.toml
@@ -44,6 +44,7 @@ libc.workspace = true
libz-sys.workspace = true
md-5 = "0.10.5"
md4 = "0.10.2"
+nix.workspace = true
num-bigint.workspace = true
num-bigint-dig = "0.8.2"
num-integer = "0.1.45"
@@ -54,6 +55,7 @@ p256.workspace = true
p384.workspace = true
path-clean = "=0.1.0"
pbkdf2 = "0.12.1"
+pin-project-lite = "0.2.13"
rand.workspace = true
regex.workspace = true
reqwest.workspace = true
@@ -65,6 +67,7 @@ serde = "1.0.149"
sha-1 = "0.10.0"
sha2.workspace = true
signature.workspace = true
+simd-json = "0.13.4"
tokio.workspace = true
typenum = "1.15.0"
url.workspace = true
diff --git a/ext/node/benchmarks/child_process_ipc.mjs b/ext/node/benchmarks/child_process_ipc.mjs
new file mode 100644
index 000000000..0486972dc
--- /dev/null
+++ b/ext/node/benchmarks/child_process_ipc.mjs
@@ -0,0 +1,64 @@
+import { fork } from "node:child_process";
+import process from "node:process";
+import { setImmediate } from "node:timers";
+
+if (process.env.CHILD) {
+ const len = +process.env.CHILD;
+ const msg = ".".repeat(len);
+ const send = () => {
+ while (process.send(msg));
+ // Wait: backlog of unsent messages exceeds threshold
+ setImmediate(send);
+ };
+ send();
+} else {
+ function main(dur, len) {
+ const p = new Promise((resolve) => {
+ const start = performance.now();
+
+ const options = {
+ "stdio": ["inherit", "inherit", "inherit", "ipc"],
+ "env": { "CHILD": len.toString() },
+ };
+ const path = new URL("child_process_ipc.mjs", import.meta.url).pathname;
+ const child = fork(
+ path,
+ options,
+ );
+
+ let bytes = 0;
+ let total = 0;
+ child.on("message", (msg) => {
+ bytes += msg.length;
+ total += 1;
+ });
+
+ setTimeout(() => {
+ child.kill();
+ const end = performance.now();
+ const mb = bytes / 1024 / 1024;
+ const sec = (end - start) / 1000;
+ const mbps = mb / sec;
+ console.log(`${len} bytes: ${mbps.toFixed(2)} MB/s`);
+ console.log(`${total} messages`);
+ resolve();
+ }, dur * 1000);
+ });
+ return p;
+ }
+
+ const len = [
+ 64,
+ 256,
+ 1024,
+ 4096,
+ 16384,
+ 65536,
+ 65536 << 4,
+ 65536 << 6 - 1,
+ ];
+
+ for (const l of len) {
+ await main(5, l);
+ }
+}
diff --git a/ext/node/lib.rs b/ext/node/lib.rs
index 56f4b0ee0..77f01b3d3 100644
--- a/ext/node/lib.rs
+++ b/ext/node/lib.rs
@@ -312,6 +312,9 @@ deno_core::extension!(deno_node,
ops::require::op_require_break_on_next_statement,
ops::util::op_node_guess_handle_type,
ops::crypto::op_node_create_private_key,
+ ops::ipc::op_node_ipc_pipe,
+ ops::ipc::op_node_ipc_write,
+ ops::ipc::op_node_ipc_read,
],
esm_entry_point = "ext:deno_node/02_init.js",
esm = [
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs
new file mode 100644
index 000000000..d1aeeb40c
--- /dev/null
+++ b/ext/node/ops/ipc.rs
@@ -0,0 +1,504 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+#[cfg(unix)]
+pub use unix::*;
+
+#[cfg(windows)]
+pub use windows::*;
+
+#[cfg(unix)]
+mod unix {
+ use std::cell::RefCell;
+ use std::future::Future;
+ use std::io;
+ use std::mem;
+ use std::os::fd::FromRawFd;
+ use std::os::fd::RawFd;
+ use std::pin::Pin;
+ use std::rc::Rc;
+ use std::task::Context;
+ use std::task::Poll;
+
+ use deno_core::error::bad_resource_id;
+ use deno_core::error::AnyError;
+ use deno_core::op2;
+ use deno_core::serde_json;
+ use deno_core::AsyncRefCell;
+ use deno_core::CancelFuture;
+ use deno_core::CancelHandle;
+ use deno_core::OpState;
+ use deno_core::RcRef;
+ use deno_core::ResourceId;
+ use pin_project_lite::pin_project;
+ use tokio::io::AsyncBufRead;
+ use tokio::io::AsyncWriteExt;
+ use tokio::io::BufReader;
+ use tokio::net::unix::OwnedReadHalf;
+ use tokio::net::unix::OwnedWriteHalf;
+ use tokio::net::UnixStream;
+
+ #[op2(fast)]
+ #[smi]
+ pub fn op_node_ipc_pipe(
+ state: &mut OpState,
+ #[smi] fd: i32,
+ ) -> Result<ResourceId, AnyError> {
+ Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?))
+ }
+
+ #[op2(async)]
+ pub async fn op_node_ipc_write(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+ #[serde] value: serde_json::Value,
+ ) -> Result<(), AnyError> {
+ let stream = state
+ .borrow()
+ .resource_table
+ .get::<IpcJsonStreamResource>(rid)
+ .map_err(|_| bad_resource_id())?;
+ stream.write_msg(value).await?;
+ Ok(())
+ }
+
+ #[op2(async)]
+ #[serde]
+ pub async fn op_node_ipc_read(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+ ) -> Result<serde_json::Value, AnyError> {
+ let stream = state
+ .borrow()
+ .resource_table
+ .get::<IpcJsonStreamResource>(rid)
+ .map_err(|_| bad_resource_id())?;
+
+ let cancel = stream.cancel.clone();
+ let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
+ let msgs = stream.read_msg().or_cancel(cancel).await??;
+ Ok(msgs)
+ }
+
+ struct IpcJsonStreamResource {
+ read_half: AsyncRefCell<IpcJsonStream>,
+ write_half: AsyncRefCell<OwnedWriteHalf>,
+ cancel: Rc<CancelHandle>,
+ }
+
+ impl deno_core::Resource for IpcJsonStreamResource {
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
+ }
+ }
+
+ impl IpcJsonStreamResource {
+ fn new(stream: RawFd) -> Result<Self, std::io::Error> {
+ // Safety: The fd is part of a pair of connected sockets create by child process
+ // implementation.
+ let unix_stream = UnixStream::from_std(unsafe {
+ std::os::unix::net::UnixStream::from_raw_fd(stream)
+ })?;
+ let (read_half, write_half) = unix_stream.into_split();
+ Ok(Self {
+ read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
+ write_half: AsyncRefCell::new(write_half),
+ cancel: Default::default(),
+ })
+ }
+
+ #[cfg(test)]
+ fn from_unix_stream(stream: UnixStream) -> Self {
+ let (read_half, write_half) = stream.into_split();
+ Self {
+ read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
+ write_half: AsyncRefCell::new(write_half),
+ cancel: Default::default(),
+ }
+ }
+
+ async fn write_msg(
+ self: Rc<Self>,
+ msg: serde_json::Value,
+ ) -> Result<(), AnyError> {
+ let mut write_half =
+ RcRef::map(self, |r| &r.write_half).borrow_mut().await;
+ // Perf note: We do not benefit from writev here because
+ // we are always allocating a buffer for serialization anyways.
+ let mut buf = Vec::new();
+ serde_json::to_writer(&mut buf, &msg)?;
+ buf.push(b'\n');
+ write_half.write_all(&buf).await?;
+ Ok(())
+ }
+ }
+
+ #[inline]
+ fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
+ #[cfg(all(target_os = "macos", target_arch = "aarch64"))]
+ // Safety: haystack of valid length. neon_memchr can handle unaligned
+ // data.
+ return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) };
+
+ #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))]
+ return haystack.iter().position(|&b| b == needle);
+ }
+
+ // Initial capacity of the buffered reader and the JSON backing buffer.
+ //
+ // This is a tradeoff between memory usage and performance on large messages.
+ //
+ // 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message.
+ const INITIAL_CAPACITY: usize = 1024 * 64;
+
+ // JSON serialization stream over IPC pipe.
+ //
+ // `\n` is used as a delimiter between messages.
+ struct IpcJsonStream {
+ pipe: BufReader<OwnedReadHalf>,
+ buffer: Vec<u8>,
+ }
+
+ impl IpcJsonStream {
+ fn new(pipe: OwnedReadHalf) -> Self {
+ Self {
+ pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
+ buffer: Vec::with_capacity(INITIAL_CAPACITY),
+ }
+ }
+
+ async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> {
+ let mut json = None;
+ let nread =
+ read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?;
+ if nread == 0 {
+ // EOF.
+ return Ok(serde_json::Value::Null);
+ }
+
+ let json = match json {
+ Some(v) => v,
+ None => {
+ // Took more than a single read and some buffering.
+ simd_json::from_slice(&mut self.buffer[..nread])?
+ }
+ };
+
+ // Safety: Same as `Vec::clear` but without the `drop_in_place` for
+ // each element (nop for u8). Capacity remains the same.
+ unsafe {
+ self.buffer.set_len(0);
+ }
+
+ Ok(json)
+ }
+ }
+
+ pin_project! {
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ struct ReadMsgInner<'a, R: ?Sized> {
+ reader: &'a mut R,
+ buf: &'a mut Vec<u8>,
+ json: &'a mut Option<serde_json::Value>,
+ // The number of bytes appended to buf. This can be less than buf.len() if
+ // the buffer was not empty when the operation was started.
+ read: usize,
+ }
+ }
+
+ fn read_msg_inner<'a, R>(
+ reader: &'a mut R,
+ buf: &'a mut Vec<u8>,
+ json: &'a mut Option<serde_json::Value>,
+ ) -> ReadMsgInner<'a, R>
+ where
+ R: AsyncBufRead + ?Sized + Unpin,
+ {
+ ReadMsgInner {
+ reader,
+ buf,
+ json,
+ read: 0,
+ }
+ }
+
+ fn read_msg_internal<R: AsyncBufRead + ?Sized>(
+ mut reader: Pin<&mut R>,
+ cx: &mut Context<'_>,
+ buf: &mut Vec<u8>,
+ json: &mut Option<serde_json::Value>,
+ read: &mut usize,
+ ) -> Poll<io::Result<usize>> {
+ loop {
+ let (done, used) = {
+ let available = match reader.as_mut().poll_fill_buf(cx) {
+ std::task::Poll::Ready(t) => t?,
+ std::task::Poll::Pending => return std::task::Poll::Pending,
+ };
+
+ if let Some(i) = memchr(b'\n', available) {
+ if *read == 0 {
+ // Fast path: parse and put into the json slot directly.
+ //
+ // Safety: It is ok to overwrite the contents because
+ // we don't need to copy it into the buffer and the length will be reset.
+ let available = unsafe {
+ std::slice::from_raw_parts_mut(
+ available.as_ptr() as *mut u8,
+ available.len(),
+ )
+ };
+ json.replace(
+ simd_json::from_slice(&mut available[..i + 1])
+ .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
+ );
+ } else {
+ // This is not the first read, so we have to copy the data
+ // to make it contiguous.
+ buf.extend_from_slice(&available[..=i]);
+ }
+ (true, i + 1)
+ } else {
+ buf.extend_from_slice(available);
+ (false, available.len())
+ }
+ };
+
+ reader.as_mut().consume(used);
+ *read += used;
+ if done || used == 0 {
+ return Poll::Ready(Ok(mem::replace(read, 0)));
+ }
+ }
+ }
+
+ impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> {
+ type Output = io::Result<usize>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let me = self.project();
+ read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read)
+ }
+ }
+
+ #[cfg(all(target_os = "macos", target_arch = "aarch64"))]
+ mod neon {
+ use std::arch::aarch64::*;
+
+ pub unsafe fn neon_memchr(
+ str: &[u8],
+ c: u8,
+ length: usize,
+ ) -> Option<usize> {
+ let end = str.as_ptr().wrapping_add(length);
+
+ // Alignment handling
+ let mut ptr = str.as_ptr();
+ while ptr < end && (ptr as usize) & 0xF != 0 {
+ if *ptr == c {
+ return Some(ptr as usize - str.as_ptr() as usize);
+ }
+ ptr = ptr.wrapping_add(1);
+ }
+
+ let search_char = vdupq_n_u8(c);
+
+ while ptr.wrapping_add(16) <= end {
+ let chunk = vld1q_u8(ptr);
+ let comparison = vceqq_u8(chunk, search_char);
+
+ // Check first 64 bits
+ let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0);
+ if result0 != 0 {
+ return Some(
+ (ptr as usize - str.as_ptr() as usize)
+ + result0.trailing_zeros() as usize / 8,
+ );
+ }
+
+ // Check second 64 bits
+ let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1);
+ if result1 != 0 {
+ return Some(
+ (ptr as usize - str.as_ptr() as usize)
+ + 8
+ + result1.trailing_zeros() as usize / 8,
+ );
+ }
+
+ ptr = ptr.wrapping_add(16);
+ }
+
+ // Handle remaining unaligned characters
+ while ptr < end {
+ if *ptr == c {
+ return Some(ptr as usize - str.as_ptr() as usize);
+ }
+ ptr = ptr.wrapping_add(1);
+ }
+
+ None
+ }
+ }
+
+ #[cfg(test)]
+ mod tests {
+ use super::IpcJsonStreamResource;
+ use deno_core::serde_json;
+ use deno_core::serde_json::json;
+ use deno_core::RcRef;
+ use std::rc::Rc;
+
+ #[tokio::test]
+ async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> {
+ // A simple round trip benchmark for quick dev feedback.
+ //
+ // Only ran when the env var is set.
+ if std::env::var_os("BENCH_IPC_DENO").is_none() {
+ return Ok(());
+ }
+
+ let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let child = tokio::spawn(async move {
+ use tokio::io::AsyncWriteExt;
+
+ let size = 1024 * 1024;
+
+ let stri = "x".repeat(size);
+ let data = format!("\"{}\"\n", stri);
+ for _ in 0..100 {
+ fd2.write_all(data.as_bytes()).await?;
+ }
+ Ok::<_, std::io::Error>(())
+ });
+
+ let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
+
+ let start = std::time::Instant::now();
+ let mut bytes = 0;
+
+ let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
+ loop {
+ let msgs = ipc.read_msg().await?;
+ if msgs == serde_json::Value::Null {
+ break;
+ }
+ bytes += msgs.as_str().unwrap().len();
+ if start.elapsed().as_secs() > 5 {
+ break;
+ }
+ }
+ let elapsed = start.elapsed();
+ let mb = bytes as f64 / 1024.0 / 1024.0;
+ println!("{} mb/s", mb / elapsed.as_secs_f64());
+
+ child.await??;
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> {
+ let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let child = tokio::spawn(async move {
+ use tokio::io::AsyncReadExt;
+ use tokio::io::AsyncWriteExt;
+
+ let mut buf = [0u8; 1024];
+ let n = fd2.read(&mut buf).await?;
+ assert_eq!(&buf[..n], b"\"hello\"\n");
+ fd2.write_all(b"\"world\"\n").await?;
+ Ok::<_, std::io::Error>(())
+ });
+
+ /* Similar to how ops would use the resource */
+ let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
+
+ ipc.clone().write_msg(json!("hello")).await?;
+
+ let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
+ let msgs = ipc.read_msg().await?;
+ assert_eq!(msgs, json!("world"));
+
+ child.await??;
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> {
+ let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let child = tokio::spawn(async move {
+ use tokio::io::AsyncReadExt;
+ use tokio::io::AsyncWriteExt;
+
+ let mut buf = [0u8; 1024];
+ let n = fd2.read(&mut buf).await?;
+ assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n");
+ fd2.write_all(b"\"foo\"\n\"bar\"\n").await?;
+ Ok::<_, std::io::Error>(())
+ });
+
+ let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
+ ipc.clone().write_msg(json!("hello")).await?;
+ ipc.clone().write_msg(json!("world")).await?;
+
+ let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
+ let msgs = ipc.read_msg().await?;
+ assert_eq!(msgs, json!("foo"));
+
+ child.await??;
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> {
+ let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let child = tokio::spawn(async move {
+ tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?;
+ Ok::<_, std::io::Error>(())
+ });
+
+ let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
+ let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
+ let _err = ipc.read_msg().await.unwrap_err();
+
+ child.await??;
+
+ Ok(())
+ }
+
+ #[test]
+ fn memchr() {
+ let str = b"hello world";
+ assert_eq!(super::memchr(b'h', str), Some(0));
+ assert_eq!(super::memchr(b'w', str), Some(6));
+ assert_eq!(super::memchr(b'd', str), Some(10));
+ assert_eq!(super::memchr(b'x', str), None);
+
+ let empty = b"";
+ assert_eq!(super::memchr(b'\n', empty), None);
+ }
+ }
+}
+
+#[cfg(windows)]
+mod windows {
+ use deno_core::error::AnyError;
+ use deno_core::op2;
+
+ #[op2(fast)]
+ pub fn op_node_ipc_pipe() -> Result<(), AnyError> {
+ Err(deno_core::error::not_supported())
+ }
+
+ #[op2(async)]
+ pub async fn op_node_ipc_write() -> Result<(), AnyError> {
+ Err(deno_core::error::not_supported())
+ }
+
+ #[op2(async)]
+ pub async fn op_node_ipc_read() -> Result<(), AnyError> {
+ Err(deno_core::error::not_supported())
+ }
+}
diff --git a/ext/node/ops/mod.rs b/ext/node/ops/mod.rs
index ec4324da3..277e340df 100644
--- a/ext/node/ops/mod.rs
+++ b/ext/node/ops/mod.rs
@@ -5,6 +5,7 @@ pub mod fs;
pub mod http;
pub mod http2;
pub mod idna;
+pub mod ipc;
pub mod os;
pub mod require;
pub mod util;
diff --git a/ext/node/polyfills/02_init.js b/ext/node/polyfills/02_init.js
index e3061c95d..e5a0279a5 100644
--- a/ext/node/polyfills/02_init.js
+++ b/ext/node/polyfills/02_init.js
@@ -7,15 +7,12 @@ const requireImpl = internals.requireImpl;
import { nodeGlobals } from "ext:deno_node/00_globals.js";
import "node:module";
-globalThis.nodeBootstrap = function (usesLocalNodeModulesDir, argv0) {
- initialize(usesLocalNodeModulesDir, argv0);
-};
-
let initialized = false;
function initialize(
usesLocalNodeModulesDir,
argv0,
+ ipcFd,
) {
if (initialized) {
throw Error("Node runtime already initialized");
@@ -41,6 +38,7 @@ function initialize(
// but it's the only way to get `args` and `version` and this point.
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
internals.__initWorkerThreads();
+ internals.__setupChildProcessIpcChannel(ipcFd);
// `Deno[Deno.internal].requireImpl` will be unreachable after this line.
delete internals.requireImpl;
}
@@ -52,6 +50,8 @@ function loadCjsModule(moduleName, isMain, inspectBrk) {
requireImpl.Module._load(moduleName, null, { main: isMain });
}
+globalThis.nodeBootstrap = initialize;
+
internals.node = {
initialize,
loadCjsModule,
diff --git a/ext/node/polyfills/child_process.ts b/ext/node/polyfills/child_process.ts
index 94a108447..c7d007f46 100644
--- a/ext/node/polyfills/child_process.ts
+++ b/ext/node/polyfills/child_process.ts
@@ -10,6 +10,7 @@ import {
ChildProcess,
ChildProcessOptions,
normalizeSpawnArguments,
+ setupChannel,
type SpawnOptions,
spawnSync as _spawnSync,
type SpawnSyncOptions,
@@ -821,6 +822,14 @@ export function execFileSync(
return ret.stdout as string | Buffer;
}
+function setupChildProcessIpcChannel(fd: number) {
+ if (typeof fd != "number" || fd < 0) return;
+ setupChannel(process, fd);
+}
+
+globalThis.__bootstrap.internals.__setupChildProcessIpcChannel =
+ setupChildProcessIpcChannel;
+
export default {
fork,
spawn,
diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts
index 04773a8b7..b9bf13396 100644
--- a/ext/node/polyfills/internal/child_process.ts
+++ b/ext/node/polyfills/internal/child_process.ts
@@ -44,6 +44,9 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs";
import process from "node:process";
+const core = globalThis.__bootstrap.core;
+const ops = core.ops;
+
export function mapValues<T, O>(
record: Readonly<Record<string, T>>,
transformer: (value: T) => O,
@@ -167,12 +170,13 @@ export class ChildProcess extends EventEmitter {
signal,
windowsVerbatimArguments = false,
} = options || {};
+ const normalizedStdio = normalizeStdioOption(stdio);
const [
stdin = "pipe",
stdout = "pipe",
stderr = "pipe",
_channel, // TODO(kt3k): handle this correctly
- ] = normalizeStdioOption(stdio);
+ ] = normalizedStdio;
const [cmd, cmdArgs] = buildCommand(
command,
args || [],
@@ -181,6 +185,8 @@ export class ChildProcess extends EventEmitter {
this.spawnfile = cmd;
this.spawnargs = [cmd, ...cmdArgs];
+ const ipc = normalizedStdio.indexOf("ipc");
+
const stringEnv = mapValues(env, (value) => value.toString());
try {
this.#process = new Deno.Command(cmd, {
@@ -191,6 +197,7 @@ export class ChildProcess extends EventEmitter {
stdout: toDenoStdio(stdout),
stderr: toDenoStdio(stderr),
windowsRawArguments: windowsVerbatimArguments,
+ ipc, // internal
}).spawn();
this.pid = this.#process.pid;
@@ -249,6 +256,10 @@ export class ChildProcess extends EventEmitter {
}
}
+ if (typeof this.#process._pipeFd == "number") {
+ setupChannel(this, this.#process._pipeFd);
+ }
+
(async () => {
const status = await this.#process.status;
this.exitCode = status.code;
@@ -1058,9 +1069,91 @@ function toDenoArgs(args: string[]): string[] {
return denoArgs;
}
+export function setupChannel(target, channel) {
+ const ipc = ops.op_node_ipc_pipe(channel);
+
+ async function readLoop() {
+ try {
+ while (true) {
+ if (!target.connected || target.killed) {
+ return;
+ }
+ const msg = await core.opAsync("op_node_ipc_read", ipc);
+ if (msg == null) {
+ // Channel closed.
+ target.disconnect();
+ return;
+ }
+
+ process.nextTick(handleMessage, msg);
+ }
+ } catch (err) {
+ if (
+ err instanceof Deno.errors.Interrupted ||
+ err instanceof Deno.errors.BadResource
+ ) {
+ return;
+ }
+ }
+ }
+
+ function handleMessage(msg) {
+ target.emit("message", msg);
+ }
+
+ target.send = function (message, handle, options, callback) {
+ if (typeof handle === "function") {
+ callback = handle;
+ handle = undefined;
+ options = undefined;
+ } else if (typeof options === "function") {
+ callback = options;
+ options = undefined;
+ } else if (options !== undefined) {
+ validateObject(options, "options");
+ }
+
+ options = { swallowErrors: false, ...options };
+
+ if (message === undefined) {
+ throw new TypeError("ERR_MISSING_ARGS", "message");
+ }
+
+ if (handle !== undefined) {
+ notImplemented("ChildProcess.send with handle");
+ }
+
+ core.opAsync("op_node_ipc_write", ipc, message)
+ .then(() => {
+ if (callback) {
+ process.nextTick(callback, null);
+ }
+ });
+ };
+
+ target.connected = true;
+
+ target.disconnect = function () {
+ if (!this.connected) {
+ this.emit("error", new Error("IPC channel is already disconnected"));
+ return;
+ }
+
+ this.connected = false;
+ process.nextTick(() => {
+ core.close(ipc);
+ target.emit("disconnect");
+ });
+ };
+
+ // Start reading messages from the channel.
+ readLoop();
+}
+
export default {
ChildProcess,
normalizeSpawnArguments,
stdioStringToArray,
spawnSync,
+ setupChannel,
};
diff --git a/ext/node/polyfills/process.ts b/ext/node/polyfills/process.ts
index 575d8dfb1..352d46f42 100644
--- a/ext/node/polyfills/process.ts
+++ b/ext/node/polyfills/process.ts
@@ -69,7 +69,6 @@ import { buildAllowedFlags } from "ext:deno_node/internal/process/per_thread.mjs
const notImplementedEvents = [
"disconnect",
- "message",
"multipleResolves",
"rejectionHandled",
"worker",
diff --git a/runtime/js/40_process.js b/runtime/js/40_process.js
index b8e05ce5a..e628aeb4a 100644
--- a/runtime/js/40_process.js
+++ b/runtime/js/40_process.js
@@ -159,6 +159,7 @@ function spawnChildInner(opFn, command, apiName, {
stderr = "piped",
signal = undefined,
windowsRawArguments = false,
+ ipc = -1,
} = {}) {
const child = opFn({
cmd: pathFromURL(command),
@@ -172,6 +173,7 @@ function spawnChildInner(opFn, command, apiName, {
stdout,
stderr,
windowsRawArguments,
+ ipc,
}, apiName);
return new ChildProcess(illegalConstructorKey, {
...child,
@@ -203,6 +205,12 @@ class ChildProcess {
#waitPromise;
#waitComplete = false;
+ #pipeFd;
+ // internal, used by ext/node
+ get _pipeFd() {
+ return this.#pipeFd;
+ }
+
#pid;
get pid() {
return this.#pid;
@@ -239,6 +247,7 @@ class ChildProcess {
stdinRid,
stdoutRid,
stderrRid,
+ pipeFd, // internal
} = null) {
if (key !== illegalConstructorKey) {
throw new TypeError("Illegal constructor.");
@@ -246,6 +255,7 @@ class ChildProcess {
this.#rid = rid;
this.#pid = pid;
+ this.#pipeFd = pipeFd;
if (stdinRid !== null) {
this.#stdin = writableStreamForRid(stdinRid);
diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js
index 0469b38bf..5b4b164a2 100644
--- a/runtime/js/99_main.js
+++ b/runtime/js/99_main.js
@@ -440,6 +440,7 @@ function bootstrapMainRuntime(runtimeOptions) {
3: inspectFlag,
5: hasNodeModulesDir,
6: maybeBinaryNpmCommandName,
+ 7: nodeIpcFd,
} = runtimeOptions;
performance.setTimeOrigin(DateNow());
@@ -545,7 +546,7 @@ function bootstrapMainRuntime(runtimeOptions) {
ObjectDefineProperty(globalThis, "Deno", util.readOnly(finalDenoNs));
if (nodeBootstrap) {
- nodeBootstrap(hasNodeModulesDir, maybeBinaryNpmCommandName);
+ nodeBootstrap(hasNodeModulesDir, maybeBinaryNpmCommandName, nodeIpcFd);
}
}
diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs
index 1fdd4bf4d..6f89e5529 100644
--- a/runtime/ops/process.rs
+++ b/runtime/ops/process.rs
@@ -141,6 +141,8 @@ pub struct SpawnArgs {
uid: Option<u32>,
#[cfg(windows)]
windows_raw_arguments: bool,
+ #[cfg(unix)]
+ ipc: Option<i32>,
#[serde(flatten)]
stdio: ChildStdio,
@@ -205,11 +207,18 @@ pub struct SpawnOutput {
stderr: Option<ToJsBuffer>,
}
+type CreateCommand = (
+ std::process::Command,
+ // TODO(@littledivy): Ideally this would return Option<ResourceId> but we are dealing with file descriptors
+ // all the way until setupChannel which makes it easier to share code between parent and child fork.
+ Option<i32>,
+);
+
fn create_command(
state: &mut OpState,
args: SpawnArgs,
api_name: &str,
-) -> Result<std::process::Command, AnyError> {
+) -> Result<CreateCommand, AnyError> {
state
.borrow_mut::<PermissionsContainer>()
.check_run(&args.cmd, api_name)?;
@@ -245,15 +254,6 @@ fn create_command(
if let Some(uid) = args.uid {
command.uid(uid);
}
- #[cfg(unix)]
- // TODO(bartlomieju):
- #[allow(clippy::undocumented_unsafe_blocks)]
- unsafe {
- command.pre_exec(|| {
- libc::setgroups(0, std::ptr::null());
- Ok(())
- });
- }
command.stdin(args.stdio.stdin.as_stdio());
command.stdout(match args.stdio.stdout {
@@ -265,7 +265,91 @@ fn create_command(
value => value.as_stdio(),
});
- Ok(command)
+ #[cfg(unix)]
+ // TODO(bartlomieju):
+ #[allow(clippy::undocumented_unsafe_blocks)]
+ unsafe {
+ if let Some(ipc) = args.ipc {
+ if ipc < 0 {
+ return Ok((command, None));
+ }
+ // SockFlag is broken on macOS
+ // https://github.com/nix-rust/nix/issues/861
+ let mut fds = [-1, -1];
+ #[cfg(not(target_os = "macos"))]
+ let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
+
+ #[cfg(target_os = "macos")]
+ let flags = 0;
+
+ let ret = libc::socketpair(
+ libc::AF_UNIX,
+ libc::SOCK_STREAM | flags,
+ 0,
+ fds.as_mut_ptr(),
+ );
+ if ret != 0 {
+ return Err(std::io::Error::last_os_error().into());
+ }
+
+ if cfg!(target_os = "macos") {
+ let fcntl =
+ |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> {
+ let flags = libc::fcntl(fd, libc::F_GETFL, 0);
+
+ if flags == -1 {
+ return Err(fail(fds));
+ }
+ let ret = libc::fcntl(fd, libc::F_SETFL, flags | flag);
+ if ret == -1 {
+ return Err(fail(fds));
+ }
+ Ok(())
+ };
+
+ fn fail(fds: [i32; 2]) -> std::io::Error {
+ unsafe {
+ libc::close(fds[0]);
+ libc::close(fds[1]);
+ }
+ std::io::Error::last_os_error()
+ }
+
+ // SOCK_NONBLOCK is not supported on macOS.
+ (fcntl)(fds[0], libc::O_NONBLOCK)?;
+ (fcntl)(fds[1], libc::O_NONBLOCK)?;
+
+ // SOCK_CLOEXEC is not supported on macOS.
+ (fcntl)(fds[0], libc::FD_CLOEXEC)?;
+ (fcntl)(fds[1], libc::FD_CLOEXEC)?;
+ }
+
+ let fd1 = fds[0];
+ let fd2 = fds[1];
+
+ command.pre_exec(move || {
+ if ipc >= 0 {
+ let _fd = libc::dup2(fd2, ipc);
+ libc::close(fd2);
+ }
+ libc::setgroups(0, std::ptr::null());
+ Ok(())
+ });
+
+ /* One end returned to parent process (this) */
+ let pipe_fd = Some(fd1);
+
+ /* The other end passed to child process via DENO_CHANNEL_FD */
+ command.env("DENO_CHANNEL_FD", format!("{}", ipc));
+
+ return Ok((command, pipe_fd));
+ }
+
+ Ok((command, None))
+ }
+
+ #[cfg(not(unix))]
+ return Ok((command, None));
}
#[derive(Serialize)]
@@ -276,11 +360,13 @@ struct Child {
stdin_rid: Option<ResourceId>,
stdout_rid: Option<ResourceId>,
stderr_rid: Option<ResourceId>,
+ pipe_fd: Option<i32>,
}
fn spawn_child(
state: &mut OpState,
command: std::process::Command,
+ pipe_fd: Option<i32>,
) -> Result<Child, AnyError> {
let mut command = tokio::process::Command::from(command);
// TODO(@crowlkats): allow detaching processes.
@@ -362,6 +448,7 @@ fn spawn_child(
stdin_rid,
stdout_rid,
stderr_rid,
+ pipe_fd,
})
}
@@ -372,8 +459,8 @@ fn op_spawn_child(
#[serde] args: SpawnArgs,
#[string] api_name: String,
) -> Result<Child, AnyError> {
- let command = create_command(state, args, &api_name)?;
- spawn_child(state, command)
+ let (command, pipe_fd) = create_command(state, args, &api_name)?;
+ spawn_child(state, command, pipe_fd)
}
#[op2(async)]
@@ -402,7 +489,8 @@ fn op_spawn_sync(
) -> Result<SpawnOutput, AnyError> {
let stdout = matches!(args.stdio.stdout, Stdio::Piped);
let stderr = matches!(args.stdio.stderr, Stdio::Piped);
- let mut command = create_command(state, args, "Deno.Command().outputSync()")?;
+ let (mut command, _) =
+ create_command(state, args, "Deno.Command().outputSync()")?;
let output = command.output().with_context(|| {
format!(
"Failed to spawn '{}'",
diff --git a/runtime/worker_bootstrap.rs b/runtime/worker_bootstrap.rs
index 828bb3766..8674190f3 100644
--- a/runtime/worker_bootstrap.rs
+++ b/runtime/worker_bootstrap.rs
@@ -59,6 +59,7 @@ pub struct BootstrapOptions {
pub inspect: bool,
pub has_node_modules_dir: bool,
pub maybe_binary_npm_command_name: Option<String>,
+ pub node_ipc_fd: Option<i32>,
}
impl Default for BootstrapOptions {
@@ -86,6 +87,7 @@ impl Default for BootstrapOptions {
args: Default::default(),
has_node_modules_dir: Default::default(),
maybe_binary_npm_command_name: None,
+ node_ipc_fd: None,
}
}
}
@@ -115,6 +117,8 @@ struct BootstrapV8<'a>(
bool,
// maybe_binary_npm_command_name
Option<&'a str>,
+ // node_ipc_fd
+ i32,
);
impl BootstrapOptions {
@@ -134,6 +138,7 @@ impl BootstrapOptions {
self.enable_testing_features,
self.has_node_modules_dir,
self.maybe_binary_npm_command_name.as_deref(),
+ self.node_ipc_fd.unwrap_or(-1),
);
bootstrap.serialize(ser).unwrap()