From 8f9a942cb911ed017eb128e9fbeb6f9a48e69601 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sun, 17 Nov 2019 01:17:47 +0100 Subject: Use futures 0.3 API (#3358) --- cli/ops/io.rs | 197 +++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 133 insertions(+), 64 deletions(-) (limited to 'cli/ops/io.rs') diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 30c933999..11afb1891 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -8,11 +8,16 @@ use deno::ErrBox; use deno::Resource; use deno::*; use futures; -use futures::Future; -use futures::Poll; +use futures::compat::AsyncRead01CompatExt; +use futures::compat::AsyncWrite01CompatExt; +use futures::future::FutureExt; +use futures::io::{AsyncRead, AsyncWrite}; use std; +use std::future::Future; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use tokio; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio_process; use tokio_rustls::client::TlsStream as ClientTlsStream; @@ -80,7 +85,7 @@ pub enum StreamResource { TcpStream(tokio::net::TcpStream), ServerTlsStream(Box>), ClientTlsStream(Box>), - HttpBody(HttpBody), + HttpBody(Box), ChildStdin(tokio_process::ChildStdin), ChildStdout(tokio_process::ChildStdout), ChildStderr(tokio_process::ChildStderr), @@ -91,26 +96,49 @@ impl Resource for StreamResource {} /// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait /// but uses an `ErrBox` error instead of `std::io:Error` pub trait DenoAsyncRead { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll; + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll>; } impl DenoAsyncRead for StreamResource { - fn poll_read(&mut self, buf: &mut [u8]) -> Poll { - let r = match self { - StreamResource::FsFile(ref mut f) => f.poll_read(buf), - StreamResource::Stdin(ref mut f) => f.poll_read(buf), - StreamResource::TcpStream(ref mut f) => f.poll_read(buf), - StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf), - StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf), - StreamResource::HttpBody(ref mut f) => f.poll_read(buf), - StreamResource::ChildStdout(ref mut f) => f.poll_read(buf), - StreamResource::ChildStderr(ref mut f) => f.poll_read(buf), + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let inner = self.get_mut(); + let mut f: Box = match inner { + StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)), + StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)), + StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)), + StreamResource::ClientTlsStream(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } + StreamResource::ServerTlsStream(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } + StreamResource::HttpBody(f) => Box::new(f), + StreamResource::ChildStdout(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } + StreamResource::ChildStderr(f) => { + Box::new(AsyncRead01CompatExt::compat(f)) + } _ => { - return Err(bad_resource()); + return Poll::Ready(Err(bad_resource())); } }; - r.map_err(ErrBox::from) + let r = AsyncRead::poll_read(Pin::new(&mut f), cx, buf); + + match r { + Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), + Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), + Poll::Pending => Poll::Pending, + } } } @@ -150,23 +178,31 @@ pub struct Read { impl Future for Read where - T: AsMut<[u8]>, + T: AsMut<[u8]> + Unpin, { - type Item = usize; - type Error = ErrBox; + type Output = Result; - fn poll(&mut self) -> Poll { - if self.io_state == IoState::Done { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + if inner.io_state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = inner.state.lock_resource_table(); let resource = table - .get_mut::(self.rid) + .get_mut::(inner.rid) .ok_or_else(bad_resource)?; - let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..])); - self.io_state = IoState::Done; - Ok(nread.into()) + let nread = match DenoAsyncRead::poll_read( + Pin::new(resource), + cx, + &mut inner.buf.as_mut()[..], + ) { + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + inner.io_state = IoState::Done; + Poll::Ready(Ok(nread as i32)) } } @@ -174,49 +210,76 @@ pub fn op_read( state: &ThreadSafeState, rid: i32, zero_copy: Option, -) -> Box { +) -> Pin> { debug!("read rid={}", rid); let zero_copy = match zero_copy { None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())); + return futures::future::err(deno_error::no_buffer_specified()).boxed() } Some(buf) => buf, }; - let fut = read(state, rid as u32, zero_copy) - .map_err(ErrBox::from) - .and_then(move |nread| Ok(nread as i32)); + let fut = read(state, rid as u32, zero_copy); - Box::new(fut) + fut.boxed() } /// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait /// but uses an `ErrBox` error instead of `std::io:Error` pub trait DenoAsyncWrite { - fn poll_write(&mut self, buf: &[u8]) -> Poll; - - fn shutdown(&mut self) -> Poll<(), ErrBox>; + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll>; + + fn poll_close( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll>; } impl DenoAsyncWrite for StreamResource { - fn poll_write(&mut self, buf: &[u8]) -> Poll { - let r = match self { - StreamResource::FsFile(ref mut f) => f.poll_write(buf), - StreamResource::Stdout(ref mut f) => f.poll_write(buf), - StreamResource::Stderr(ref mut f) => f.poll_write(buf), - StreamResource::TcpStream(ref mut f) => f.poll_write(buf), - StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf), - StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf), - StreamResource::ChildStdin(ref mut f) => f.poll_write(buf), + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + let inner = self.get_mut(); + let mut f: Box = match inner { + StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)), + StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)), + StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)), + StreamResource::TcpStream(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } + StreamResource::ClientTlsStream(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } + StreamResource::ServerTlsStream(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } + StreamResource::ChildStdin(f) => { + Box::new(AsyncWrite01CompatExt::compat(f)) + } _ => { - return Err(bad_resource()); + return Poll::Ready(Err(bad_resource())); } }; - r.map_err(ErrBox::from) + let r = AsyncWrite::poll_write(Pin::new(&mut f), cx, buf); + + match r { + Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), + Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), + Poll::Pending => Poll::Pending, + } } - fn shutdown(&mut self) -> futures::Poll<(), ErrBox> { + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context, + ) -> Poll> { unimplemented!() } } @@ -250,23 +313,31 @@ where /// that error type is `ErrBox` instead of `std::io::Error`. impl Future for Write where - T: AsRef<[u8]>, + T: AsRef<[u8]> + Unpin, { - type Item = usize; - type Error = ErrBox; + type Output = Result; - fn poll(&mut self) -> Poll { - if self.io_state == IoState::Done { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + if inner.io_state == IoState::Done { panic!("poll a Read after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = inner.state.lock_resource_table(); let resource = table - .get_mut::(self.rid) + .get_mut::(inner.rid) .ok_or_else(bad_resource)?; - let nwritten = try_ready!(resource.poll_write(self.buf.as_ref())); - self.io_state = IoState::Done; - Ok(nwritten.into()) + let nwritten = match DenoAsyncWrite::poll_write( + Pin::new(resource), + cx, + inner.buf.as_ref(), + ) { + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + inner.io_state = IoState::Done; + Poll::Ready(Ok(nwritten as i32)) } } @@ -274,18 +345,16 @@ pub fn op_write( state: &ThreadSafeState, rid: i32, zero_copy: Option, -) -> Box { +) -> Pin> { debug!("write rid={}", rid); let zero_copy = match zero_copy { None => { - return Box::new(futures::future::err(deno_error::no_buffer_specified())); + return futures::future::err(deno_error::no_buffer_specified()).boxed() } Some(buf) => buf, }; - let fut = write(state, rid as u32, zero_copy) - .map_err(ErrBox::from) - .and_then(move |nwritten| Ok(nwritten as i32)); + let fut = write(state, rid as u32, zero_copy); - Box::new(fut) + fut.boxed() } -- cgit v1.2.3