diff options
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r-- | cli/ops/io.rs | 158 |
1 files changed, 45 insertions, 113 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs index ad3949a03..2c3727642 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -4,9 +4,9 @@ use crate::op_error::OpError; use crate::ops::minimal_op; use crate::state::State; use deno_core::*; +use futures::future::poll_fn; use futures::future::FutureExt; use futures::ready; -use std::future::Future; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -69,6 +69,10 @@ pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) { (stdin, stdout, stderr) } +fn no_buffer_specified() -> OpError { + OpError::type_error("no buffer specified".to_string()) +} + #[cfg(unix)] use nix::sys::termios; @@ -131,51 +135,6 @@ impl DenoAsyncRead for StreamResource { } } -#[derive(Debug, PartialEq)] -enum IoState { - Pending, - Flush, - Done, -} - -/// A future which can be used to easily read available number of bytes to fill -/// a buffer. -/// -/// Created by the [`read`] function. -pub struct Read<T> { - rid: ResourceId, - buf: T, - io_state: IoState, - state: State, -} - -impl<T> Future for Read<T> -where - T: AsMut<[u8]> + Unpin, -{ - type Output = Result<i32, OpError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - if inner.io_state == IoState::Done { - panic!("poll a Read after it's done"); - } - - let mut state = inner.state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<StreamResource>(inner.rid) - .ok_or_else(OpError::bad_resource)?; - let nread = ready!(resource.poll_read(cx, &mut inner.buf.as_mut()[..]))?; - inner.io_state = IoState::Done; - Poll::Ready(Ok(nread as i32)) - } -} - -fn no_buffer_specified() -> OpError { - OpError::type_error("no buffer specified".to_string()) -} - pub fn op_read( state: &State, rid: i32, @@ -185,13 +144,18 @@ pub fn op_read( if zero_copy.is_none() { return futures::future::err(no_buffer_specified()).boxed_local(); } - // TODO(ry) Probably poll_fn can be used here and the Read struct eliminated. - Read { - rid: rid as u32, - buf: zero_copy.unwrap(), - io_state: IoState::Pending, - state: state.clone(), - } + + let state = state.clone(); + let mut buf = zero_copy.unwrap(); + + poll_fn(move |cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let resource = resource_table + .get_mut::<StreamResource>(rid as u32) + .ok_or_else(OpError::bad_resource)?; + let nread = ready!(resource.poll_read(cx, &mut buf.as_mut()[..]))?; + Poll::Ready(Ok(nread as i32)) + }) .boxed_local() } @@ -253,59 +217,6 @@ impl DenoAsyncWrite for StreamResource { } } -/// A future used to write some data to a stream. -pub struct Write<T> { - rid: ResourceId, - buf: T, - io_state: IoState, - state: State, - nwritten: i32, -} - -/// This is almost the same implementation as in tokio, difference is -/// that error type is `OpError` instead of `std::io::Error`. -impl<T> Future for Write<T> -where - T: AsRef<[u8]> + Unpin, -{ - type Output = Result<i32, OpError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - if inner.io_state == IoState::Done { - panic!("poll a Read after it's done"); - } - - if inner.io_state == IoState::Pending { - let mut state = inner.state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<StreamResource>(inner.rid) - .ok_or_else(OpError::bad_resource)?; - - let nwritten = ready!(resource.poll_write(cx, inner.buf.as_ref()))?; - inner.io_state = IoState::Flush; - inner.nwritten = nwritten as i32; - } - - // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2 - // and the reasons for the need to explicitly flush are not fully known. - // Figure out why it's needed and preferably remove it. - // https://github.com/denoland/deno/issues/3565 - if inner.io_state == IoState::Flush { - let mut state = inner.state.borrow_mut(); - let resource = state - .resource_table - .get_mut::<StreamResource>(inner.rid) - .ok_or_else(OpError::bad_resource)?; - ready!(resource.poll_flush(cx))?; - inner.io_state = IoState::Done; - } - - Poll::Ready(Ok(inner.nwritten)) - } -} - pub fn op_write( state: &State, rid: i32, @@ -315,13 +226,34 @@ pub fn op_write( if zero_copy.is_none() { return futures::future::err(no_buffer_specified()).boxed_local(); } - // TODO(ry) Probably poll_fn can be used here and the Write struct eliminated. - Write { - rid: rid as u32, - buf: zero_copy.unwrap(), - io_state: IoState::Pending, - state: state.clone(), - nwritten: 0, + + let state = state.clone(); + let buf = zero_copy.unwrap(); + + async move { + let nwritten = poll_fn(|cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let resource = resource_table + .get_mut::<StreamResource>(rid as u32) + .ok_or_else(OpError::bad_resource)?; + resource.poll_write(cx, &buf.as_ref()[..]) + }) + .await?; + + // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2 + // and the reasons for the need to explicitly flush are not fully known. + // Figure out why it's needed and preferably remove it. + // https://github.com/denoland/deno/issues/3565 + poll_fn(|cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let resource = resource_table + .get_mut::<StreamResource>(rid as u32) + .ok_or_else(OpError::bad_resource)?; + resource.poll_flush(cx) + }) + .await?; + + Ok(nwritten as i32) } .boxed_local() } |