summaryrefslogtreecommitdiff
path: root/cli/ops/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r--cli/ops/io.rs143
1 files changed, 91 insertions, 52 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 11afb1891..f268adc03 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -8,18 +8,15 @@ use deno::ErrBox;
use deno::Resource;
use deno::*;
use futures;
-use futures::compat::AsyncRead01CompatExt;
-use futures::compat::AsyncWrite01CompatExt;
use futures::future::FutureExt;
-use futures::io::{AsyncRead, AsyncWrite};
use std;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio;
+use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
-use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;
@@ -86,9 +83,9 @@ pub enum StreamResource {
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
HttpBody(Box<HttpBody>),
- ChildStdin(tokio_process::ChildStdin),
- ChildStdout(tokio_process::ChildStdout),
- ChildStderr(tokio_process::ChildStderr),
+ ChildStdin(tokio::process::ChildStdin),
+ ChildStdout(tokio::process::ChildStdout),
+ ChildStderr(tokio::process::ChildStderr),
}
impl Resource for StreamResource {}
@@ -111,22 +108,14 @@ impl DenoAsyncRead for StreamResource {
) -> Poll<Result<usize, ErrBox>> {
let inner = self.get_mut();
let mut f: Box<dyn AsyncRead + Unpin> = match inner {
- StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)),
- StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)),
- StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)),
- StreamResource::ClientTlsStream(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
- StreamResource::ServerTlsStream(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
+ StreamResource::FsFile(f) => Box::new(f),
+ StreamResource::Stdin(f) => Box::new(f),
+ StreamResource::TcpStream(f) => Box::new(f),
+ StreamResource::ClientTlsStream(f) => Box::new(f),
+ StreamResource::ServerTlsStream(f) => Box::new(f),
+ StreamResource::ChildStdout(f) => Box::new(f),
+ StreamResource::ChildStderr(f) => Box::new(f),
StreamResource::HttpBody(f) => Box::new(f),
- StreamResource::ChildStdout(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
- StreamResource::ChildStderr(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
_ => {
return Poll::Ready(Err(bad_resource()));
}
@@ -145,6 +134,7 @@ impl DenoAsyncRead for StreamResource {
#[derive(Debug, PartialEq)]
enum IoState {
Pending,
+ Flush,
Done,
}
@@ -237,6 +227,11 @@ pub trait DenoAsyncWrite {
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), ErrBox>>;
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Result<(), ErrBox>>;
}
impl DenoAsyncWrite for StreamResource {
@@ -247,21 +242,13 @@ impl DenoAsyncWrite for StreamResource {
) -> Poll<Result<usize, ErrBox>> {
let inner = self.get_mut();
let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
- StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
- StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
- StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
- StreamResource::TcpStream(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
- StreamResource::ClientTlsStream(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
- StreamResource::ServerTlsStream(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
- StreamResource::ChildStdin(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
+ StreamResource::FsFile(f) => Box::new(f),
+ StreamResource::Stdout(f) => Box::new(f),
+ StreamResource::Stderr(f) => Box::new(f),
+ StreamResource::TcpStream(f) => Box::new(f),
+ StreamResource::ClientTlsStream(f) => Box::new(f),
+ StreamResource::ServerTlsStream(f) => Box::new(f),
+ StreamResource::ChildStdin(f) => Box::new(f),
_ => {
return Poll::Ready(Err(bad_resource()));
}
@@ -276,6 +263,33 @@ impl DenoAsyncWrite for StreamResource {
}
}
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Result<(), ErrBox>> {
+ let inner = self.get_mut();
+ let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
+ StreamResource::FsFile(f) => Box::new(f),
+ StreamResource::Stdout(f) => Box::new(f),
+ StreamResource::Stderr(f) => Box::new(f),
+ StreamResource::TcpStream(f) => Box::new(f),
+ StreamResource::ClientTlsStream(f) => Box::new(f),
+ StreamResource::ServerTlsStream(f) => Box::new(f),
+ StreamResource::ChildStdin(f) => Box::new(f),
+ _ => {
+ return Poll::Ready(Err(bad_resource()));
+ }
+ };
+
+ let r = AsyncWrite::poll_flush(Pin::new(&mut f), cx);
+
+ match r {
+ Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
+ Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+
fn poll_close(
self: Pin<&mut Self>,
_cx: &mut Context,
@@ -290,6 +304,7 @@ pub struct Write<T> {
buf: T,
io_state: IoState,
state: ThreadSafeState,
+ nwritten: i32,
}
/// Creates a future that will write some of the buffer `buf` to
@@ -306,6 +321,7 @@ where
buf,
io_state: IoState::Pending,
state: state.clone(),
+ nwritten: 0,
}
}
@@ -323,21 +339,44 @@ where
panic!("poll a Read after it's done");
}
- let mut table = inner.state.lock_resource_table();
- let resource = table
- .get_mut::<StreamResource>(inner.rid)
- .ok_or_else(bad_resource)?;
- let nwritten = match DenoAsyncWrite::poll_write(
- Pin::new(resource),
- cx,
- inner.buf.as_ref(),
- ) {
- Poll::Ready(Ok(v)) => v,
- Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
- Poll::Pending => return Poll::Pending,
- };
- inner.io_state = IoState::Done;
- Poll::Ready(Ok(nwritten as i32))
+ if inner.io_state == IoState::Pending {
+ let mut table = inner.state.lock_resource_table();
+ let resource = table
+ .get_mut::<StreamResource>(inner.rid)
+ .ok_or_else(bad_resource)?;
+
+ let nwritten = match DenoAsyncWrite::poll_write(
+ Pin::new(resource),
+ cx,
+ inner.buf.as_ref(),
+ ) {
+ Poll::Ready(Ok(v)) => v,
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ inner.io_state = IoState::Flush;
+ inner.nwritten = nwritten as i32;
+ }
+
+ // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
+ // and the reasons for the need to explicitly flush are not fully known.
+ // Figure out why it's needed and preferably remove it.
+ // https://github.com/denoland/deno/issues/3565
+ if inner.io_state == IoState::Flush {
+ let mut table = inner.state.lock_resource_table();
+ let resource = table
+ .get_mut::<StreamResource>(inner.rid)
+ .ok_or_else(bad_resource)?;
+ match DenoAsyncWrite::poll_flush(Pin::new(resource), cx) {
+ Poll::Ready(Ok(_)) => {
+ inner.io_state = IoState::Done;
+ }
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ }
+
+ Poll::Ready(Ok(inner.nwritten))
}
}