summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2024-02-26 21:21:14 -0700
committerGitHub <noreply@github.com>2024-02-26 21:21:14 -0700
commitd722de886b85093eeef08d1e9fd6f3193405762d (patch)
treee7b5e9ac1992c7e36ea9c1b833dfb0eb51699c5a
parenteaad94687b8ed253362d9c0272840259306a2a38 (diff)
fix(io): create_named_pipe parallelism (#22597)
Investigating https://github.com/denoland/deno/issues/22574 Unable to reproduce with a unit test, but assuming that it's a name collision or create pipe/open pipe race, and adding some additional diagnostics.
-rw-r--r--ext/io/Cargo.toml2
-rw-r--r--ext/io/winpipe.rs123
2 files changed, 102 insertions, 23 deletions
diff --git a/ext/io/Cargo.toml b/ext/io/Cargo.toml
index 1b7f5126f..2fa4d0d87 100644
--- a/ext/io/Cargo.toml
+++ b/ext/io/Cargo.toml
@@ -25,5 +25,5 @@ tokio.workspace = true
os_pipe.workspace = true
[target.'cfg(windows)'.dependencies]
-winapi = { workspace = true, features = ["winbase", "processenv"] }
+winapi = { workspace = true, features = ["winbase", "processenv", "errhandlingapi"] }
rand.workspace = true
diff --git a/ext/io/winpipe.rs b/ext/io/winpipe.rs
index 01272300d..1495cbed1 100644
--- a/ext/io/winpipe.rs
+++ b/ext/io/winpipe.rs
@@ -3,7 +3,11 @@ use rand::thread_rng;
use rand::RngCore;
use std::io;
use std::os::windows::io::RawHandle;
+use std::sync::atomic::AtomicU32;
+use std::sync::atomic::Ordering;
+use std::time::Duration;
use winapi::shared::minwindef::DWORD;
+use winapi::um::errhandlingapi::GetLastError;
use winapi::um::fileapi::CreateFileA;
use winapi::um::fileapi::OPEN_EXISTING;
use winapi::um::handleapi::CloseHandle;
@@ -15,7 +19,6 @@ use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
use winapi::um::winbase::PIPE_READMODE_BYTE;
use winapi::um::winbase::PIPE_TYPE_BYTE;
-use winapi::um::winnt::FILE_ATTRIBUTE_NORMAL;
use winapi::um::winnt::GENERIC_READ;
use winapi::um::winnt::GENERIC_WRITE;
@@ -28,10 +31,23 @@ use winapi::um::winnt::GENERIC_WRITE;
/// well as offering a complex NTAPI solution if we decide to try to make these pipes truely
/// anonymous: https://stackoverflow.com/questions/60645/overlapped-i-o-on-anonymous-pipe
pub fn create_named_pipe() -> io::Result<(RawHandle, RawHandle)> {
+ // Silently retry up to 10 times.
+ for _ in 0..10 {
+ if let Ok(res) = create_named_pipe_inner() {
+ return Ok(res);
+ }
+ }
+ create_named_pipe_inner()
+}
+
+fn create_named_pipe_inner() -> io::Result<(RawHandle, RawHandle)> {
+ static NEXT_ID: AtomicU32 = AtomicU32::new(0);
+ // Create an extremely-likely-unique pipe name from randomness, identity and a serial counter.
let pipe_name = format!(
- r#"\\.\pipe\deno_pipe_{:x}_{:x}\0"#,
+ r#"\\.\pipe\deno_pipe_{:x}.{:x}.{:x}\0"#,
+ thread_rng().next_u64(),
std::process::id(),
- thread_rng().next_u64()
+ NEXT_ID.fetch_add(1, Ordering::SeqCst),
);
// Create security attributes to make the pipe handles non-inheritable
@@ -62,32 +78,58 @@ pub fn create_named_pipe() -> io::Result<(RawHandle, RawHandle)> {
};
if server_handle == INVALID_HANDLE_VALUE {
+ // This should not happen, so we would like to get some better diagnostics here.
+ // SAFETY: Printing last error for diagnostics
+ unsafe {
+ eprintln!("*** Unexpected server pipe failure: {:x}", GetLastError());
+ }
return Err(io::Error::last_os_error());
}
- // SAFETY: Create the pipe client with non-inheritable handle
- let client_handle = unsafe {
- CreateFileA(
- pipe_name.as_ptr() as *const i8,
- GENERIC_READ | GENERIC_WRITE | FILE_FLAG_OVERLAPPED,
- 0,
- &mut security_attributes,
- OPEN_EXISTING,
- FILE_ATTRIBUTE_NORMAL,
- std::ptr::null_mut(),
- )
- };
+ // The pipe might not be ready yet in rare cases, so we loop for a bit
+ for i in 0..10 {
+ // SAFETY: Create the pipe client with non-inheritable handle
+ let client_handle = unsafe {
+ CreateFileA(
+ pipe_name.as_ptr() as *const i8,
+ GENERIC_READ | GENERIC_WRITE,
+ 0,
+ &mut security_attributes,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ std::ptr::null_mut(),
+ )
+ };
- if client_handle == INVALID_HANDLE_VALUE {
- let err = io::Error::last_os_error();
- // SAFETY: Close the handles if we failed
- unsafe {
- CloseHandle(server_handle);
+ // There is a very rare case where the pipe is not ready to open. If we get `ERROR_PATH_NOT_FOUND`,
+ // we spin and try again in 1-10ms.
+ if client_handle == INVALID_HANDLE_VALUE {
+ // SAFETY: Getting last error for diagnostics
+ let error = unsafe { GetLastError() };
+ if error == winapi::shared::winerror::ERROR_FILE_NOT_FOUND
+ || error == winapi::shared::winerror::ERROR_PATH_NOT_FOUND
+ {
+ // Exponential backoff, but don't sleep longer than 10ms
+ eprintln!("*** Unexpected client pipe not found failure: {:x}", error);
+ std::thread::sleep(Duration::from_millis(10.min(2_u64.pow(i) + 1)));
+ continue;
+ }
+
+ // This should not happen, so we would like to get some better diagnostics here.
+ eprintln!("*** Unexpected client pipe failure: {:x}", error);
+ let err = io::Error::last_os_error();
+ // SAFETY: Close the handles if we failed
+ unsafe {
+ CloseHandle(server_handle);
+ }
+ return Err(err);
}
- return Err(err);
+
+ return Ok((server_handle, client_handle));
}
- Ok((server_handle, client_handle))
+ // We failed to open the pipe despite sleeping
+ Err(std::io::ErrorKind::NotFound.into())
}
#[cfg(test)]
@@ -97,6 +139,8 @@ mod tests {
use std::io::Read;
use std::io::Write;
use std::os::windows::io::FromRawHandle;
+ use std::sync::Arc;
+ use std::sync::Barrier;
#[test]
fn make_named_pipe() {
@@ -112,4 +156,39 @@ mod tests {
client.read_exact(&mut buf).unwrap();
assert_eq!(&buf, b"hello");
}
+
+ #[test]
+ fn make_many_named_pipes_serial() {
+ let mut handles = vec![];
+ for _ in 0..100 {
+ let (server, client) = create_named_pipe().unwrap();
+ // SAFETY: For testing
+ let server = unsafe { File::from_raw_handle(server) };
+ // SAFETY: For testing
+ let client = unsafe { File::from_raw_handle(client) };
+ handles.push((server, client))
+ }
+ }
+
+ #[test]
+ fn make_many_named_pipes_parallel() {
+ let mut handles = vec![];
+ let barrier = Arc::new(Barrier::new(50));
+ for _ in 0..50 {
+ let barrier = barrier.clone();
+ handles.push(std::thread::spawn(move || {
+ barrier.wait();
+ let (server, client) = create_named_pipe().unwrap();
+ // SAFETY: For testing
+ let server = unsafe { File::from_raw_handle(server) };
+ // SAFETY: For testing
+ let client = unsafe { File::from_raw_handle(client) };
+ std::thread::sleep(std::time::Duration::from_millis(100));
+ drop((server, client));
+ }));
+ }
+ for handle in handles.drain(..) {
+ handle.join().unwrap();
+ }
+ }
}