diff options
Diffstat (limited to 'ext/node/ops/ipc.rs')
-rw-r--r-- | ext/node/ops/ipc.rs | 180 |
1 files changed, 110 insertions, 70 deletions
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index afaccc49f..9b2b27c71 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -1,20 +1,17 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -#[cfg(unix)] -pub use unix::*; +pub use impl_::*; -#[cfg(windows)] -pub use windows::*; +pub struct ChildPipeFd(pub i64); -pub struct ChildPipeFd(pub i32); - -#[cfg(unix)] -mod unix { +mod impl_ { use std::cell::RefCell; use std::future::Future; use std::io; use std::mem; + #[cfg(unix)] use std::os::fd::FromRawFd; + #[cfg(unix)] use std::os::fd::RawFd; use std::pin::Pin; use std::rc::Rc; @@ -35,18 +32,16 @@ mod unix { use tokio::io::AsyncBufRead; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; + + #[cfg(unix)] use tokio::net::unix::OwnedReadHalf; + #[cfg(unix)] use tokio::net::unix::OwnedWriteHalf; + #[cfg(unix)] use tokio::net::UnixStream; - #[op2(fast)] - #[smi] - pub fn op_node_ipc_pipe( - state: &mut OpState, - #[smi] fd: i32, - ) -> Result<ResourceId, AnyError> { - Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?)) - } + #[cfg(windows)] + type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; // Open IPC pipe from bootstrap options. #[op2] @@ -97,9 +92,12 @@ mod unix { Ok(msgs) } - struct IpcJsonStreamResource { + pub struct IpcJsonStreamResource { read_half: AsyncRefCell<IpcJsonStream>, + #[cfg(unix)] write_half: AsyncRefCell<OwnedWriteHalf>, + #[cfg(windows)] + write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>, cancel: Rc<CancelHandle>, } @@ -109,14 +107,35 @@ mod unix { } } + #[cfg(unix)] + fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> { + // Safety: The fd is part of a pair of connected sockets create by child process + // implementation. + let unix_stream = UnixStream::from_std(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(stream) + })?; + Ok(unix_stream.into_split()) + } + + #[cfg(windows)] + fn pipe( + handle: i64, + ) -> Result< + ( + tokio::io::ReadHalf<NamedPipeClient>, + tokio::io::WriteHalf<NamedPipeClient>, + ), + io::Error, + > { + // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the + // fd handle map will be the same. + let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? }; + Ok(tokio::io::split(pipe)) + } + impl IpcJsonStreamResource { - fn new(stream: RawFd) -> Result<Self, std::io::Error> { - // Safety: The fd is part of a pair of connected sockets create by child process - // implementation. - let unix_stream = UnixStream::from_std(unsafe { - std::os::unix::net::UnixStream::from_raw_fd(stream) - })?; - let (read_half, write_half) = unix_stream.into_split(); + pub fn new(stream: i64) -> Result<Self, std::io::Error> { + let (read_half, write_half) = pipe(stream as _)?; Ok(Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), @@ -124,8 +143,9 @@ mod unix { }) } + #[cfg(unix)] #[cfg(test)] - fn from_unix_stream(stream: UnixStream) -> Self { + fn from_stream(stream: UnixStream) -> Self { let (read_half, write_half) = stream.into_split(); Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), @@ -134,6 +154,17 @@ mod unix { } } + #[cfg(windows)] + #[cfg(test)] + fn from_stream(pipe: NamedPipeClient) -> Self { + let (read_half, write_half) = tokio::io::split(pipe); + Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + } + } + async fn write_msg( self: Rc<Self>, msg: serde_json::Value, @@ -172,11 +203,15 @@ mod unix { // // `\n` is used as a delimiter between messages. struct IpcJsonStream { + #[cfg(unix)] pipe: BufReader<OwnedReadHalf>, + #[cfg(windows)] + pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>, buffer: Vec<u8>, } impl IpcJsonStream { + #[cfg(unix)] fn new(pipe: OwnedReadHalf) -> Self { Self { pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), @@ -184,6 +219,14 @@ mod unix { } } + #[cfg(windows)] + fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self { + Self { + pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + buffer: Vec::with_capacity(INITIAL_CAPACITY), + } + } + async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> { let mut json = None; let nread = @@ -252,7 +295,6 @@ mod unix { std::task::Poll::Ready(t) => t?, std::task::Poll::Pending => return std::task::Poll::Pending, }; - if let Some(i) = memchr(b'\n', available) { if *read == 0 { // Fast path: parse and put into the json slot directly. @@ -366,6 +408,35 @@ mod unix { use deno_core::RcRef; use std::rc::Rc; + #[cfg(unix)] + pub async fn pair() -> (Rc<IpcJsonStreamResource>, tokio::net::UnixStream) { + let (a, b) = tokio::net::UnixStream::pair().unwrap(); + + /* Similar to how ops would use the resource */ + let a = Rc::new(IpcJsonStreamResource::from_stream(a)); + (a, b) + } + + #[cfg(windows)] + pub async fn pair() -> ( + Rc<IpcJsonStreamResource>, + tokio::net::windows::named_pipe::NamedPipeServer, + ) { + use tokio::net::windows::named_pipe::ClientOptions; + use tokio::net::windows::named_pipe::ServerOptions; + + let name = + format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::<u32>()); + + let server = ServerOptions::new().create(name.clone()).unwrap(); + let client = ClientOptions::new().open(name).unwrap(); + + server.connect().await.unwrap(); + /* Similar to how ops would use the resource */ + let client = Rc::new(IpcJsonStreamResource::from_stream(client)); + (client, server) + } + #[tokio::test] async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> { // A simple round trip benchmark for quick dev feedback. @@ -375,7 +446,7 @@ mod unix { return Ok(()); } - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncWriteExt; @@ -389,8 +460,6 @@ mod unix { Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); - let start = std::time::Instant::now(); let mut bytes = 0; @@ -416,21 +485,20 @@ mod unix { #[tokio::test] async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut buf = [0u8; 1024]; - let n = fd2.read(&mut buf).await?; - assert_eq!(&buf[..n], b"\"hello\"\n"); + const EXPECTED: &[u8] = b"\"hello\"\n"; + let mut buf = [0u8; EXPECTED.len()]; + let n = fd2.read_exact(&mut buf).await?; + assert_eq!(&buf[..n], EXPECTED); fd2.write_all(b"\"world\"\n").await?; + Ok::<_, std::io::Error>(()) }); - /* Similar to how ops would use the resource */ - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); - ipc.clone().write_msg(json!("hello")).await?; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; @@ -444,19 +512,19 @@ mod unix { #[tokio::test] async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut buf = [0u8; 1024]; - let n = fd2.read(&mut buf).await?; - assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n"); + const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n"; + let mut buf = [0u8; EXPECTED.len()]; + let n = fd2.read_exact(&mut buf).await?; + assert_eq!(&buf[..n], EXPECTED); fd2.write_all(b"\"foo\"\n\"bar\"\n").await?; Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); ipc.clone().write_msg(json!("hello")).await?; ipc.clone().write_msg(json!("world")).await?; @@ -471,13 +539,12 @@ mod unix { #[tokio::test] async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?; Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; let _err = ipc.read_msg().await.unwrap_err(); @@ -499,30 +566,3 @@ mod unix { } } } - -#[cfg(windows)] -mod windows { - use deno_core::error::AnyError; - use deno_core::op2; - - #[op2(fast)] - pub fn op_node_ipc_pipe() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } - - #[op2(fast)] - #[smi] - pub fn op_node_child_ipc_pipe() -> Result<i32, AnyError> { - Ok(-1) - } - - #[op2(async)] - pub async fn op_node_ipc_write() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } - - #[op2(async)] - pub async fn op_node_ipc_read() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } -} |