summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/ops/io.rs156
1 files changed, 50 insertions, 106 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 1d832a70e..8f69fd444 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -7,14 +7,12 @@ use crate::state::ThreadSafeState;
use deno::ErrBox;
use deno::Resource;
use deno::*;
-use futures;
use futures::future::FutureExt;
-use std;
+use futures::ready;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
-use tokio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream as ClientTlsStream;
@@ -94,7 +92,7 @@ impl Resource for StreamResource {}
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncRead {
fn poll_read(
- self: Pin<&mut Self>,
+ &mut self,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, ErrBox>>;
@@ -102,32 +100,25 @@ pub trait DenoAsyncRead {
impl DenoAsyncRead for StreamResource {
fn poll_read(
- self: Pin<&mut Self>,
+ &mut self,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, ErrBox>> {
- let inner = self.get_mut();
- let mut f: Box<dyn AsyncRead + Unpin> = match inner {
- StreamResource::FsFile(f) => Box::new(f),
- StreamResource::Stdin(f) => Box::new(f),
- StreamResource::TcpStream(f) => Box::new(f),
- StreamResource::ClientTlsStream(f) => Box::new(f),
- StreamResource::ServerTlsStream(f) => Box::new(f),
- StreamResource::ChildStdout(f) => Box::new(f),
- StreamResource::ChildStderr(f) => Box::new(f),
- StreamResource::HttpBody(f) => Box::new(f),
- _ => {
- return Poll::Ready(Err(bad_resource()));
- }
+ use StreamResource::*;
+ let mut f: Pin<Box<dyn AsyncRead>> = match self {
+ FsFile(f) => Box::pin(f),
+ Stdin(f) => Box::pin(f),
+ TcpStream(f) => Box::pin(f),
+ ClientTlsStream(f) => Box::pin(f),
+ ServerTlsStream(f) => Box::pin(f),
+ ChildStdout(f) => Box::pin(f),
+ ChildStderr(f) => Box::pin(f),
+ HttpBody(f) => Box::pin(f),
+ _ => return Err(bad_resource()).into(),
};
- let r = AsyncRead::poll_read(Pin::new(&mut f), cx, buf);
-
- match r {
- Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
- Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
- Poll::Pending => Poll::Pending,
- }
+ let v = ready!(f.as_mut().poll_read(cx, buf))?;
+ Ok(v).into()
}
}
@@ -182,15 +173,7 @@ where
let resource = table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
- let nread = match DenoAsyncRead::poll_read(
- Pin::new(resource),
- cx,
- &mut inner.buf.as_mut()[..],
- ) {
- Poll::Ready(Ok(v)) => v,
- Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
- Poll::Pending => return Poll::Pending,
- };
+ let nread = ready!(resource.poll_read(cx, &mut inner.buf.as_mut()[..]))?;
inner.io_state = IoState::Done;
Poll::Ready(Ok(nread as i32))
}
@@ -218,82 +201,56 @@ pub fn op_read(
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncWrite {
fn poll_write(
- self: Pin<&mut Self>,
+ &mut self,
cx: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, ErrBox>>;
- fn poll_close(
- self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> Poll<Result<(), ErrBox>>;
+ fn poll_close(&mut self, cx: &mut Context) -> Poll<Result<(), ErrBox>>;
- fn poll_flush(
- self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> Poll<Result<(), ErrBox>>;
+ fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), ErrBox>>;
}
impl DenoAsyncWrite for StreamResource {
fn poll_write(
- self: Pin<&mut Self>,
+ &mut self,
cx: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, ErrBox>> {
- let inner = self.get_mut();
- let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
- StreamResource::FsFile(f) => Box::new(f),
- StreamResource::Stdout(f) => Box::new(f),
- StreamResource::Stderr(f) => Box::new(f),
- StreamResource::TcpStream(f) => Box::new(f),
- StreamResource::ClientTlsStream(f) => Box::new(f),
- StreamResource::ServerTlsStream(f) => Box::new(f),
- StreamResource::ChildStdin(f) => Box::new(f),
- _ => {
- return Poll::Ready(Err(bad_resource()));
- }
+ use StreamResource::*;
+ let mut f: Pin<Box<dyn AsyncWrite>> = match self {
+ FsFile(f) => Box::pin(f),
+ Stdout(f) => Box::pin(f),
+ Stderr(f) => Box::pin(f),
+ TcpStream(f) => Box::pin(f),
+ ClientTlsStream(f) => Box::pin(f),
+ ServerTlsStream(f) => Box::pin(f),
+ ChildStdin(f) => Box::pin(f),
+ _ => return Err(bad_resource()).into(),
};
- let r = AsyncWrite::poll_write(Pin::new(&mut f), cx, buf);
-
- match r {
- Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
- Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
- Poll::Pending => Poll::Pending,
- }
+ let v = ready!(f.as_mut().poll_write(cx, buf))?;
+ Ok(v).into()
}
- fn poll_flush(
- self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> Poll<Result<(), ErrBox>> {
- let inner = self.get_mut();
- let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
- StreamResource::FsFile(f) => Box::new(f),
- StreamResource::Stdout(f) => Box::new(f),
- StreamResource::Stderr(f) => Box::new(f),
- StreamResource::TcpStream(f) => Box::new(f),
- StreamResource::ClientTlsStream(f) => Box::new(f),
- StreamResource::ServerTlsStream(f) => Box::new(f),
- StreamResource::ChildStdin(f) => Box::new(f),
- _ => {
- return Poll::Ready(Err(bad_resource()));
- }
+ fn poll_flush(&mut self, cx: &mut Context) -> Poll<Result<(), ErrBox>> {
+ use StreamResource::*;
+ let mut f: Pin<Box<dyn AsyncWrite>> = match self {
+ FsFile(f) => Box::pin(f),
+ Stdout(f) => Box::pin(f),
+ Stderr(f) => Box::pin(f),
+ TcpStream(f) => Box::pin(f),
+ ClientTlsStream(f) => Box::pin(f),
+ ServerTlsStream(f) => Box::pin(f),
+ ChildStdin(f) => Box::pin(f),
+ _ => return Err(bad_resource()).into(),
};
- let r = AsyncWrite::poll_flush(Pin::new(&mut f), cx);
-
- match r {
- Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
- Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
- Poll::Pending => Poll::Pending,
- }
+ ready!(f.as_mut().poll_flush(cx))?;
+ Ok(()).into()
}
- fn poll_close(
- self: Pin<&mut Self>,
- _cx: &mut Context,
- ) -> Poll<Result<(), ErrBox>> {
+ fn poll_close(&mut self, _cx: &mut Context) -> Poll<Result<(), ErrBox>> {
unimplemented!()
}
}
@@ -345,15 +302,7 @@ where
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
- let nwritten = match DenoAsyncWrite::poll_write(
- Pin::new(resource),
- cx,
- inner.buf.as_ref(),
- ) {
- Poll::Ready(Ok(v)) => v,
- Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
- Poll::Pending => return Poll::Pending,
- };
+ let nwritten = ready!(resource.poll_write(cx, inner.buf.as_ref()))?;
inner.io_state = IoState::Flush;
inner.nwritten = nwritten as i32;
}
@@ -367,13 +316,8 @@ where
let resource = table
.get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
- match DenoAsyncWrite::poll_flush(Pin::new(resource), cx) {
- Poll::Ready(Ok(_)) => {
- inner.io_state = IoState::Done;
- }
- Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
- Poll::Pending => return Poll::Pending,
- };
+ ready!(resource.poll_flush(cx))?;
+ inner.io_state = IoState::Done;
}
Poll::Ready(Ok(inner.nwritten))