diff options
Diffstat (limited to 'cli')
-rw-r--r-- | cli/Cargo.toml | 12 | ||||
-rw-r--r-- | cli/deno_error.rs | 5 | ||||
-rw-r--r-- | cli/global_timer.rs | 5 | ||||
-rw-r--r-- | cli/http_body.rs | 27 | ||||
-rw-r--r-- | cli/http_util.rs | 115 | ||||
-rw-r--r-- | cli/js/tls_test.ts | 5 | ||||
-rw-r--r-- | cli/ops/fetch.rs | 46 | ||||
-rw-r--r-- | cli/ops/files.rs | 75 | ||||
-rw-r--r-- | cli/ops/io.rs | 143 | ||||
-rw-r--r-- | cli/ops/net.rs | 61 | ||||
-rw-r--r-- | cli/ops/process.rs | 58 | ||||
-rw-r--r-- | cli/ops/tls.rs | 167 | ||||
-rw-r--r-- | cli/ops/workers.rs | 15 | ||||
-rw-r--r-- | cli/tokio_util.rs | 28 | ||||
-rw-r--r-- | cli/worker.rs | 24 |
15 files changed, 335 insertions, 451 deletions
diff --git a/cli/Cargo.toml b/cli/Cargo.toml index dacefa249..43cccd14f 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -28,12 +28,13 @@ deno_typescript = { path = "../deno_typescript", version = "0.27.0" } ansi_term = "0.12.1" atty = "0.2.13" base64 = "0.11.0" +bytes = "0.5" byteorder = "1.3.2" clap = "2.33.0" dirs = "2.0.2" dlopen = "0.1.8" futures = { version = "0.3", features = [ "compat", "io-compat" ] } -http = "0.1.19" +http = "0.2" hyper = "0.12.35" hyper-rustls = "0.17.1" indexmap = "1.3.0" @@ -43,7 +44,7 @@ log = "0.4.8" rand = "0.7.2" regex = "1.3.1" remove_dir_all = "0.5.2" -reqwest = { version = "0.9.22", default-features = false, features = ["rustls-tls"] } +reqwest = { git = "https://github.com/seanmonstar/reqwest.git", rev = "0ab5df3", features = ["rustls-tls", "stream"] } ring = "0.16.9" rustyline = "5.0.4" serde = { version = "1.0.102", features = ["derive"] } @@ -53,11 +54,10 @@ source-map-mappings = "0.5.0" sys-info = "0.5.8" tempfile = "3.1.0" termcolor = "1.0.5" -tokio = "0.1.22" +tokio = { version = "0.2.6", features = ["full"] } tokio-executor = "0.1.8" -tokio-process = "0.2.4" -tokio-rustls = "0.10.2" -url = "1.7.2" +tokio-rustls = "0.12.0" +url = "2.1" utime = "0.2.1" webpki = "0.21.0" webpki-roots = "0.17.0" diff --git a/cli/deno_error.rs b/cli/deno_error.rs index 8d0eea201..d74483df9 100644 --- a/cli/deno_error.rs +++ b/cli/deno_error.rs @@ -211,6 +211,7 @@ impl GetErrorKind for url::ParseError { } RelativeUrlWithoutBase => ErrorKind::RelativeUrlWithoutBase, SetHostOnCannotBeABaseUrl => ErrorKind::SetHostOnCannotBeABaseUrl, + _ => ErrorKind::Other, } } } @@ -231,7 +232,7 @@ impl GetErrorKind for reqwest::Error { fn kind(&self) -> ErrorKind { use self::GetErrorKind as Get; - match self.get_ref() { + match self.source() { Some(err_ref) => None .or_else(|| err_ref.downcast_ref::<hyper::Error>().map(Get::kind)) .or_else(|| err_ref.downcast_ref::<url::ParseError>().map(Get::kind)) @@ -242,7 +243,7 @@ impl GetErrorKind for reqwest::Error { .map(Get::kind) }) .unwrap_or_else(|| ErrorKind::HttpOther), - _ => ErrorKind::HttpOther, + None => ErrorKind::HttpOther, } } } diff --git a/cli/global_timer.rs b/cli/global_timer.rs index e06cabc48..1dba8d3b5 100644 --- a/cli/global_timer.rs +++ b/cli/global_timer.rs @@ -13,7 +13,7 @@ use futures::channel::oneshot; use futures::future::FutureExt; use std::future::Future; use std::time::Instant; -use tokio::timer::Delay; +use tokio; #[derive(Default)] pub struct GlobalTimer { @@ -43,8 +43,7 @@ impl GlobalTimer { let (tx, rx) = oneshot::channel(); self.tx = Some(tx); - let delay = futures::compat::Compat01As03::new(Delay::new(deadline)) - .map_err(|err| panic!("Unexpected error in timeout {:?}", err)); + let delay = tokio::time::delay_until(deadline.into()); let rx = rx .map_err(|err| panic!("Unexpected error in receiving channel {:?}", err)); diff --git a/cli/http_body.rs b/cli/http_body.rs index 72ec8017e..487306989 100644 --- a/cli/http_body.rs +++ b/cli/http_body.rs @@ -1,28 +1,31 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. - -use futures::io::AsyncRead; -use futures::stream::StreamExt; -use reqwest::r#async::Chunk; -use reqwest::r#async::Decoder; +use bytes::Bytes; +use futures::Stream; +use futures::StreamExt; +use reqwest; use std::cmp::min; use std::io; use std::io::Read; use std::pin::Pin; use std::task::Context; use std::task::Poll; +use tokio::io::AsyncRead; + +// TODO(bartlomieju): most of this stuff can be moved to `cli/ops/fetch.rs` +type ReqwestStream = Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send>>; -/// Wraps `reqwest::Decoder` so that it can be exposed as an `AsyncRead` and integrated +/// Wraps `ReqwestStream` so that it can be exposed as an `AsyncRead` and integrated /// into resources more easily. pub struct HttpBody { - decoder: futures::compat::Compat01As03<Decoder>, - chunk: Option<Chunk>, + stream: ReqwestStream, + chunk: Option<Bytes>, pos: usize, } impl HttpBody { - pub fn from(body: Decoder) -> Self { + pub fn from(body: ReqwestStream) -> Self { Self { - decoder: futures::compat::Compat01As03::new(body), + stream: body, chunk: None, pos: 0, } @@ -65,10 +68,10 @@ impl AsyncRead for HttpBody { assert_eq!(inner.pos, 0); } - let p = inner.decoder.poll_next_unpin(cx); + let p = inner.stream.poll_next_unpin(cx); match p { Poll::Ready(Some(Err(e))) => Poll::Ready(Err( - // TODO Need to map hyper::Error into std::io::Error. + // TODO(bartlomieju): rewrite it to use ErrBox io::Error::new(io::ErrorKind::Other, e), )), Poll::Ready(Some(Ok(chunk))) => { diff --git a/cli/http_util.rs b/cli/http_util.rs index 41a01f1e3..1be9ad60f 100644 --- a/cli/http_util.rs +++ b/cli/http_util.rs @@ -3,18 +3,15 @@ use crate::deno_error; use crate::deno_error::DenoError; use crate::version; use deno::ErrBox; -use futures::future; use futures::future::FutureExt; -use futures::future::TryFutureExt; use reqwest; use reqwest::header::HeaderMap; use reqwest::header::CONTENT_TYPE; use reqwest::header::LOCATION; use reqwest::header::USER_AGENT; -use reqwest::r#async::Client; -use reqwest::RedirectPolicy; +use reqwest::redirect::Policy; +use reqwest::Client; use std::future::Future; -use std::pin::Pin; use url::Url; /// Create new instance of async reqwest::Client. This client supports @@ -26,9 +23,9 @@ pub fn get_client() -> Client { format!("Deno/{}", version::DENO).parse().unwrap(), ); Client::builder() - .redirect(RedirectPolicy::none()) + .redirect(Policy::none()) .default_headers(headers) - .use_sys_proxy() + .use_rustls_tls() .build() .unwrap() } @@ -75,77 +72,45 @@ pub enum FetchOnceResult { pub fn fetch_string_once( url: &Url, ) -> impl Future<Output = Result<FetchOnceResult, ErrBox>> { - type FetchAttempt = (Option<String>, Option<String>, Option<FetchOnceResult>); - let url = url.clone(); let client = get_client(); - futures::compat::Compat01As03::new(client.get(url.clone()).send()) - .map_err(ErrBox::from) - .and_then( - move |mut response| -> Pin< - Box<dyn Future<Output = Result<FetchAttempt, ErrBox>> + Send>, - > { - if response.status().is_redirection() { - let location_string = response - .headers() - .get(LOCATION) - .expect("url redirection should provide 'location' header") - .to_str() - .unwrap(); - - debug!("Redirecting to {:?}...", &location_string); - let new_url = resolve_url_from_location(&url, location_string); - // Boxed trait object turns out to be the savior for 2+ types yielding same results. - return futures::future::try_join3( - future::ok(None), - future::ok(None), - future::ok(Some(FetchOnceResult::Redirect(new_url))), - ) - .boxed(); - } - - if response.status().is_client_error() - || response.status().is_server_error() - { - return future::err( - DenoError::new( - deno_error::ErrorKind::Other, - format!("Import '{}' failed: {}", &url, response.status()), - ) - .into(), - ) - .boxed(); - } - - let content_type = response - .headers() - .get(CONTENT_TYPE) - .map(|content_type| content_type.to_str().unwrap().to_owned()); - - let body = futures::compat::Compat01As03::new(response.text()) - .map_ok(Some) - .map_err(ErrBox::from); - - futures::future::try_join3( - body, - future::ok(content_type), - future::ok(None), - ) - .boxed() - }, - ) - .and_then(move |(maybe_code, maybe_content_type, maybe_redirect)| { - if let Some(redirect) = maybe_redirect { - future::ok(redirect) - } else { - // maybe_code should always contain code here! - future::ok(FetchOnceResult::Code( - maybe_code.unwrap(), - maybe_content_type, - )) - } - }) + let fut = async move { + let response = client.get(url.clone()).send().await?; + + if response.status().is_redirection() { + let location_string = response + .headers() + .get(LOCATION) + .expect("url redirection should provide 'location' header") + .to_str() + .unwrap(); + + debug!("Redirecting to {:?}...", &location_string); + let new_url = resolve_url_from_location(&url, location_string); + return Ok(FetchOnceResult::Redirect(new_url)); + } + + if response.status().is_client_error() + || response.status().is_server_error() + { + let err = DenoError::new( + deno_error::ErrorKind::Other, + format!("Import '{}' failed: {}", &url, response.status()), + ); + return Err(err.into()); + } + + let content_type = response + .headers() + .get(CONTENT_TYPE) + .map(|content_type| content_type.to_str().unwrap().to_owned()); + + let body = response.text().await?; + return Ok(FetchOnceResult::Code(body, content_type)); + }; + + fut.boxed() } #[cfg(test)] diff --git a/cli/js/tls_test.ts b/cli/js/tls_test.ts index 0fee79978..be3d54503 100644 --- a/cli/js/tls_test.ts +++ b/cli/js/tls_test.ts @@ -166,7 +166,10 @@ testPerm({ read: true, net: true }, async function dialAndListenTLS(): Promise< assert(conn.remoteAddr != null); assert(conn.localAddr != null); await conn.write(response); - conn.close(); + // TODO(bartlomieju): this might be a bug + setTimeout(() => { + conn.close(); + }, 0); } ); diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index 25cf99812..9db8d68be 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -7,7 +7,7 @@ use crate::ops::json_op; use crate::state::ThreadSafeState; use deno::*; use futures::future::FutureExt; -use futures::future::TryFutureExt; +use futures::StreamExt; use http::header::HeaderName; use http::header::HeaderValue; use http::Method; @@ -56,32 +56,32 @@ pub fn op_fetch( } debug!("Before fetch {}", url); let state_ = state.clone(); - let future = futures::compat::Compat01As03::new(request.send()) - .map_err(ErrBox::from) - .and_then(move |res| { - debug!("Fetch response {}", url); - let status = res.status(); - let mut res_headers = Vec::new(); - for (key, val) in res.headers().iter() { - res_headers.push((key.to_string(), val.to_str().unwrap().to_owned())); - } - let body = HttpBody::from(res.into_body()); - let mut table = state_.lock_resource_table(); - let rid = table.add( - "httpBody", - Box::new(StreamResource::HttpBody(Box::new(body))), - ); + let future = async move { + let res = request.send().await?; + debug!("Fetch response {}", url); + let status = res.status(); + let mut res_headers = Vec::new(); + for (key, val) in res.headers().iter() { + res_headers.push((key.to_string(), val.to_str().unwrap().to_owned())); + } - let json_res = json!({ - "bodyRid": rid, - "status": status.as_u16(), - "statusText": status.canonical_reason().unwrap_or(""), - "headers": res_headers - }); + let body = HttpBody::from(res.bytes_stream().boxed()); + let mut table = state_.lock_resource_table(); + let rid = table.add( + "httpBody", + Box::new(StreamResource::HttpBody(Box::new(body))), + ); - futures::future::ok(json_res) + let json_res = json!({ + "bodyRid": rid, + "status": status.as_u16(), + "statusText": status.canonical_reason().unwrap_or(""), + "headers": res_headers }); + Ok(json_res) + }; + Ok(JsonOp::Async(future.boxed())) } diff --git a/cli/ops/files.rs b/cli/ops/files.rs index 1c041b38d..6f015329b 100644 --- a/cli/ops/files.rs +++ b/cli/ops/files.rs @@ -9,14 +9,9 @@ use crate::ops::json_op; use crate::state::ThreadSafeState; use deno::*; use futures::future::FutureExt; -use futures::future::TryFutureExt; use std; use std::convert::From; -use std::future::Future; use std::io::SeekFrom; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; use tokio; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { @@ -92,21 +87,19 @@ fn op_open( } let is_sync = args.promise_id.is_none(); - let op = futures::compat::Compat01As03::new(tokio::prelude::Future::map_err( - open_options.open(filename), - ErrBox::from, - )) - .and_then(move |fs_file| { + + let fut = async move { + let fs_file = open_options.open(filename).await?; let mut table = state_.lock_resource_table(); let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file))); - futures::future::ok(json!(rid)) - }); + Ok(json!(rid)) + }; if is_sync { - let buf = futures::executor::block_on(op)?; + let buf = futures::executor::block_on(fut)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(fut.boxed())) } } @@ -127,37 +120,6 @@ fn op_close( Ok(JsonOp::Sync(json!({}))) } -pub struct SeekFuture { - seek_from: SeekFrom, - rid: ResourceId, - state: ThreadSafeState, -} - -impl Future for SeekFuture { - type Output = Result<u64, ErrBox>; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let mut table = inner.state.lock_resource_table(); - let resource = table - .get_mut::<StreamResource>(inner.rid) - .ok_or_else(bad_resource)?; - - let tokio_file = match resource { - StreamResource::FsFile(ref mut file) => file, - _ => return Poll::Ready(Err(bad_resource())), - }; - - use tokio::prelude::Async::*; - - match tokio_file.poll_seek(inner.seek_from).map_err(ErrBox::from) { - Ok(Ready(v)) => Poll::Ready(Ok(v)), - Err(err) => Poll::Ready(Err(err)), - Ok(NotReady) => Poll::Pending, - } - } -} - #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct SeekArgs { @@ -189,17 +151,26 @@ fn op_seek( } }; - let fut = SeekFuture { - state: state.clone(), - seek_from, - rid, + let mut table = state.lock_resource_table(); + let resource = table + .get_mut::<StreamResource>(rid) + .ok_or_else(bad_resource)?; + + let tokio_file = match resource { + StreamResource::FsFile(ref mut file) => file, + _ => return Err(bad_resource()), + }; + let mut file = futures::executor::block_on(tokio_file.try_clone())?; + + let fut = async move { + file.seek(seek_from).await?; + Ok(json!({})) }; - let op = fut.and_then(move |_| futures::future::ok(json!({}))); if args.promise_id.is_none() { - let buf = futures::executor::block_on(op)?; + let buf = futures::executor::block_on(fut)?; Ok(JsonOp::Sync(buf)) } else { - Ok(JsonOp::Async(op.boxed())) + Ok(JsonOp::Async(fut.boxed())) } } diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 11afb1891..f268adc03 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -8,18 +8,15 @@ use deno::ErrBox; use deno::Resource; use deno::*; use futures; -use futures::compat::AsyncRead01CompatExt; -use futures::compat::AsyncWrite01CompatExt; use futures::future::FutureExt; -use futures::io::{AsyncRead, AsyncWrite}; use std; use std::future::Future; use std::pin::Pin; use std::task::Context; use std::task::Poll; use tokio; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; -use tokio_process; use tokio_rustls::client::TlsStream as ClientTlsStream; use tokio_rustls::server::TlsStream as ServerTlsStream; @@ -86,9 +83,9 @@ pub enum StreamResource { ServerTlsStream(Box<ServerTlsStream<TcpStream>>), ClientTlsStream(Box<ClientTlsStream<TcpStream>>), HttpBody(Box<HttpBody>), - ChildStdin(tokio_process::ChildStdin), - ChildStdout(tokio_process::ChildStdout), - ChildStderr(tokio_process::ChildStderr), + ChildStdin(tokio::process::ChildStdin), + ChildStdout(tokio::process::ChildStdout), + ChildStderr(tokio::process::ChildStderr), } impl Resource for StreamResource {} @@ -111,22 +108,14 @@ impl DenoAsyncRead for StreamResource { ) -> Poll<Result<usize, ErrBox>> { let inner = self.get_mut(); let mut f: Box<dyn AsyncRead + Unpin> = match inner { - StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)), - StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)), - StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)), - StreamResource::ClientTlsStream(f) => { - Box::new(AsyncRead01CompatExt::compat(f)) - } - StreamResource::ServerTlsStream(f) => { - Box::new(AsyncRead01CompatExt::compat(f)) - } + StreamResource::FsFile(f) => Box::new(f), + StreamResource::Stdin(f) => Box::new(f), + StreamResource::TcpStream(f) => Box::new(f), + StreamResource::ClientTlsStream(f) => Box::new(f), + StreamResource::ServerTlsStream(f) => Box::new(f), + StreamResource::ChildStdout(f) => Box::new(f), + StreamResource::ChildStderr(f) => Box::new(f), StreamResource::HttpBody(f) => Box::new(f), - StreamResource::ChildStdout(f) => { - Box::new(AsyncRead01CompatExt::compat(f)) - } - StreamResource::ChildStderr(f) => { - Box::new(AsyncRead01CompatExt::compat(f)) - } _ => { return Poll::Ready(Err(bad_resource())); } @@ -145,6 +134,7 @@ impl DenoAsyncRead for StreamResource { #[derive(Debug, PartialEq)] enum IoState { Pending, + Flush, Done, } @@ -237,6 +227,11 @@ pub trait DenoAsyncWrite { self: Pin<&mut Self>, cx: &mut Context, ) -> Poll<Result<(), ErrBox>>; + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Result<(), ErrBox>>; } impl DenoAsyncWrite for StreamResource { @@ -247,21 +242,13 @@ impl DenoAsyncWrite for StreamResource { ) -> Poll<Result<usize, ErrBox>> { let inner = self.get_mut(); let mut f: Box<dyn AsyncWrite + Unpin> = match inner { - StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)), - StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)), - StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)), - StreamResource::TcpStream(f) => { - Box::new(AsyncWrite01CompatExt::compat(f)) - } - StreamResource::ClientTlsStream(f) => { - Box::new(AsyncWrite01CompatExt::compat(f)) - } - StreamResource::ServerTlsStream(f) => { - Box::new(AsyncWrite01CompatExt::compat(f)) - } - StreamResource::ChildStdin(f) => { - Box::new(AsyncWrite01CompatExt::compat(f)) - } + StreamResource::FsFile(f) => Box::new(f), + StreamResource::Stdout(f) => Box::new(f), + StreamResource::Stderr(f) => Box::new(f), + StreamResource::TcpStream(f) => Box::new(f), + StreamResource::ClientTlsStream(f) => Box::new(f), + StreamResource::ServerTlsStream(f) => Box::new(f), + StreamResource::ChildStdin(f) => Box::new(f), _ => { return Poll::Ready(Err(bad_resource())); } @@ -276,6 +263,33 @@ impl DenoAsyncWrite for StreamResource { } } + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll<Result<(), ErrBox>> { + let inner = self.get_mut(); + let mut f: Box<dyn AsyncWrite + Unpin> = match inner { + StreamResource::FsFile(f) => Box::new(f), + StreamResource::Stdout(f) => Box::new(f), + StreamResource::Stderr(f) => Box::new(f), + StreamResource::TcpStream(f) => Box::new(f), + StreamResource::ClientTlsStream(f) => Box::new(f), + StreamResource::ServerTlsStream(f) => Box::new(f), + StreamResource::ChildStdin(f) => Box::new(f), + _ => { + return Poll::Ready(Err(bad_resource())); + } + }; + + let r = AsyncWrite::poll_flush(Pin::new(&mut f), cx); + + match r { + Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending, + } + } + fn poll_close( self: Pin<&mut Self>, _cx: &mut Context, @@ -290,6 +304,7 @@ pub struct Write<T> { buf: T, io_state: IoState, state: ThreadSafeState, + nwritten: i32, } /// Creates a future that will write some of the buffer `buf` to @@ -306,6 +321,7 @@ where buf, io_state: IoState::Pending, state: state.clone(), + nwritten: 0, } } @@ -323,21 +339,44 @@ where panic!("poll a Read after it's done"); } - let mut table = inner.state.lock_resource_table(); - let resource = table - .get_mut::<StreamResource>(inner.rid) - .ok_or_else(bad_resource)?; - let nwritten = match DenoAsyncWrite::poll_write( - Pin::new(resource), - cx, - inner.buf.as_ref(), - ) { - Poll::Ready(Ok(v)) => v, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }; - inner.io_state = IoState::Done; - Poll::Ready(Ok(nwritten as i32)) + if inner.io_state == IoState::Pending { + let mut table = inner.state.lock_resource_table(); + let resource = table + .get_mut::<StreamResource>(inner.rid) + .ok_or_else(bad_resource)?; + + let nwritten = match DenoAsyncWrite::poll_write( + Pin::new(resource), + cx, + inner.buf.as_ref(), + ) { + Poll::Ready(Ok(v)) => v, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + inner.io_state = IoState::Flush; + inner.nwritten = nwritten as i32; + } + + // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2 + // and the reasons for the need to explicitly flush are not fully known. + // Figure out why it's needed and preferably remove it. + // https://github.com/denoland/deno/issues/3565 + if inner.io_state == IoState::Flush { + let mut table = inner.state.lock_resource_table(); + let resource = table + .get_mut::<StreamResource>(inner.rid) + .ok_or_else(bad_resource)?; + match DenoAsyncWrite::poll_flush(Pin::new(resource), cx) { + Poll::Ready(Ok(_)) => { + inner.io_state = IoState::Done; + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + } + + Poll::Ready(Ok(inner.nwritten)) } } diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 6d843c0ba..a3a1e665e 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -9,8 +9,6 @@ use deno::Resource; use deno::*; use futures::future::FutureExt; use futures::future::TryFutureExt; -use futures::stream::StreamExt; -use futures::stream::TryStreamExt; use std; use std::convert::From; use std::future::Future; @@ -20,7 +18,6 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; use tokio; -use tokio::net::tcp::Incoming; use tokio::net::TcpListener; use tokio::net::TcpStream; @@ -73,27 +70,23 @@ impl Future for Accept { ErrBox::from(e) })?; - let mut listener = - futures::compat::Compat01As03::new(&mut listener_resource.listener) - .map_err(ErrBox::from); + let listener = &mut listener_resource.listener; - match listener.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(stream))) => { + match listener.poll_accept(cx).map_err(ErrBox::from) { + Poll::Ready(Ok((stream, addr))) => { listener_resource.untrack_task(); inner.accept_state = AcceptState::Done; - let addr = stream.peer_addr().unwrap(); Poll::Ready(Ok((stream, addr))) } Poll::Pending => { listener_resource.track_task(cx)?; Poll::Pending } - Poll::Ready(Some(Err(e))) => { + Poll::Ready(Err(e)) => { listener_resource.untrack_task(); inner.accept_state = AcceptState::Done; Poll::Ready(Err(e)) } - _ => unreachable!(), } } } @@ -160,32 +153,20 @@ fn op_dial( let state_ = state.clone(); state.check_net(&args.hostname, args.port)?; - let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { - futures::compat::Compat01As03::new(TcpStream::connect(&addr)) - .map_err(ErrBox::from) - .and_then(move |tcp_stream| { - let local_addr = match tcp_stream.local_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(ErrBox::from(e)), - }; - let remote_addr = match tcp_stream.peer_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(ErrBox::from(e)), - }; - let mut table = state_.lock_resource_table(); - let rid = table - .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); - futures::future::ok((rid, local_addr, remote_addr)) - }) - .map_err(ErrBox::from) - .and_then(move |(rid, local_addr, remote_addr)| { - futures::future::ok(json!({ - "rid": rid, - "localAddr": local_addr.to_string(), - "remoteAddr": remote_addr.to_string(), - })) - }) - }); + let op = async move { + let addr = resolve_addr(&args.hostname, args.port).await?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut table = state_.lock_resource_table(); + let rid = + table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); + Ok(json!({ + "rid": rid, + "localAddr": local_addr.to_string(), + "remoteAddr": remote_addr.to_string(), + })) + }; Ok(JsonOp::Async(op.boxed())) } @@ -235,7 +216,7 @@ struct ListenArgs { #[allow(dead_code)] struct TcpListenerResource { - listener: Incoming, + listener: TcpListener, waker: Option<futures::task::AtomicWaker>, local_addr: SocketAddr, } @@ -300,11 +281,11 @@ fn op_listen( let addr = futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; - let listener = TcpListener::bind(&addr)?; + let listener = futures::executor::block_on(TcpListener::bind(&addr))?; let local_addr = listener.local_addr()?; let local_addr_str = local_addr.to_string(); let listener_resource = TcpListenerResource { - listener: listener.incoming(), + listener, waker: None, local_addr, }; diff --git a/cli/ops/process.rs b/cli/ops/process.rs index a267130ec..40a9877ac 100644 --- a/cli/ops/process.rs +++ b/cli/ops/process.rs @@ -14,12 +14,10 @@ use std; use std::convert::From; use std::future::Future; use std::pin::Pin; -use std::process::Command; use std::process::ExitStatus; use std::task::Context; use std::task::Poll; -use tokio::prelude::Async; -use tokio_process::CommandExt; +use tokio::process::Command; #[cfg(unix)] use std::os::unix::process::ExitStatusExt; @@ -33,42 +31,21 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill)))); } -struct CloneFileFuture { - rid: ResourceId, - state: ThreadSafeState, -} - -impl Future for CloneFileFuture { - type Output = Result<tokio::fs::File, ErrBox>; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let mut table = inner.state.lock_resource_table(); - let repr = table - .get_mut::<StreamResource>(inner.rid) - .ok_or_else(bad_resource)?; - match repr { - StreamResource::FsFile(ref mut file) => { - match file.poll_try_clone().map_err(ErrBox::from) { - Err(err) => Poll::Ready(Err(err)), - Ok(Async::Ready(v)) => Poll::Ready(Ok(v)), - Ok(Async::NotReady) => Poll::Pending, - } - } - _ => Poll::Ready(Err(bad_resource())), - } - } -} - fn clone_file( rid: u32, state: &ThreadSafeState, ) -> Result<std::fs::File, ErrBox> { - futures::executor::block_on(CloneFileFuture { - rid, - state: state.clone(), - }) - .map(|f| f.into_std()) + let mut table = state.lock_resource_table(); + let repr = table + .get_mut::<StreamResource>(rid) + .ok_or_else(bad_resource)?; + let file = match repr { + StreamResource::FsFile(ref mut file) => file, + _ => return Err(bad_resource()), + }; + let tokio_file = futures::executor::block_on(file.try_clone())?; + let std_file = futures::executor::block_on(tokio_file.into_std()); + Ok(std_file) } fn subprocess_stdio_map(s: &str) -> std::process::Stdio { @@ -95,7 +72,7 @@ struct RunArgs { } struct ChildResource { - child: futures::compat::Compat01As03<tokio_process::Child>, + child: tokio::process::Child, } impl Resource for ChildResource {} @@ -149,8 +126,11 @@ fn op_run( c.stderr(subprocess_stdio_map(run_args.stderr.as_ref())); } + // We want to kill child when it's closed + c.kill_on_drop(true); + // Spawn the command. - let mut child = c.spawn_async().map_err(ErrBox::from)?; + let mut child = c.spawn()?; let pid = child.id(); let mut table = state_.lock_resource_table(); @@ -188,9 +168,7 @@ fn op_run( None => None, }; - let child_resource = ChildResource { - child: futures::compat::Compat01As03::new(child), - }; + let child_resource = ChildResource { child }; let child_rid = table.add("child", Box::new(child_resource)); Ok(JsonOp::Sync(json!({ diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs index 33900d4e0..e1897a86e 100644 --- a/cli/ops/tls.rs +++ b/cli/ops/tls.rs @@ -10,9 +10,6 @@ use crate::state::ThreadSafeState; use deno::Resource; use deno::*; use futures::future::FutureExt; -use futures::future::TryFutureExt; -use futures::stream::StreamExt; -use futures::stream::TryStreamExt; use std; use std::convert::From; use std::fs::File; @@ -24,7 +21,6 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; use tokio; -use tokio::net::tcp::Incoming; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_rustls::{rustls::ClientConfig, TlsConnector}; @@ -65,7 +61,7 @@ pub fn op_dial_tls( _zero_copy: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: DialTLSArgs = serde_json::from_value(args)?; - let cert_file = args.cert_file; + let cert_file = args.cert_file.clone(); let state_ = state.clone(); state.check_net(&args.hostname, args.port)?; if let Some(path) = cert_file.clone() { @@ -77,62 +73,35 @@ pub fn op_dial_tls( domain.push_str("localhost"); } - let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { - futures::compat::Compat01As03::new(TcpStream::connect(&addr)) - .and_then(move |tcp_stream| { - let local_addr = match tcp_stream.local_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(e), - }; - let remote_addr = match tcp_stream.peer_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(e), - }; - let mut config = ClientConfig::new(); - config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - - if let Some(path) = cert_file { - let key_file = match File::open(path) { - Ok(v) => v, - Err(e) => return futures::future::err(e), - }; - let reader = &mut BufReader::new(key_file); - config.root_store.add_pem_file(reader).unwrap(); - } - let tls_connector = TlsConnector::from(Arc::new(config)); - futures::future::ok(( - tls_connector, - tcp_stream, - local_addr, - remote_addr, - )) - }) - .map_err(ErrBox::from) - .and_then( - move |(tls_connector, tcp_stream, local_addr, remote_addr)| { - let dnsname = DNSNameRef::try_from_ascii_str(&domain) - .expect("Invalid DNS lookup"); - futures::compat::Compat01As03::new( - tls_connector.connect(dnsname, tcp_stream), - ) - .map_err(ErrBox::from) - .and_then(move |tls_stream| { - let mut table = state_.lock_resource_table(); - let rid = table.add( - "clientTlsStream", - Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), - ); - futures::future::ok(json!({ - "rid": rid, - "localAddr": local_addr.to_string(), - "remoteAddr": remote_addr.to_string(), - })) - }) - }, - ) - }); + let op = async move { + let addr = resolve_addr(&args.hostname, args.port).await?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut config = ClientConfig::new(); + config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + if let Some(path) = cert_file { + let key_file = File::open(path)?; + let reader = &mut BufReader::new(key_file); + config.root_store.add_pem_file(reader).unwrap(); + } + let tls_connector = TlsConnector::from(Arc::new(config)); + let dnsname = + DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup"); + let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?; + let mut table = state_.lock_resource_table(); + let rid = table.add( + "clientTlsStream", + Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))), + ); + Ok(json!({ + "rid": rid, + "localAddr": local_addr.to_string(), + "remoteAddr": remote_addr.to_string(), + })) + }; Ok(JsonOp::Async(op.boxed())) } @@ -197,7 +166,7 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, ErrBox> { #[allow(dead_code)] pub struct TlsListenerResource { - listener: Incoming, + listener: TcpListener, tls_acceptor: TlsAcceptor, waker: Option<futures::task::AtomicWaker>, local_addr: SocketAddr, @@ -283,11 +252,11 @@ fn op_listen_tls( let tls_acceptor = TlsAcceptor::from(Arc::new(config)); let addr = futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; - let listener = TcpListener::bind(&addr)?; + let listener = futures::executor::block_on(TcpListener::bind(&addr))?; let local_addr = listener.local_addr()?; let local_addr_str = local_addr.to_string(); let tls_listener_resource = TlsListenerResource { - listener: listener.incoming(), + listener, tls_acceptor, waker: None, local_addr, @@ -343,27 +312,23 @@ impl Future for AcceptTls { ErrBox::from(e) })?; - let mut listener = - futures::compat::Compat01As03::new(&mut listener_resource.listener) - .map_err(ErrBox::from); + let listener = &mut listener_resource.listener; - match listener.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(stream))) => { + match listener.poll_accept(cx).map_err(ErrBox::from) { + Poll::Ready(Ok((stream, addr))) => { listener_resource.untrack_task(); inner.accept_state = AcceptTlsState::Done; - let addr = stream.peer_addr().unwrap(); Poll::Ready(Ok((stream, addr))) } Poll::Pending => { listener_resource.track_task(cx)?; Poll::Pending } - Poll::Ready(Some(Err(e))) => { + Poll::Ready(Err(e)) => { listener_resource.untrack_task(); inner.accept_state = AcceptTlsState::Done; Poll::Ready(Err(e)) } - _ => unreachable!(), } } } @@ -380,47 +345,33 @@ fn op_accept_tls( ) -> Result<JsonOp, ErrBox> { let args: AcceptTlsArgs = serde_json::from_value(args)?; let rid = args.rid as u32; - let state1 = state.clone(); - let state2 = state.clone(); - let op = accept_tls(state, rid) - .and_then(move |(tcp_stream, _socket_addr)| { - let local_addr = match tcp_stream.local_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(ErrBox::from(e)), - }; - let remote_addr = match tcp_stream.peer_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(ErrBox::from(e)), - }; - futures::future::ok((tcp_stream, local_addr, remote_addr)) - }) - .and_then(move |(tcp_stream, local_addr, remote_addr)| { - let table = state1.lock_resource_table(); + let state = state.clone(); + let op = async move { + let (tcp_stream, _socket_addr) = accept_tls(&state.clone(), rid).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let tls_acceptor = { + let table = state.lock_resource_table(); let resource = table .get::<TlsListenerResource>(rid) .ok_or_else(bad_resource) .expect("Can't find tls listener"); - - futures::compat::Compat01As03::new( - resource.tls_acceptor.accept(tcp_stream), + resource.tls_acceptor.clone() + }; + let tls_stream = tls_acceptor.accept(tcp_stream).await?; + let rid = { + let mut table = state.lock_resource_table(); + table.add( + "serverTlsStream", + Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), ) - .map_err(ErrBox::from) - .and_then(move |tls_stream| { - let mut table = state2.lock_resource_table(); - let rid = table.add( - "serverTlsStream", - Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))), - ); - futures::future::ok((rid, local_addr, remote_addr)) - }) - }) - .and_then(move |(rid, local_addr, remote_addr)| { - futures::future::ok(json!({ - "rid": rid, - "localAddr": local_addr.to_string(), - "remoteAddr": remote_addr.to_string(), - })) - }); + }; + Ok(json!({ + "rid": rid, + "localAddr": local_addr.to_string(), + "remoteAddr": remote_addr.to_string(), + })) + }; Ok(JsonOp::Async(op.boxed())) } diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index 467d0bfb2..131283614 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -168,14 +168,13 @@ fn op_create_worker( // TODO(bartlomieju): this should spawn mod execution on separate tokio task // and block on receving message on a channel or even use sync channel /shrug let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1); - let fut = worker - .execute_mod_async(&module_specifier, None, false) - .then(move |result| { - sender.send(result).expect("Failed to send message"); - futures::future::ok(()) - }) - .boxed() - .compat(); + let fut = async move { + let result = worker + .execute_mod_async(&module_specifier, None, false) + .await; + sender.send(result).expect("Failed to send message"); + } + .boxed(); tokio::spawn(fut); let result = receiver.recv().expect("Failed to receive message"); diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index fea47792e..ec1e0e3cf 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -1,30 +1,30 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use futures; -use futures::future::FutureExt; -use futures::future::TryFutureExt; use std::future::Future; use tokio; use tokio::runtime; -pub fn create_threadpool_runtime( -) -> Result<tokio::runtime::Runtime, tokio::io::Error> { - runtime::Builder::new() - .panic_handler(|err| std::panic::resume_unwind(err)) - .build() -} - pub fn run<F>(future: F) where F: Future<Output = Result<(), ()>> + Send + 'static, { - // tokio::runtime::current_thread::run(future) - let rt = create_threadpool_runtime().expect("Unable to create Tokio runtime"); - rt.block_on_all(future.boxed().compat()).unwrap(); + let mut rt = runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .thread_name("deno") + .build() + .expect("Unable to create Tokio runtime"); + rt.block_on(future).unwrap(); } pub fn run_on_current_thread<F>(future: F) where F: Future<Output = Result<(), ()>> + Send + 'static, { - tokio::runtime::current_thread::run(future.boxed().compat()); + let mut rt = runtime::Builder::new() + .basic_scheduler() + .enable_all() + .thread_name("deno") + .build() + .expect("Unable to create Tokio runtime"); + rt.block_on(future).unwrap(); } diff --git a/cli/worker.rs b/cli/worker.rs index 410d6ee44..c44dfbb39 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -420,10 +420,9 @@ mod tests { let fut = async move { let r = worker.await; r.unwrap(); - Ok(()) }; - tokio::spawn(fut.boxed().compat()); + tokio::spawn(fut); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); @@ -453,26 +452,21 @@ mod tests { .unwrap(); let worker_ = worker.clone(); - let worker_future = worker - .then(move |r| { - println!("workers.rs after resource close"); - r.unwrap(); - futures::future::ok(()) - }) - .shared(); + let worker_future = async move { + let result = worker.await; + println!("workers.rs after resource close"); + result.unwrap(); + } + .shared(); let worker_future_ = worker_future.clone(); - tokio::spawn( - worker_future_ - .then(|_: Result<(), ()>| futures::future::ok(())) - .compat(), - ); + tokio::spawn(worker_future_); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); let r = block_on(worker_.post_message(msg)); assert!(r.is_ok()); - block_on(worker_future).unwrap(); + block_on(worker_future); }) } |