summaryrefslogtreecommitdiff
path: root/cli/ops/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r--cli/ops/io.rs100
1 files changed, 86 insertions, 14 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 2562b4c55..b7f67cea4 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -7,7 +7,9 @@ use deno_core::*;
use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::ready;
+use std::collections::HashMap;
use std::pin::Pin;
+use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Context;
use std::task::Poll;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -56,15 +58,23 @@ pub fn init(i: &mut Isolate, s: &State) {
);
}
-pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
- let stdin = StreamResource::Stdin(tokio::io::stdin(), TTYMetadata::default());
- let stdout = StreamResource::Stdout({
+pub fn get_stdio() -> (
+ StreamResourceHolder,
+ StreamResourceHolder,
+ StreamResourceHolder,
+) {
+ let stdin = StreamResourceHolder::new(StreamResource::Stdin(
+ tokio::io::stdin(),
+ TTYMetadata::default(),
+ ));
+ let stdout = StreamResourceHolder::new(StreamResource::Stdout({
let stdout = STDOUT_HANDLE
.try_clone()
.expect("Unable to clone stdout handle");
tokio::fs::File::from_std(stdout)
- });
- let stderr = StreamResource::Stderr(tokio::io::stderr());
+ }));
+ let stderr =
+ StreamResourceHolder::new(StreamResource::Stderr(tokio::io::stderr()));
(stdin, stdout, stderr)
}
@@ -87,6 +97,51 @@ pub struct FileMetadata {
pub tty: TTYMetadata,
}
+pub struct StreamResourceHolder {
+ pub resource: StreamResource,
+ waker: HashMap<usize, futures::task::AtomicWaker>,
+ waker_counter: AtomicUsize,
+}
+
+impl StreamResourceHolder {
+ pub fn new(resource: StreamResource) -> StreamResourceHolder {
+ StreamResourceHolder {
+ resource,
+ // Atleast one task is expecter for the resource
+ waker: HashMap::with_capacity(1),
+ // Tracks wakers Ids
+ waker_counter: AtomicUsize::new(0),
+ }
+ }
+}
+
+impl Drop for StreamResourceHolder {
+ fn drop(&mut self) {
+ self.wake_tasks();
+ }
+}
+
+impl StreamResourceHolder {
+ pub fn track_task(&mut self, cx: &Context) -> Result<usize, OpError> {
+ let waker = futures::task::AtomicWaker::new();
+ waker.register(cx.waker());
+ // Its OK if it overflows
+ let task_waker_id = self.waker_counter.fetch_add(1, Ordering::Relaxed);
+ self.waker.insert(task_waker_id, waker);
+ Ok(task_waker_id)
+ }
+
+ pub fn wake_tasks(&mut self) {
+ for waker in self.waker.values() {
+ waker.wake();
+ }
+ }
+
+ pub fn untrack_task(&mut self, task_waker_id: usize) {
+ self.waker.remove(&task_waker_id);
+ }
+}
+
pub enum StreamResource {
Stdin(tokio::io::Stdin, TTYMetadata),
Stdout(tokio::fs::File),
@@ -150,10 +205,27 @@ pub fn op_read(
poll_fn(move |cx| {
let resource_table = &mut state.borrow_mut().resource_table;
- let resource = resource_table
- .get_mut::<StreamResource>(rid as u32)
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
- let nread = ready!(resource.poll_read(cx, &mut buf.as_mut()[..]))?;
+
+ let mut task_tracker_id: Option<usize> = None;
+ let nread = match resource_holder
+ .resource
+ .poll_read(cx, &mut buf.as_mut()[..])
+ .map_err(OpError::from)
+ {
+ Poll::Ready(t) => {
+ if let Some(id) = task_tracker_id {
+ resource_holder.untrack_task(id);
+ }
+ t
+ }
+ Poll::Pending => {
+ task_tracker_id.replace(resource_holder.track_task(cx)?);
+ return Poll::Pending;
+ }
+ }?;
Poll::Ready(Ok(nread as i32))
})
.boxed_local()
@@ -233,10 +305,10 @@ pub fn op_write(
async move {
let nwritten = poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
- let resource = resource_table
- .get_mut::<StreamResource>(rid as u32)
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
- resource.poll_write(cx, &buf.as_ref()[..])
+ resource_holder.resource.poll_write(cx, &buf.as_ref()[..])
})
.await?;
@@ -246,10 +318,10 @@ pub fn op_write(
// https://github.com/denoland/deno/issues/3565
poll_fn(|cx| {
let resource_table = &mut state.borrow_mut().resource_table;
- let resource = resource_table
- .get_mut::<StreamResource>(rid as u32)
+ let resource_holder = resource_table
+ .get_mut::<StreamResourceHolder>(rid as u32)
.ok_or_else(OpError::bad_resource_id)?;
- resource.poll_flush(cx)
+ resource_holder.resource.poll_flush(cx)
})
.await?;