diff options
-rw-r--r-- | Cargo.lock | 2 | ||||
-rw-r--r-- | cli/args/mod.rs | 4 | ||||
-rw-r--r-- | cli/worker.rs | 4 | ||||
-rw-r--r-- | ext/node/Cargo.toml | 4 | ||||
-rw-r--r-- | ext/node/lib.rs | 2 | ||||
-rw-r--r-- | ext/node/ops/ipc.rs | 180 | ||||
-rw-r--r-- | ext/node/polyfills/internal/child_process.ts | 5 | ||||
-rw-r--r-- | runtime/Cargo.toml | 1 | ||||
-rw-r--r-- | runtime/ops/process.rs | 147 | ||||
-rw-r--r-- | runtime/worker_bootstrap.rs | 2 |
10 files changed, 258 insertions, 93 deletions
diff --git a/Cargo.lock b/Cargo.lock index d6fe5d296..384e84579 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1543,6 +1543,7 @@ dependencies = [ "typenum", "url", "winapi", + "windows-sys 0.48.0", "x25519-dalek", "x509-parser", ] @@ -1635,6 +1636,7 @@ dependencies = [ "uuid", "which", "winapi", + "windows-sys 0.48.0", "winres", ] diff --git a/cli/args/mod.rs b/cli/args/mod.rs index 187d3d604..0c1bd6e0a 100644 --- a/cli/args/mod.rs +++ b/cli/args/mod.rs @@ -917,12 +917,12 @@ impl CliOptions { .map(Some) } - pub fn node_ipc_fd(&self) -> Option<i32> { + pub fn node_ipc_fd(&self) -> Option<i64> { let maybe_node_channel_fd = std::env::var("DENO_CHANNEL_FD").ok(); if let Some(node_channel_fd) = maybe_node_channel_fd { // Remove so that child processes don't inherit this environment variable. std::env::remove_var("DENO_CHANNEL_FD"); - node_channel_fd.parse::<i32>().ok() + node_channel_fd.parse::<i64>().ok() } else { None } diff --git a/cli/worker.rs b/cli/worker.rs index 2f0016581..bc611a05c 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -124,7 +124,7 @@ struct SharedWorkerState { maybe_inspector_server: Option<Arc<InspectorServer>>, maybe_lockfile: Option<Arc<Mutex<Lockfile>>>, feature_checker: Arc<FeatureChecker>, - node_ipc: Option<i32>, + node_ipc: Option<i64>, } impl SharedWorkerState { @@ -404,7 +404,7 @@ impl CliMainWorkerFactory { maybe_lockfile: Option<Arc<Mutex<Lockfile>>>, feature_checker: Arc<FeatureChecker>, options: CliMainWorkerOptions, - node_ipc: Option<i32>, + node_ipc: Option<i64>, ) -> Self { Self { shared: Arc::new(SharedWorkerState { diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index c7ca63097..b72449f5c 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -74,3 +74,7 @@ url.workspace = true winapi.workspace = true x25519-dalek = "2.0.0" x509-parser = "0.15.0" + +[target.'cfg(windows)'.dependencies] +windows-sys.workspace = true +winapi = { workspace = true, features = ["consoleapi"] } diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 42a752791..0922c6986 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -31,6 +31,7 @@ mod polyfill; mod resolution; pub use ops::ipc::ChildPipeFd; +pub use ops::ipc::IpcJsonStreamResource; pub use ops::v8::VM_CONTEXT_INDEX; pub use package_json::PackageJson; pub use path::PathClean; @@ -307,7 +308,6 @@ deno_core::extension!(deno_node, ops::require::op_require_break_on_next_statement, ops::util::op_node_guess_handle_type, ops::crypto::op_node_create_private_key, - ops::ipc::op_node_ipc_pipe, ops::ipc::op_node_child_ipc_pipe, ops::ipc::op_node_ipc_write, ops::ipc::op_node_ipc_read, diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index afaccc49f..9b2b27c71 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -1,20 +1,17 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -#[cfg(unix)] -pub use unix::*; +pub use impl_::*; -#[cfg(windows)] -pub use windows::*; +pub struct ChildPipeFd(pub i64); -pub struct ChildPipeFd(pub i32); - -#[cfg(unix)] -mod unix { +mod impl_ { use std::cell::RefCell; use std::future::Future; use std::io; use std::mem; + #[cfg(unix)] use std::os::fd::FromRawFd; + #[cfg(unix)] use std::os::fd::RawFd; use std::pin::Pin; use std::rc::Rc; @@ -35,18 +32,16 @@ mod unix { use tokio::io::AsyncBufRead; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; + + #[cfg(unix)] use tokio::net::unix::OwnedReadHalf; + #[cfg(unix)] use tokio::net::unix::OwnedWriteHalf; + #[cfg(unix)] use tokio::net::UnixStream; - #[op2(fast)] - #[smi] - pub fn op_node_ipc_pipe( - state: &mut OpState, - #[smi] fd: i32, - ) -> Result<ResourceId, AnyError> { - Ok(state.resource_table.add(IpcJsonStreamResource::new(fd)?)) - } + #[cfg(windows)] + type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; // Open IPC pipe from bootstrap options. #[op2] @@ -97,9 +92,12 @@ mod unix { Ok(msgs) } - struct IpcJsonStreamResource { + pub struct IpcJsonStreamResource { read_half: AsyncRefCell<IpcJsonStream>, + #[cfg(unix)] write_half: AsyncRefCell<OwnedWriteHalf>, + #[cfg(windows)] + write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>, cancel: Rc<CancelHandle>, } @@ -109,14 +107,35 @@ mod unix { } } + #[cfg(unix)] + fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> { + // Safety: The fd is part of a pair of connected sockets create by child process + // implementation. + let unix_stream = UnixStream::from_std(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(stream) + })?; + Ok(unix_stream.into_split()) + } + + #[cfg(windows)] + fn pipe( + handle: i64, + ) -> Result< + ( + tokio::io::ReadHalf<NamedPipeClient>, + tokio::io::WriteHalf<NamedPipeClient>, + ), + io::Error, + > { + // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the + // fd handle map will be the same. + let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? }; + Ok(tokio::io::split(pipe)) + } + impl IpcJsonStreamResource { - fn new(stream: RawFd) -> Result<Self, std::io::Error> { - // Safety: The fd is part of a pair of connected sockets create by child process - // implementation. - let unix_stream = UnixStream::from_std(unsafe { - std::os::unix::net::UnixStream::from_raw_fd(stream) - })?; - let (read_half, write_half) = unix_stream.into_split(); + pub fn new(stream: i64) -> Result<Self, std::io::Error> { + let (read_half, write_half) = pipe(stream as _)?; Ok(Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), @@ -124,8 +143,9 @@ mod unix { }) } + #[cfg(unix)] #[cfg(test)] - fn from_unix_stream(stream: UnixStream) -> Self { + fn from_stream(stream: UnixStream) -> Self { let (read_half, write_half) = stream.into_split(); Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), @@ -134,6 +154,17 @@ mod unix { } } + #[cfg(windows)] + #[cfg(test)] + fn from_stream(pipe: NamedPipeClient) -> Self { + let (read_half, write_half) = tokio::io::split(pipe); + Self { + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), + write_half: AsyncRefCell::new(write_half), + cancel: Default::default(), + } + } + async fn write_msg( self: Rc<Self>, msg: serde_json::Value, @@ -172,11 +203,15 @@ mod unix { // // `\n` is used as a delimiter between messages. struct IpcJsonStream { + #[cfg(unix)] pipe: BufReader<OwnedReadHalf>, + #[cfg(windows)] + pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>, buffer: Vec<u8>, } impl IpcJsonStream { + #[cfg(unix)] fn new(pipe: OwnedReadHalf) -> Self { Self { pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), @@ -184,6 +219,14 @@ mod unix { } } + #[cfg(windows)] + fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self { + Self { + pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe), + buffer: Vec::with_capacity(INITIAL_CAPACITY), + } + } + async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> { let mut json = None; let nread = @@ -252,7 +295,6 @@ mod unix { std::task::Poll::Ready(t) => t?, std::task::Poll::Pending => return std::task::Poll::Pending, }; - if let Some(i) = memchr(b'\n', available) { if *read == 0 { // Fast path: parse and put into the json slot directly. @@ -366,6 +408,35 @@ mod unix { use deno_core::RcRef; use std::rc::Rc; + #[cfg(unix)] + pub async fn pair() -> (Rc<IpcJsonStreamResource>, tokio::net::UnixStream) { + let (a, b) = tokio::net::UnixStream::pair().unwrap(); + + /* Similar to how ops would use the resource */ + let a = Rc::new(IpcJsonStreamResource::from_stream(a)); + (a, b) + } + + #[cfg(windows)] + pub async fn pair() -> ( + Rc<IpcJsonStreamResource>, + tokio::net::windows::named_pipe::NamedPipeServer, + ) { + use tokio::net::windows::named_pipe::ClientOptions; + use tokio::net::windows::named_pipe::ServerOptions; + + let name = + format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::<u32>()); + + let server = ServerOptions::new().create(name.clone()).unwrap(); + let client = ClientOptions::new().open(name).unwrap(); + + server.connect().await.unwrap(); + /* Similar to how ops would use the resource */ + let client = Rc::new(IpcJsonStreamResource::from_stream(client)); + (client, server) + } + #[tokio::test] async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> { // A simple round trip benchmark for quick dev feedback. @@ -375,7 +446,7 @@ mod unix { return Ok(()); } - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncWriteExt; @@ -389,8 +460,6 @@ mod unix { Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); - let start = std::time::Instant::now(); let mut bytes = 0; @@ -416,21 +485,20 @@ mod unix { #[tokio::test] async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut buf = [0u8; 1024]; - let n = fd2.read(&mut buf).await?; - assert_eq!(&buf[..n], b"\"hello\"\n"); + const EXPECTED: &[u8] = b"\"hello\"\n"; + let mut buf = [0u8; EXPECTED.len()]; + let n = fd2.read_exact(&mut buf).await?; + assert_eq!(&buf[..n], EXPECTED); fd2.write_all(b"\"world\"\n").await?; + Ok::<_, std::io::Error>(()) }); - /* Similar to how ops would use the resource */ - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); - ipc.clone().write_msg(json!("hello")).await?; let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; @@ -444,19 +512,19 @@ mod unix { #[tokio::test] async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; - let mut buf = [0u8; 1024]; - let n = fd2.read(&mut buf).await?; - assert_eq!(&buf[..n], b"\"hello\"\n\"world\"\n"); + const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n"; + let mut buf = [0u8; EXPECTED.len()]; + let n = fd2.read_exact(&mut buf).await?; + assert_eq!(&buf[..n], EXPECTED); fd2.write_all(b"\"foo\"\n\"bar\"\n").await?; Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); ipc.clone().write_msg(json!("hello")).await?; ipc.clone().write_msg(json!("world")).await?; @@ -471,13 +539,12 @@ mod unix { #[tokio::test] async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> { - let (fd1, mut fd2) = tokio::net::UnixStream::pair()?; + let (ipc, mut fd2) = pair().await; let child = tokio::spawn(async move { tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?; Ok::<_, std::io::Error>(()) }); - let ipc = Rc::new(IpcJsonStreamResource::from_unix_stream(fd1)); let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await; let _err = ipc.read_msg().await.unwrap_err(); @@ -499,30 +566,3 @@ mod unix { } } } - -#[cfg(windows)] -mod windows { - use deno_core::error::AnyError; - use deno_core::op2; - - #[op2(fast)] - pub fn op_node_ipc_pipe() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } - - #[op2(fast)] - #[smi] - pub fn op_node_child_ipc_pipe() -> Result<i32, AnyError> { - Ok(-1) - } - - #[op2(async)] - pub async fn op_node_ipc_write() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } - - #[op2(async)] - pub async fn op_node_ipc_read() -> Result<(), AnyError> { - Err(deno_core::error::not_supported()) - } -} diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index b9bf13396..0e93e22d3 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -45,7 +45,6 @@ import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; const core = globalThis.__bootstrap.core; -const ops = core.ops; export function mapValues<T, O>( record: Readonly<Record<string, T>>, @@ -1069,9 +1068,7 @@ function toDenoArgs(args: string[]): string[] { return denoArgs; } -export function setupChannel(target, channel) { - const ipc = ops.op_node_ipc_pipe(channel); - +export function setupChannel(target, ipc) { async function readLoop() { try { while (true) { diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index e47d2b59e..ad9586390 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -122,6 +122,7 @@ which = "4.2.5" fwdansi.workspace = true winapi = { workspace = true, features = ["commapi", "knownfolders", "mswsock", "objbase", "psapi", "shlobj", "tlhelp32", "winbase", "winerror", "winuser", "winsock2"] } ntapi = "0.4.0" +windows-sys.workspace = true [target.'cfg(unix)'.dependencies] nix.workspace = true diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs index 6f89e5529..e8dc9c690 100644 --- a/runtime/ops/process.rs +++ b/runtime/ops/process.rs @@ -141,7 +141,6 @@ pub struct SpawnArgs { uid: Option<u32>, #[cfg(windows)] windows_raw_arguments: bool, - #[cfg(unix)] ipc: Option<i32>, #[serde(flatten)] @@ -207,12 +206,7 @@ pub struct SpawnOutput { stderr: Option<ToJsBuffer>, } -type CreateCommand = ( - std::process::Command, - // TODO(@littledivy): Ideally this would return Option<ResourceId> but we are dealing with file descriptors - // all the way until setupChannel which makes it easier to share code between parent and child fork. - Option<i32>, -); +type CreateCommand = (std::process::Command, Option<ResourceId>); fn create_command( state: &mut OpState, @@ -337,17 +331,144 @@ fn create_command( }); /* One end returned to parent process (this) */ - let pipe_fd = Some(fd1); + let pipe_rid = Some( + state + .resource_table + .add(deno_node::IpcJsonStreamResource::new(fd1 as _)?), + ); /* The other end passed to child process via DENO_CHANNEL_FD */ command.env("DENO_CHANNEL_FD", format!("{}", ipc)); - return Ok((command, pipe_fd)); + return Ok((command, pipe_rid)); } Ok((command, None)) } + #[cfg(windows)] + // Safety: We setup a windows named pipe and pass one end to the child process. + unsafe { + use windows_sys::Win32::Foundation::CloseHandle; + use windows_sys::Win32::Foundation::DuplicateHandle; + use windows_sys::Win32::Foundation::DUPLICATE_SAME_ACCESS; + use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED; + use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED; + use windows_sys::Win32::Foundation::GENERIC_READ; + use windows_sys::Win32::Foundation::GENERIC_WRITE; + use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE; + use windows_sys::Win32::Security::SECURITY_ATTRIBUTES; + use windows_sys::Win32::Storage::FileSystem::CreateFileW; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; + use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING; + use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX; + use windows_sys::Win32::System::Pipes::ConnectNamedPipe; + use windows_sys::Win32::System::Pipes::CreateNamedPipeW; + use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE; + use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE; + use windows_sys::Win32::System::Threading::GetCurrentProcess; + + use std::io; + use std::os::windows::ffi::OsStrExt; + use std::path::Path; + use std::ptr; + + if let Some(ipc) = args.ipc { + if ipc < 0 { + return Ok((command, None)); + } + + let (path, hd1) = loop { + let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4()); + let mut path = Path::new(&name) + .as_os_str() + .encode_wide() + .collect::<Vec<_>>(); + path.push(0); + + let hd1 = CreateNamedPipeW( + path.as_ptr(), + PIPE_ACCESS_DUPLEX + | FILE_FLAG_FIRST_PIPE_INSTANCE + | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + 1, + 65536, + 65536, + 0, + std::ptr::null_mut(), + ); + + if hd1 == INVALID_HANDLE_VALUE { + let err = io::Error::last_os_error(); + /* If the pipe name is already in use, try again. */ + if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) { + continue; + } + + return Err(err.into()); + } + + break (path, hd1); + }; + + /* Create child pipe handle. */ + let s = SECURITY_ATTRIBUTES { + nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32, + lpSecurityDescriptor: ptr::null_mut(), + bInheritHandle: 1, + }; + let mut hd2 = CreateFileW( + path.as_ptr(), + GENERIC_READ | GENERIC_WRITE, + 0, + &s, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + 0, + ); + if hd2 == INVALID_HANDLE_VALUE { + return Err(io::Error::last_os_error().into()); + } + + // Will not block because we have create the pair. + if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 { + let err = std::io::Error::last_os_error(); + if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) { + CloseHandle(hd2); + return Err(err.into()); + } + } + + // Duplicating the handle to allow the child process to use it. + if DuplicateHandle( + GetCurrentProcess(), + hd2, + GetCurrentProcess(), + &mut hd2, + 0, + 1, + DUPLICATE_SAME_ACCESS, + ) == 0 + { + return Err(std::io::Error::last_os_error().into()); + } + + /* One end returned to parent process (this) */ + let pipe_fd = Some( + state + .resource_table + .add(deno_node::IpcJsonStreamResource::new(hd1 as i64)?), + ); + + /* The other end passed to child process via DENO_CHANNEL_FD */ + command.env("DENO_CHANNEL_FD", format!("{}", hd2 as i64)); + + return Ok((command, pipe_fd)); + } + } + #[cfg(not(unix))] return Ok((command, None)); } @@ -360,13 +481,13 @@ struct Child { stdin_rid: Option<ResourceId>, stdout_rid: Option<ResourceId>, stderr_rid: Option<ResourceId>, - pipe_fd: Option<i32>, + pipe_fd: Option<ResourceId>, } fn spawn_child( state: &mut OpState, command: std::process::Command, - pipe_fd: Option<i32>, + pipe_fd: Option<ResourceId>, ) -> Result<Child, AnyError> { let mut command = tokio::process::Command::from(command); // TODO(@crowlkats): allow detaching processes. @@ -459,8 +580,8 @@ fn op_spawn_child( #[serde] args: SpawnArgs, #[string] api_name: String, ) -> Result<Child, AnyError> { - let (command, pipe_fd) = create_command(state, args, &api_name)?; - spawn_child(state, command, pipe_fd) + let (command, pipe_rid) = create_command(state, args, &api_name)?; + spawn_child(state, command, pipe_rid) } #[op2(async)] diff --git a/runtime/worker_bootstrap.rs b/runtime/worker_bootstrap.rs index 97d66158b..8f7f05888 100644 --- a/runtime/worker_bootstrap.rs +++ b/runtime/worker_bootstrap.rs @@ -59,7 +59,7 @@ pub struct BootstrapOptions { pub inspect: bool, pub has_node_modules_dir: bool, pub maybe_binary_npm_command_name: Option<String>, - pub node_ipc_fd: Option<i32>, + pub node_ipc_fd: Option<i64>, } impl Default for BootstrapOptions { |