summaryrefslogtreecommitdiff
path: root/ext/node/ops/ipc.rs
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2023-12-19 18:07:22 +0530
committerGitHub <noreply@github.com>2023-12-19 13:37:22 +0100
commit55fac9f5ead6d30996400e8597c969b675c5a22b (patch)
treef7646976d8e96cc3ca9e158f6a99828c648453e0 /ext/node/ops/ipc.rs
parentaefa205f63d6e4d0d56b9fd45b3d25e03509b9fb (diff)
fix(node): child_process IPC on Windows (#21597)
This PR implements the child_process IPC pipe between parent and child. The implementation uses Windows named pipes created by parent and passes the inheritable file handle to the child. I've also replace parts of the initial implementation which passed the raw parent fd to JS with resource ids instead. This way no file handle is exposed to the JS land (both parent and child). `IpcJsonStreamResource` can stream upto 800MB/s of JSON data on Win 11 AMD Ryzen 7 16GB (without `memchr` vectorization)
Diffstat (limited to 'ext/node/ops/ipc.rs')
-rw-r--r--ext/node/ops/ipc.rs180
1 files changed, 110 insertions, 70 deletions
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs
index afaccc49f..9b2b27c71 100644
--- a/ext/node/ops/ipc.rs
+++ b/ext/node/ops/ipc.rs
@@ -1,20 +1,17 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-#[cfg(unix)]
-pub use unix::*;
+pub use impl_::*;
-#[cfg(windows)]
-pub use windows::*;
+pub struct ChildPipeFd(pub i64);
-pub struct ChildPipeFd(pub i32);
-
-#[cfg(unix)]
-mod unix {
+mod impl_ {
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::mem;
+ #[cfg(unix)]
use std::os::fd::FromRawFd;
+ #[cfg(unix)]
use std::os::fd::RawFd;
use std::pin::Pin;
use std::rc::Rc;
@@ -35,18 +32,16 @@ mod unix {
use tokio::io::AsyncBufRead;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
+
+ #[cfg(unix)]
use tokio::net::unix::OwnedReadHalf;
+ #[cfg(unix)]
use tokio::net::unix::OwnedWriteHalf;
+ #[cfg(unix)]
use tokio::net::UnixStream;
- #[op2(fast)]
- #[smi]
- pub fn op_node_ipc_pipe(
- state: &mut OpState,
- #[smi] fd: i32,
- ) -> Result<ResourceId, AnyError> {
- Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?))
- }
+ #[cfg(windows)]
+ type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;
// Open IPC pipe from bootstrap options.
#[op2]
@@ -97,9 +92,12 @@ mod unix {
Ok(msgs)
}
- struct IpcJsonStreamResource {
+ pub struct IpcJsonStreamResource {
read_half: AsyncRefCell<IpcJsonStream>,
+ #[cfg(unix)]
write_half: AsyncRefCell<OwnedWriteHalf>,
+ #[cfg(windows)]
+ write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
cancel: Rc<CancelHandle>,
}
@@ -109,14 +107,35 @@ mod unix {
}
}
+ #[cfg(unix)]
+ fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> {
+ // Safety: The fd is part of a pair of connected sockets create by child process
+ // implementation.
+ let unix_stream = UnixStream::from_std(unsafe {
+ std::os::unix::net::UnixStream::from_raw_fd(stream)
+ })?;
+ Ok(unix_stream.into_split())
+ }
+
+ #[cfg(windows)]
+ fn pipe(
+ handle: i64,
+ ) -> Result<
+ (
+ tokio::io::ReadHalf<NamedPipeClient>,
+ tokio::io::WriteHalf<NamedPipeClient>,
+ ),
+ io::Error,
+ > {
+ // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
+ // fd handle map will be the same.
+ let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? };
+ Ok(tokio::io::split(pipe))
+ }
+
impl IpcJsonStreamResource {
- fn new(stream: RawFd) -> Result<Self, std::io::Error> {
- // Safety: The fd is part of a pair of connected sockets create by child process
- // implementation.
- let unix_stream = UnixStream::from_std(unsafe {
- std::os::unix::net::UnixStream::from_raw_fd(stream)
- })?;
- let (read_half, write_half) = unix_stream.into_split();
+ pub fn new(stream: i64) -> Result<Self, std::io::Error> {
+ let (read_half, write_half) = pipe(stream as _)?;
Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
@@ -124,8 +143,9 @@ mod unix {
})
}
+ #[cfg(unix)]
#[cfg(test)]
- fn from_unix_stream(stream: UnixStream) -> Self {
+ fn from_stream(stream: UnixStream) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
@@ -134,6 +154,17 @@ mod unix {
}
}
+ #[cfg(windows)]
+ #[cfg(test)]
+ fn from_stream(pipe: NamedPipeClient) -> Self {
+ let (read_half, write_half) = tokio::io::split(pipe);
+ Self {
+ read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
+ write_half: AsyncRefCell::new(write_half),
+ cancel: Default::default(),
+ }
+ }
+
async fn write_msg(
self: Rc<Self>,
msg: serde_json::Value,
@@ -172,11 +203,15 @@ mod unix {
//
// `\n` is used as a delimiter between messages.
struct IpcJsonStream {
+ #[cfg(unix)]
pipe: BufReader<OwnedReadHalf>,
+ #[cfg(windows)]
+ pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>,
buffer: Vec<u8>,
}
impl IpcJsonStream {
+ #[cfg(unix)]
fn new(pipe: OwnedReadHalf) -> Self {
Self {
pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
@@ -184,6 +219,14 @@ mod unix {
}
}
+ #[cfg(windows)]
+ fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
+ Self {
+ pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
+ buffer: Vec::with_capacity(INITIAL_CAPACITY),
+ }
+ }
+
async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> {
let mut json = None;
let nread =
@@ -252,7 +295,6 @@ mod unix {
std::task::Poll::Ready(t) => t?,
std::task::Poll::Pending => return std::task::Poll::Pending,
};
-
if let Some(i) = memchr(b'\n', available) {
if *read == 0 {
// Fast path: parse and put into the json slot directly.
@@ -366,6 +408,35 @@ mod unix {
use deno_core::RcRef;
use std::rc::Rc;
+ #[cfg(unix)]
+ pub async fn pair() -> (Rc<IpcJsonStreamResource>, tokio::net::UnixStream) {
+ let (a, b) = tokio::net::UnixStream::pair().unwrap();
+
+ /* Similar to how ops would use the resource */
+ let a = Rc::new(IpcJsonStreamResource::from_stream(a));
+ (a, b)
+ }
+
+ #[cfg(windows)]
+ pub async fn pair() -> (
+ Rc<IpcJsonStreamResource>,
+ tokio::net::windows::named_pipe::NamedPipeServer,
+ ) {
+ use tokio::net::windows::named_pipe::ClientOptions;
+ use tokio::net::windows::named_pipe::ServerOptions;
+
+ let name =
+ format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::<u32>());
+
+ let server = ServerOptions::new().create(name.clone()).unwrap();
+ let client = ClientOptions::new().open(name).unwrap();
+
+ server.connect().await.unwrap();
+ /* Similar to how ops would use the resource */
+ let client = Rc::new(IpcJsonStreamResource::from_stream(client));
+ (client, server)
+ }
+
#[tokio::test]
async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> {
// A simple round trip benchmark for quick dev feedback.
@@ -375,7 +446,7 @@ mod unix {
return Ok(());
}
- let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
@@ -389,8 +460,6 @@ mod unix {
Ok::<_, std::io::Error>(())
});
- let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
-
let start = std::time::Instant::now();
let mut bytes = 0;
@@ -416,21 +485,20 @@ mod unix {
#[tokio::test]
async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> {
- let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
- let mut buf = [0u8; 1024];
- let n = fd2.read(&mut buf).await?;
- assert_eq!(&buf[..n], b"\"hello\"\n");
+ const EXPECTED: &[u8] = b"\"hello\"\n";
+ let mut buf = [0u8; EXPECTED.len()];
+ let n = fd2.read_exact(&mut buf).await?;
+ assert_eq!(&buf[..n], EXPECTED);
fd2.write_all(b"\"world\"\n").await?;
+
Ok::<_, std::io::Error>(())
});
- /* Similar to how ops would use the resource */
- let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
-
ipc.clone().write_msg(json!("hello")).await?;
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
@@ -444,19 +512,19 @@ mod unix {
#[tokio::test]
async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> {
- let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
- let mut buf = [0u8; 1024];
- let n = fd2.read(&mut buf).await?;
- assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n");
+ const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n";
+ let mut buf = [0u8; EXPECTED.len()];
+ let n = fd2.read_exact(&mut buf).await?;
+ assert_eq!(&buf[..n], EXPECTED);
fd2.write_all(b"\"foo\"\n\"bar\"\n").await?;
Ok::<_, std::io::Error>(())
});
- let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
ipc.clone().write_msg(json!("hello")).await?;
ipc.clone().write_msg(json!("world")).await?;
@@ -471,13 +539,12 @@ mod unix {
#[tokio::test]
async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> {
- let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?;
Ok::<_, std::io::Error>(())
});
- let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
let _err = ipc.read_msg().await.unwrap_err();
@@ -499,30 +566,3 @@ mod unix {
}
}
}
-
-#[cfg(windows)]
-mod windows {
- use deno_core::error::AnyError;
- use deno_core::op2;
-
- #[op2(fast)]
- pub fn op_node_ipc_pipe() -> Result<(), AnyError> {
- Err(deno_core::error::not_supported())
- }
-
- #[op2(fast)]
- #[smi]
- pub fn op_node_child_ipc_pipe() -> Result<i32, AnyError> {
- Ok(-1)
- }
-
- #[op2(async)]
- pub async fn op_node_ipc_write() -> Result<(), AnyError> {
- Err(deno_core::error::not_supported())
- }
-
- #[op2(async)]
- pub async fn op_node_ipc_read() -> Result<(), AnyError> {
- Err(deno_core::error::not_supported())
- }
-}