summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--ext/io/Cargo.toml1
-rw-r--r--ext/io/lib.rs150
-rw-r--r--runtime/ops/tty.rs81
-rw-r--r--tests/integration/run_tests.rs14
-rw-r--r--tests/testdata/run/process_stdin_unblock.mjs21
6 files changed, 251 insertions, 17 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 41679e239..37a3fbcbd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1555,6 +1555,7 @@ dependencies = [
"log",
"once_cell",
"os_pipe",
+ "parking_lot 0.12.1",
"rand",
"tokio",
"winapi",
diff --git a/ext/io/Cargo.toml b/ext/io/Cargo.toml
index be4cf7362..573c41aa6 100644
--- a/ext/io/Cargo.toml
+++ b/ext/io/Cargo.toml
@@ -28,3 +28,4 @@ os_pipe.workspace = true
[target.'cfg(windows)'.dependencies]
winapi = { workspace = true, features = ["winbase", "processenv", "errhandlingapi"] }
rand.workspace = true
+parking_lot.workspace = true
diff --git a/ext/io/lib.rs b/ext/io/lib.rs
index 04ed58ef2..a2f14e0db 100644
--- a/ext/io/lib.rs
+++ b/ext/io/lib.rs
@@ -48,6 +48,13 @@ use winapi::um::processenv::GetStdHandle;
#[cfg(windows)]
use winapi::um::winbase;
+#[cfg(windows)]
+use parking_lot::Condvar;
+#[cfg(windows)]
+use parking_lot::Mutex;
+#[cfg(windows)]
+use std::sync::Arc;
+
pub mod fs;
mod pipe;
#[cfg(windows)]
@@ -106,12 +113,21 @@ deno_core::extension!(deno_io,
},
state = |state, options| {
if let Some(stdio) = options.stdio {
+ #[cfg(windows)]
+ let stdin_state = {
+ let st = Arc::new(Mutex::new(WinTtyState::default()));
+ state.put(st.clone());
+ st
+ };
+ #[cfg(unix)]
+ let stdin_state = ();
+
let t = &mut state.resource_table;
let rid = t.add(fs::FileResource::new(
Rc::new(match stdio.stdin.pipe {
StdioPipeInner::Inherit => StdFileResourceInner::new(
- StdFileResourceKind::Stdin,
+ StdFileResourceKind::Stdin(stdin_state),
STDIN_HANDLE.try_clone().unwrap(),
),
StdioPipeInner::File(pipe) => StdFileResourceInner::file(pipe),
@@ -317,14 +333,27 @@ impl Resource for ChildStderrResource {
}
}
-#[derive(Clone, Copy)]
+#[cfg(windows)]
+#[derive(Default)]
+pub struct WinTtyState {
+ pub cancelled: bool,
+ pub reading: bool,
+ pub screen_buffer_info:
+ Option<winapi::um::wincon::CONSOLE_SCREEN_BUFFER_INFO>,
+ pub cvar: Arc<Condvar>,
+}
+
+#[derive(Clone)]
enum StdFileResourceKind {
File,
// For stdout and stderr, we sometimes instead use std::io::stdout() directly,
// because we get some Windows specific functionality for free by using Rust
// std's wrappers. So we take a bit of a complexity hit in order to not
// have to duplicate the functionality in Rust's std/src/sys/windows/stdio.rs
- Stdin,
+ #[cfg(windows)]
+ Stdin(Arc<Mutex<WinTtyState>>),
+ #[cfg(not(windows))]
+ Stdin(()),
Stdout,
Stderr,
}
@@ -422,6 +451,84 @@ impl StdFileResourceInner {
spawn_blocking(action).await.unwrap()
}
}
+
+ #[cfg(windows)]
+ async fn handle_stdin_read(
+ &self,
+ state: Arc<Mutex<WinTtyState>>,
+ mut buf: BufMutView,
+ ) -> FsResult<(usize, BufMutView)> {
+ loop {
+ let state = state.clone();
+
+ let fut = self.with_inner_blocking_task(move |file| {
+ /* Start reading, and set the reading flag to true */
+ state.lock().reading = true;
+ let nread = match file.read(&mut buf) {
+ Ok(nread) => nread,
+ Err(e) => return Err((e.into(), buf)),
+ };
+
+ let mut state = state.lock();
+ state.reading = false;
+
+ /* If we canceled the read by sending a VK_RETURN event, restore
+ the screen state to undo the visual effect of the VK_RETURN event */
+ if state.cancelled {
+ if let Some(screen_buffer_info) = state.screen_buffer_info {
+ // SAFETY: WinAPI calls to open conout$ and restore visual state.
+ unsafe {
+ let handle = winapi::um::fileapi::CreateFileW(
+ "conout$"
+ .encode_utf16()
+ .chain(Some(0))
+ .collect::<Vec<_>>()
+ .as_ptr(),
+ winapi::um::winnt::GENERIC_READ
+ | winapi::um::winnt::GENERIC_WRITE,
+ winapi::um::winnt::FILE_SHARE_READ
+ | winapi::um::winnt::FILE_SHARE_WRITE,
+ std::ptr::null_mut(),
+ winapi::um::fileapi::OPEN_EXISTING,
+ 0,
+ std::ptr::null_mut(),
+ );
+
+ let mut pos = screen_buffer_info.dwCursorPosition;
+ /* If the cursor was at the bottom line of the screen buffer, the
+ VK_RETURN would have caused the buffer contents to scroll up by
+ one line. The right position to reset the cursor to is therefore one
+ line higher */
+ if pos.Y == screen_buffer_info.dwSize.Y - 1 {
+ pos.Y -= 1;
+ }
+
+ winapi::um::wincon::SetConsoleCursorPosition(handle, pos);
+ winapi::um::handleapi::CloseHandle(handle);
+ }
+ }
+
+ /* Reset the cancelled flag */
+ state.cancelled = false;
+
+ /* Unblock the main thread */
+ state.cvar.notify_one();
+
+ return Err((FsError::FileBusy, buf));
+ }
+
+ Ok((nread, buf))
+ });
+
+ match fut.await {
+ Err((FsError::FileBusy, b)) => {
+ buf = b;
+ continue;
+ }
+ other => return other.map_err(|(e, _)| e),
+ }
+ }
+ }
}
#[async_trait::async_trait(?Send)]
@@ -435,7 +542,7 @@ impl crate::fs::File for StdFileResourceInner {
// std/src/sys/windows/stdio.rs in Rust's source code).
match self.kind {
StdFileResourceKind::File => self.with_sync(|file| Ok(file.write(buf)?)),
- StdFileResourceKind::Stdin => {
+ StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
@@ -457,7 +564,7 @@ impl crate::fs::File for StdFileResourceInner {
fn read_sync(self: Rc<Self>, buf: &mut [u8]) -> FsResult<usize> {
match self.kind {
- StdFileResourceKind::File | StdFileResourceKind::Stdin => {
+ StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => {
self.with_sync(|file| Ok(file.read(buf)?))
}
StdFileResourceKind::Stdout | StdFileResourceKind::Stderr => {
@@ -471,7 +578,7 @@ impl crate::fs::File for StdFileResourceInner {
StdFileResourceKind::File => {
self.with_sync(|file| Ok(file.write_all(buf)?))
}
- StdFileResourceKind::Stdin => {
+ StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
@@ -497,7 +604,7 @@ impl crate::fs::File for StdFileResourceInner {
.with_inner_blocking_task(move |file| Ok(file.write_all(&buf)?))
.await
}
- StdFileResourceKind::Stdin => {
+ StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
@@ -538,7 +645,7 @@ impl crate::fs::File for StdFileResourceInner {
})
.await
}
- StdFileResourceKind::Stdin => {
+ StdFileResourceKind::Stdin(_) => {
Err(Into::<std::io::Error>::into(ErrorKind::Unsupported).into())
}
StdFileResourceKind::Stdout => {
@@ -568,7 +675,7 @@ impl crate::fs::File for StdFileResourceInner {
fn read_all_sync(self: Rc<Self>) -> FsResult<Vec<u8>> {
match self.kind {
- StdFileResourceKind::File | StdFileResourceKind::Stdin => {
+ StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => {
let mut buf = Vec::new();
self.with_sync(|file| Ok(file.read_to_end(&mut buf)?))?;
Ok(buf)
@@ -580,7 +687,7 @@ impl crate::fs::File for StdFileResourceInner {
}
async fn read_all_async(self: Rc<Self>) -> FsResult<Vec<u8>> {
match self.kind {
- StdFileResourceKind::File | StdFileResourceKind::Stdin => {
+ StdFileResourceKind::File | StdFileResourceKind::Stdin(_) => {
self
.with_inner_blocking_task(|file| {
let mut buf = Vec::new();
@@ -736,19 +843,28 @@ impl crate::fs::File for StdFileResourceInner {
self: Rc<Self>,
mut buf: BufMutView,
) -> FsResult<(usize, BufMutView)> {
- self
- .with_inner_blocking_task(|file| {
- let nread = file.read(&mut buf)?;
- Ok((nread, buf))
- })
- .await
+ match &self.kind {
+ /* On Windows, we need to handle special read cancellation logic for stdin */
+ #[cfg(windows)]
+ StdFileResourceKind::Stdin(state) => {
+ self.handle_stdin_read(state.clone(), buf).await
+ }
+ _ => {
+ self
+ .with_inner_blocking_task(|file| {
+ let nread = file.read(&mut buf)?;
+ Ok((nread, buf))
+ })
+ .await
+ }
+ }
}
fn try_clone_inner(self: Rc<Self>) -> FsResult<Rc<dyn fs::File>> {
let inner: &Option<_> = &self.cell.borrow();
match inner {
Some(inner) => Ok(Rc::new(StdFileResourceInner {
- kind: self.kind,
+ kind: self.kind.clone(),
cell: RefCell::new(Some(inner.try_clone()?)),
cell_async_task_queue: Default::default(),
handle: self.handle,
diff --git a/runtime/ops/tty.rs b/runtime/ops/tty.rs
index 3d721734c..be22bdd2a 100644
--- a/runtime/ops/tty.rs
+++ b/runtime/ops/tty.rs
@@ -13,6 +13,13 @@ use rustyline::KeyCode;
use rustyline::KeyEvent;
use rustyline::Modifiers;
+#[cfg(windows)]
+use deno_core::parking_lot::Mutex;
+#[cfg(windows)]
+use deno_io::WinTtyState;
+#[cfg(windows)]
+use std::sync::Arc;
+
#[cfg(unix)]
use deno_core::ResourceId;
#[cfg(unix)]
@@ -94,6 +101,7 @@ fn op_set_raw(
#[cfg(windows)]
{
use winapi::shared::minwindef::FALSE;
+
use winapi::um::consoleapi;
let handle = handle_or_fd;
@@ -116,6 +124,79 @@ fn op_set_raw(
mode_raw_input_off(original_mode)
};
+ let stdin_state = state.borrow::<Arc<Mutex<WinTtyState>>>();
+ let mut stdin_state = stdin_state.lock();
+
+ if stdin_state.reading {
+ let cvar = stdin_state.cvar.clone();
+
+ /* Trick to unblock an ongoing line-buffered read operation if not already pending.
+ See https://github.com/libuv/libuv/pull/866 for prior art */
+ if original_mode & COOKED_MODE != 0 && !stdin_state.cancelled {
+ // SAFETY: Write enter key event to force the console wait to return.
+ let record = unsafe {
+ let mut record: wincon::INPUT_RECORD = std::mem::zeroed();
+ record.EventType = wincon::KEY_EVENT;
+ record.Event.KeyEvent_mut().wVirtualKeyCode =
+ winapi::um::winuser::VK_RETURN as u16;
+ record.Event.KeyEvent_mut().bKeyDown = 1;
+ record.Event.KeyEvent_mut().wRepeatCount = 1;
+ *record.Event.KeyEvent_mut().uChar.UnicodeChar_mut() = '\r' as u16;
+ record.Event.KeyEvent_mut().dwControlKeyState = 0;
+ record.Event.KeyEvent_mut().wVirtualScanCode =
+ winapi::um::winuser::MapVirtualKeyW(
+ winapi::um::winuser::VK_RETURN as u32,
+ winapi::um::winuser::MAPVK_VK_TO_VSC,
+ ) as u16;
+ record
+ };
+ stdin_state.cancelled = true;
+
+ // SAFETY: winapi call to open conout$ and save screen state.
+ let active_screen_buffer = unsafe {
+ /* Save screen state before sending the VK_RETURN event */
+ let handle = winapi::um::fileapi::CreateFileW(
+ "conout$"
+ .encode_utf16()
+ .chain(Some(0))
+ .collect::<Vec<_>>()
+ .as_ptr(),
+ winapi::um::winnt::GENERIC_READ | winapi::um::winnt::GENERIC_WRITE,
+ winapi::um::winnt::FILE_SHARE_READ
+ | winapi::um::winnt::FILE_SHARE_WRITE,
+ std::ptr::null_mut(),
+ winapi::um::fileapi::OPEN_EXISTING,
+ 0,
+ std::ptr::null_mut(),
+ );
+
+ let mut active_screen_buffer = std::mem::zeroed();
+ winapi::um::wincon::GetConsoleScreenBufferInfo(
+ handle,
+ &mut active_screen_buffer,
+ );
+ winapi::um::handleapi::CloseHandle(handle);
+ active_screen_buffer
+ };
+ stdin_state.screen_buffer_info = Some(active_screen_buffer);
+
+ // SAFETY: winapi call to write the VK_RETURN event.
+ if unsafe {
+ winapi::um::wincon::WriteConsoleInputW(handle, &record, 1, &mut 0)
+ } == FALSE
+ {
+ return Err(Error::last_os_error().into());
+ }
+
+ /* Wait for read thread to acknowledge the cancellation to ensure that nothing
+ interferes with the screen state.
+ NOTE: `wait_while` automatically unlocks stdin_state */
+ cvar.wait_while(&mut stdin_state, |state: &mut WinTtyState| {
+ state.cancelled
+ });
+ }
+ }
+
// SAFETY: winapi call
if unsafe { consoleapi::SetConsoleMode(handle, new_mode) } == FALSE {
return Err(Error::last_os_error().into());
diff --git a/tests/integration/run_tests.rs b/tests/integration/run_tests.rs
index 91370a87c..f7aaa9daf 100644
--- a/tests/integration/run_tests.rs
+++ b/tests/integration/run_tests.rs
@@ -3142,6 +3142,20 @@ itest!(byte_order_mark {
});
#[test]
+#[cfg(windows)]
+fn process_stdin_read_unblock() {
+ TestContext::default()
+ .new_command()
+ .args_vec(["run", "run/process_stdin_unblock.mjs"])
+ .with_pty(|mut console| {
+ console.write_raw("b");
+ console.human_delay();
+ console.write_line_raw("s");
+ console.expect_all(&["1", "1"]);
+ });
+}
+
+#[test]
fn issue9750() {
TestContext::default()
.new_command()
diff --git a/tests/testdata/run/process_stdin_unblock.mjs b/tests/testdata/run/process_stdin_unblock.mjs
new file mode 100644
index 000000000..bbeea2afb
--- /dev/null
+++ b/tests/testdata/run/process_stdin_unblock.mjs
@@ -0,0 +1,21 @@
+import process from "node:process";
+
+function prompt() {
+ process.stdin.setRawMode(true);
+
+ const { promise, resolve } = Promise.withResolvers();
+
+ const onData = (buf) => {
+ process.stdin.setRawMode(false);
+ process.stdin.removeListener("data", onData);
+ console.log(buf.length);
+ resolve();
+ };
+
+ process.stdin.on("data", onData);
+ return promise;
+}
+
+await prompt();
+await prompt();
+Deno.exit(0);