diff options
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r-- | cli/ops/io.rs | 100 |
1 files changed, 86 insertions, 14 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 2562b4c55..b7f67cea4 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -7,7 +7,9 @@ use deno_core::*; use futures::future::poll_fn; use futures::future::FutureExt; use futures::ready; +use std::collections::HashMap; use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::Context; use std::task::Poll; use tokio::io::{AsyncRead, AsyncWrite}; @@ -56,15 +58,23 @@ pub fn init(i: &mut Isolate, s: &State) { ); } -pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) { - let stdin = StreamResource::Stdin(tokio::io::stdin(), TTYMetadata::default()); - let stdout = StreamResource::Stdout({ +pub fn get_stdio() -> ( + StreamResourceHolder, + StreamResourceHolder, + StreamResourceHolder, +) { + let stdin = StreamResourceHolder::new(StreamResource::Stdin( + tokio::io::stdin(), + TTYMetadata::default(), + )); + let stdout = StreamResourceHolder::new(StreamResource::Stdout({ let stdout = STDOUT_HANDLE .try_clone() .expect("Unable to clone stdout handle"); tokio::fs::File::from_std(stdout) - }); - let stderr = StreamResource::Stderr(tokio::io::stderr()); + })); + let stderr = + StreamResourceHolder::new(StreamResource::Stderr(tokio::io::stderr())); (stdin, stdout, stderr) } @@ -87,6 +97,51 @@ pub struct FileMetadata { pub tty: TTYMetadata, } +pub struct StreamResourceHolder { + pub resource: StreamResource, + waker: HashMap<usize, futures::task::AtomicWaker>, + waker_counter: AtomicUsize, +} + +impl StreamResourceHolder { + pub fn new(resource: StreamResource) -> StreamResourceHolder { + StreamResourceHolder { + resource, + // Atleast one task is expecter for the resource + waker: HashMap::with_capacity(1), + // Tracks wakers Ids + waker_counter: AtomicUsize::new(0), + } + } +} + +impl Drop for StreamResourceHolder { + fn drop(&mut self) { + self.wake_tasks(); + } +} + +impl StreamResourceHolder { + pub fn track_task(&mut self, cx: &Context) -> Result<usize, OpError> { + let waker = futures::task::AtomicWaker::new(); + waker.register(cx.waker()); + // Its OK if it overflows + let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed); + self.waker.insert(task_waker_id, waker); + Ok(task_waker_id) + } + + pub fn wake_tasks(&mut self) { + for waker in self.waker.values() { + waker.wake(); + } + } + + pub fn untrack_task(&mut self, task_waker_id: usize) { + self.waker.remove(&task_waker_id); + } +} + pub enum StreamResource { Stdin(tokio::io::Stdin, TTYMetadata), Stdout(tokio::fs::File), @@ -150,10 +205,27 @@ pub fn op_read( poll_fn(move |cx| { let resource_table = &mut state.borrow_mut().resource_table; - let resource = resource_table - .get_mut::<StreamResource>(rid as u32) + let resource_holder = resource_table + .get_mut::<StreamResourceHolder>(rid as u32) .ok_or_else(OpError::bad_resource_id)?; - let nread = ready!(resource.poll_read(cx, &mut buf.as_mut()[..]))?; + + let mut task_tracker_id: Option<usize> = None; + let nread = match resource_holder + .resource + .poll_read(cx, &mut buf.as_mut()[..]) + .map_err(OpError::from) + { + Poll::Ready(t) => { + if let Some(id) = task_tracker_id { + resource_holder.untrack_task(id); + } + t + } + Poll::Pending => { + task_tracker_id.replace(resource_holder.track_task(cx)?); + return Poll::Pending; + } + }?; Poll::Ready(Ok(nread as i32)) }) .boxed_local() @@ -233,10 +305,10 @@ pub fn op_write( async move { let nwritten = poll_fn(|cx| { let resource_table = &mut state.borrow_mut().resource_table; - let resource = resource_table - .get_mut::<StreamResource>(rid as u32) + let resource_holder = resource_table + .get_mut::<StreamResourceHolder>(rid as u32) .ok_or_else(OpError::bad_resource_id)?; - resource.poll_write(cx, &buf.as_ref()[..]) + resource_holder.resource.poll_write(cx, &buf.as_ref()[..]) }) .await?; @@ -246,10 +318,10 @@ pub fn op_write( // https://github.com/denoland/deno/issues/3565 poll_fn(|cx| { let resource_table = &mut state.borrow_mut().resource_table; - let resource = resource_table - .get_mut::<StreamResource>(rid as u32) + let resource_holder = resource_table + .get_mut::<StreamResourceHolder>(rid as u32) .ok_or_else(OpError::bad_resource_id)?; - resource.poll_flush(cx) + resource_holder.resource.poll_flush(cx) }) .await?; |