summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathan Whitaker <17734409+nathanwhit@users.noreply.github.com>2024-07-30 16:13:24 -0700
committerGitHub <noreply@github.com>2024-07-30 16:13:24 -0700
commitcd59fc53a528603112addfe8b10fe4e30d04e7f0 (patch)
tree1abe3976361b39ad3969aabdd2b40380ae79c85d
parent3659781f88236a369aa9ca5142c0fb7d690fc898 (diff)
fix(node): Rework node:child_process IPC (#24763)
Fixes https://github.com/denoland/deno/issues/24756. Fixes https://github.com/denoland/deno/issues/24796. This also gets vitest working when using [`--pool=forks`](https://vitest.dev/guide/improving-performance#pool) (which is the default as of vitest 2.0). Ref https://github.com/denoland/deno/issues/23882. --- This PR resolves a handful of issues with child_process IPC. In particular: - We didn't support sending typed array views over IPC - Opening an IPC channel resulted in the event loop never exiting - Sending a `null` over IPC would terminate the channel - There was some UB in the read implementation (transmuting an `&[u8]` to `&mut [u8]`) - The `send` method wasn't returning anything, so there was no way to signal backpressure (this also resulted in the benchmark `child_process_ipc.mjs` being misleading, as it tried to respect backpressure. That gave node much worse results at larger message sizes, and gave us much worse results at smaller message sizes). - We weren't setting up the `channel` property on the `process` global (or on the `ChildProcess` object), and also didn't have a way to ref/unref the channel - Calling `kill` multiple times (or disconnecting the channel, then calling kill) would throw an error - Node couldn't spawn a deno subprocess and communicate with it over IPC
-rw-r--r--Cargo.lock5
-rw-r--r--cli/args/mod.rs4
-rw-r--r--ext/node/Cargo.toml1
-rw-r--r--ext/node/benchmarks/child_process_ipc.mjs14
-rw-r--r--ext/node/lib.rs3
-rw-r--r--ext/node/ops/ipc.rs579
-rw-r--r--ext/node/ops/vm.rs3
-rw-r--r--ext/node/polyfills/child_process.ts25
-rw-r--r--ext/node/polyfills/internal/child_process.ts220
-rw-r--r--runtime/ops/process.rs32
-rw-r--r--tests/unit_node/child_process_test.ts191
11 files changed, 898 insertions, 179 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 354973874..25a99898f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1770,6 +1770,7 @@ dependencies = [
"libz-sys",
"md-5",
"md4",
+ "memchr",
"node_resolver",
"num-bigint",
"num-bigint-dig",
@@ -4152,9 +4153,9 @@ dependencies = [
[[package]]
name = "memchr"
-version = "2.7.2"
+version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d"
+checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "memmap2"
diff --git a/cli/args/mod.rs b/cli/args/mod.rs
index b288751a7..76644028c 100644
--- a/cli/args/mod.rs
+++ b/cli/args/mod.rs
@@ -1068,10 +1068,10 @@ impl CliOptions {
}
pub fn node_ipc_fd(&self) -> Option<i64> {
- let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok();
+ let maybe_node_channel_fd = std::env::var("NODE_CHANNEL_FD").ok();
if let Some(node_channel_fd) = maybe_node_channel_fd {
// Remove so that child processes don't inherit this environment variable.
- std::env::remove_var("DENO_CHANNEL_FD");
+ std::env::remove_var("NODE_CHANNEL_FD");
node_channel_fd.parse::<i64>().ok()
} else {
None
diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml
index 44cf81481..dc27f8415 100644
--- a/ext/node/Cargo.toml
+++ b/ext/node/Cargo.toml
@@ -57,6 +57,7 @@ libc.workspace = true
libz-sys.workspace = true
md-5 = { version = "0.10.5", features = ["oid"] }
md4 = "0.10.2"
+memchr = "2.7.4"
node_resolver.workspace = true
num-bigint.workspace = true
num-bigint-dig = "0.8.2"
diff --git a/ext/node/benchmarks/child_process_ipc.mjs b/ext/node/benchmarks/child_process_ipc.mjs
index 0486972dc..39377cd8c 100644
--- a/ext/node/benchmarks/child_process_ipc.mjs
+++ b/ext/node/benchmarks/child_process_ipc.mjs
@@ -5,10 +5,20 @@ import { setImmediate } from "node:timers";
if (process.env.CHILD) {
const len = +process.env.CHILD;
const msg = ".".repeat(len);
+ let waiting = false;
const send = () => {
- while (process.send(msg));
+ while (
+ process.send(msg, undefined, undefined, (_e) => {
+ if (waiting) {
+ waiting = false;
+ setImmediate(send);
+ }
+ })
+ );
// Wait: backlog of unsent messages exceeds threshold
- setImmediate(send);
+ // once the message is sent, the callback will be called
+ // and we'll resume
+ waiting = true;
};
send();
} else {
diff --git a/ext/node/lib.rs b/ext/node/lib.rs
index 899ffc44e..987d52520 100644
--- a/ext/node/lib.rs
+++ b/ext/node/lib.rs
@@ -30,6 +30,7 @@ pub use deno_package_json::PackageJson;
pub use node_resolver::PathClean;
pub use ops::ipc::ChildPipeFd;
pub use ops::ipc::IpcJsonStreamResource;
+pub use ops::ipc::IpcRefTracker;
use ops::vm;
pub use ops::vm::create_v8_context;
pub use ops::vm::init_global_template;
@@ -380,6 +381,8 @@ deno_core::extension!(deno_node,
ops::ipc::op_node_child_ipc_pipe,
ops::ipc::op_node_ipc_write,
ops::ipc::op_node_ipc_read,
+ ops::ipc::op_node_ipc_ref,
+ ops::ipc::op_node_ipc_unref,
ops::process::op_node_process_kill,
ops::process::op_process_abort,
],
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs
index dc0c086c1..4849a5c6c 100644
--- a/ext/node/ops/ipc.rs
+++ b/ext/node/ops/ipc.rs
@@ -15,23 +15,33 @@ mod impl_ {
use std::os::fd::RawFd;
use std::pin::Pin;
use std::rc::Rc;
+ use std::sync::atomic::AtomicBool;
+ use std::sync::atomic::AtomicUsize;
+ use std::task::ready;
use std::task::Context;
use std::task::Poll;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
use deno_core::op2;
+ use deno_core::serde;
+ use deno_core::serde::Serializer;
use deno_core::serde_json;
+ use deno_core::v8;
use deno_core::AsyncRefCell;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
+ use deno_core::ExternalOpsTracker;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::ResourceId;
+ use deno_core::ToV8;
+ use memchr::memchr;
use pin_project_lite::pin_project;
- use tokio::io::AsyncBufRead;
+ use serde::Serialize;
+ use tokio::io::AsyncRead;
use tokio::io::AsyncWriteExt;
- use tokio::io::BufReader;
+ use tokio::io::ReadBuf;
#[cfg(unix)]
use tokio::net::unix::OwnedReadHalf;
@@ -43,6 +53,116 @@ mod impl_ {
#[cfg(windows)]
type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;
+ /// Wrapper around v8 value that implements Serialize.
+ struct SerializeWrapper<'a, 'b>(
+ RefCell<&'b mut v8::HandleScope<'a>>,
+ v8::Local<'a, v8::Value>,
+ );
+
+ impl<'a, 'b> Serialize for SerializeWrapper<'a, 'b> {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: Serializer,
+ {
+ serialize_v8_value(*self.0.borrow_mut(), self.1, serializer)
+ }
+ }
+
+ /// Serialize a v8 value directly into a serde serializer.
+ /// This allows us to go from v8 values to JSON without having to
+ /// deserialize into a `serde_json::Value` and then reserialize to JSON
+ fn serialize_v8_value<'a, S: Serializer>(
+ scope: &mut v8::HandleScope<'a>,
+ value: v8::Local<'a, v8::Value>,
+ ser: S,
+ ) -> Result<S::Ok, S::Error> {
+ use serde::ser::Error;
+ if value.is_null_or_undefined() {
+ ser.serialize_unit()
+ } else if value.is_number() || value.is_number_object() {
+ let num_value = value.number_value(scope).unwrap();
+ if (num_value as i64 as f64) == num_value {
+ ser.serialize_i64(num_value as i64)
+ } else {
+ ser.serialize_f64(num_value)
+ }
+ } else if value.is_string() {
+ let str = deno_core::serde_v8::to_utf8(value.try_into().unwrap(), scope);
+ ser.serialize_str(&str)
+ } else if value.is_string_object() {
+ let str =
+ deno_core::serde_v8::to_utf8(value.to_string(scope).unwrap(), scope);
+ ser.serialize_str(&str)
+ } else if value.is_boolean() {
+ ser.serialize_bool(value.is_true())
+ } else if value.is_boolean_object() {
+ ser.serialize_bool(value.boolean_value(scope))
+ } else if value.is_array() {
+ use serde::ser::SerializeSeq;
+ let array = v8::Local::<v8::Array>::try_from(value).unwrap();
+ let length = array.length();
+ let mut seq = ser.serialize_seq(Some(length as usize))?;
+ for i in 0..length {
+ let element = array.get_index(scope, i).unwrap();
+ seq
+ .serialize_element(&SerializeWrapper(RefCell::new(scope), element))?;
+ }
+ seq.end()
+ } else if value.is_object() {
+ use serde::ser::SerializeMap;
+ if value.is_array_buffer_view() {
+ let buffer = v8::Local::<v8::ArrayBufferView>::try_from(value).unwrap();
+ let mut buf = vec![0u8; buffer.byte_length()];
+ let copied = buffer.copy_contents(&mut buf);
+ assert_eq!(copied, buf.len());
+ return ser.serialize_bytes(&buf);
+ }
+ let object = value.to_object(scope).unwrap();
+ // node uses `JSON.stringify`, so to match its behavior (and allow serializing custom objects)
+ // we need to respect the `toJSON` method if it exists.
+ let to_json_key = v8::String::new_from_utf8(
+ scope,
+ b"toJSON",
+ v8::NewStringType::Internalized,
+ )
+ .unwrap()
+ .into();
+ if let Some(to_json) = object.get(scope, to_json_key) {
+ if to_json.is_function() {
+ let to_json = v8::Local::<v8::Function>::try_from(to_json).unwrap();
+ let json_value = to_json.call(scope, object.into(), &[]).unwrap();
+ return serialize_v8_value(scope, json_value, ser);
+ }
+ }
+
+ let keys = object
+ .get_own_property_names(
+ scope,
+ v8::GetPropertyNamesArgs {
+ ..Default::default()
+ },
+ )
+ .unwrap();
+ let num_keys = keys.length();
+ let mut map = ser.serialize_map(Some(num_keys as usize))?;
+ for i in 0..num_keys {
+ let key = keys.get_index(scope, i).unwrap();
+ let key_str = key.to_rust_string_lossy(scope);
+ let value = object.get(scope, key).unwrap();
+ map.serialize_entry(
+ &key_str,
+ &SerializeWrapper(RefCell::new(scope), value),
+ )?;
+ }
+ map.end()
+ } else {
+ // TODO(nathanwhit): better error message
+ Err(S::Error::custom(deno_core::error::type_error(
+ "Unsupported type",
+ )))
+ }
+ }
+
// Open IPC pipe from bootstrap options.
#[op2]
#[smi]
@@ -53,25 +173,66 @@ mod impl_ {
Some(child_pipe_fd) => child_pipe_fd.0,
None => return Ok(None),
};
-
+ let ref_tracker = IpcRefTracker::new(state.external_ops_tracker.clone());
Ok(Some(
- state.resource_table.add(IpcJsonStreamResource::new(fd)?),
+ state
+ .resource_table
+ .add(IpcJsonStreamResource::new(fd, ref_tracker)?),
))
}
#[op2(async)]
- pub async fn op_node_ipc_write(
+ pub fn op_node_ipc_write<'a>(
+ scope: &mut v8::HandleScope<'a>,
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
- #[serde] value: serde_json::Value,
- ) -> Result<(), AnyError> {
+ value: v8::Local<'a, v8::Value>,
+ // using an array as an "out parameter".
+ // index 0 is a boolean indicating whether the queue is under the limit.
+ //
+ // ideally we would just return `Result<(impl Future, bool), ..>`, but that's not
+ // supported by `op2` currently.
+ queue_ok: v8::Local<'a, v8::Array>,
+ ) -> Result<impl Future<Output = Result<(), AnyError>>, AnyError> {
+ let mut serialized = Vec::with_capacity(64);
+ let mut ser = serde_json::Serializer::new(&mut serialized);
+ serialize_v8_value(scope, value, &mut ser).map_err(|e| {
+ deno_core::error::type_error(format!(
+ "failed to serialize json value: {e}"
+ ))
+ })?;
+ serialized.push(b'\n');
+
let stream = state
.borrow()
.resource_table
.get::<IpcJsonStreamResource>(rid)
.map_err(|_| bad_resource_id())?;
- stream.write_msg(value).await?;
- Ok(())
+ let old = stream
+ .queued_bytes
+ .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed);
+ if old + serialized.len() > 2 * INITIAL_CAPACITY {
+ // sending messages too fast
+ let v = false.to_v8(scope)?;
+ queue_ok.set_index(scope, 0, v);
+ }
+ Ok(async move {
+ stream.clone().write_msg_bytes(&serialized).await?;
+ stream
+ .queued_bytes
+ .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed);
+ Ok(())
+ })
+ }
+
+ /// Value signaling that the other end ipc channel has closed.
+ ///
+ /// Node reserves objects of this form (`{ "cmd": "NODE_<something>"`)
+ /// for internal use, so we use it here as well to avoid breaking anyone.
+ fn stop_sentinel() -> serde_json::Value {
+ serde_json::json!({
+ "cmd": "NODE_CLOSE"
+ })
}
#[op2(async)]
@@ -89,7 +250,92 @@ mod impl_ {
let cancel = stream.cancel.clone();
let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
let msgs = stream.read_msg().or_cancel(cancel).await??;
- Ok(msgs)
+ if let Some(msg) = msgs {
+ Ok(msg)
+ } else {
+ Ok(stop_sentinel())
+ }
+ }
+
+ #[op2(fast)]
+ pub fn op_node_ipc_ref(state: &mut OpState, #[smi] rid: ResourceId) {
+ let stream = state
+ .resource_table
+ .get::<IpcJsonStreamResource>(rid)
+ .expect("Invalid resource ID");
+ stream.ref_tracker.ref_();
+ }
+
+ #[op2(fast)]
+ pub fn op_node_ipc_unref(state: &mut OpState, #[smi] rid: ResourceId) {
+ let stream = state
+ .resource_table
+ .get::<IpcJsonStreamResource>(rid)
+ .expect("Invalid resource ID");
+ stream.ref_tracker.unref();
+ }
+
+ /// Tracks whether the IPC resources is currently
+ /// refed, and allows refing/unrefing it.
+ pub struct IpcRefTracker {
+ refed: AtomicBool,
+ tracker: OpsTracker,
+ }
+
+ /// A little wrapper so we don't have to get an
+ /// `ExternalOpsTracker` for tests. When we aren't
+ /// cfg(test), this will get optimized out.
+ enum OpsTracker {
+ External(ExternalOpsTracker),
+ #[cfg(test)]
+ Test,
+ }
+
+ impl OpsTracker {
+ fn ref_(&self) {
+ match self {
+ Self::External(tracker) => tracker.ref_op(),
+ #[cfg(test)]
+ Self::Test => {}
+ }
+ }
+
+ fn unref(&self) {
+ match self {
+ Self::External(tracker) => tracker.unref_op(),
+ #[cfg(test)]
+ Self::Test => {}
+ }
+ }
+ }
+
+ impl IpcRefTracker {
+ pub fn new(tracker: ExternalOpsTracker) -> Self {
+ Self {
+ refed: AtomicBool::new(false),
+ tracker: OpsTracker::External(tracker),
+ }
+ }
+
+ #[cfg(test)]
+ fn new_test() -> Self {
+ Self {
+ refed: AtomicBool::new(false),
+ tracker: OpsTracker::Test,
+ }
+ }
+
+ fn ref_(&self) {
+ if !self.refed.swap(true, std::sync::atomic::Ordering::AcqRel) {
+ self.tracker.ref_();
+ }
+ }
+
+ fn unref(&self) {
+ if self.refed.swap(false, std::sync::atomic::Ordering::AcqRel) {
+ self.tracker.unref();
+ }
+ }
}
pub struct IpcJsonStreamResource {
@@ -99,6 +345,8 @@ mod impl_ {
#[cfg(windows)]
write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
cancel: Rc<CancelHandle>,
+ queued_bytes: AtomicUsize,
+ ref_tracker: IpcRefTracker,
}
impl deno_core::Resource for IpcJsonStreamResource {
@@ -134,64 +382,56 @@ mod impl_ {
}
impl IpcJsonStreamResource {
- pub fn new(stream: i64) -> Result<Self, std::io::Error> {
+ pub fn new(
+ stream: i64,
+ ref_tracker: IpcRefTracker,
+ ) -> Result<Self, std::io::Error> {
let (read_half, write_half) = pipe(stream as _)?;
Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
cancel: Default::default(),
+ queued_bytes: Default::default(),
+ ref_tracker,
})
}
- #[cfg(unix)]
- #[cfg(test)]
- fn from_stream(stream: UnixStream) -> Self {
+ #[cfg(all(unix, test))]
+ fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
cancel: Default::default(),
+ queued_bytes: Default::default(),
+ ref_tracker,
}
}
- #[cfg(windows)]
- #[cfg(test)]
- fn from_stream(pipe: NamedPipeClient) -> Self {
+ #[cfg(all(windows, test))]
+ fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self {
let (read_half, write_half) = tokio::io::split(pipe);
Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
cancel: Default::default(),
+ queued_bytes: Default::default(),
+ ref_tracker,
}
}
- async fn write_msg(
+ /// writes _newline terminated_ JSON message to the IPC pipe.
+ async fn write_msg_bytes(
self: Rc<Self>,
- msg: serde_json::Value,
+ msg: &[u8],
) -> Result<(), AnyError> {
let mut write_half =
RcRef::map(self, |r| &r.write_half).borrow_mut().await;
- // Perf note: We do not benefit from writev here because
- // we are always allocating a buffer for serialization anyways.
- let mut buf = Vec::new();
- serde_json::to_writer(&mut buf, &msg)?;
- buf.push(b'\n');
- write_half.write_all(&buf).await?;
+ write_half.write_all(msg).await?;
Ok(())
}
}
- #[inline]
- fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
- #[cfg(all(target_os = "macos", target_arch = "aarch64"))]
- // Safety: haystack of valid length. neon_memchr can handle unaligned
- // data.
- return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) };
-
- #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))]
- return haystack.iter().position(|&b| b == needle);
- }
-
// Initial capacity of the buffered reader and the JSON backing buffer.
//
// This is a tradeoff between memory usage and performance on large messages.
@@ -199,41 +439,91 @@ mod impl_ {
// 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message.
const INITIAL_CAPACITY: usize = 1024 * 64;
+ /// A buffer for reading from the IPC pipe.
+ /// Similar to the internal buffer of `tokio::io::BufReader`.
+ ///
+ /// This exists to provide buffered reading while granting mutable access
+ /// to the internal buffer (which isn't exposed through `tokio::io::BufReader`
+ /// or the `AsyncBufRead` trait). `simd_json` requires mutable access to an input
+ /// buffer for parsing, so this allows us to use the read buffer directly as the
+ /// input buffer without a copy (provided the message fits).
+ struct ReadBuffer {
+ buffer: Box<[u8]>,
+ pos: usize,
+ cap: usize,
+ }
+
+ impl ReadBuffer {
+ fn new() -> Self {
+ Self {
+ buffer: vec![0; INITIAL_CAPACITY].into_boxed_slice(),
+ pos: 0,
+ cap: 0,
+ }
+ }
+
+ fn get_mut(&mut self) -> &mut [u8] {
+ &mut self.buffer
+ }
+
+ fn available_mut(&mut self) -> &mut [u8] {
+ &mut self.buffer[self.pos..self.cap]
+ }
+
+ fn consume(&mut self, n: usize) {
+ self.pos = std::cmp::min(self.pos + n, self.cap);
+ }
+
+ fn needs_fill(&self) -> bool {
+ self.pos >= self.cap
+ }
+ }
+
// JSON serialization stream over IPC pipe.
//
// `\n` is used as a delimiter between messages.
struct IpcJsonStream {
#[cfg(unix)]
- pipe: BufReader<OwnedReadHalf>,
+ pipe: OwnedReadHalf,
#[cfg(windows)]
- pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>,
+ pipe: tokio::io::ReadHalf<NamedPipeClient>,
buffer: Vec<u8>,
+ read_buffer: ReadBuffer,
}
impl IpcJsonStream {
#[cfg(unix)]
fn new(pipe: OwnedReadHalf) -> Self {
Self {
- pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
+ pipe,
buffer: Vec::with_capacity(INITIAL_CAPACITY),
+ read_buffer: ReadBuffer::new(),
}
}
#[cfg(windows)]
fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
Self {
- pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
+ pipe,
buffer: Vec::with_capacity(INITIAL_CAPACITY),
+ read_buffer: ReadBuffer::new(),
}
}
- async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> {
+ async fn read_msg(
+ &mut self,
+ ) -> Result<Option<serde_json::Value>, AnyError> {
let mut json = None;
- let nread =
- read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?;
+ let nread = read_msg_inner(
+ &mut self.pipe,
+ &mut self.buffer,
+ &mut json,
+ &mut self.read_buffer,
+ )
+ .await?;
if nread == 0 {
// EOF.
- return Ok(serde_json::Value::Null);
+ return Ok(None);
}
let json = match json {
@@ -250,7 +540,7 @@ mod impl_ {
self.buffer.set_len(0);
}
- Ok(json)
+ Ok(Some(json))
}
}
@@ -263,6 +553,7 @@ mod impl_ {
// The number of bytes appended to buf. This can be less than buf.len() if
// the buffer was not empty when the operation was started.
read: usize,
+ read_buffer: &'a mut ReadBuffer,
}
}
@@ -270,43 +561,41 @@ mod impl_ {
reader: &'a mut R,
buf: &'a mut Vec<u8>,
json: &'a mut Option<serde_json::Value>,
+ read_buffer: &'a mut ReadBuffer,
) -> ReadMsgInner<'a, R>
where
- R: AsyncBufRead + ?Sized + Unpin,
+ R: AsyncRead + ?Sized + Unpin,
{
ReadMsgInner {
reader,
buf,
json,
read: 0,
+ read_buffer,
}
}
- fn read_msg_internal<R: AsyncBufRead + ?Sized>(
+ fn read_msg_internal<R: AsyncRead + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,
+ read_buffer: &mut ReadBuffer,
json: &mut Option<serde_json::Value>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
loop {
let (done, used) = {
- let available = match reader.as_mut().poll_fill_buf(cx) {
- std::task::Poll::Ready(t) => t?,
- std::task::Poll::Pending => return std::task::Poll::Pending,
- };
+ // effectively a tiny `poll_fill_buf`, but allows us to get a mutable reference to the buffer.
+ if read_buffer.needs_fill() {
+ let mut read_buf = ReadBuf::new(read_buffer.get_mut());
+ ready!(reader.as_mut().poll_read(cx, &mut read_buf))?;
+ read_buffer.cap = read_buf.filled().len();
+ read_buffer.pos = 0;
+ }
+ let available = read_buffer.available_mut();
if let Some(i) = memchr(b'\n', available) {
if *read == 0 {
// Fast path: parse and put into the json slot directly.
- //
- // Safety: It is ok to overwrite the contents because
- // we don't need to copy it into the buffer and the length will be reset.
- let available = unsafe {
- std::slice::from_raw_parts_mut(
- available.as_ptr() as *mut u8,
- available.len(),
- )
- };
json.replace(
simd_json::from_slice(&mut available[..i + 1])
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
@@ -323,7 +612,7 @@ mod impl_ {
}
};
- reader.as_mut().consume(used);
+ read_buffer.consume(used);
*read += used;
if done || used == 0 {
return Poll::Ready(Ok(mem::replace(read, 0)));
@@ -331,81 +620,30 @@ mod impl_ {
}
}
- impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> {
+ impl<R: AsyncRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
- read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read)
- }
- }
-
- #[cfg(all(target_os = "macos", target_arch = "aarch64"))]
- mod neon {
- use std::arch::aarch64::*;
-
- pub unsafe fn neon_memchr(
- str: &[u8],
- c: u8,
- length: usize,
- ) -> Option<usize> {
- let end = str.as_ptr().wrapping_add(length);
-
- // Alignment handling
- let mut ptr = str.as_ptr();
- while ptr < end && (ptr as usize) & 0xF != 0 {
- if *ptr == c {
- return Some(ptr as usize - str.as_ptr() as usize);
- }
- ptr = ptr.wrapping_add(1);
- }
-
- let search_char = vdupq_n_u8(c);
-
- while ptr.wrapping_add(16) <= end {
- let chunk = vld1q_u8(ptr);
- let comparison = vceqq_u8(chunk, search_char);
-
- // Check first 64 bits
- let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0);
- if result0 != 0 {
- return Some(
- (ptr as usize - str.as_ptr() as usize)
- + result0.trailing_zeros() as usize / 8,
- );
- }
-
- // Check second 64 bits
- let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1);
- if result1 != 0 {
- return Some(
- (ptr as usize - str.as_ptr() as usize)
- + 8
- + result1.trailing_zeros() as usize / 8,
- );
- }
-
- ptr = ptr.wrapping_add(16);
- }
-
- // Handle remaining unaligned characters
- while ptr < end {
- if *ptr == c {
- return Some(ptr as usize - str.as_ptr() as usize);
- }
- ptr = ptr.wrapping_add(1);
- }
-
- None
+ read_msg_internal(
+ Pin::new(*me.reader),
+ cx,
+ me.buf,
+ me.read_buffer,
+ me.json,
+ me.read,
+ )
}
}
#[cfg(test)]
mod tests {
use super::IpcJsonStreamResource;
- use deno_core::serde_json;
use deno_core::serde_json::json;
+ use deno_core::v8;
+ use deno_core::JsRuntime;
use deno_core::RcRef;
+ use deno_core::RuntimeOptions;
use std::rc::Rc;
#[allow(clippy::unused_async)]
@@ -414,7 +652,10 @@ mod impl_ {
let (a, b) = tokio::net::UnixStream::pair().unwrap();
/* Similar to how ops would use the resource */
- let a = Rc::new(IpcJsonStreamResource::from_stream(a));
+ let a = Rc::new(IpcJsonStreamResource::from_stream(
+ a,
+ super::IpcRefTracker::new_test(),
+ ));
(a, b)
}
@@ -434,7 +675,10 @@ mod impl_ {
server.connect().await.unwrap();
/* Similar to how ops would use the resource */
- let client = Rc::new(IpcJsonStreamResource::from_stream(client));
+ let client = Rc::new(IpcJsonStreamResource::from_stream(
+ client,
+ super::IpcRefTracker::new_test(),
+ ));
(client, server)
}
@@ -467,10 +711,9 @@ mod impl_ {
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
loop {
- let msgs = ipc.read_msg().await?;
- if msgs == serde_json::Value::Null {
+ let Some(msgs) = ipc.read_msg().await? else {
break;
- }
+ };
bytes += msgs.as_str().unwrap().len();
if start.elapsed().as_secs() > 5 {
break;
@@ -501,10 +744,13 @@ mod impl_ {
Ok::<_, std::io::Error>(())
});
- ipc.clone().write_msg(json!("hello")).await?;
+ ipc
+ .clone()
+ .write_msg_bytes(&json_to_bytes(json!("hello")))
+ .await?;
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
- let msgs = ipc.read_msg().await?;
+ let msgs = ipc.read_msg().await?.unwrap();
assert_eq!(msgs, json!("world"));
child.await??;
@@ -512,6 +758,12 @@ mod impl_ {
Ok(())
}
+ fn json_to_bytes(v: deno_core::serde_json::Value) -> Vec<u8> {
+ let mut buf = deno_core::serde_json::to_vec(&v).unwrap();
+ buf.push(b'\n');
+ buf
+ }
+
#[tokio::test]
async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> {
let (ipc, mut fd2) = pair().await;
@@ -527,11 +779,17 @@ mod impl_ {
Ok::<_, std::io::Error>(())
});
- ipc.clone().write_msg(json!("hello")).await?;
- ipc.clone().write_msg(json!("world")).await?;
+ ipc
+ .clone()
+ .write_msg_bytes(&json_to_bytes(json!("hello")))
+ .await?;
+ ipc
+ .clone()
+ .write_msg_bytes(&json_to_bytes(json!("world")))
+ .await?;
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
- let msgs = ipc.read_msg().await?;
+ let msgs = ipc.read_msg().await?.unwrap();
assert_eq!(msgs, json!("foo"));
child.await??;
@@ -566,5 +824,58 @@ mod impl_ {
let empty = b"";
assert_eq!(super::memchr(b'\n', empty), None);
}
+
+ fn wrap_expr(s: &str) -> String {
+ format!("(function () {{ return {s}; }})()")
+ }
+
+ fn serialize_js_to_json(runtime: &mut JsRuntime, js: String) -> String {
+ let val = runtime.execute_script("", js).unwrap();
+ let scope = &mut runtime.handle_scope();
+ let val = v8::Local::new(scope, val);
+ let mut buf = Vec::new();
+ let mut ser = deno_core::serde_json::Serializer::new(&mut buf);
+ super::serialize_v8_value(scope, val, &mut ser).unwrap();
+ String::from_utf8(buf).unwrap()
+ }
+
+ #[test]
+ fn ipc_serialization() {
+ let mut runtime = JsRuntime::new(RuntimeOptions::default());
+
+ let cases = [
+ ("'hello'", "\"hello\""),
+ ("1", "1"),
+ ("1.5", "1.5"),
+ ("Number.NaN", "null"),
+ ("Infinity", "null"),
+ ("Number.MAX_SAFE_INTEGER", &(2i64.pow(53) - 1).to_string()),
+ (
+ "Number.MIN_SAFE_INTEGER",
+ &(-(2i64.pow(53) - 1)).to_string(),
+ ),
+ ("[1, 2, 3]", "[1,2,3]"),
+ ("new Uint8Array([1,2,3])", "[1,2,3]"),
+ (
+ "{ a: 1.5, b: { c: new ArrayBuffer(5) }}",
+ r#"{"a":1.5,"b":{"c":{}}}"#,
+ ),
+ ("new Number(1)", "1"),
+ ("new Boolean(true)", "true"),
+ ("true", "true"),
+ (r#"new String("foo")"#, "\"foo\""),
+ ("null", "null"),
+ (
+ r#"{ a: "field", toJSON() { return "custom"; } }"#,
+ "\"custom\"",
+ ),
+ ];
+
+ for (input, expect) in cases {
+ let js = wrap_expr(input);
+ let actual = serialize_js_to_json(&mut runtime, js);
+ assert_eq!(actual, expect);
+ }
+ }
}
}
diff --git a/ext/node/ops/vm.rs b/ext/node/ops/vm.rs
index df631a51f..a44d84975 100644
--- a/ext/node/ops/vm.rs
+++ b/ext/node/ops/vm.rs
@@ -140,8 +140,7 @@ mod tests {
#[test]
fn test_run_in_this_context() {
let platform = v8::new_default_platform(0, false).make_shared();
- v8::V8::initialize_platform(platform);
- v8::V8::initialize();
+ deno_core::JsRuntime::init_platform(Some(platform));
let isolate = &mut v8::Isolate::new(Default::default());
diff --git a/ext/node/polyfills/child_process.ts b/ext/node/polyfills/child_process.ts
index 2aba88d63..bb38b746c 100644
--- a/ext/node/polyfills/child_process.ts
+++ b/ext/node/polyfills/child_process.ts
@@ -115,7 +115,8 @@ export function fork(
// more
const v8Flags: string[] = [];
if (Array.isArray(execArgv)) {
- for (let index = 0; index < execArgv.length; index++) {
+ let index = 0;
+ while (index < execArgv.length) {
const flag = execArgv[index];
if (flag.startsWith("--max-old-space-size")) {
execArgv.splice(index, 1);
@@ -123,6 +124,16 @@ export function fork(
} else if (flag.startsWith("--enable-source-maps")) {
// https://github.com/denoland/deno/issues/21750
execArgv.splice(index, 1);
+ } else if (flag.startsWith("-C") || flag.startsWith("--conditions")) {
+ let rm = 1;
+ if (flag.indexOf("=") === -1) {
+ // --conditions foo
+ // so remove the next argument as well.
+ rm = 2;
+ }
+ execArgv.splice(index, rm);
+ } else {
+ index++;
}
}
}
@@ -825,7 +836,17 @@ export function execFileSync(
function setupChildProcessIpcChannel() {
const fd = op_node_child_ipc_pipe();
if (typeof fd != "number" || fd < 0) return;
- setupChannel(process, fd);
+ const control = setupChannel(process, fd);
+ process.on("newListener", (name: string) => {
+ if (name === "message" || name === "disconnect") {
+ control.refCounted();
+ }
+ });
+ process.on("removeListener", (name: string) => {
+ if (name === "message" || name === "disconnect") {
+ control.unrefCounted();
+ }
+ });
}
internals.__setupChildProcessIpcChannel = setupChildProcessIpcChannel;
diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts
index cabae63ee..2dcf0e782 100644
--- a/ext/node/polyfills/internal/child_process.ts
+++ b/ext/node/polyfills/internal/child_process.ts
@@ -7,7 +7,12 @@
// deno-lint-ignore-file prefer-primordials
import { core, internals } from "ext:core/mod.js";
-import { op_node_ipc_read, op_node_ipc_write } from "ext:core/ops";
+import {
+ op_node_ipc_read,
+ op_node_ipc_ref,
+ op_node_ipc_unref,
+ op_node_ipc_write,
+} from "ext:core/ops";
import {
ArrayIsArray,
ArrayPrototypeFilter,
@@ -17,13 +22,14 @@ import {
ArrayPrototypeSort,
ArrayPrototypeUnshift,
ObjectHasOwn,
+ StringPrototypeStartsWith,
StringPrototypeToUpperCase,
} from "ext:deno_node/internal/primordials.mjs";
import { assert } from "ext:deno_node/_util/asserts.ts";
import { EventEmitter } from "node:events";
import { os } from "ext:deno_node/internal_binding/constants.ts";
-import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
+import { notImplemented } from "ext:deno_node/_utils.ts";
import { Readable, Stream, Writable } from "node:stream";
import { isWindows } from "ext:deno_node/_util/os.ts";
import { nextTick } from "ext:deno_node/_next_tick.ts";
@@ -31,6 +37,7 @@ import {
AbortError,
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
+ ERR_IPC_CHANNEL_CLOSED,
ERR_UNKNOWN_SIGNAL,
} from "ext:deno_node/internal/errors.ts";
import { Buffer } from "node:buffer";
@@ -46,6 +53,7 @@ import {
import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs";
import process from "node:process";
+import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs";
export function mapValues<T, O>(
record: Readonly<Record<string, T>>,
@@ -97,6 +105,19 @@ export function stdioStringToArray(
return options;
}
+const kClosesNeeded = Symbol("_closesNeeded");
+const kClosesReceived = Symbol("_closesReceived");
+
+// We only want to emit a close event for the child process when all of
+// the writable streams have closed. The value of `child[kClosesNeeded]` should be 1 +
+// the number of opened writable streams (note this excludes `stdin`).
+function maybeClose(child: ChildProcess) {
+ child[kClosesReceived]++;
+ if (child[kClosesNeeded] === child[kClosesReceived]) {
+ child.emit("close", child.exitCode, child.signalCode);
+ }
+}
+
export class ChildProcess extends EventEmitter {
/**
* The exit code of the child process. This property will be `null` until the child process exits.
@@ -152,8 +173,13 @@ export class ChildProcess extends EventEmitter {
null,
];
+ disconnect?: () => void;
+
#process!: Deno.ChildProcess;
#spawned = Promise.withResolvers<void>();
+ [kClosesNeeded] = 1;
+ [kClosesReceived] = 0;
+ canDisconnect = false;
constructor(
command: string,
@@ -218,13 +244,23 @@ export class ChildProcess extends EventEmitter {
if (stdout === "pipe") {
assert(this.#process.stdout);
+ this[kClosesNeeded]++;
this.stdout = Readable.fromWeb(this.#process.stdout);
+ this.stdout.on("close", () => {
+ maybeClose(this);
+ });
}
if (stderr === "pipe") {
assert(this.#process.stderr);
+ this[kClosesNeeded]++;
this.stderr = Readable.fromWeb(this.#process.stderr);
+ this.stderr.on("close", () => {
+ maybeClose(this);
+ });
}
+ // TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their
+ // close events (like above)
this.stdio[0] = this.stdin;
this.stdio[1] = this.stdout;
@@ -259,6 +295,10 @@ export class ChildProcess extends EventEmitter {
const pipeFd = internals.getPipeFd(this.#process);
if (typeof pipeFd == "number") {
setupChannel(this, pipeFd);
+ this[kClosesNeeded]++;
+ this.on("disconnect", () => {
+ maybeClose(this);
+ });
}
(async () => {
@@ -271,7 +311,7 @@ export class ChildProcess extends EventEmitter {
this.emit("exit", exitCode, signalCode);
await this.#_waitForChildStreamsToClose();
this.#closePipes();
- this.emit("close", exitCode, signalCode);
+ maybeClose(this);
});
})();
} catch (err) {
@@ -304,7 +344,7 @@ export class ChildProcess extends EventEmitter {
}
/* Cancel any pending IPC I/O */
- if (this.implementsDisconnect) {
+ if (this.canDisconnect) {
this.disconnect?.();
}
@@ -321,10 +361,6 @@ export class ChildProcess extends EventEmitter {
this.#process.unref();
}
- disconnect() {
- warnNotImplemented("ChildProcess.prototype.disconnect");
- }
-
async #_waitForChildStreamsToClose() {
const promises = [] as Array<Promise<void>>;
// Don't close parent process stdin if that's passed through
@@ -359,6 +395,16 @@ export class ChildProcess extends EventEmitter {
assert(this.stdin);
this.stdin.destroy();
}
+ /// TODO(nathanwhit): for some reason when the child process exits
+ /// and the child end of the named pipe closes, reads still just return `Pending`
+ /// instead of returning that 0 bytes were read (to signal the pipe died).
+ /// For now, just forcibly disconnect, but in theory I think we could miss messages
+ /// that haven't been read yet.
+ if (Deno.build.os === "windows") {
+ if (this.canDisconnect) {
+ this.disconnect?.();
+ }
+ }
}
}
@@ -1099,18 +1145,109 @@ function toDenoArgs(args: string[]): string[] {
return denoArgs;
}
-export function setupChannel(target, ipc) {
+const kControlDisconnect = Symbol("kControlDisconnect");
+const kPendingMessages = Symbol("kPendingMessages");
+
+// controls refcounting for the IPC channel
+class Control extends EventEmitter {
+ #channel: number;
+ #refs: number = 0;
+ #refExplicitlySet = false;
+ #connected = true;
+ [kPendingMessages] = [];
+ constructor(channel: number) {
+ super();
+ this.#channel = channel;
+ }
+
+ #ref() {
+ if (this.#connected) {
+ op_node_ipc_ref(this.#channel);
+ }
+ }
+
+ #unref() {
+ if (this.#connected) {
+ op_node_ipc_unref(this.#channel);
+ }
+ }
+
+ [kControlDisconnect]() {
+ this.#unref();
+ this.#connected = false;
+ }
+
+ refCounted() {
+ if (++this.#refs === 1 && !this.#refExplicitlySet) {
+ this.#ref();
+ }
+ }
+
+ unrefCounted() {
+ if (--this.#refs === 0 && !this.#refExplicitlySet) {
+ this.#unref();
+ this.emit("unref");
+ }
+ }
+
+ ref() {
+ this.#refExplicitlySet = true;
+ this.#ref();
+ }
+
+ unref() {
+ this.#refExplicitlySet = false;
+ this.#unref();
+ }
+}
+
+type InternalMessage = {
+ cmd: `NODE_${string}`;
+};
+
+// deno-lint-ignore no-explicit-any
+function isInternal(msg: any): msg is InternalMessage {
+ if (msg && typeof msg === "object") {
+ const cmd = msg["cmd"];
+ if (typeof cmd === "string") {
+ return StringPrototypeStartsWith(cmd, "NODE_");
+ }
+ }
+ return false;
+}
+
+function internalCmdName(msg: InternalMessage): string {
+ return StringPrototypeSlice(msg.cmd, 5);
+}
+
+// deno-lint-ignore no-explicit-any
+export function setupChannel(target: any, ipc: number) {
+ const control = new Control(ipc);
+ target.channel = control;
+
async function readLoop() {
try {
while (true) {
if (!target.connected || target.killed) {
return;
}
- const msg = await op_node_ipc_read(ipc);
- if (msg == null) {
- // Channel closed.
- target.disconnect();
- return;
+ const prom = op_node_ipc_read(ipc);
+ // there will always be a pending read promise,
+ // but it shouldn't keep the event loop from exiting
+ core.unrefOpPromise(prom);
+ const msg = await prom;
+ if (isInternal(msg)) {
+ const cmd = internalCmdName(msg);
+ if (cmd === "CLOSE") {
+ // Channel closed.
+ target.disconnect();
+ return;
+ } else {
+ // TODO(nathanwhit): once we add support for sending
+ // handles, if we want to support deno-node IPC interop,
+ // we'll need to handle the NODE_HANDLE_* messages here.
+ continue;
+ }
}
process.nextTick(handleMessage, msg);
@@ -1126,9 +1263,29 @@ export function setupChannel(target, ipc) {
}
function handleMessage(msg) {
- target.emit("message", msg);
+ if (!target.channel) {
+ return;
+ }
+ if (target.listenerCount("message") !== 0) {
+ target.emit("message", msg);
+ return;
+ }
+
+ ArrayPrototypePush(target.channel[kPendingMessages], msg);
}
+ target.on("newListener", () => {
+ nextTick(() => {
+ if (!target.channel || !target.listenerCount("message")) {
+ return;
+ }
+ for (const msg of target.channel[kPendingMessages]) {
+ target.emit("message", msg);
+ }
+ target.channel[kPendingMessages] = [];
+ });
+ });
+
target.send = function (message, handle, options, callback) {
if (typeof handle === "function") {
callback = handle;
@@ -1151,32 +1308,55 @@ export function setupChannel(target, ipc) {
notImplemented("ChildProcess.send with handle");
}
- op_node_ipc_write(ipc, message)
+ if (!target.connected) {
+ const err = new ERR_IPC_CHANNEL_CLOSED();
+ if (typeof callback === "function") {
+ console.error("ChildProcess.send with callback");
+ process.nextTick(callback, err);
+ } else {
+ nextTick(() => target.emit("error", err));
+ }
+ return false;
+ }
+
+ // signals whether the queue is within the limit.
+ // if false, the sender should slow down.
+ // this acts as a backpressure mechanism.
+ const queueOk = [true];
+ control.refCounted();
+ op_node_ipc_write(ipc, message, queueOk)
.then(() => {
+ control.unrefCounted();
if (callback) {
process.nextTick(callback, null);
}
});
+ return queueOk[0];
};
target.connected = true;
target.disconnect = function () {
- if (!this.connected) {
- this.emit("error", new Error("IPC channel is already disconnected"));
+ if (!target.connected) {
+ target.emit("error", new Error("IPC channel is already disconnected"));
return;
}
- this.connected = false;
+ target.connected = false;
+ target.canDisconnect = false;
+ control[kControlDisconnect]();
process.nextTick(() => {
+ target.channel = null;
core.close(ipc);
target.emit("disconnect");
});
};
- target.implementsDisconnect = true;
+ target.canDisconnect = true;
// Start reading messages from the channel.
readLoop();
+
+ return control;
}
export default {
diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs
index ecf6ef49b..69fb5cf29 100644
--- a/runtime/ops/process.rs
+++ b/runtime/ops/process.rs
@@ -345,14 +345,15 @@ fn create_command(
});
/* One end returned to parent process (this) */
- let pipe_rid = Some(
- state
- .resource_table
- .add(deno_node::IpcJsonStreamResource::new(fd1 as _)?),
- );
+ let pipe_rid = Some(state.resource_table.add(
+ deno_node::IpcJsonStreamResource::new(
+ fd1 as _,
+ deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
+ )?,
+ ));
- /* The other end passed to child process via DENO_CHANNEL_FD */
- command.env("DENO_CHANNEL_FD", format!("{}", ipc));
+ /* The other end passed to child process via NODE_CHANNEL_FD */
+ command.env("NODE_CHANNEL_FD", format!("{}", ipc));
return Ok((command, pipe_rid));
}
@@ -470,14 +471,15 @@ fn create_command(
}
/* One end returned to parent process (this) */
- let pipe_fd = Some(
- state
- .resource_table
- .add(deno_node::IpcJsonStreamResource::new(hd1 as i64)?),
- );
-
- /* The other end passed to child process via DENO_CHANNEL_FD */
- command.env("DENO_CHANNEL_FD", format!("{}", hd2 as i64));
+ let pipe_fd = Some(state.resource_table.add(
+ deno_node::IpcJsonStreamResource::new(
+ hd1 as i64,
+ deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
+ )?,
+ ));
+
+ /* The other end passed to child process via NODE_CHANNEL_FD */
+ command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64));
return Ok((command, pipe_fd));
}
diff --git a/tests/unit_node/child_process_test.ts b/tests/unit_node/child_process_test.ts
index cfac0b5a9..d613d2989 100644
--- a/tests/unit_node/child_process_test.ts
+++ b/tests/unit_node/child_process_test.ts
@@ -9,8 +9,10 @@ import {
assertNotStrictEquals,
assertStrictEquals,
assertStringIncludes,
+ assertThrows,
} from "@std/assert";
import * as path from "@std/path";
+import { setTimeout } from "node:timers";
const { spawn, spawnSync, execFile, execFileSync, ChildProcess } = CP;
@@ -63,6 +65,7 @@ Deno.test("[node/child_process disconnect] the method exists", async () => {
const deferred = withTimeout<void>();
const childProcess = spawn(Deno.execPath(), ["--help"], {
env: { NO_COLOR: "true" },
+ stdio: ["pipe", "pipe", "pipe", "ipc"],
});
try {
childProcess.disconnect();
@@ -855,3 +858,191 @@ Deno.test(
assertEquals(output.stderr, null);
},
);
+
+Deno.test(
+ async function ipcSerialization() {
+ const timeout = withTimeout<void>();
+ const script = `
+ if (typeof process.send !== "function") {
+ console.error("process.send is not a function");
+ process.exit(1);
+ }
+
+ class BigIntWrapper {
+ constructor(value) {
+ this.value = value;
+ }
+ toJSON() {
+ return this.value.toString();
+ }
+ }
+
+ const makeSab = (arr) => {
+ const sab = new SharedArrayBuffer(arr.length);
+ const buf = new Uint8Array(sab);
+ for (let i = 0; i < arr.length; i++) {
+ buf[i] = arr[i];
+ }
+ return buf;
+ };
+
+
+ const inputs = [
+ "foo",
+ {
+ foo: "bar",
+ },
+ 42,
+ true,
+ null,
+ new Uint8Array([1, 2, 3]),
+ {
+ foo: new Uint8Array([1, 2, 3]),
+ bar: makeSab([4, 5, 6]),
+ },
+ [1, { foo: 2 }, [3, 4]],
+ new BigIntWrapper(42n),
+ ];
+ for (const input of inputs) {
+ process.send(input);
+ }
+ `;
+ const file = await Deno.makeTempFile();
+ await Deno.writeTextFile(file, script);
+ const child = CP.fork(file, [], {
+ stdio: ["inherit", "inherit", "inherit", "ipc"],
+ });
+ const expect = [
+ "foo",
+ {
+ foo: "bar",
+ },
+ 42,
+ true,
+ null,
+ [1, 2, 3],
+ {
+ foo: [1, 2, 3],
+ bar: [4, 5, 6],
+ },
+ [1, { foo: 2 }, [3, 4]],
+ "42",
+ ];
+ let i = 0;
+
+ child.on("message", (message) => {
+ assertEquals(message, expect[i]);
+ i++;
+ });
+ child.on("close", () => timeout.resolve());
+ await timeout.promise;
+ assertEquals(i, expect.length);
+ },
+);
+
+Deno.test(async function childProcessExitsGracefully() {
+ const testdataDir = path.join(
+ path.dirname(path.fromFileUrl(import.meta.url)),
+ "testdata",
+ );
+ const script = path.join(
+ testdataDir,
+ "node_modules",
+ "foo",
+ "index.js",
+ );
+ const p = Promise.withResolvers<void>();
+ const cp = CP.fork(script, [], {
+ cwd: testdataDir,
+ stdio: ["inherit", "inherit", "inherit", "ipc"],
+ });
+ cp.on("close", () => p.resolve());
+
+ await p.promise;
+});
+
+Deno.test(async function killMultipleTimesNoError() {
+ const loop = `
+ while (true) {
+ await new Promise((resolve) => setTimeout(resolve, 10000));
+ }
+ `;
+
+ const timeout = withTimeout<void>();
+ const file = await Deno.makeTempFile();
+ await Deno.writeTextFile(file, loop);
+ const child = CP.fork(file, [], {
+ stdio: ["inherit", "inherit", "inherit", "ipc"],
+ });
+ child.on("close", () => {
+ timeout.resolve();
+ });
+ child.kill();
+ child.kill();
+
+ // explicitly calling disconnect after kill should throw
+ assertThrows(() => child.disconnect());
+
+ await timeout.promise;
+});
+
+// Make sure that you receive messages sent before a "message" event listener is set up
+Deno.test(async function bufferMessagesIfNoListener() {
+ const code = `
+ process.on("message", (_) => {
+ process.channel.unref();
+ });
+ process.send("hello");
+ process.send("world");
+ console.error("sent messages");
+ `;
+ const file = await Deno.makeTempFile();
+ await Deno.writeTextFile(file, code);
+ const timeout = withTimeout<void>();
+ const child = CP.fork(file, [], {
+ stdio: ["inherit", "inherit", "pipe", "ipc"],
+ });
+
+ let got = 0;
+ child.on("message", (message) => {
+ if (got++ === 0) {
+ assertEquals(message, "hello");
+ } else {
+ assertEquals(message, "world");
+ }
+ });
+ child.on("close", () => {
+ timeout.resolve();
+ });
+ let stderr = "";
+ child.stderr?.on("data", (data) => {
+ stderr += data;
+ if (stderr.includes("sent messages")) {
+ // now that we've set up the listeners, and the child
+ // has sent the messages, we can let it exit
+ child.send("ready");
+ }
+ });
+ await timeout.promise;
+ assertEquals(got, 2);
+});
+
+Deno.test(async function sendAfterClosedThrows() {
+ const code = ``;
+ const file = await Deno.makeTempFile();
+ await Deno.writeTextFile(file, code);
+ const timeout = withTimeout<void>();
+ const child = CP.fork(file, [], {
+ stdio: ["inherit", "inherit", "inherit", "ipc"],
+ });
+ child.on("error", (err) => {
+ assert("code" in err);
+ assertEquals(err.code, "ERR_IPC_CHANNEL_CLOSED");
+ timeout.resolve();
+ });
+ child.on("close", () => {
+ child.send("ready");
+ });
+
+ await timeout.promise;
+});