summaryrefslogtreecommitdiff
path: root/ext/node/ops/ipc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/ops/ipc.rs')
-rw-r--r--ext/node/ops/ipc.rs180
1 files changed, 110 insertions, 70 deletions
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs
index afaccc49f..9b2b27c71 100644
--- a/ext/node/ops/ipc.rs
+++ b/ext/node/ops/ipc.rs
@@ -1,20 +1,17 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-#[cfg(unix)]
-pub use unix::*;
+pub use impl_::*;
-#[cfg(windows)]
-pub use windows::*;
+pub struct ChildPipeFd(pub i64);
-pub struct ChildPipeFd(pub i32);
-
-#[cfg(unix)]
-mod unix {
+mod impl_ {
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::mem;
+ #[cfg(unix)]
use std::os::fd::FromRawFd;
+ #[cfg(unix)]
use std::os::fd::RawFd;
use std::pin::Pin;
use std::rc::Rc;
@@ -35,18 +32,16 @@ mod unix {
use tokio::io::AsyncBufRead;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
+
+ #[cfg(unix)]
use tokio::net::unix::OwnedReadHalf;
+ #[cfg(unix)]
use tokio::net::unix::OwnedWriteHalf;
+ #[cfg(unix)]
use tokio::net::UnixStream;
- #[op2(fast)]
- #[smi]
- pub fn op_node_ipc_pipe(
- state: &mut OpState,
- #[smi] fd: i32,
- ) -> Result<ResourceId, AnyError> {
- Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?))
- }
+ #[cfg(windows)]
+ type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;
// Open IPC pipe from bootstrap options.
#[op2]
@@ -97,9 +92,12 @@ mod unix {
Ok(msgs)
}
- struct IpcJsonStreamResource {
+ pub struct IpcJsonStreamResource {
read_half: AsyncRefCell<IpcJsonStream>,
+ #[cfg(unix)]
write_half: AsyncRefCell<OwnedWriteHalf>,
+ #[cfg(windows)]
+ write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
cancel: Rc<CancelHandle>,
}
@@ -109,14 +107,35 @@ mod unix {
}
}
+ #[cfg(unix)]
+ fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> {
+ // Safety: The fd is part of a pair of connected sockets create by child process
+ // implementation.
+ let unix_stream = UnixStream::from_std(unsafe {
+ std::os::unix::net::UnixStream::from_raw_fd(stream)
+ })?;
+ Ok(unix_stream.into_split())
+ }
+
+ #[cfg(windows)]
+ fn pipe(
+ handle: i64,
+ ) -> Result<
+ (
+ tokio::io::ReadHalf<NamedPipeClient>,
+ tokio::io::WriteHalf<NamedPipeClient>,
+ ),
+ io::Error,
+ > {
+ // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
+ // fd handle map will be the same.
+ let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? };
+ Ok(tokio::io::split(pipe))
+ }
+
impl IpcJsonStreamResource {
- fn new(stream: RawFd) -> Result<Self, std::io::Error> {
- // Safety: The fd is part of a pair of connected sockets create by child process
- // implementation.
- let unix_stream = UnixStream::from_std(unsafe {
- std::os::unix::net::UnixStream::from_raw_fd(stream)
- })?;
- let (read_half, write_half) = unix_stream.into_split();
+ pub fn new(stream: i64) -> Result<Self, std::io::Error> {
+ let (read_half, write_half) = pipe(stream as _)?;
Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
@@ -124,8 +143,9 @@ mod unix {
})
}
+ #[cfg(unix)]
#[cfg(test)]
- fn from_unix_stream(stream: UnixStream) -> Self {
+ fn from_stream(stream: UnixStream) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
@@ -134,6 +154,17 @@ mod unix {
}
}
+ #[cfg(windows)]
+ #[cfg(test)]
+ fn from_stream(pipe: NamedPipeClient) -> Self {
+ let (read_half, write_half) = tokio::io::split(pipe);
+ Self {
+ read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
+ write_half: AsyncRefCell::new(write_half),
+ cancel: Default::default(),
+ }
+ }
+
async fn write_msg(
self: Rc<Self>,
msg: serde_json::Value,
@@ -172,11 +203,15 @@ mod unix {
//
// `\n` is used as a delimiter between messages.
struct IpcJsonStream {
+ #[cfg(unix)]
pipe: BufReader<OwnedReadHalf>,
+ #[cfg(windows)]
+ pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>,
buffer: Vec<u8>,
}
impl IpcJsonStream {
+ #[cfg(unix)]
fn new(pipe: OwnedReadHalf) -> Self {
Self {
pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
@@ -184,6 +219,14 @@ mod unix {
}
}
+ #[cfg(windows)]
+ fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
+ Self {
+ pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
+ buffer: Vec::with_capacity(INITIAL_CAPACITY),
+ }
+ }
+
async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> {
let mut json = None;
let nread =
@@ -252,7 +295,6 @@ mod unix {
std::task::Poll::Ready(t) => t?,
std::task::Poll::Pending => return std::task::Poll::Pending,
};
-
if let Some(i) = memchr(b'\n', available) {
if *read == 0 {
// Fast path: parse and put into the json slot directly.
@@ -366,6 +408,35 @@ mod unix {
use deno_core::RcRef;
use std::rc::Rc;
+ #[cfg(unix)]
+ pub async fn pair() -> (Rc<IpcJsonStreamResource>, tokio::net::UnixStream) {
+ let (a, b) = tokio::net::UnixStream::pair().unwrap();
+
+ /* Similar to how ops would use the resource */
+ let a = Rc::new(IpcJsonStreamResource::from_stream(a));
+ (a, b)
+ }
+
+ #[cfg(windows)]
+ pub async fn pair() -> (
+ Rc<IpcJsonStreamResource>,
+ tokio::net::windows::named_pipe::NamedPipeServer,
+ ) {
+ use tokio::net::windows::named_pipe::ClientOptions;
+ use tokio::net::windows::named_pipe::ServerOptions;
+
+ let name =
+ format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::<u32>());
+
+ let server = ServerOptions::new().create(name.clone()).unwrap();
+ let client = ClientOptions::new().open(name).unwrap();
+
+ server.connect().await.unwrap();
+ /* Similar to how ops would use the resource */
+ let client = Rc::new(IpcJsonStreamResource::from_stream(client));
+ (client, server)
+ }
+
#[tokio::test]
async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> {
// A simple round trip benchmark for quick dev feedback.
@@ -375,7 +446,7 @@ mod unix {
return Ok(());
}
- let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncWriteExt;
@@ -389,8 +460,6 @@ mod unix {
Ok::<_, std::io::Error>(())
});
- let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
-
let start = std::time::Instant::now();
let mut bytes = 0;
@@ -416,21 +485,20 @@ mod unix {
#[tokio::test]
async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> {
- let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
- let mut buf = [0u8; 1024];
- let n = fd2.read(&mut buf).await?;
- assert_eq!(&buf[..n], b"\"hello\"\n");
+ const EXPECTED: &[u8] = b"\"hello\"\n";
+ let mut buf = [0u8; EXPECTED.len()];
+ let n = fd2.read_exact(&mut buf).await?;
+ assert_eq!(&buf[..n], EXPECTED);
fd2.write_all(b"\"world\"\n").await?;
+
Ok::<_, std::io::Error>(())
});
- /* Similar to how ops would use the resource */
- let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
-
ipc.clone().write_msg(json!("hello")).await?;
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
@@ -444,19 +512,19 @@ mod unix {
#[tokio::test]
async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> {
- let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
- let mut buf = [0u8; 1024];
- let n = fd2.read(&mut buf).await?;
- assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n");
+ const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n";
+ let mut buf = [0u8; EXPECTED.len()];
+ let n = fd2.read_exact(&mut buf).await?;
+ assert_eq!(&buf[..n], EXPECTED);
fd2.write_all(b"\"foo\"\n\"bar\"\n").await?;
Ok::<_, std::io::Error>(())
});
- let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
ipc.clone().write_msg(json!("hello")).await?;
ipc.clone().write_msg(json!("world")).await?;
@@ -471,13 +539,12 @@ mod unix {
#[tokio::test]
async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> {
- let (fd1, mut fd2) = tokio::net::UnixStream::pair()?;
+ let (ipc, mut fd2) = pair().await;
let child = tokio::spawn(async move {
tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?;
Ok::<_, std::io::Error>(())
});
- let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1));
let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
let _err = ipc.read_msg().await.unwrap_err();
@@ -499,30 +566,3 @@ mod unix {
}
}
}
-
-#[cfg(windows)]
-mod windows {
- use deno_core::error::AnyError;
- use deno_core::op2;
-
- #[op2(fast)]
- pub fn op_node_ipc_pipe() -> Result<(), AnyError> {
- Err(deno_core::error::not_supported())
- }
-
- #[op2(fast)]
- #[smi]
- pub fn op_node_child_ipc_pipe() -> Result<i32, AnyError> {
- Ok(-1)
- }
-
- #[op2(async)]
- pub async fn op_node_ipc_write() -> Result<(), AnyError> {
- Err(deno_core::error::not_supported())
- }
-
- #[op2(async)]
- pub async fn op_node_ipc_read() -> Result<(), AnyError> {
- Err(deno_core::error::not_supported())
- }
-}