summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/ops/io.rs158
1 files changed, 45 insertions, 113 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index ad3949a03..2c3727642 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -4,9 +4,9 @@ use crate::op_error::OpError;
use crate::ops::minimal_op;
use crate::state::State;
use deno_core::*;
+use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::ready;
-use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
@@ -69,6 +69,10 @@ pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
(stdin, stdout, stderr)
}
+fn no_buffer_specified() -> OpError {
+ OpError::type_error("no buffer specified".to_string())
+}
+
#[cfg(unix)]
use nix::sys::termios;
@@ -131,51 +135,6 @@ impl DenoAsyncRead for StreamResource {
}
}
-#[derive(Debug, PartialEq)]
-enum IoState {
- Pending,
- Flush,
- Done,
-}
-
-/// A future which can be used to easily read available number of bytes to fill
-/// a buffer.
-///
-/// Created by the [`read`] function.
-pub struct Read<T> {
- rid: ResourceId,
- buf: T,
- io_state: IoState,
- state: State,
-}
-
-impl<T> Future for Read<T>
-where
- T: AsMut<[u8]> + Unpin,
-{
- type Output = Result<i32, OpError>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- let inner = self.get_mut();
- if inner.io_state == IoState::Done {
- panic!("poll a Read after it's done");
- }
-
- let mut state = inner.state.borrow_mut();
- let resource = state
- .resource_table
- .get_mut::<StreamResource>(inner.rid)
- .ok_or_else(OpError::bad_resource)?;
- let nread = ready!(resource.poll_read(cx, &mut inner.buf.as_mut()[..]))?;
- inner.io_state = IoState::Done;
- Poll::Ready(Ok(nread as i32))
- }
-}
-
-fn no_buffer_specified() -> OpError {
- OpError::type_error("no buffer specified".to_string())
-}
-
pub fn op_read(
state: &State,
rid: i32,
@@ -185,13 +144,18 @@ pub fn op_read(
if zero_copy.is_none() {
return futures::future::err(no_buffer_specified()).boxed_local();
}
- // TODO(ry) Probably poll_fn can be used here and the Read struct eliminated.
- Read {
- rid: rid as u32,
- buf: zero_copy.unwrap(),
- io_state: IoState::Pending,
- state: state.clone(),
- }
+
+ let state = state.clone();
+ let mut buf = zero_copy.unwrap();
+
+ poll_fn(move |cx| {
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let resource = resource_table
+ .get_mut::<StreamResource>(rid as u32)
+ .ok_or_else(OpError::bad_resource)?;
+ let nread = ready!(resource.poll_read(cx, &mut buf.as_mut()[..]))?;
+ Poll::Ready(Ok(nread as i32))
+ })
.boxed_local()
}
@@ -253,59 +217,6 @@ impl DenoAsyncWrite for StreamResource {
}
}
-/// A future used to write some data to a stream.
-pub struct Write<T> {
- rid: ResourceId,
- buf: T,
- io_state: IoState,
- state: State,
- nwritten: i32,
-}
-
-/// This is almost the same implementation as in tokio, difference is
-/// that error type is `OpError` instead of `std::io::Error`.
-impl<T> Future for Write<T>
-where
- T: AsRef<[u8]> + Unpin,
-{
- type Output = Result<i32, OpError>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- let inner = self.get_mut();
- if inner.io_state == IoState::Done {
- panic!("poll a Read after it's done");
- }
-
- if inner.io_state == IoState::Pending {
- let mut state = inner.state.borrow_mut();
- let resource = state
- .resource_table
- .get_mut::<StreamResource>(inner.rid)
- .ok_or_else(OpError::bad_resource)?;
-
- let nwritten = ready!(resource.poll_write(cx, inner.buf.as_ref()))?;
- inner.io_state = IoState::Flush;
- inner.nwritten = nwritten as i32;
- }
-
- // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
- // and the reasons for the need to explicitly flush are not fully known.
- // Figure out why it's needed and preferably remove it.
- // https://github.com/denoland/deno/issues/3565
- if inner.io_state == IoState::Flush {
- let mut state = inner.state.borrow_mut();
- let resource = state
- .resource_table
- .get_mut::<StreamResource>(inner.rid)
- .ok_or_else(OpError::bad_resource)?;
- ready!(resource.poll_flush(cx))?;
- inner.io_state = IoState::Done;
- }
-
- Poll::Ready(Ok(inner.nwritten))
- }
-}
-
pub fn op_write(
state: &State,
rid: i32,
@@ -315,13 +226,34 @@ pub fn op_write(
if zero_copy.is_none() {
return futures::future::err(no_buffer_specified()).boxed_local();
}
- // TODO(ry) Probably poll_fn can be used here and the Write struct eliminated.
- Write {
- rid: rid as u32,
- buf: zero_copy.unwrap(),
- io_state: IoState::Pending,
- state: state.clone(),
- nwritten: 0,
+
+ let state = state.clone();
+ let buf = zero_copy.unwrap();
+
+ async move {
+ let nwritten = poll_fn(|cx| {
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let resource = resource_table
+ .get_mut::<StreamResource>(rid as u32)
+ .ok_or_else(OpError::bad_resource)?;
+ resource.poll_write(cx, &buf.as_ref()[..])
+ })
+ .await?;
+
+ // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
+ // and the reasons for the need to explicitly flush are not fully known.
+ // Figure out why it's needed and preferably remove it.
+ // https://github.com/denoland/deno/issues/3565
+ poll_fn(|cx| {
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let resource = resource_table
+ .get_mut::<StreamResource>(rid as u32)
+ .ok_or_else(OpError::bad_resource)?;
+ resource.poll_flush(cx)
+ })
+ .await?;
+
+ Ok(nwritten as i32)
}
.boxed_local()
}