diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2023-12-13 15:44:16 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-13 11:14:16 +0100 |
commit | 5a91a065b882215dde209baf626247e54c21a392 (patch) | |
tree | 192cb8b3b0a4037453b5fd5b2a60e4d52d4543a8 /ext | |
parent | bbf8f69cb979be0f36c38ae52b1588e648b3252e (diff) |
fix: implement child_process IPC (#21490)
This PR implements the Node child_process IPC functionality in Deno on
Unix systems.
For `fd > 2` a duplex unix pipe is set up between the parent and child
processes. Currently implements data passing via the channel in the JSON
serialization format.
Diffstat (limited to 'ext')
-rw-r--r-- | ext/node/Cargo.toml | 3 | ||||
-rw-r--r-- | ext/node/benchmarks/child_process_ipc.mjs | 64 | ||||
-rw-r--r-- | ext/node/lib.rs | 3 | ||||
-rw-r--r-- | ext/node/ops/ipc.rs | 504 | ||||
-rw-r--r-- | ext/node/ops/mod.rs | 1 | ||||
-rw-r--r-- | ext/node/polyfills/02_init.js | 8 | ||||
-rw-r--r-- | ext/node/polyfills/child_process.ts | 9 | ||||
-rw-r--r-- | ext/node/polyfills/internal/child_process.ts | 95 | ||||
-rw-r--r-- | ext/node/polyfills/process.ts | 1 |
9 files changed, 682 insertions, 6 deletions
diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index 1393c82c0..e5f984145 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -44,6 +44,7 @@ libc.workspace = true libz-sys.workspace = true md-5 = "0.10.5" md4 = "0.10.2" +nix.workspace = true num-bigint.workspace = true num-bigint-dig = "0.8.2" num-integer = "0.1.45" @@ -54,6 +55,7 @@ p256.workspace = true p384.workspace = true path-clean = "=0.1.0" pbkdf2 = "0.12.1" +pin-project-lite = "0.2.13" rand.workspace = true regex.workspace = true reqwest.workspace = true @@ -65,6 +67,7 @@ serde = "1.0.149" sha-1 = "0.10.0" sha2.workspace = true signature.workspace = true +simd-json = "0.13.4" tokio.workspace = true typenum = "1.15.0" url.workspace = true diff --git a/ext/node/benchmarks/child_process_ipc.mjs b/ext/node/benchmarks/child_process_ipc.mjs new file mode 100644 index 000000000..0486972dc --- /dev/null +++ b/ext/node/benchmarks/child_process_ipc.mjs @@ -0,0 +1,64 @@ +import { fork } from "node:child_process"; +import process from "node:process"; +import { setImmediate } from "node:timers"; + +if (process.env.CHILD) { + const len = +process.env.CHILD; + const msg = ".".repeat(len); + const send = () => { + while (process.send(msg)); + // Wait: backlog of unsent messages exceeds threshold + setImmediate(send); + }; + send(); +} else { + function main(dur, len) { + const p = new Promise((resolve) => { + const start = performance.now(); + + const options = { + "stdio": ["inherit", "inherit", "inherit", "ipc"], + "env": { "CHILD": len.toString() }, + }; + const path = new URL("child_process_ipc.mjs", import.meta.url).pathname; + const child = fork( + path, + options, + ); + + let bytes = 0; + let total = 0; + child.on("message", (msg) => { + bytes += msg.length; + total += 1; + }); + + setTimeout(() => { + child.kill(); + const end = performance.now(); + const mb = bytes / 1024 / 1024; + const sec = (end - start) / 1000; + const mbps = mb / sec; + console.log(`${len} bytes: ${mbps.toFixed(2)} MB/s`); + console.log(`${total} messages`); + resolve(); + }, dur * 1000); + }); + return p; + } + + const len = [ + 64, + 256, + 1024, + 4096, + 16384, + 65536, + 65536 << 4, + 65536 << 6 - 1, + ]; + + for (const l of len) { + await main(5, l); + } +} diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 56f4b0ee0..77f01b3d3 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -312,6 +312,9 @@ deno_core::extension!(deno_node, ops::require::op_require_break_on_next_statement, ops::util::op_node_guess_handle_type, ops::crypto::op_node_create_private_key, + ops::ipc::op_node_ipc_pipe, + ops::ipc::op_node_ipc_write, + ops::ipc::op_node_ipc_read, ], esm_entry_point = "ext:deno_node/02_init.js", esm = [ diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs new file mode 100644 index 000000000..d1aeeb40c --- /dev/null +++ b/ext/node/ops/ipc.rs @@ -0,0 +1,504 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +#[cfg(unix)] +pub use unix::*; + +#[cfg(windows)] +pub use windows::*; + +#[cfg(unix)] +mod unix { + use std::cell::RefCell; + use std::future::Future; + use std::io; + use std::mem; + use std::os::fd::FromRawFd; + use std::os::fd::RawFd; + use std::pin::Pin; + use std::rc::Rc; + use std::task::Context; + use std::task::Poll; + + use deno_core::error::bad_resource_id; + use deno_core::error::AnyError; + use deno_core::op2; + use deno_core::serde_json; + use deno_core::AsyncRefCell; + use deno_core::CancelFuture; + use deno_core::CancelHandle; + use deno_core::OpState; + use deno_core::RcRef; + use deno_core::ResourceId; + use pin_project_lite::pin_project; + use tokio::io::AsyncBufRead; + use tokio::io::AsyncWriteExt; + use tokio::io::BufReader; + use tokio::net::unix::OwnedReadHalf; + use tokio::net::unix::OwnedWriteHalf; + use tokio::net::UnixStream; + + #[op2(fast)] + #[smi] + pub fn op_node_ipc_pipe( + state: &mut OpState, + #[smi] fd: i32, + ) -> Result<ResourceId, AnyError> { + Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?)) + } + + #[op2(async)] + pub async fn op_node_ipc_write( + state: Rc<RefCell<OpState>>, + #[smi] rid: ResourceId, + #[serde] value: serde_json::Value, + ) -> Result<(), AnyError> { + let stream = state + .borrow() + .resource_table + .get::<IpcJsonStreamResource>(rid) + .map_err(|_| bad_resource_id())?; + stream.write_msg(value).await?; + Ok(()) + } + + #[op2(async)] + #[serde] + pub async fn op_node_ipc_read( + state: Rc<RefCell<OpState>>, + #[smi] rid: ResourceId, + ) -> Result<serde_json::Value, AnyError> { + let stream = state + .borrow() + .resource_table + .get::<IpcJsonStreamResource>(rid) + .map_err(|_| bad_resource_id())?; + + let cancel = stream.cancel.clone(); + let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await; + let msgs = stream.read_msg().or_cancel(cancel).await??; + Ok(msgs) + } + + struct IpcJsonStreamResource { + read_half: AsyncRefCell<IpcJsonStream>, + write_half: AsyncRefCell<OwnedWriteHalf>, + cancel: Rc<CancelHandle>, + } + + impl deno_core::Resource for IpcJsonStreamResource { + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } + } + + impl IpcJsonStreamResource { + fn new(stream: RawFd) -> Result<Self, std::io::Error> { + // Safety: The fd is part of a pair of connected sockets create by child process + // implementation. + let unix_stream = UnixStream::from_std(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(stream) + })?; + let (read_half, write_half) = unix_stream.into_split(); + Ok(Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + }) + } + + #[cfg(test)] + fn from_unix_stream(stream: UnixStream) -> Self { + let (read_half, write_half) = stream.into_split(); + Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + } + } + + async fn write_msg( + self: Rc<Self>, + msg: serde_json::Value, + ) -> Result<(), AnyError> { + let mut write_half = + RcRef::map(self, |r| &r.write_half).borrow_mut().await; + // Perf note: We do not benefit from writev here because + // we are always allocating a buffer for serialization anyways. + let mut buf = Vec::new(); + serde_json::to_writer(&mut buf, &msg)?; + buf.push(b'\n'); + write_half.write_all(&buf).await?; + Ok(()) + } + } + + #[inline] + fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> { + #[cfg(all(target_os = "macos", target_arch = "aarch64"))] + // Safety: haystack of valid length. neon_memchr can handle unaligned + // data. + return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) }; + + #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))] + return haystack.iter().position(|&b| b == needle); + } + + // Initial capacity of the buffered reader and the JSON backing buffer. + // + // This is a tradeoff between memory usage and performance on large messages. + // + // 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message. + const INITIAL_CAPACITY: usize = 1024 * 64; + + // JSON serialization stream over IPC pipe. + // + // `\n` is used as a delimiter between messages. + struct IpcJsonStream { + pipe: BufReader<OwnedReadHalf>, + buffer: Vec<u8>, + } + + impl IpcJsonStream { + fn new(pipe: OwnedReadHalf) -> Self { + Self { + pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + buffer: Vec::with_capacity(INITIAL_CAPACITY), + } + } + + async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> { + let mut json = None; + let nread = + read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?; + if nread == 0 { + // EOF. + return Ok(serde_json::Value::Null); + } + + let json = match json { + Some(v) => v, + None => { + // Took more than a single read and some buffering. + simd_json::from_slice(&mut self.buffer[..nread])? + } + }; + + // Safety: Same as `Vec::clear` but without the `drop_in_place` for + // each element (nop for u8). Capacity remains the same. + unsafe { + self.buffer.set_len(0); + } + + Ok(json) + } + } + + pin_project! { + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct ReadMsgInner<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut Vec<u8>, + json: &'a mut Option<serde_json::Value>, + // The number of bytes appended to buf. This can be less than buf.len() if + // the buffer was not empty when the operation was started. + read: usize, + } + } + + fn read_msg_inner<'a, R>( + reader: &'a mut R, + buf: &'a mut Vec<u8>, + json: &'a mut Option<serde_json::Value>, + ) -> ReadMsgInner<'a, R> + where + R: AsyncBufRead + ?Sized + Unpin, + { + ReadMsgInner { + reader, + buf, + json, + read: 0, + } + } + + fn read_msg_internal<R: AsyncBufRead + ?Sized>( + mut reader: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut Vec<u8>, + json: &mut Option<serde_json::Value>, + read: &mut usize, + ) -> Poll<io::Result<usize>> { + loop { + let (done, used) = { + let available = match reader.as_mut().poll_fill_buf(cx) { + std::task::Poll::Ready(t) => t?, + std::task::Poll::Pending => return std::task::Poll::Pending, + }; + + if let Some(i) = memchr(b'\n', available) { + if *read == 0 { + // Fast path: parse and put into the json slot directly. + // + // Safety: It is ok to overwrite the contents because + // we don't need to copy it into the buffer and the length will be reset. + let available = unsafe { + std::slice::from_raw_parts_mut( + available.as_ptr() as *mut u8, + available.len(), + ) + }; + json.replace( + simd_json::from_slice(&mut available[..i + 1]) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, + ); + } else { + // This is not the first read, so we have to copy the data + // to make it contiguous. + buf.extend_from_slice(&available[..=i]); + } + (true, i + 1) + } else { + buf.extend_from_slice(available); + (false, available.len()) + } + }; + + reader.as_mut().consume(used); + *read += used; + if done || used == 0 { + return Poll::Ready(Ok(mem::replace(read, 0))); + } + } + } + + impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> { + type Output = io::Result<usize>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read) + } + } + + #[cfg(all(target_os = "macos", target_arch = "aarch64"))] + mod neon { + use std::arch::aarch64::*; + + pub unsafe fn neon_memchr( + str: &[u8], + c: u8, + length: usize, + ) -> Option<usize> { + let end = str.as_ptr().wrapping_add(length); + + // Alignment handling + let mut ptr = str.as_ptr(); + while ptr < end && (ptr as usize) & 0xF != 0 { + if *ptr == c { + return Some(ptr as usize - str.as_ptr() as usize); + } + ptr = ptr.wrapping_add(1); + } + + let search_char = vdupq_n_u8(c); + + while ptr.wrapping_add(16) <= end { + let chunk = vld1q_u8(ptr); + let comparison = vceqq_u8(chunk, search_char); + + // Check first 64 bits + let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0); + if result0 != 0 { + return Some( + (ptr as usize - str.as_ptr() as usize) + + result0.trailing_zeros() as usize / 8, + ); + } + + // Check second 64 bits + let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1); + if result1 != 0 { + return Some( + (ptr as usize - str.as_ptr() as usize) + + 8 + + result1.trailing_zeros() as usize / 8, + ); + } + + ptr = ptr.wrapping_add(16); + } + + // Handle remaining unaligned characters + while ptr < end { + if *ptr == c { + return Some(ptr as usize - str.as_ptr() as usize); + } + ptr = ptr.wrapping_add(1); + } + + None + } + } + + #[cfg(test)] + mod tests { + use super::IpcJsonStreamResource; + use deno_core::serde_json; + use deno_core::serde_json::json; + use deno_core::RcRef; + use std::rc::Rc; + + #[tokio::test] + async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> { + // A simple round trip benchmark for quick dev feedback. + // + // Only ran when the env var is set. + if std::env::var_os("BENCH_IPC_DENO").is_none() { + return Ok(()); + } + + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncWriteExt; + + let size = 1024 * 1024; + + let stri = "x".repeat(size); + let data = format!("\"{}\"\n", stri); + for _ in 0..100 { + fd2.write_all(data.as_bytes()).await?; + } + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + + let start = std::time::Instant::now(); + let mut bytes = 0; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + loop { + let msgs = ipc.read_msg().await?; + if msgs == serde_json::Value::Null { + break; + } + bytes += msgs.as_str().unwrap().len(); + if start.elapsed().as_secs() > 5 { + break; + } + } + let elapsed = start.elapsed(); + let mb = bytes as f64 / 1024.0 / 1024.0; + println!("{} mb/s", mb / elapsed.as_secs_f64()); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + let mut buf = [0u8; 1024]; + let n = fd2.read(&mut buf).await?; + assert_eq!(&buf[..n], b"\"hello\"\n"); + fd2.write_all(b"\"world\"\n").await?; + Ok::<_, std::io::Error>(()) + }); + + /* Similar to how ops would use the resource */ + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + + ipc.clone().write_msg(json!("hello")).await?; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let msgs = ipc.read_msg().await?; + assert_eq!(msgs, json!("world")); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + let mut buf = [0u8; 1024]; + let n = fd2.read(&mut buf).await?; + assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n"); + fd2.write_all(b"\"foo\"\n\"bar\"\n").await?; + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + ipc.clone().write_msg(json!("hello")).await?; + ipc.clone().write_msg(json!("world")).await?; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let msgs = ipc.read_msg().await?; + assert_eq!(msgs, json!("foo")); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?; + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let _err = ipc.read_msg().await.unwrap_err(); + + child.await??; + + Ok(()) + } + + #[test] + fn memchr() { + let str = b"hello world"; + assert_eq!(super::memchr(b'h', str), Some(0)); + assert_eq!(super::memchr(b'w', str), Some(6)); + assert_eq!(super::memchr(b'd', str), Some(10)); + assert_eq!(super::memchr(b'x', str), None); + + let empty = b""; + assert_eq!(super::memchr(b'\n', empty), None); + } + } +} + +#[cfg(windows)] +mod windows { + use deno_core::error::AnyError; + use deno_core::op2; + + #[op2(fast)] + pub fn op_node_ipc_pipe() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } + + #[op2(async)] + pub async fn op_node_ipc_write() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } + + #[op2(async)] + pub async fn op_node_ipc_read() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } +} diff --git a/ext/node/ops/mod.rs b/ext/node/ops/mod.rs index ec4324da3..277e340df 100644 --- a/ext/node/ops/mod.rs +++ b/ext/node/ops/mod.rs @@ -5,6 +5,7 @@ pub mod fs; pub mod http; pub mod http2; pub mod idna; +pub mod ipc; pub mod os; pub mod require; pub mod util; diff --git a/ext/node/polyfills/02_init.js b/ext/node/polyfills/02_init.js index e3061c95d..e5a0279a5 100644 --- a/ext/node/polyfills/02_init.js +++ b/ext/node/polyfills/02_init.js @@ -7,15 +7,12 @@ const requireImpl = internals.requireImpl; import { nodeGlobals } from "ext:deno_node/00_globals.js"; import "node:module"; -globalThis.nodeBootstrap = function (usesLocalNodeModulesDir, argv0) { - initialize(usesLocalNodeModulesDir, argv0); -}; - let initialized = false; function initialize( usesLocalNodeModulesDir, argv0, + ipcFd, ) { if (initialized) { throw Error("Node runtime already initialized"); @@ -41,6 +38,7 @@ function initialize( // but it's the only way to get `args` and `version` and this point. internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version); internals.__initWorkerThreads(); + internals.__setupChildProcessIpcChannel(ipcFd); // `Deno[Deno.internal].requireImpl` will be unreachable after this line. delete internals.requireImpl; } @@ -52,6 +50,8 @@ function loadCjsModule(moduleName, isMain, inspectBrk) { requireImpl.Module._load(moduleName, null, { main: isMain }); } +globalThis.nodeBootstrap = initialize; + internals.node = { initialize, loadCjsModule, diff --git a/ext/node/polyfills/child_process.ts b/ext/node/polyfills/child_process.ts index 94a108447..c7d007f46 100644 --- a/ext/node/polyfills/child_process.ts +++ b/ext/node/polyfills/child_process.ts @@ -10,6 +10,7 @@ import { ChildProcess, ChildProcessOptions, normalizeSpawnArguments, + setupChannel, type SpawnOptions, spawnSync as _spawnSync, type SpawnSyncOptions, @@ -821,6 +822,14 @@ export function execFileSync( return ret.stdout as string | Buffer; } +function setupChildProcessIpcChannel(fd: number) { + if (typeof fd != "number" || fd < 0) return; + setupChannel(process, fd); +} + +globalThis.__bootstrap.internals.__setupChildProcessIpcChannel = + setupChildProcessIpcChannel; + export default { fork, spawn, diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index 04773a8b7..b9bf13396 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -44,6 +44,9 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; +const core = globalThis.__bootstrap.core; +const ops = core.ops; + export function mapValues<T, O>( record: Readonly<Record<string, T>>, transformer: (value: T) => O, @@ -167,12 +170,13 @@ export class ChildProcess extends EventEmitter { signal, windowsVerbatimArguments = false, } = options || {}; + const normalizedStdio = normalizeStdioOption(stdio); const [ stdin = "pipe", stdout = "pipe", stderr = "pipe", _channel, // TODO(kt3k): handle this correctly - ] = normalizeStdioOption(stdio); + ] = normalizedStdio; const [cmd, cmdArgs] = buildCommand( command, args || [], @@ -181,6 +185,8 @@ export class ChildProcess extends EventEmitter { this.spawnfile = cmd; this.spawnargs = [cmd, ...cmdArgs]; + const ipc = normalizedStdio.indexOf("ipc"); + const stringEnv = mapValues(env, (value) => value.toString()); try { this.#process = new Deno.Command(cmd, { @@ -191,6 +197,7 @@ export class ChildProcess extends EventEmitter { stdout: toDenoStdio(stdout), stderr: toDenoStdio(stderr), windowsRawArguments: windowsVerbatimArguments, + ipc, // internal }).spawn(); this.pid = this.#process.pid; @@ -249,6 +256,10 @@ export class ChildProcess extends EventEmitter { } } + if (typeof this.#process._pipeFd == "number") { + setupChannel(this, this.#process._pipeFd); + } + (async () => { const status = await this.#process.status; this.exitCode = status.code; @@ -1058,9 +1069,91 @@ function toDenoArgs(args: string[]): string[] { return denoArgs; } +export function setupChannel(target, channel) { + const ipc = ops.op_node_ipc_pipe(channel); + + async function readLoop() { + try { + while (true) { + if (!target.connected || target.killed) { + return; + } + const msg = await core.opAsync("op_node_ipc_read", ipc); + if (msg == null) { + // Channel closed. + target.disconnect(); + return; + } + + process.nextTick(handleMessage, msg); + } + } catch (err) { + if ( + err instanceof Deno.errors.Interrupted || + err instanceof Deno.errors.BadResource + ) { + return; + } + } + } + + function handleMessage(msg) { + target.emit("message", msg); + } + + target.send = function (message, handle, options, callback) { + if (typeof handle === "function") { + callback = handle; + handle = undefined; + options = undefined; + } else if (typeof options === "function") { + callback = options; + options = undefined; + } else if (options !== undefined) { + validateObject(options, "options"); + } + + options = { swallowErrors: false, ...options }; + + if (message === undefined) { + throw new TypeError("ERR_MISSING_ARGS", "message"); + } + + if (handle !== undefined) { + notImplemented("ChildProcess.send with handle"); + } + + core.opAsync("op_node_ipc_write", ipc, message) + .then(() => { + if (callback) { + process.nextTick(callback, null); + } + }); + }; + + target.connected = true; + + target.disconnect = function () { + if (!this.connected) { + this.emit("error", new Error("IPC channel is already disconnected")); + return; + } + + this.connected = false; + process.nextTick(() => { + core.close(ipc); + target.emit("disconnect"); + }); + }; + + // Start reading messages from the channel. + readLoop(); +} + export default { ChildProcess, normalizeSpawnArguments, stdioStringToArray, spawnSync, + setupChannel, }; diff --git a/ext/node/polyfills/process.ts b/ext/node/polyfills/process.ts index 575d8dfb1..352d46f42 100644 --- a/ext/node/polyfills/process.ts +++ b/ext/node/polyfills/process.ts @@ -69,7 +69,6 @@ import { buildAllowedFlags } from "ext:deno_node/internal/process/per_thread.mjs const notImplementedEvents = [ "disconnect", - "message", "multipleResolves", "rejectionHandled", "worker", |