diff options
Diffstat (limited to 'ext/node/ops/ipc.rs')
-rw-r--r-- | ext/node/ops/ipc.rs | 84 |
1 files changed, 19 insertions, 65 deletions
diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index 7cfab65a4..59b6fece1 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -9,10 +9,6 @@ mod impl_ { use std::future::Future; use std::io; use std::mem; - #[cfg(unix)] - use std::os::fd::FromRawFd; - #[cfg(unix)] - use std::os::fd::RawFd; use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::AtomicBool; @@ -43,15 +39,9 @@ mod impl_ { use tokio::io::AsyncWriteExt; use tokio::io::ReadBuf; - #[cfg(unix)] - use tokio::net::unix::OwnedReadHalf; - #[cfg(unix)] - use tokio::net::unix::OwnedWriteHalf; - #[cfg(unix)] - use tokio::net::UnixStream; - - #[cfg(windows)] - type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; + use deno_io::BiPipe; + use deno_io::BiPipeRead; + use deno_io::BiPipeWrite; /// Wrapper around v8 value that implements Serialize. struct SerializeWrapper<'a, 'b>( @@ -349,10 +339,7 @@ mod impl_ { pub struct IpcJsonStreamResource { read_half: AsyncRefCell<IpcJsonStream>, - #[cfg(unix)] - write_half: AsyncRefCell<OwnedWriteHalf>, - #[cfg(windows)] - write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>, + write_half: AsyncRefCell<BiPipeWrite>, cancel: Rc<CancelHandle>, queued_bytes: AtomicUsize, ref_tracker: IpcRefTracker, @@ -364,38 +351,12 @@ mod impl_ { } } - #[cfg(unix)] - fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> { - // Safety: The fd is part of a pair of connected sockets create by child process - // implementation. - let unix_stream = UnixStream::from_std(unsafe { - std::os::unix::net::UnixStream::from_raw_fd(stream) - })?; - Ok(unix_stream.into_split()) - } - - #[cfg(windows)] - fn pipe( - handle: i64, - ) -> Result< - ( - tokio::io::ReadHalf<NamedPipeClient>, - tokio::io::WriteHalf<NamedPipeClient>, - ), - io::Error, - > { - // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the - // fd handle map will be the same. - let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? }; - Ok(tokio::io::split(pipe)) - } - impl IpcJsonStreamResource { pub fn new( stream: i64, ref_tracker: IpcRefTracker, ) -> Result<Self, std::io::Error> { - let (read_half, write_half) = pipe(stream as _)?; + let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split(); Ok(Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), @@ -406,11 +367,14 @@ mod impl_ { } #[cfg(all(unix, test))] - fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self { + fn from_stream( + stream: tokio::net::UnixStream, + ref_tracker: IpcRefTracker, + ) -> Self { let (read_half, write_half) = stream.into_split(); Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), - write_half: AsyncRefCell::new(write_half), + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), + write_half: AsyncRefCell::new(write_half.into()), cancel: Default::default(), queued_bytes: Default::default(), ref_tracker, @@ -418,11 +382,14 @@ mod impl_ { } #[cfg(all(windows, test))] - fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self { + fn from_stream( + pipe: tokio::net::windows::named_pipe::NamedPipeClient, + ref_tracker: IpcRefTracker, + ) -> Self { let (read_half, write_half) = tokio::io::split(pipe); Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), - write_half: AsyncRefCell::new(write_half), + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), + write_half: AsyncRefCell::new(write_half.into()), cancel: Default::default(), queued_bytes: Default::default(), ref_tracker, @@ -492,26 +459,13 @@ mod impl_ { // // `\n` is used as a delimiter between messages. struct IpcJsonStream { - #[cfg(unix)] - pipe: OwnedReadHalf, - #[cfg(windows)] - pipe: tokio::io::ReadHalf<NamedPipeClient>, + pipe: BiPipeRead, buffer: Vec<u8>, read_buffer: ReadBuffer, } impl IpcJsonStream { - #[cfg(unix)] - fn new(pipe: OwnedReadHalf) -> Self { - Self { - pipe, - buffer: Vec::with_capacity(INITIAL_CAPACITY), - read_buffer: ReadBuffer::new(), - } - } - - #[cfg(windows)] - fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self { + fn new(pipe: BiPipeRead) -> Self { Self { pipe, buffer: Vec::with_capacity(INITIAL_CAPACITY), |