diff options
Diffstat (limited to 'cli/ops/io.rs')
-rw-r--r-- | cli/ops/io.rs | 143 |
1 files changed, 91 insertions, 52 deletions
diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 11afb1891..f268adc03 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -8,18 +8,15 @@ use deno::ErrBox; use deno::Resource; use deno::*; use futures; -use futures::compat::AsyncRead01CompatExt; -use futures::compat::AsyncWrite01CompatExt; use futures::future::FutureExt; -use futures::io::{AsyncRead, AsyncWrite}; use std; use std::future::Future; use std::pin::Pin; use std::task::Context; use std::task::Poll; use tokio; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; -use tokio_process; use tokio_rustls::client::TlsStream as ClientTlsStream; use tokio_rustls::server::TlsStream as ServerTlsStream; @@ -86,9 +83,9 @@ pub enum StreamResource { ServerTlsStream(Box<ServerTlsStream<TcpStream>>), ClientTlsStream(Box<ClientTlsStream<TcpStream>>), HttpBody(Box<HttpBody>), - ChildStdin(tokio_process::ChildStdin), - ChildStdout(tokio_process::ChildStdout), - ChildStderr(tokio_process::ChildStderr), + ChildStdin(tokio::process::ChildStdin), + ChildStdout(tokio::process::ChildStdout), + ChildStderr(tokio::process::ChildStderr), } impl Resource for StreamResource {} @@ -111,22 +108,14 @@ impl DenoAsyncRead for StreamResource { ) -> Poll<Result<usize, ErrBox>> { let inner = self.get_mut(); let mut f: Box<dyn AsyncRead + Unpin> = match inner { - StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)), - StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)), - StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)), - StreamResource::ClientTlsStream(f) => { - Box::new(AsyncRead01CompatExt::compat(f)) - } - StreamResource::ServerTlsStream(f) => { - Box::new(AsyncRead01CompatExt::compat(f)) - } + StreamResource::FsFile(f) => Box::new(f), + StreamResource::Stdin(f) => Box::new(f), + StreamResource::TcpStream(f) => Box::new(f), + StreamResource::ClientTlsStream(f) => Box::new(f), + StreamResource::ServerTlsStream(f) => Box::new(f), + StreamResource::ChildStdout(f) => Box::new(f), + StreamResource::ChildStderr(f) => Box::new(f), StreamResource::HttpBody(f) => Box::new(f), - StreamResource::ChildStdout(f) => { - Box::new(AsyncRead01CompatExt::compat(f)) - } - StreamResource::ChildStderr(f) => { - Box::new(AsyncRead01CompatExt::compat(f)) - } _ => { return Poll::Ready(Err(bad_resource())); } @@ -145,6 +134,7 @@ impl DenoAsyncRead for StreamResource { #[derive(Debug, PartialEq)] enum IoState { Pending, + Flush, Done, } @@ -237,6 +227,11 @@ pub trait DenoAsyncWrite { self: Pin<&mut Self>, cx: &mut Context, ) -> Poll<Result<(), ErrBox>>; + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Result<(), ErrBox>>; } impl DenoAsyncWrite for StreamResource { @@ -247,21 +242,13 @@ impl DenoAsyncWrite for StreamResource { ) -> Poll<Result<usize, ErrBox>> { let inner = self.get_mut(); let mut f: Box<dyn AsyncWrite + Unpin> = match inner { - StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)), - StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)), - StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)), - StreamResource::TcpStream(f) => { - Box::new(AsyncWrite01CompatExt::compat(f)) - } - StreamResource::ClientTlsStream(f) => { - Box::new(AsyncWrite01CompatExt::compat(f)) - } - StreamResource::ServerTlsStream(f) => { - Box::new(AsyncWrite01CompatExt::compat(f)) - } - StreamResource::ChildStdin(f) => { - Box::new(AsyncWrite01CompatExt::compat(f)) - } + StreamResource::FsFile(f) => Box::new(f), + StreamResource::Stdout(f) => Box::new(f), + StreamResource::Stderr(f) => Box::new(f), + StreamResource::TcpStream(f) => Box::new(f), + StreamResource::ClientTlsStream(f) => Box::new(f), + StreamResource::ServerTlsStream(f) => Box::new(f), + StreamResource::ChildStdin(f) => Box::new(f), _ => { return Poll::Ready(Err(bad_resource())); } @@ -276,6 +263,33 @@ impl DenoAsyncWrite for StreamResource { } } + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Result<(), ErrBox>> { + let inner = self.get_mut(); + let mut f: Box<dyn AsyncWrite + Unpin> = match inner { + StreamResource::FsFile(f) => Box::new(f), + StreamResource::Stdout(f) => Box::new(f), + StreamResource::Stderr(f) => Box::new(f), + StreamResource::TcpStream(f) => Box::new(f), + StreamResource::ClientTlsStream(f) => Box::new(f), + StreamResource::ServerTlsStream(f) => Box::new(f), + StreamResource::ChildStdin(f) => Box::new(f), + _ => { + return Poll::Ready(Err(bad_resource())); + } + }; + + let r = AsyncWrite::poll_flush(Pin::new(&mut f), cx); + + match r { + Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending, + } + } + fn poll_close( self: Pin<&mut Self>, _cx: &mut Context, @@ -290,6 +304,7 @@ pub struct Write<T> { buf: T, io_state: IoState, state: ThreadSafeState, + nwritten: i32, } /// Creates a future that will write some of the buffer `buf` to @@ -306,6 +321,7 @@ where buf, io_state: IoState::Pending, state: state.clone(), + nwritten: 0, } } @@ -323,21 +339,44 @@ where panic!("poll a Read after it's done"); } - let mut table = inner.state.lock_resource_table(); - let resource = table - .get_mut::<StreamResource>(inner.rid) - .ok_or_else(bad_resource)?; - let nwritten = match DenoAsyncWrite::poll_write( - Pin::new(resource), - cx, - inner.buf.as_ref(), - ) { - Poll::Ready(Ok(v)) => v, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }; - inner.io_state = IoState::Done; - Poll::Ready(Ok(nwritten as i32)) + if inner.io_state == IoState::Pending { + let mut table = inner.state.lock_resource_table(); + let resource = table + .get_mut::<StreamResource>(inner.rid) + .ok_or_else(bad_resource)?; + + let nwritten = match DenoAsyncWrite::poll_write( + Pin::new(resource), + cx, + inner.buf.as_ref(), + ) { + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + inner.io_state = IoState::Flush; + inner.nwritten = nwritten as i32; + } + + // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2 + // and the reasons for the need to explicitly flush are not fully known. + // Figure out why it's needed and preferably remove it. + // https://github.com/denoland/deno/issues/3565 + if inner.io_state == IoState::Flush { + let mut table = inner.state.lock_resource_table(); + let resource = table + .get_mut::<StreamResource>(inner.rid) + .ok_or_else(bad_resource)?; + match DenoAsyncWrite::poll_flush(Pin::new(resource), cx) { + Poll::Ready(Ok(_)) => { + inner.io_state = IoState::Done; + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + } + + Poll::Ready(Ok(inner.nwritten)) } } |