diff options
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r-- | cli/ops/io.rs | 197 |
1 files changed, 133 insertions, 64 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 30c933999..11afb1891 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -8,11 +8,16 @@ use deno::ErrBox; use deno::Resource; use deno::*; use futures; -use futures::Future; -use futures::Poll; +use futures::compat::AsyncRead01CompatExt; +use futures::compat::AsyncWrite01CompatExt; +use futures::future::FutureExt; +use futures::io::{AsyncRead, AsyncWrite}; use std; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use tokio; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio_process; use tokio_rustls::client::TlsStream as ClientTlsStream; @@ -80,7 +85,7 @@ pub enum StreamResource { TcpStream(tokio::net::TcpStream), ServerTlsStream(Box<ServerTlsStream<TcpStream>>), ClientTlsStream(Box<ClientTlsStream<TcpStream>>), - HttpBody(HttpBody), + HttpBody(Box<HttpBody>), ChildStdin(tokio_process::ChildStdin), ChildStdout(tokio_process::ChildStdout), ChildStderr(tokio_process::ChildStderr), @@ -91,26 +96,49 @@ impl Resource for StreamResource {} /// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait /// but uses an `ErrBox` error instead of `std::io:Error` pub trait DenoAsyncRead { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>; + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll<Result<usize, ErrBox>>; } impl DenoAsyncRead for StreamResource { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> { - let r = match self { - StreamResource::FsFile(ref mut f) => f.poll_read(buf), - StreamResource::Stdin(ref mut f) => f.poll_read(buf), - StreamResource::TcpStream(ref mut f) => f.poll_read(buf), - StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf), - StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf), - StreamResource::HttpBody(ref mut f) => f.poll_read(buf), - StreamResource::ChildStdout(ref mut f) => f.poll_read(buf), - StreamResource::ChildStderr(ref mut f) => f.poll_read(buf), + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll<Result<usize, ErrBox>> { + let inner = self.get_mut(); + let mut f: Box<dyn AsyncRead + Unpin> = match inner { + StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)), + StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)), + StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)), + StreamResource::ClientTlsStream(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } + StreamResource::ServerTlsStream(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } + StreamResource::HttpBody(f) => Box::new(f), + StreamResource::ChildStdout(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } + StreamResource::ChildStderr(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } _ => { - return Err(bad_resource()); + return Poll::Ready(Err(bad_resource())); } }; - r.map_err(ErrBox::from) + let r = AsyncRead::poll_read(Pin::new(&mut f), cx, buf); + + match r { + Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), + Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), + Poll::Pending => Poll::Pending, + } } } @@ -150,23 +178,31 @@ pub struct Read<T> { impl<T> Future for Read<T> where - T: AsMut<[u8]>, + T: AsMut<[u8]> + Unpin, { - type Item = usize; - type Error = ErrBox; + type Output = Result<i32, ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.io_state == IoState::Done { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + if inner.io_state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = inner.state.lock_resource_table(); let resource = table - .get_mut::<StreamResource>(self.rid) + .get_mut::<StreamResource>(inner.rid) .ok_or_else(bad_resource)?; - let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..])); - self.io_state = IoState::Done; - Ok(nread.into()) + let nread = match DenoAsyncRead::poll_read( + Pin::new(resource), + cx, + &mut inner.buf.as_mut()[..], + ) { + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + inner.io_state = IoState::Done; + Poll::Ready(Ok(nread as i32)) } } @@ -174,49 +210,76 @@ pub fn op_read( state: &ThreadSafeState, rid: i32, zero_copy: Option<PinnedBuf>, -) -> Box<MinimalOp> { +) -> Pin<Box<MinimalOp>> { debug!("read rid={}", rid); let zero_copy = match zero_copy { None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())); + return futures::future::err(deno_error::no_buffer_specified()).boxed() } Some(buf) => buf, }; - let fut = read(state, rid as u32, zero_copy) - .map_err(ErrBox::from) - .and_then(move |nread| Ok(nread as i32)); + let fut = read(state, rid as u32, zero_copy); - Box::new(fut) + fut.boxed() } /// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait /// but uses an `ErrBox` error instead of `std::io:Error` pub trait DenoAsyncWrite { - fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>; - - fn shutdown(&mut self) -> Poll<(), ErrBox>; + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll<Result<usize, ErrBox>>; + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Result<(), ErrBox>>; } impl DenoAsyncWrite for StreamResource { - fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> { - let r = match self { - StreamResource::FsFile(ref mut f) => f.poll_write(buf), - StreamResource::Stdout(ref mut f) => f.poll_write(buf), - StreamResource::Stderr(ref mut f) => f.poll_write(buf), - StreamResource::TcpStream(ref mut f) => f.poll_write(buf), - StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf), - StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf), - StreamResource::ChildStdin(ref mut f) => f.poll_write(buf), + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll<Result<usize, ErrBox>> { + let inner = self.get_mut(); + let mut f: Box<dyn AsyncWrite + Unpin> = match inner { + StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)), + StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)), + StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)), + StreamResource::TcpStream(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } + StreamResource::ClientTlsStream(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } + StreamResource::ServerTlsStream(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } + StreamResource::ChildStdin(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } _ => { - return Err(bad_resource()); + return Poll::Ready(Err(bad_resource())); } }; - r.map_err(ErrBox::from) + let r = AsyncWrite::poll_write(Pin::new(&mut f), cx, buf); + + match r { + Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), + Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), + Poll::Pending => Poll::Pending, + } } - fn shutdown(&mut self) -> futures::Poll<(), ErrBox> { + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll<Result<(), ErrBox>> { unimplemented!() } } @@ -250,23 +313,31 @@ where /// that error type is `ErrBox` instead of `std::io::Error`. impl<T> Future for Write<T> where - T: AsRef<[u8]>, + T: AsRef<[u8]> + Unpin, { - type Item = usize; - type Error = ErrBox; + type Output = Result<i32, ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.io_state == IoState::Done { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + if inner.io_state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = inner.state.lock_resource_table(); let resource = table - .get_mut::<StreamResource>(self.rid) + .get_mut::<StreamResource>(inner.rid) .ok_or_else(bad_resource)?; - let nwritten = try_ready!(resource.poll_write(self.buf.as_ref())); - self.io_state = IoState::Done; - Ok(nwritten.into()) + let nwritten = match DenoAsyncWrite::poll_write( + Pin::new(resource), + cx, + inner.buf.as_ref(), + ) { + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + inner.io_state = IoState::Done; + Poll::Ready(Ok(nwritten as i32)) } } @@ -274,18 +345,16 @@ pub fn op_write( state: &ThreadSafeState, rid: i32, zero_copy: Option<PinnedBuf>, -) -> Box<MinimalOp> { +) -> Pin<Box<MinimalOp>> { debug!("write rid={}", rid); let zero_copy = match zero_copy { None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())); + return futures::future::err(deno_error::no_buffer_specified()).boxed() } Some(buf) => buf, }; - let fut = write(state, rid as u32, zero_copy) - .map_err(ErrBox::from) - .and_then(move |nwritten| Ok(nwritten as i32)); + let fut = write(state, rid as u32, zero_copy); - Box::new(fut) + fut.boxed() } |