diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2023-12-19 18:07:22 +0530 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-19 13:37:22 +0100 |
commit | 55fac9f5ead6d30996400e8597c969b675c5a22b (patch) | |
tree | f7646976d8e96cc3ca9e158f6a99828c648453e0 /runtime/ops/process.rs | |
parent | aefa205f63d6e4d0d56b9fd45b3d25e03509b9fb (diff) |
fix(node): child_process IPC on Windows (#21597)
This PR implements the child_process IPC pipe between parent and child.
The implementation uses Windows named pipes created by parent and passes
the inheritable file handle to the child.
I've also replace parts of the initial implementation which passed the
raw parent fd to JS with resource ids instead. This way no file handle
is exposed to the JS land (both parent and child).
`IpcJsonStreamResource` can stream upto 800MB/s of JSON data on Win 11
AMD Ryzen 7 16GB (without `memchr` vectorization)
Diffstat (limited to 'runtime/ops/process.rs')
-rw-r--r-- | runtime/ops/process.rs | 147 |
1 files changed, 134 insertions, 13 deletions
diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs index 6f89e5529..e8dc9c690 100644 --- a/runtime/ops/process.rs +++ b/runtime/ops/process.rs @@ -141,7 +141,6 @@ pub struct SpawnArgs { uid: Option<u32>, #[cfg(windows)] windows_raw_arguments: bool, - #[cfg(unix)] ipc: Option<i32>, #[serde(flatten)] @@ -207,12 +206,7 @@ pub struct SpawnOutput { stderr: Option<ToJsBuffer>, } -type CreateCommand = ( - std::process::Command, - // TODO(@littledivy): Ideally this would return Option<ResourceId> but we are dealing with file descriptors - // all the way until setupChannel which makes it easier to share code between parent and child fork. - Option<i32>, -); +type CreateCommand = (std::process::Command, Option<ResourceId>); fn create_command( state: &mut OpState, @@ -337,17 +331,144 @@ fn create_command( }); /* One end returned to parent process (this) */ - let pipe_fd = Some(fd1); + let pipe_rid = Some( + state + .resource_table + .add(deno_node::IpcJsonStreamResource::new(fd1 as _)?), + ); /* The other end passed to child process via DENO_CHANNEL_FD */ command.env("DENO_CHANNEL_FD", format!("{}", ipc)); - return Ok((command, pipe_fd)); + return Ok((command, pipe_rid)); } Ok((command, None)) } + #[cfg(windows)] + // Safety: We setup a windows named pipe and pass one end to the child process. + unsafe { + use windows_sys::Win32::Foundation::CloseHandle; + use windows_sys::Win32::Foundation::DuplicateHandle; + use windows_sys::Win32::Foundation::DUPLICATE_SAME_ACCESS; + use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED; + use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED; + use windows_sys::Win32::Foundation::GENERIC_READ; + use windows_sys::Win32::Foundation::GENERIC_WRITE; + use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE; + use windows_sys::Win32::Security::SECURITY_ATTRIBUTES; + use windows_sys::Win32::Storage::FileSystem::CreateFileW; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; + use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING; + use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX; + use windows_sys::Win32::System::Pipes::ConnectNamedPipe; + use windows_sys::Win32::System::Pipes::CreateNamedPipeW; + use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE; + use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE; + use windows_sys::Win32::System::Threading::GetCurrentProcess; + + use std::io; + use std::os::windows::ffi::OsStrExt; + use std::path::Path; + use std::ptr; + + if let Some(ipc) = args.ipc { + if ipc < 0 { + return Ok((command, None)); + } + + let (path, hd1) = loop { + let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4()); + let mut path = Path::new(&name) + .as_os_str() + .encode_wide() + .collect::<Vec<_>>(); + path.push(0); + + let hd1 = CreateNamedPipeW( + path.as_ptr(), + PIPE_ACCESS_DUPLEX + | FILE_FLAG_FIRST_PIPE_INSTANCE + | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + 1, + 65536, + 65536, + 0, + std::ptr::null_mut(), + ); + + if hd1 == INVALID_HANDLE_VALUE { + let err = io::Error::last_os_error(); + /* If the pipe name is already in use, try again. */ + if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) { + continue; + } + + return Err(err.into()); + } + + break (path, hd1); + }; + + /* Create child pipe handle. */ + let s = SECURITY_ATTRIBUTES { + nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32, + lpSecurityDescriptor: ptr::null_mut(), + bInheritHandle: 1, + }; + let mut hd2 = CreateFileW( + path.as_ptr(), + GENERIC_READ | GENERIC_WRITE, + 0, + &s, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + 0, + ); + if hd2 == INVALID_HANDLE_VALUE { + return Err(io::Error::last_os_error().into()); + } + + // Will not block because we have create the pair. + if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 { + let err = std::io::Error::last_os_error(); + if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) { + CloseHandle(hd2); + return Err(err.into()); + } + } + + // Duplicating the handle to allow the child process to use it. + if DuplicateHandle( + GetCurrentProcess(), + hd2, + GetCurrentProcess(), + &mut hd2, + 0, + 1, + DUPLICATE_SAME_ACCESS, + ) == 0 + { + return Err(std::io::Error::last_os_error().into()); + } + + /* One end returned to parent process (this) */ + let pipe_fd = Some( + state + .resource_table + .add(deno_node::IpcJsonStreamResource::new(hd1 as i64)?), + ); + + /* The other end passed to child process via DENO_CHANNEL_FD */ + command.env("DENO_CHANNEL_FD", format!("{}", hd2 as i64)); + + return Ok((command, pipe_fd)); + } + } + #[cfg(not(unix))] return Ok((command, None)); } @@ -360,13 +481,13 @@ struct Child { stdin_rid: Option<ResourceId>, stdout_rid: Option<ResourceId>, stderr_rid: Option<ResourceId>, - pipe_fd: Option<i32>, + pipe_fd: Option<ResourceId>, } fn spawn_child( state: &mut OpState, command: std::process::Command, - pipe_fd: Option<i32>, + pipe_fd: Option<ResourceId>, ) -> Result<Child, AnyError> { let mut command = tokio::process::Command::from(command); // TODO(@crowlkats): allow detaching processes. @@ -459,8 +580,8 @@ fn op_spawn_child( #[serde] args: SpawnArgs, #[string] api_name: String, ) -> Result<Child, AnyError> { - let (command, pipe_fd) = create_command(state, args, &api_name)?; - spawn_child(state, command, pipe_fd) + let (command, pipe_rid) = create_command(state, args, &api_name)?; + spawn_child(state, command, pipe_rid) } #[op2(async)] |