summaryrefslogtreecommitdiff
path: root/cli/ops/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r--cli/ops/io.rs197
1 files changed, 133 insertions, 64 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 30c933999..11afb1891 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -8,11 +8,16 @@ use deno::ErrBox;
use deno::Resource;
use deno::*;
use futures;
-use futures::Future;
-use futures::Poll;
+use futures::compat::AsyncRead01CompatExt;
+use futures::compat::AsyncWrite01CompatExt;
+use futures::future::FutureExt;
+use futures::io::{AsyncRead, AsyncWrite};
use std;
+use std::future::Future;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
use tokio;
-use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
@@ -80,7 +85,7 @@ pub enum StreamResource {
TcpStream(tokio::net::TcpStream),
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
- HttpBody(HttpBody),
+ HttpBody(Box<HttpBody>),
ChildStdin(tokio_process::ChildStdin),
ChildStdout(tokio_process::ChildStdout),
ChildStderr(tokio_process::ChildStderr),
@@ -91,26 +96,49 @@ impl Resource for StreamResource {}
/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncRead {
- fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, ErrBox>>;
}
impl DenoAsyncRead for StreamResource {
- fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
- let r = match self {
- StreamResource::FsFile(ref mut f) => f.poll_read(buf),
- StreamResource::Stdin(ref mut f) => f.poll_read(buf),
- StreamResource::TcpStream(ref mut f) => f.poll_read(buf),
- StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
- StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
- StreamResource::HttpBody(ref mut f) => f.poll_read(buf),
- StreamResource::ChildStdout(ref mut f) => f.poll_read(buf),
- StreamResource::ChildStderr(ref mut f) => f.poll_read(buf),
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize, ErrBox>> {
+ let inner = self.get_mut();
+ let mut f: Box<dyn AsyncRead + Unpin> = match inner {
+ StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)),
+ StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)),
+ StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)),
+ StreamResource::ClientTlsStream(f) => {
+ Box::new(AsyncRead01CompatExt::compat(f))
+ }
+ StreamResource::ServerTlsStream(f) => {
+ Box::new(AsyncRead01CompatExt::compat(f))
+ }
+ StreamResource::HttpBody(f) => Box::new(f),
+ StreamResource::ChildStdout(f) => {
+ Box::new(AsyncRead01CompatExt::compat(f))
+ }
+ StreamResource::ChildStderr(f) => {
+ Box::new(AsyncRead01CompatExt::compat(f))
+ }
_ => {
- return Err(bad_resource());
+ return Poll::Ready(Err(bad_resource()));
}
};
- r.map_err(ErrBox::from)
+ let r = AsyncRead::poll_read(Pin::new(&mut f), cx, buf);
+
+ match r {
+ Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
+ Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
+ Poll::Pending => Poll::Pending,
+ }
}
}
@@ -150,23 +178,31 @@ pub struct Read<T> {
impl<T> Future for Read<T>
where
- T: AsMut<[u8]>,
+ T: AsMut<[u8]> + Unpin,
{
- type Item = usize;
- type Error = ErrBox;
+ type Output = Result<i32, ErrBox>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.io_state == IoState::Done {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ if inner.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
- let mut table = self.state.lock_resource_table();
+ let mut table = inner.state.lock_resource_table();
let resource = table
- .get_mut::<StreamResource>(self.rid)
+ .get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
- let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..]));
- self.io_state = IoState::Done;
- Ok(nread.into())
+ let nread = match DenoAsyncRead::poll_read(
+ Pin::new(resource),
+ cx,
+ &mut inner.buf.as_mut()[..],
+ ) {
+ Poll::Ready(Ok(v)) => v,
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ inner.io_state = IoState::Done;
+ Poll::Ready(Ok(nread as i32))
}
}
@@ -174,49 +210,76 @@ pub fn op_read(
state: &ThreadSafeState,
rid: i32,
zero_copy: Option<PinnedBuf>,
-) -> Box<MinimalOp> {
+) -> Pin<Box<MinimalOp>> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
None => {
- return Box::new(futures::future::err(deno_error::no_buffer_specified()));
+ return futures::future::err(deno_error::no_buffer_specified()).boxed()
}
Some(buf) => buf,
};
- let fut = read(state, rid as u32, zero_copy)
- .map_err(ErrBox::from)
- .and_then(move |nread| Ok(nread as i32));
+ let fut = read(state, rid as u32, zero_copy);
- Box::new(fut)
+ fut.boxed()
}
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncWrite {
- fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
-
- fn shutdown(&mut self) -> Poll<(), ErrBox>;
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> Poll<Result<usize, ErrBox>>;
+
+ fn poll_close(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Result<(), ErrBox>>;
}
impl DenoAsyncWrite for StreamResource {
- fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
- let r = match self {
- StreamResource::FsFile(ref mut f) => f.poll_write(buf),
- StreamResource::Stdout(ref mut f) => f.poll_write(buf),
- StreamResource::Stderr(ref mut f) => f.poll_write(buf),
- StreamResource::TcpStream(ref mut f) => f.poll_write(buf),
- StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
- StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
- StreamResource::ChildStdin(ref mut f) => f.poll_write(buf),
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8],
+ ) -> Poll<Result<usize, ErrBox>> {
+ let inner = self.get_mut();
+ let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
+ StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
+ StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
+ StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
+ StreamResource::TcpStream(f) => {
+ Box::new(AsyncWrite01CompatExt::compat(f))
+ }
+ StreamResource::ClientTlsStream(f) => {
+ Box::new(AsyncWrite01CompatExt::compat(f))
+ }
+ StreamResource::ServerTlsStream(f) => {
+ Box::new(AsyncWrite01CompatExt::compat(f))
+ }
+ StreamResource::ChildStdin(f) => {
+ Box::new(AsyncWrite01CompatExt::compat(f))
+ }
_ => {
- return Err(bad_resource());
+ return Poll::Ready(Err(bad_resource()));
}
};
- r.map_err(ErrBox::from)
+ let r = AsyncWrite::poll_write(Pin::new(&mut f), cx, buf);
+
+ match r {
+ Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
+ Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
+ Poll::Pending => Poll::Pending,
+ }
}
- fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
+ fn poll_close(
+ self: Pin<&mut Self>,
+ _cx: &mut Context,
+ ) -> Poll<Result<(), ErrBox>> {
unimplemented!()
}
}
@@ -250,23 +313,31 @@ where
/// that error type is `ErrBox` instead of `std::io::Error`.
impl<T> Future for Write<T>
where
- T: AsRef<[u8]>,
+ T: AsRef<[u8]> + Unpin,
{
- type Item = usize;
- type Error = ErrBox;
+ type Output = Result<i32, ErrBox>;
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- if self.io_state == IoState::Done {
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ if inner.io_state == IoState::Done {
panic!("poll a Read after it's done");
}
- let mut table = self.state.lock_resource_table();
+ let mut table = inner.state.lock_resource_table();
let resource = table
- .get_mut::<StreamResource>(self.rid)
+ .get_mut::<StreamResource>(inner.rid)
.ok_or_else(bad_resource)?;
- let nwritten = try_ready!(resource.poll_write(self.buf.as_ref()));
- self.io_state = IoState::Done;
- Ok(nwritten.into())
+ let nwritten = match DenoAsyncWrite::poll_write(
+ Pin::new(resource),
+ cx,
+ inner.buf.as_ref(),
+ ) {
+ Poll::Ready(Ok(v)) => v,
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ inner.io_state = IoState::Done;
+ Poll::Ready(Ok(nwritten as i32))
}
}
@@ -274,18 +345,16 @@ pub fn op_write(
state: &ThreadSafeState,
rid: i32,
zero_copy: Option<PinnedBuf>,
-) -> Box<MinimalOp> {
+) -> Pin<Box<MinimalOp>> {
debug!("write rid={}", rid);
let zero_copy = match zero_copy {
None => {
- return Box::new(futures::future::err(deno_error::no_buffer_specified()));
+ return futures::future::err(deno_error::no_buffer_specified()).boxed()
}
Some(buf) => buf,
};
- let fut = write(state, rid as u32, zero_copy)
- .map_err(ErrBox::from)
- .and_then(move |nwritten| Ok(nwritten as i32));
+ let fut = write(state, rid as u32, zero_copy);
- Box::new(fut)
+ fut.boxed()
}