diff options
author | Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> | 2024-07-30 16:13:24 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-30 16:13:24 -0700 |
commit | cd59fc53a528603112addfe8b10fe4e30d04e7f0 (patch) | |
tree | 1abe3976361b39ad3969aabdd2b40380ae79c85d /ext/node/ops/ipc.rs | |
parent | 3659781f88236a369aa9ca5142c0fb7d690fc898 (diff) |
fix(node): Rework node:child_process IPC (#24763)
Fixes https://github.com/denoland/deno/issues/24756. Fixes
https://github.com/denoland/deno/issues/24796.
This also gets vitest working when using
[`--pool=forks`](https://vitest.dev/guide/improving-performance#pool)
(which is the default as of vitest 2.0). Ref
https://github.com/denoland/deno/issues/23882.
---
This PR resolves a handful of issues with child_process IPC. In
particular:
- We didn't support sending typed array views over IPC
- Opening an IPC channel resulted in the event loop never exiting
- Sending a `null` over IPC would terminate the channel
- There was some UB in the read implementation (transmuting an `&[u8]`
to `&mut [u8]`)
- The `send` method wasn't returning anything, so there was no way to
signal backpressure (this also resulted in the benchmark
`child_process_ipc.mjs` being misleading, as it tried to respect
backpressure. That gave node much worse results at larger message sizes,
and gave us much worse results at smaller message sizes).
- We weren't setting up the `channel` property on the `process` global
(or on the `ChildProcess` object), and also didn't have a way to
ref/unref the channel
- Calling `kill` multiple times (or disconnecting the channel, then
calling kill) would throw an error
- Node couldn't spawn a deno subprocess and communicate with it over IPC
Diffstat (limited to 'ext/node/ops/ipc.rs')
-rw-r--r-- | ext/node/ops/ipc.rs | 579 |
1 files changed, 445 insertions, 134 deletions
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index dc0c086c1..4849a5c6c 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -15,23 +15,33 @@ mod impl_ { use std::os::fd::RawFd; use std::pin::Pin; use std::rc::Rc; + use std::sync::atomic::AtomicBool; + use std::sync::atomic::AtomicUsize; + use std::task::ready; use std::task::Context; use std::task::Poll; use deno_core::error::bad_resource_id; use deno_core::error::AnyError; use deno_core::op2; + use deno_core::serde; + use deno_core::serde::Serializer; use deno_core::serde_json; + use deno_core::v8; use deno_core::AsyncRefCell; use deno_core::CancelFuture; use deno_core::CancelHandle; + use deno_core::ExternalOpsTracker; use deno_core::OpState; use deno_core::RcRef; use deno_core::ResourceId; + use deno_core::ToV8; + use memchr::memchr; use pin_project_lite::pin_project; - use tokio::io::AsyncBufRead; + use serde::Serialize; + use tokio::io::AsyncRead; use tokio::io::AsyncWriteExt; - use tokio::io::BufReader; + use tokio::io::ReadBuf; #[cfg(unix)] use tokio::net::unix::OwnedReadHalf; @@ -43,6 +53,116 @@ mod impl_ { #[cfg(windows)] type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; + /// Wrapper around v8 value that implements Serialize. + struct SerializeWrapper<'a, 'b>( + RefCell<&'b mut v8::HandleScope<'a>>, + v8::Local<'a, v8::Value>, + ); + + impl<'a, 'b> Serialize for SerializeWrapper<'a, 'b> { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + serialize_v8_value(*self.0.borrow_mut(), self.1, serializer) + } + } + + /// Serialize a v8 value directly into a serde serializer. + /// This allows us to go from v8 values to JSON without having to + /// deserialize into a `serde_json::Value` and then reserialize to JSON + fn serialize_v8_value<'a, S: Serializer>( + scope: &mut v8::HandleScope<'a>, + value: v8::Local<'a, v8::Value>, + ser: S, + ) -> Result<S::Ok, S::Error> { + use serde::ser::Error; + if value.is_null_or_undefined() { + ser.serialize_unit() + } else if value.is_number() || value.is_number_object() { + let num_value = value.number_value(scope).unwrap(); + if (num_value as i64 as f64) == num_value { + ser.serialize_i64(num_value as i64) + } else { + ser.serialize_f64(num_value) + } + } else if value.is_string() { + let str = deno_core::serde_v8::to_utf8(value.try_into().unwrap(), scope); + ser.serialize_str(&str) + } else if value.is_string_object() { + let str = + deno_core::serde_v8::to_utf8(value.to_string(scope).unwrap(), scope); + ser.serialize_str(&str) + } else if value.is_boolean() { + ser.serialize_bool(value.is_true()) + } else if value.is_boolean_object() { + ser.serialize_bool(value.boolean_value(scope)) + } else if value.is_array() { + use serde::ser::SerializeSeq; + let array = v8::Local::<v8::Array>::try_from(value).unwrap(); + let length = array.length(); + let mut seq = ser.serialize_seq(Some(length as usize))?; + for i in 0..length { + let element = array.get_index(scope, i).unwrap(); + seq + .serialize_element(&SerializeWrapper(RefCell::new(scope), element))?; + } + seq.end() + } else if value.is_object() { + use serde::ser::SerializeMap; + if value.is_array_buffer_view() { + let buffer = v8::Local::<v8::ArrayBufferView>::try_from(value).unwrap(); + let mut buf = vec![0u8; buffer.byte_length()]; + let copied = buffer.copy_contents(&mut buf); + assert_eq!(copied, buf.len()); + return ser.serialize_bytes(&buf); + } + let object = value.to_object(scope).unwrap(); + // node uses `JSON.stringify`, so to match its behavior (and allow serializing custom objects) + // we need to respect the `toJSON` method if it exists. + let to_json_key = v8::String::new_from_utf8( + scope, + b"toJSON", + v8::NewStringType::Internalized, + ) + .unwrap() + .into(); + if let Some(to_json) = object.get(scope, to_json_key) { + if to_json.is_function() { + let to_json = v8::Local::<v8::Function>::try_from(to_json).unwrap(); + let json_value = to_json.call(scope, object.into(), &[]).unwrap(); + return serialize_v8_value(scope, json_value, ser); + } + } + + let keys = object + .get_own_property_names( + scope, + v8::GetPropertyNamesArgs { + ..Default::default() + }, + ) + .unwrap(); + let num_keys = keys.length(); + let mut map = ser.serialize_map(Some(num_keys as usize))?; + for i in 0..num_keys { + let key = keys.get_index(scope, i).unwrap(); + let key_str = key.to_rust_string_lossy(scope); + let value = object.get(scope, key).unwrap(); + map.serialize_entry( + &key_str, + &SerializeWrapper(RefCell::new(scope), value), + )?; + } + map.end() + } else { + // TODO(nathanwhit): better error message + Err(S::Error::custom(deno_core::error::type_error( + "Unsupported type", + ))) + } + } + // Open IPC pipe from bootstrap options. #[op2] #[smi] @@ -53,25 +173,66 @@ mod impl_ { Some(child_pipe_fd) => child_pipe_fd.0, None => return Ok(None), }; - + let ref_tracker = IpcRefTracker::new(state.external_ops_tracker.clone()); Ok(Some( - state.resource_table.add(IpcJsonStreamResource::new(fd)?), + state + .resource_table + .add(IpcJsonStreamResource::new(fd, ref_tracker)?), )) } #[op2(async)] - pub async fn op_node_ipc_write( + pub fn op_node_ipc_write<'a>( + scope: &mut v8::HandleScope<'a>, state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, - #[serde] value: serde_json::Value, - ) -> Result<(), AnyError> { + value: v8::Local<'a, v8::Value>, + // using an array as an "out parameter". + // index 0 is a boolean indicating whether the queue is under the limit. + // + // ideally we would just return `Result<(impl Future, bool), ..>`, but that's not + // supported by `op2` currently. + queue_ok: v8::Local<'a, v8::Array>, + ) -> Result<impl Future<Output = Result<(), AnyError>>, AnyError> { + let mut serialized = Vec::with_capacity(64); + let mut ser = serde_json::Serializer::new(&mut serialized); + serialize_v8_value(scope, value, &mut ser).map_err(|e| { + deno_core::error::type_error(format!( + "failed to serialize json value: {e}" + )) + })?; + serialized.push(b'\n'); + let stream = state .borrow() .resource_table .get::<IpcJsonStreamResource>(rid) .map_err(|_| bad_resource_id())?; - stream.write_msg(value).await?; - Ok(()) + let old = stream + .queued_bytes + .fetch_add(serialized.len(), std::sync::atomic::Ordering::Relaxed); + if old + serialized.len() > 2 * INITIAL_CAPACITY { + // sending messages too fast + let v = false.to_v8(scope)?; + queue_ok.set_index(scope, 0, v); + } + Ok(async move { + stream.clone().write_msg_bytes(&serialized).await?; + stream + .queued_bytes + .fetch_sub(serialized.len(), std::sync::atomic::Ordering::Relaxed); + Ok(()) + }) + } + + /// Value signaling that the other end ipc channel has closed. + /// + /// Node reserves objects of this form (`{ "cmd": "NODE_<something>"`) + /// for internal use, so we use it here as well to avoid breaking anyone. + fn stop_sentinel() -> serde_json::Value { + serde_json::json!({ + "cmd": "NODE_CLOSE" + }) } #[op2(async)] @@ -89,7 +250,92 @@ mod impl_ { let cancel = stream.cancel.clone(); let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await; let msgs = stream.read_msg().or_cancel(cancel).await??; - Ok(msgs) + if let Some(msg) = msgs { + Ok(msg) + } else { + Ok(stop_sentinel()) + } + } + + #[op2(fast)] + pub fn op_node_ipc_ref(state: &mut OpState, #[smi] rid: ResourceId) { + let stream = state + .resource_table + .get::<IpcJsonStreamResource>(rid) + .expect("Invalid resource ID"); + stream.ref_tracker.ref_(); + } + + #[op2(fast)] + pub fn op_node_ipc_unref(state: &mut OpState, #[smi] rid: ResourceId) { + let stream = state + .resource_table + .get::<IpcJsonStreamResource>(rid) + .expect("Invalid resource ID"); + stream.ref_tracker.unref(); + } + + /// Tracks whether the IPC resources is currently + /// refed, and allows refing/unrefing it. + pub struct IpcRefTracker { + refed: AtomicBool, + tracker: OpsTracker, + } + + /// A little wrapper so we don't have to get an + /// `ExternalOpsTracker` for tests. When we aren't + /// cfg(test), this will get optimized out. + enum OpsTracker { + External(ExternalOpsTracker), + #[cfg(test)] + Test, + } + + impl OpsTracker { + fn ref_(&self) { + match self { + Self::External(tracker) => tracker.ref_op(), + #[cfg(test)] + Self::Test => {} + } + } + + fn unref(&self) { + match self { + Self::External(tracker) => tracker.unref_op(), + #[cfg(test)] + Self::Test => {} + } + } + } + + impl IpcRefTracker { + pub fn new(tracker: ExternalOpsTracker) -> Self { + Self { + refed: AtomicBool::new(false), + tracker: OpsTracker::External(tracker), + } + } + + #[cfg(test)] + fn new_test() -> Self { + Self { + refed: AtomicBool::new(false), + tracker: OpsTracker::Test, + } + } + + fn ref_(&self) { + if !self.refed.swap(true, std::sync::atomic::Ordering::AcqRel) { + self.tracker.ref_(); + } + } + + fn unref(&self) { + if self.refed.swap(false, std::sync::atomic::Ordering::AcqRel) { + self.tracker.unref(); + } + } } pub struct IpcJsonStreamResource { @@ -99,6 +345,8 @@ mod impl_ { #[cfg(windows)] write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>, cancel: Rc<CancelHandle>, + queued_bytes: AtomicUsize, + ref_tracker: IpcRefTracker, } impl deno_core::Resource for IpcJsonStreamResource { @@ -134,64 +382,56 @@ mod impl_ { } impl IpcJsonStreamResource { - pub fn new(stream: i64) -> Result<Self, std::io::Error> { + pub fn new( + stream: i64, + ref_tracker: IpcRefTracker, + ) -> Result<Self, std::io::Error> { let (read_half, write_half) = pipe(stream as _)?; Ok(Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), cancel: Default::default(), + queued_bytes: Default::default(), + ref_tracker, }) } - #[cfg(unix)] - #[cfg(test)] - fn from_stream(stream: UnixStream) -> Self { + #[cfg(all(unix, test))] + fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self { let (read_half, write_half) = stream.into_split(); Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), cancel: Default::default(), + queued_bytes: Default::default(), + ref_tracker, } } - #[cfg(windows)] - #[cfg(test)] - fn from_stream(pipe: NamedPipeClient) -> Self { + #[cfg(all(windows, test))] + fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self { let (read_half, write_half) = tokio::io::split(pipe); Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), cancel: Default::default(), + queued_bytes: Default::default(), + ref_tracker, } } - async fn write_msg( + /// writes _newline terminated_ JSON message to the IPC pipe. + async fn write_msg_bytes( self: Rc<Self>, - msg: serde_json::Value, + msg: &[u8], ) -> Result<(), AnyError> { let mut write_half = RcRef::map(self, |r| &r.write_half).borrow_mut().await; - // Perf note: We do not benefit from writev here because - // we are always allocating a buffer for serialization anyways. - let mut buf = Vec::new(); - serde_json::to_writer(&mut buf, &msg)?; - buf.push(b'\n'); - write_half.write_all(&buf).await?; + write_half.write_all(msg).await?; Ok(()) } } - #[inline] - fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> { - #[cfg(all(target_os = "macos", target_arch = "aarch64"))] - // Safety: haystack of valid length. neon_memchr can handle unaligned - // data. - return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) }; - - #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))] - return haystack.iter().position(|&b| b == needle); - } - // Initial capacity of the buffered reader and the JSON backing buffer. // // This is a tradeoff between memory usage and performance on large messages. @@ -199,41 +439,91 @@ mod impl_ { // 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message. const INITIAL_CAPACITY: usize = 1024 * 64; + /// A buffer for reading from the IPC pipe. + /// Similar to the internal buffer of `tokio::io::BufReader`. + /// + /// This exists to provide buffered reading while granting mutable access + /// to the internal buffer (which isn't exposed through `tokio::io::BufReader` + /// or the `AsyncBufRead` trait). `simd_json` requires mutable access to an input + /// buffer for parsing, so this allows us to use the read buffer directly as the + /// input buffer without a copy (provided the message fits). + struct ReadBuffer { + buffer: Box<[u8]>, + pos: usize, + cap: usize, + } + + impl ReadBuffer { + fn new() -> Self { + Self { + buffer: vec![0; INITIAL_CAPACITY].into_boxed_slice(), + pos: 0, + cap: 0, + } + } + + fn get_mut(&mut self) -> &mut [u8] { + &mut self.buffer + } + + fn available_mut(&mut self) -> &mut [u8] { + &mut self.buffer[self.pos..self.cap] + } + + fn consume(&mut self, n: usize) { + self.pos = std::cmp::min(self.pos + n, self.cap); + } + + fn needs_fill(&self) -> bool { + self.pos >= self.cap + } + } + // JSON serialization stream over IPC pipe. // // `\n` is used as a delimiter between messages. struct IpcJsonStream { #[cfg(unix)] - pipe: BufReader<OwnedReadHalf>, + pipe: OwnedReadHalf, #[cfg(windows)] - pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>, + pipe: tokio::io::ReadHalf<NamedPipeClient>, buffer: Vec<u8>, + read_buffer: ReadBuffer, } impl IpcJsonStream { #[cfg(unix)] fn new(pipe: OwnedReadHalf) -> Self { Self { - pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + pipe, buffer: Vec::with_capacity(INITIAL_CAPACITY), + read_buffer: ReadBuffer::new(), } } #[cfg(windows)] fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self { Self { - pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + pipe, buffer: Vec::with_capacity(INITIAL_CAPACITY), + read_buffer: ReadBuffer::new(), } } - async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> { + async fn read_msg( + &mut self, + ) -> Result<Option<serde_json::Value>, AnyError> { let mut json = None; - let nread = - read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?; + let nread = read_msg_inner( + &mut self.pipe, + &mut self.buffer, + &mut json, + &mut self.read_buffer, + ) + .await?; if nread == 0 { // EOF. - return Ok(serde_json::Value::Null); + return Ok(None); } let json = match json { @@ -250,7 +540,7 @@ mod impl_ { self.buffer.set_len(0); } - Ok(json) + Ok(Some(json)) } } @@ -263,6 +553,7 @@ mod impl_ { // The number of bytes appended to buf. This can be less than buf.len() if // the buffer was not empty when the operation was started. read: usize, + read_buffer: &'a mut ReadBuffer, } } @@ -270,43 +561,41 @@ mod impl_ { reader: &'a mut R, buf: &'a mut Vec<u8>, json: &'a mut Option<serde_json::Value>, + read_buffer: &'a mut ReadBuffer, ) -> ReadMsgInner<'a, R> where - R: AsyncBufRead + ?Sized + Unpin, + R: AsyncRead + ?Sized + Unpin, { ReadMsgInner { reader, buf, json, read: 0, + read_buffer, } } - fn read_msg_internal<R: AsyncBufRead + ?Sized>( + fn read_msg_internal<R: AsyncRead + ?Sized>( mut reader: Pin<&mut R>, cx: &mut Context<'_>, buf: &mut Vec<u8>, + read_buffer: &mut ReadBuffer, json: &mut Option<serde_json::Value>, read: &mut usize, ) -> Poll<io::Result<usize>> { loop { let (done, used) = { - let available = match reader.as_mut().poll_fill_buf(cx) { - std::task::Poll::Ready(t) => t?, - std::task::Poll::Pending => return std::task::Poll::Pending, - }; + // effectively a tiny `poll_fill_buf`, but allows us to get a mutable reference to the buffer. + if read_buffer.needs_fill() { + let mut read_buf = ReadBuf::new(read_buffer.get_mut()); + ready!(reader.as_mut().poll_read(cx, &mut read_buf))?; + read_buffer.cap = read_buf.filled().len(); + read_buffer.pos = 0; + } + let available = read_buffer.available_mut(); if let Some(i) = memchr(b'\n', available) { if *read == 0 { // Fast path: parse and put into the json slot directly. - // - // Safety: It is ok to overwrite the contents because - // we don't need to copy it into the buffer and the length will be reset. - let available = unsafe { - std::slice::from_raw_parts_mut( - available.as_ptr() as *mut u8, - available.len(), - ) - }; json.replace( simd_json::from_slice(&mut available[..i + 1]) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, @@ -323,7 +612,7 @@ mod impl_ { } }; - reader.as_mut().consume(used); + read_buffer.consume(used); *read += used; if done || used == 0 { return Poll::Ready(Ok(mem::replace(read, 0))); @@ -331,81 +620,30 @@ mod impl_ { } } - impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> { + impl<R: AsyncRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> { type Output = io::Result<usize>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let me = self.project(); - read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read) - } - } - - #[cfg(all(target_os = "macos", target_arch = "aarch64"))] - mod neon { - use std::arch::aarch64::*; - - pub unsafe fn neon_memchr( - str: &[u8], - c: u8, - length: usize, - ) -> Option<usize> { - let end = str.as_ptr().wrapping_add(length); - - // Alignment handling - let mut ptr = str.as_ptr(); - while ptr < end && (ptr as usize) & 0xF != 0 { - if *ptr == c { - return Some(ptr as usize - str.as_ptr() as usize); - } - ptr = ptr.wrapping_add(1); - } - - let search_char = vdupq_n_u8(c); - - while ptr.wrapping_add(16) <= end { - let chunk = vld1q_u8(ptr); - let comparison = vceqq_u8(chunk, search_char); - - // Check first 64 bits - let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0); - if result0 != 0 { - return Some( - (ptr as usize - str.as_ptr() as usize) - + result0.trailing_zeros() as usize / 8, - ); - } - - // Check second 64 bits - let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1); - if result1 != 0 { - return Some( - (ptr as usize - str.as_ptr() as usize) - + 8 - + result1.trailing_zeros() as usize / 8, - ); - } - - ptr = ptr.wrapping_add(16); - } - - // Handle remaining unaligned characters - while ptr < end { - if *ptr == c { - return Some(ptr as usize - str.as_ptr() as usize); - } - ptr = ptr.wrapping_add(1); - } - - None + read_msg_internal( + Pin::new(*me.reader), + cx, + me.buf, + me.read_buffer, + me.json, + me.read, + ) } } #[cfg(test)] mod tests { use super::IpcJsonStreamResource; - use deno_core::serde_json; use deno_core::serde_json::json; + use deno_core::v8; + use deno_core::JsRuntime; use deno_core::RcRef; + use deno_core::RuntimeOptions; use std::rc::Rc; #[allow(clippy::unused_async)] @@ -414,7 +652,10 @@ mod impl_ { let (a, b) = tokio::net::UnixStream::pair().unwrap(); /* Similar to how ops would use the resource */ - let a = Rc::new(IpcJsonStreamResource::from_stream(a)); + let a = Rc::new(IpcJsonStreamResource::from_stream( + a, + super::IpcRefTracker::new_test(), + )); (a, b) } @@ -434,7 +675,10 @@ mod impl_ { server.connect().await.unwrap(); /* Similar to how ops would use the resource */ - let client = Rc::new(IpcJsonStreamResource::from_stream(client)); + let client = Rc::new(IpcJsonStreamResource::from_stream( + client, + super::IpcRefTracker::new_test(), + )); (client, server) } @@ -467,10 +711,9 @@ mod impl_ { let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; loop { - let msgs = ipc.read_msg().await?; - if msgs == serde_json::Value::Null { + let Some(msgs) = ipc.read_msg().await? else { break; - } + }; bytes += msgs.as_str().unwrap().len(); if start.elapsed().as_secs() > 5 { break; @@ -501,10 +744,13 @@ mod impl_ { Ok::<_, std::io::Error>(()) }); - ipc.clone().write_msg(json!("hello")).await?; + ipc + .clone() + .write_msg_bytes(&json_to_bytes(json!("hello"))) + .await?; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; - let msgs = ipc.read_msg().await?; + let msgs = ipc.read_msg().await?.unwrap(); assert_eq!(msgs, json!("world")); child.await??; @@ -512,6 +758,12 @@ mod impl_ { Ok(()) } + fn json_to_bytes(v: deno_core::serde_json::Value) -> Vec<u8> { + let mut buf = deno_core::serde_json::to_vec(&v).unwrap(); + buf.push(b'\n'); + buf + } + #[tokio::test] async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> { let (ipc, mut fd2) = pair().await; @@ -527,11 +779,17 @@ mod impl_ { Ok::<_, std::io::Error>(()) }); - ipc.clone().write_msg(json!("hello")).await?; - ipc.clone().write_msg(json!("world")).await?; + ipc + .clone() + .write_msg_bytes(&json_to_bytes(json!("hello"))) + .await?; + ipc + .clone() + .write_msg_bytes(&json_to_bytes(json!("world"))) + .await?; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; - let msgs = ipc.read_msg().await?; + let msgs = ipc.read_msg().await?.unwrap(); assert_eq!(msgs, json!("foo")); child.await??; @@ -566,5 +824,58 @@ mod impl_ { let empty = b""; assert_eq!(super::memchr(b'\n', empty), None); } + + fn wrap_expr(s: &str) -> String { + format!("(function () {{ return {s}; }})()") + } + + fn serialize_js_to_json(runtime: &mut JsRuntime, js: String) -> String { + let val = runtime.execute_script("", js).unwrap(); + let scope = &mut runtime.handle_scope(); + let val = v8::Local::new(scope, val); + let mut buf = Vec::new(); + let mut ser = deno_core::serde_json::Serializer::new(&mut buf); + super::serialize_v8_value(scope, val, &mut ser).unwrap(); + String::from_utf8(buf).unwrap() + } + + #[test] + fn ipc_serialization() { + let mut runtime = JsRuntime::new(RuntimeOptions::default()); + + let cases = [ + ("'hello'", "\"hello\""), + ("1", "1"), + ("1.5", "1.5"), + ("Number.NaN", "null"), + ("Infinity", "null"), + ("Number.MAX_SAFE_INTEGER", &(2i64.pow(53) - 1).to_string()), + ( + "Number.MIN_SAFE_INTEGER", + &(-(2i64.pow(53) - 1)).to_string(), + ), + ("[1, 2, 3]", "[1,2,3]"), + ("new Uint8Array([1,2,3])", "[1,2,3]"), + ( + "{ a: 1.5, b: { c: new ArrayBuffer(5) }}", + r#"{"a":1.5,"b":{"c":{}}}"#, + ), + ("new Number(1)", "1"), + ("new Boolean(true)", "true"), + ("true", "true"), + (r#"new String("foo")"#, "\"foo\""), + ("null", "null"), + ( + r#"{ a: "field", toJSON() { return "custom"; } }"#, + "\"custom\"", + ), + ]; + + for (input, expect) in cases { + let js = wrap_expr(input); + let actual = serialize_js_to_json(&mut runtime, js); + assert_eq!(actual, expect); + } + } } } |