summaryrefslogtreecommitdiff
path: root/ext/node/ops/ipc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/ops/ipc.rs')
-rw-r--r--ext/node/ops/ipc.rs84
1 files changed, 19 insertions, 65 deletions
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs
index 7cfab65a4..59b6fece1 100644
--- a/ext/node/ops/ipc.rs
+++ b/ext/node/ops/ipc.rs
@@ -9,10 +9,6 @@ mod impl_ {
use std::future::Future;
use std::io;
use std::mem;
- #[cfg(unix)]
- use std::os::fd::FromRawFd;
- #[cfg(unix)]
- use std::os::fd::RawFd;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
@@ -43,15 +39,9 @@ mod impl_ {
use tokio::io::AsyncWriteExt;
use tokio::io::ReadBuf;
- #[cfg(unix)]
- use tokio::net::unix::OwnedReadHalf;
- #[cfg(unix)]
- use tokio::net::unix::OwnedWriteHalf;
- #[cfg(unix)]
- use tokio::net::UnixStream;
-
- #[cfg(windows)]
- type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;
+ use deno_io::BiPipe;
+ use deno_io::BiPipeRead;
+ use deno_io::BiPipeWrite;
/// Wrapper around v8 value that implements Serialize.
struct SerializeWrapper<'a, 'b>(
@@ -349,10 +339,7 @@ mod impl_ {
pub struct IpcJsonStreamResource {
read_half: AsyncRefCell<IpcJsonStream>,
- #[cfg(unix)]
- write_half: AsyncRefCell<OwnedWriteHalf>,
- #[cfg(windows)]
- write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
+ write_half: AsyncRefCell<BiPipeWrite>,
cancel: Rc<CancelHandle>,
queued_bytes: AtomicUsize,
ref_tracker: IpcRefTracker,
@@ -364,38 +351,12 @@ mod impl_ {
}
}
- #[cfg(unix)]
- fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> {
- // Safety: The fd is part of a pair of connected sockets create by child process
- // implementation.
- let unix_stream = UnixStream::from_std(unsafe {
- std::os::unix::net::UnixStream::from_raw_fd(stream)
- })?;
- Ok(unix_stream.into_split())
- }
-
- #[cfg(windows)]
- fn pipe(
- handle: i64,
- ) -> Result<
- (
- tokio::io::ReadHalf<NamedPipeClient>,
- tokio::io::WriteHalf<NamedPipeClient>,
- ),
- io::Error,
- > {
- // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
- // fd handle map will be the same.
- let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? };
- Ok(tokio::io::split(pipe))
- }
-
impl IpcJsonStreamResource {
pub fn new(
stream: i64,
ref_tracker: IpcRefTracker,
) -> Result<Self, std::io::Error> {
- let (read_half, write_half) = pipe(stream as _)?;
+ let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split();
Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
@@ -406,11 +367,14 @@ mod impl_ {
}
#[cfg(all(unix, test))]
- fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self {
+ fn from_stream(
+ stream: tokio::net::UnixStream,
+ ref_tracker: IpcRefTracker,
+ ) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
- read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
- write_half: AsyncRefCell::new(write_half),
+ read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())),
+ write_half: AsyncRefCell::new(write_half.into()),
cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
@@ -418,11 +382,14 @@ mod impl_ {
}
#[cfg(all(windows, test))]
- fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self {
+ fn from_stream(
+ pipe: tokio::net::windows::named_pipe::NamedPipeClient,
+ ref_tracker: IpcRefTracker,
+ ) -> Self {
let (read_half, write_half) = tokio::io::split(pipe);
Self {
- read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
- write_half: AsyncRefCell::new(write_half),
+ read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())),
+ write_half: AsyncRefCell::new(write_half.into()),
cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
@@ -492,26 +459,13 @@ mod impl_ {
//
// `\n` is used as a delimiter between messages.
struct IpcJsonStream {
- #[cfg(unix)]
- pipe: OwnedReadHalf,
- #[cfg(windows)]
- pipe: tokio::io::ReadHalf<NamedPipeClient>,
+ pipe: BiPipeRead,
buffer: Vec<u8>,
read_buffer: ReadBuffer,
}
impl IpcJsonStream {
- #[cfg(unix)]
- fn new(pipe: OwnedReadHalf) -> Self {
- Self {
- pipe,
- buffer: Vec::with_capacity(INITIAL_CAPACITY),
- read_buffer: ReadBuffer::new(),
- }
- }
-
- #[cfg(windows)]
- fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
+ fn new(pipe: BiPipeRead) -> Self {
Self {
pipe,
buffer: Vec::with_capacity(INITIAL_CAPACITY),