diff options
Diffstat (limited to 'ext/node/ops/ipc.rs')
-rw-r--r-- | ext/node/ops/ipc.rs | 504 |
1 files changed, 504 insertions, 0 deletions
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs new file mode 100644 index 000000000..d1aeeb40c --- /dev/null +++ b/ext/node/ops/ipc.rs @@ -0,0 +1,504 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +#[cfg(unix)] +pub use unix::*; + +#[cfg(windows)] +pub use windows::*; + +#[cfg(unix)] +mod unix { + use std::cell::RefCell; + use std::future::Future; + use std::io; + use std::mem; + use std::os::fd::FromRawFd; + use std::os::fd::RawFd; + use std::pin::Pin; + use std::rc::Rc; + use std::task::Context; + use std::task::Poll; + + use deno_core::error::bad_resource_id; + use deno_core::error::AnyError; + use deno_core::op2; + use deno_core::serde_json; + use deno_core::AsyncRefCell; + use deno_core::CancelFuture; + use deno_core::CancelHandle; + use deno_core::OpState; + use deno_core::RcRef; + use deno_core::ResourceId; + use pin_project_lite::pin_project; + use tokio::io::AsyncBufRead; + use tokio::io::AsyncWriteExt; + use tokio::io::BufReader; + use tokio::net::unix::OwnedReadHalf; + use tokio::net::unix::OwnedWriteHalf; + use tokio::net::UnixStream; + + #[op2(fast)] + #[smi] + pub fn op_node_ipc_pipe( + state: &mut OpState, + #[smi] fd: i32, + ) -> Result<ResourceId, AnyError> { + Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?)) + } + + #[op2(async)] + pub async fn op_node_ipc_write( + state: Rc<RefCell<OpState>>, + #[smi] rid: ResourceId, + #[serde] value: serde_json::Value, + ) -> Result<(), AnyError> { + let stream = state + .borrow() + .resource_table + .get::<IpcJsonStreamResource>(rid) + .map_err(|_| bad_resource_id())?; + stream.write_msg(value).await?; + Ok(()) + } + + #[op2(async)] + #[serde] + pub async fn op_node_ipc_read( + state: Rc<RefCell<OpState>>, + #[smi] rid: ResourceId, + ) -> Result<serde_json::Value, AnyError> { + let stream = state + .borrow() + .resource_table + .get::<IpcJsonStreamResource>(rid) + .map_err(|_| bad_resource_id())?; + + let cancel = stream.cancel.clone(); + let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await; + let msgs = stream.read_msg().or_cancel(cancel).await??; + Ok(msgs) + } + + struct IpcJsonStreamResource { + read_half: AsyncRefCell<IpcJsonStream>, + write_half: AsyncRefCell<OwnedWriteHalf>, + cancel: Rc<CancelHandle>, + } + + impl deno_core::Resource for IpcJsonStreamResource { + fn close(self: Rc<Self>) { + self.cancel.cancel(); + } + } + + impl IpcJsonStreamResource { + fn new(stream: RawFd) -> Result<Self, std::io::Error> { + // Safety: The fd is part of a pair of connected sockets create by child process + // implementation. + let unix_stream = UnixStream::from_std(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(stream) + })?; + let (read_half, write_half) = unix_stream.into_split(); + Ok(Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + }) + } + + #[cfg(test)] + fn from_unix_stream(stream: UnixStream) -> Self { + let (read_half, write_half) = stream.into_split(); + Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + } + } + + async fn write_msg( + self: Rc<Self>, + msg: serde_json::Value, + ) -> Result<(), AnyError> { + let mut write_half = + RcRef::map(self, |r| &r.write_half).borrow_mut().await; + // Perf note: We do not benefit from writev here because + // we are always allocating a buffer for serialization anyways. + let mut buf = Vec::new(); + serde_json::to_writer(&mut buf, &msg)?; + buf.push(b'\n'); + write_half.write_all(&buf).await?; + Ok(()) + } + } + + #[inline] + fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> { + #[cfg(all(target_os = "macos", target_arch = "aarch64"))] + // Safety: haystack of valid length. neon_memchr can handle unaligned + // data. + return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) }; + + #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))] + return haystack.iter().position(|&b| b == needle); + } + + // Initial capacity of the buffered reader and the JSON backing buffer. + // + // This is a tradeoff between memory usage and performance on large messages. + // + // 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message. + const INITIAL_CAPACITY: usize = 1024 * 64; + + // JSON serialization stream over IPC pipe. + // + // `\n` is used as a delimiter between messages. + struct IpcJsonStream { + pipe: BufReader<OwnedReadHalf>, + buffer: Vec<u8>, + } + + impl IpcJsonStream { + fn new(pipe: OwnedReadHalf) -> Self { + Self { + pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + buffer: Vec::with_capacity(INITIAL_CAPACITY), + } + } + + async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> { + let mut json = None; + let nread = + read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?; + if nread == 0 { + // EOF. + return Ok(serde_json::Value::Null); + } + + let json = match json { + Some(v) => v, + None => { + // Took more than a single read and some buffering. + simd_json::from_slice(&mut self.buffer[..nread])? + } + }; + + // Safety: Same as `Vec::clear` but without the `drop_in_place` for + // each element (nop for u8). Capacity remains the same. + unsafe { + self.buffer.set_len(0); + } + + Ok(json) + } + } + + pin_project! { + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct ReadMsgInner<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut Vec<u8>, + json: &'a mut Option<serde_json::Value>, + // The number of bytes appended to buf. This can be less than buf.len() if + // the buffer was not empty when the operation was started. + read: usize, + } + } + + fn read_msg_inner<'a, R>( + reader: &'a mut R, + buf: &'a mut Vec<u8>, + json: &'a mut Option<serde_json::Value>, + ) -> ReadMsgInner<'a, R> + where + R: AsyncBufRead + ?Sized + Unpin, + { + ReadMsgInner { + reader, + buf, + json, + read: 0, + } + } + + fn read_msg_internal<R: AsyncBufRead + ?Sized>( + mut reader: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut Vec<u8>, + json: &mut Option<serde_json::Value>, + read: &mut usize, + ) -> Poll<io::Result<usize>> { + loop { + let (done, used) = { + let available = match reader.as_mut().poll_fill_buf(cx) { + std::task::Poll::Ready(t) => t?, + std::task::Poll::Pending => return std::task::Poll::Pending, + }; + + if let Some(i) = memchr(b'\n', available) { + if *read == 0 { + // Fast path: parse and put into the json slot directly. + // + // Safety: It is ok to overwrite the contents because + // we don't need to copy it into the buffer and the length will be reset. + let available = unsafe { + std::slice::from_raw_parts_mut( + available.as_ptr() as *mut u8, + available.len(), + ) + }; + json.replace( + simd_json::from_slice(&mut available[..i + 1]) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, + ); + } else { + // This is not the first read, so we have to copy the data + // to make it contiguous. + buf.extend_from_slice(&available[..=i]); + } + (true, i + 1) + } else { + buf.extend_from_slice(available); + (false, available.len()) + } + }; + + reader.as_mut().consume(used); + *read += used; + if done || used == 0 { + return Poll::Ready(Ok(mem::replace(read, 0))); + } + } + } + + impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> { + type Output = io::Result<usize>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let me = self.project(); + read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read) + } + } + + #[cfg(all(target_os = "macos", target_arch = "aarch64"))] + mod neon { + use std::arch::aarch64::*; + + pub unsafe fn neon_memchr( + str: &[u8], + c: u8, + length: usize, + ) -> Option<usize> { + let end = str.as_ptr().wrapping_add(length); + + // Alignment handling + let mut ptr = str.as_ptr(); + while ptr < end && (ptr as usize) & 0xF != 0 { + if *ptr == c { + return Some(ptr as usize - str.as_ptr() as usize); + } + ptr = ptr.wrapping_add(1); + } + + let search_char = vdupq_n_u8(c); + + while ptr.wrapping_add(16) <= end { + let chunk = vld1q_u8(ptr); + let comparison = vceqq_u8(chunk, search_char); + + // Check first 64 bits + let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0); + if result0 != 0 { + return Some( + (ptr as usize - str.as_ptr() as usize) + + result0.trailing_zeros() as usize / 8, + ); + } + + // Check second 64 bits + let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1); + if result1 != 0 { + return Some( + (ptr as usize - str.as_ptr() as usize) + + 8 + + result1.trailing_zeros() as usize / 8, + ); + } + + ptr = ptr.wrapping_add(16); + } + + // Handle remaining unaligned characters + while ptr < end { + if *ptr == c { + return Some(ptr as usize - str.as_ptr() as usize); + } + ptr = ptr.wrapping_add(1); + } + + None + } + } + + #[cfg(test)] + mod tests { + use super::IpcJsonStreamResource; + use deno_core::serde_json; + use deno_core::serde_json::json; + use deno_core::RcRef; + use std::rc::Rc; + + #[tokio::test] + async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> { + // A simple round trip benchmark for quick dev feedback. + // + // Only ran when the env var is set. + if std::env::var_os("BENCH_IPC_DENO").is_none() { + return Ok(()); + } + + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncWriteExt; + + let size = 1024 * 1024; + + let stri = "x".repeat(size); + let data = format!("\"{}\"\n", stri); + for _ in 0..100 { + fd2.write_all(data.as_bytes()).await?; + } + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + + let start = std::time::Instant::now(); + let mut bytes = 0; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + loop { + let msgs = ipc.read_msg().await?; + if msgs == serde_json::Value::Null { + break; + } + bytes += msgs.as_str().unwrap().len(); + if start.elapsed().as_secs() > 5 { + break; + } + } + let elapsed = start.elapsed(); + let mb = bytes as f64 / 1024.0 / 1024.0; + println!("{} mb/s", mb / elapsed.as_secs_f64()); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + let mut buf = [0u8; 1024]; + let n = fd2.read(&mut buf).await?; + assert_eq!(&buf[..n], b"\"hello\"\n"); + fd2.write_all(b"\"world\"\n").await?; + Ok::<_, std::io::Error>(()) + }); + + /* Similar to how ops would use the resource */ + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + + ipc.clone().write_msg(json!("hello")).await?; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let msgs = ipc.read_msg().await?; + assert_eq!(msgs, json!("world")); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + let mut buf = [0u8; 1024]; + let n = fd2.read(&mut buf).await?; + assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n"); + fd2.write_all(b"\"foo\"\n\"bar\"\n").await?; + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + ipc.clone().write_msg(json!("hello")).await?; + ipc.clone().write_msg(json!("world")).await?; + + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let msgs = ipc.read_msg().await?; + assert_eq!(msgs, json!("foo")); + + child.await??; + + Ok(()) + } + + #[tokio::test] + async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> { + let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let child = tokio::spawn(async move { + tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?; + Ok::<_, std::io::Error>(()) + }); + + let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); + let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; + let _err = ipc.read_msg().await.unwrap_err(); + + child.await??; + + Ok(()) + } + + #[test] + fn memchr() { + let str = b"hello world"; + assert_eq!(super::memchr(b'h', str), Some(0)); + assert_eq!(super::memchr(b'w', str), Some(6)); + assert_eq!(super::memchr(b'd', str), Some(10)); + assert_eq!(super::memchr(b'x', str), None); + + let empty = b""; + assert_eq!(super::memchr(b'\n', empty), None); + } + } +} + +#[cfg(windows)] +mod windows { + use deno_core::error::AnyError; + use deno_core::op2; + + #[op2(fast)] + pub fn op_node_ipc_pipe() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } + + #[op2(async)] + pub async fn op_node_ipc_write() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } + + #[op2(async)] + pub async fn op_node_ipc_read() -> Result<(), AnyError> { + Err(deno_core::error::not_supported()) + } +} |