summaryrefslogtreecommitdiff
path: root/cli
diff options
context:
space:
mode:
Diffstat (limited to 'cli')
-rw-r--r--cli/Cargo.toml12
-rw-r--r--cli/deno_error.rs5
-rw-r--r--cli/global_timer.rs5
-rw-r--r--cli/http_body.rs27
-rw-r--r--cli/http_util.rs115
-rw-r--r--cli/js/tls_test.ts5
-rw-r--r--cli/ops/fetch.rs46
-rw-r--r--cli/ops/files.rs75
-rw-r--r--cli/ops/io.rs143
-rw-r--r--cli/ops/net.rs61
-rw-r--r--cli/ops/process.rs58
-rw-r--r--cli/ops/tls.rs167
-rw-r--r--cli/ops/workers.rs15
-rw-r--r--cli/tokio_util.rs28
-rw-r--r--cli/worker.rs24
15 files changed, 335 insertions, 451 deletions
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index dacefa249..43cccd14f 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -28,12 +28,13 @@ deno_typescript = { path = "../deno_typescript", version = "0.27.0" }
ansi_term = "0.12.1"
atty = "0.2.13"
base64 = "0.11.0"
+bytes = "0.5"
byteorder = "1.3.2"
clap = "2.33.0"
dirs = "2.0.2"
dlopen = "0.1.8"
futures = { version = "0.3", features = [ "compat", "io-compat" ] }
-http = "0.1.19"
+http = "0.2"
hyper = "0.12.35"
hyper-rustls = "0.17.1"
indexmap = "1.3.0"
@@ -43,7 +44,7 @@ log = "0.4.8"
rand = "0.7.2"
regex = "1.3.1"
remove_dir_all = "0.5.2"
-reqwest = { version = "0.9.22", default-features = false, features = ["rustls-tls"] }
+reqwest = { git = "https://github.com/seanmonstar/reqwest.git", rev = "0ab5df3", features = ["rustls-tls", "stream"] }
ring = "0.16.9"
rustyline = "5.0.4"
serde = { version = "1.0.102", features = ["derive"] }
@@ -53,11 +54,10 @@ source-map-mappings = "0.5.0"
sys-info = "0.5.8"
tempfile = "3.1.0"
termcolor = "1.0.5"
-tokio = "0.1.22"
+tokio = { version = "0.2.6", features = ["full"] }
tokio-executor = "0.1.8"
-tokio-process = "0.2.4"
-tokio-rustls = "0.10.2"
-url = "1.7.2"
+tokio-rustls = "0.12.0"
+url = "2.1"
utime = "0.2.1"
webpki = "0.21.0"
webpki-roots = "0.17.0"
diff --git a/cli/deno_error.rs b/cli/deno_error.rs
index 8d0eea201..d74483df9 100644
--- a/cli/deno_error.rs
+++ b/cli/deno_error.rs
@@ -211,6 +211,7 @@ impl GetErrorKind for url::ParseError {
}
RelativeUrlWithoutBase => ErrorKind::RelativeUrlWithoutBase,
SetHostOnCannotBeABaseUrl => ErrorKind::SetHostOnCannotBeABaseUrl,
+ _ => ErrorKind::Other,
}
}
}
@@ -231,7 +232,7 @@ impl GetErrorKind for reqwest::Error {
fn kind(&self) -> ErrorKind {
use self::GetErrorKind as Get;
- match self.get_ref() {
+ match self.source() {
Some(err_ref) => None
.or_else(|| err_ref.downcast_ref::<hyper::Error>().map(Get::kind))
.or_else(|| err_ref.downcast_ref::<url::ParseError>().map(Get::kind))
@@ -242,7 +243,7 @@ impl GetErrorKind for reqwest::Error {
.map(Get::kind)
})
.unwrap_or_else(|| ErrorKind::HttpOther),
- _ => ErrorKind::HttpOther,
+ None => ErrorKind::HttpOther,
}
}
}
diff --git a/cli/global_timer.rs b/cli/global_timer.rs
index e06cabc48..1dba8d3b5 100644
--- a/cli/global_timer.rs
+++ b/cli/global_timer.rs
@@ -13,7 +13,7 @@ use futures::channel::oneshot;
use futures::future::FutureExt;
use std::future::Future;
use std::time::Instant;
-use tokio::timer::Delay;
+use tokio;
#[derive(Default)]
pub struct GlobalTimer {
@@ -43,8 +43,7 @@ impl GlobalTimer {
let (tx, rx) = oneshot::channel();
self.tx = Some(tx);
- let delay = futures::compat::Compat01As03::new(Delay::new(deadline))
- .map_err(|err| panic!("Unexpected error in timeout {:?}", err));
+ let delay = tokio::time::delay_until(deadline.into());
let rx = rx
.map_err(|err| panic!("Unexpected error in receiving channel {:?}", err));
diff --git a/cli/http_body.rs b/cli/http_body.rs
index 72ec8017e..487306989 100644
--- a/cli/http_body.rs
+++ b/cli/http_body.rs
@@ -1,28 +1,31 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-
-use futures::io::AsyncRead;
-use futures::stream::StreamExt;
-use reqwest::r#async::Chunk;
-use reqwest::r#async::Decoder;
+use bytes::Bytes;
+use futures::Stream;
+use futures::StreamExt;
+use reqwest;
use std::cmp::min;
use std::io;
use std::io::Read;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
+use tokio::io::AsyncRead;
+
+// TODO(bartlomieju): most of this stuff can be moved to `cli/ops/fetch.rs`
+type ReqwestStream = Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + Send>>;
-/// Wraps `reqwest::Decoder` so that it can be exposed as an `AsyncRead` and integrated
+/// Wraps `ReqwestStream` so that it can be exposed as an `AsyncRead` and integrated
/// into resources more easily.
pub struct HttpBody {
- decoder: futures::compat::Compat01As03<Decoder>,
- chunk: Option<Chunk>,
+ stream: ReqwestStream,
+ chunk: Option<Bytes>,
pos: usize,
}
impl HttpBody {
- pub fn from(body: Decoder) -> Self {
+ pub fn from(body: ReqwestStream) -> Self {
Self {
- decoder: futures::compat::Compat01As03::new(body),
+ stream: body,
chunk: None,
pos: 0,
}
@@ -65,10 +68,10 @@ impl AsyncRead for HttpBody {
assert_eq!(inner.pos, 0);
}
- let p = inner.decoder.poll_next_unpin(cx);
+ let p = inner.stream.poll_next_unpin(cx);
match p {
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(
- // TODO Need to map hyper::Error into std::io::Error.
+ // TODO(bartlomieju): rewrite it to use ErrBox
io::Error::new(io::ErrorKind::Other, e),
)),
Poll::Ready(Some(Ok(chunk))) => {
diff --git a/cli/http_util.rs b/cli/http_util.rs
index 41a01f1e3..1be9ad60f 100644
--- a/cli/http_util.rs
+++ b/cli/http_util.rs
@@ -3,18 +3,15 @@ use crate::deno_error;
use crate::deno_error::DenoError;
use crate::version;
use deno::ErrBox;
-use futures::future;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
use reqwest;
use reqwest::header::HeaderMap;
use reqwest::header::CONTENT_TYPE;
use reqwest::header::LOCATION;
use reqwest::header::USER_AGENT;
-use reqwest::r#async::Client;
-use reqwest::RedirectPolicy;
+use reqwest::redirect::Policy;
+use reqwest::Client;
use std::future::Future;
-use std::pin::Pin;
use url::Url;
/// Create new instance of async reqwest::Client. This client supports
@@ -26,9 +23,9 @@ pub fn get_client() -> Client {
format!("Deno/{}", version::DENO).parse().unwrap(),
);
Client::builder()
- .redirect(RedirectPolicy::none())
+ .redirect(Policy::none())
.default_headers(headers)
- .use_sys_proxy()
+ .use_rustls_tls()
.build()
.unwrap()
}
@@ -75,77 +72,45 @@ pub enum FetchOnceResult {
pub fn fetch_string_once(
url: &Url,
) -> impl Future<Output = Result<FetchOnceResult, ErrBox>> {
- type FetchAttempt = (Option<String>, Option<String>, Option<FetchOnceResult>);
-
let url = url.clone();
let client = get_client();
- futures::compat::Compat01As03::new(client.get(url.clone()).send())
- .map_err(ErrBox::from)
- .and_then(
- move |mut response| -> Pin<
- Box<dyn Future<Output = Result<FetchAttempt, ErrBox>> + Send>,
- > {
- if response.status().is_redirection() {
- let location_string = response
- .headers()
- .get(LOCATION)
- .expect("url redirection should provide 'location' header")
- .to_str()
- .unwrap();
-
- debug!("Redirecting to {:?}...", &location_string);
- let new_url = resolve_url_from_location(&url, location_string);
- // Boxed trait object turns out to be the savior for 2+ types yielding same results.
- return futures::future::try_join3(
- future::ok(None),
- future::ok(None),
- future::ok(Some(FetchOnceResult::Redirect(new_url))),
- )
- .boxed();
- }
-
- if response.status().is_client_error()
- || response.status().is_server_error()
- {
- return future::err(
- DenoError::new(
- deno_error::ErrorKind::Other,
- format!("Import '{}' failed: {}", &url, response.status()),
- )
- .into(),
- )
- .boxed();
- }
-
- let content_type = response
- .headers()
- .get(CONTENT_TYPE)
- .map(|content_type| content_type.to_str().unwrap().to_owned());
-
- let body = futures::compat::Compat01As03::new(response.text())
- .map_ok(Some)
- .map_err(ErrBox::from);
-
- futures::future::try_join3(
- body,
- future::ok(content_type),
- future::ok(None),
- )
- .boxed()
- },
- )
- .and_then(move |(maybe_code, maybe_content_type, maybe_redirect)| {
- if let Some(redirect) = maybe_redirect {
- future::ok(redirect)
- } else {
- // maybe_code should always contain code here!
- future::ok(FetchOnceResult::Code(
- maybe_code.unwrap(),
- maybe_content_type,
- ))
- }
- })
+ let fut = async move {
+ let response = client.get(url.clone()).send().await?;
+
+ if response.status().is_redirection() {
+ let location_string = response
+ .headers()
+ .get(LOCATION)
+ .expect("url redirection should provide 'location' header")
+ .to_str()
+ .unwrap();
+
+ debug!("Redirecting to {:?}...", &location_string);
+ let new_url = resolve_url_from_location(&url, location_string);
+ return Ok(FetchOnceResult::Redirect(new_url));
+ }
+
+ if response.status().is_client_error()
+ || response.status().is_server_error()
+ {
+ let err = DenoError::new(
+ deno_error::ErrorKind::Other,
+ format!("Import '{}' failed: {}", &url, response.status()),
+ );
+ return Err(err.into());
+ }
+
+ let content_type = response
+ .headers()
+ .get(CONTENT_TYPE)
+ .map(|content_type| content_type.to_str().unwrap().to_owned());
+
+ let body = response.text().await?;
+ return Ok(FetchOnceResult::Code(body, content_type));
+ };
+
+ fut.boxed()
}
#[cfg(test)]
diff --git a/cli/js/tls_test.ts b/cli/js/tls_test.ts
index 0fee79978..be3d54503 100644
--- a/cli/js/tls_test.ts
+++ b/cli/js/tls_test.ts
@@ -166,7 +166,10 @@ testPerm({ read: true, net: true }, async function dialAndListenTLS(): Promise<
assert(conn.remoteAddr != null);
assert(conn.localAddr != null);
await conn.write(response);
- conn.close();
+ // TODO(bartlomieju): this might be a bug
+ setTimeout(() => {
+ conn.close();
+ }, 0);
}
);
diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs
index 25cf99812..9db8d68be 100644
--- a/cli/ops/fetch.rs
+++ b/cli/ops/fetch.rs
@@ -7,7 +7,7 @@ use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno::*;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
+use futures::StreamExt;
use http::header::HeaderName;
use http::header::HeaderValue;
use http::Method;
@@ -56,32 +56,32 @@ pub fn op_fetch(
}
debug!("Before fetch {}", url);
let state_ = state.clone();
- let future = futures::compat::Compat01As03::new(request.send())
- .map_err(ErrBox::from)
- .and_then(move |res| {
- debug!("Fetch response {}", url);
- let status = res.status();
- let mut res_headers = Vec::new();
- for (key, val) in res.headers().iter() {
- res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
- }
- let body = HttpBody::from(res.into_body());
- let mut table = state_.lock_resource_table();
- let rid = table.add(
- "httpBody",
- Box::new(StreamResource::HttpBody(Box::new(body))),
- );
+ let future = async move {
+ let res = request.send().await?;
+ debug!("Fetch response {}", url);
+ let status = res.status();
+ let mut res_headers = Vec::new();
+ for (key, val) in res.headers().iter() {
+ res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
+ }
- let json_res = json!({
- "bodyRid": rid,
- "status": status.as_u16(),
- "statusText": status.canonical_reason().unwrap_or(""),
- "headers": res_headers
- });
+ let body = HttpBody::from(res.bytes_stream().boxed());
+ let mut table = state_.lock_resource_table();
+ let rid = table.add(
+ "httpBody",
+ Box::new(StreamResource::HttpBody(Box::new(body))),
+ );
- futures::future::ok(json_res)
+ let json_res = json!({
+ "bodyRid": rid,
+ "status": status.as_u16(),
+ "statusText": status.canonical_reason().unwrap_or(""),
+ "headers": res_headers
});
+ Ok(json_res)
+ };
+
Ok(JsonOp::Async(future.boxed()))
}
diff --git a/cli/ops/files.rs b/cli/ops/files.rs
index 1c041b38d..6f015329b 100644
--- a/cli/ops/files.rs
+++ b/cli/ops/files.rs
@@ -9,14 +9,9 @@ use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno::*;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
use std;
use std::convert::From;
-use std::future::Future;
use std::io::SeekFrom;
-use std::pin::Pin;
-use std::task::Context;
-use std::task::Poll;
use tokio;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
@@ -92,21 +87,19 @@ fn op_open(
}
let is_sync = args.promise_id.is_none();
- let op = futures::compat::Compat01As03::new(tokio::prelude::Future::map_err(
- open_options.open(filename),
- ErrBox::from,
- ))
- .and_then(move |fs_file| {
+
+ let fut = async move {
+ let fs_file = open_options.open(filename).await?;
let mut table = state_.lock_resource_table();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
- futures::future::ok(json!(rid))
- });
+ Ok(json!(rid))
+ };
if is_sync {
- let buf = futures::executor::block_on(op)?;
+ let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf))
} else {
- Ok(JsonOp::Async(op.boxed()))
+ Ok(JsonOp::Async(fut.boxed()))
}
}
@@ -127,37 +120,6 @@ fn op_close(
Ok(JsonOp::Sync(json!({})))
}
-pub struct SeekFuture {
- seek_from: SeekFrom,
- rid: ResourceId,
- state: ThreadSafeState,
-}
-
-impl Future for SeekFuture {
- type Output = Result<u64, ErrBox>;
-
- fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
- let inner = self.get_mut();
- let mut table = inner.state.lock_resource_table();
- let resource = table
- .get_mut::<StreamResource>(inner.rid)
- .ok_or_else(bad_resource)?;
-
- let tokio_file = match resource {
- StreamResource::FsFile(ref mut file) => file,
- _ => return Poll::Ready(Err(bad_resource())),
- };
-
- use tokio::prelude::Async::*;
-
- match tokio_file.poll_seek(inner.seek_from).map_err(ErrBox::from) {
- Ok(Ready(v)) => Poll::Ready(Ok(v)),
- Err(err) => Poll::Ready(Err(err)),
- Ok(NotReady) => Poll::Pending,
- }
- }
-}
-
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct SeekArgs {
@@ -189,17 +151,26 @@ fn op_seek(
}
};
- let fut = SeekFuture {
- state: state.clone(),
- seek_from,
- rid,
+ let mut table = state.lock_resource_table();
+ let resource = table
+ .get_mut::<StreamResource>(rid)
+ .ok_or_else(bad_resource)?;
+
+ let tokio_file = match resource {
+ StreamResource::FsFile(ref mut file) => file,
+ _ => return Err(bad_resource()),
+ };
+ let mut file = futures::executor::block_on(tokio_file.try_clone())?;
+
+ let fut = async move {
+ file.seek(seek_from).await?;
+ Ok(json!({}))
};
- let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() {
- let buf = futures::executor::block_on(op)?;
+ let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf))
} else {
- Ok(JsonOp::Async(op.boxed()))
+ Ok(JsonOp::Async(fut.boxed()))
}
}
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 11afb1891..f268adc03 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -8,18 +8,15 @@ use deno::ErrBox;
use deno::Resource;
use deno::*;
use futures;
-use futures::compat::AsyncRead01CompatExt;
-use futures::compat::AsyncWrite01CompatExt;
use futures::future::FutureExt;
-use futures::io::{AsyncRead, AsyncWrite};
use std;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio;
+use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
-use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;
@@ -86,9 +83,9 @@ pub enum StreamResource {
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
HttpBody(Box<HttpBody>),
- ChildStdin(tokio_process::ChildStdin),
- ChildStdout(tokio_process::ChildStdout),
- ChildStderr(tokio_process::ChildStderr),
+ ChildStdin(tokio::process::ChildStdin),
+ ChildStdout(tokio::process::ChildStdout),
+ ChildStderr(tokio::process::ChildStderr),
}
impl Resource for StreamResource {}
@@ -111,22 +108,14 @@ impl DenoAsyncRead for StreamResource {
) -> Poll<Result<usize, ErrBox>> {
let inner = self.get_mut();
let mut f: Box<dyn AsyncRead + Unpin> = match inner {
- StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)),
- StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)),
- StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)),
- StreamResource::ClientTlsStream(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
- StreamResource::ServerTlsStream(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
+ StreamResource::FsFile(f) => Box::new(f),
+ StreamResource::Stdin(f) => Box::new(f),
+ StreamResource::TcpStream(f) => Box::new(f),
+ StreamResource::ClientTlsStream(f) => Box::new(f),
+ StreamResource::ServerTlsStream(f) => Box::new(f),
+ StreamResource::ChildStdout(f) => Box::new(f),
+ StreamResource::ChildStderr(f) => Box::new(f),
StreamResource::HttpBody(f) => Box::new(f),
- StreamResource::ChildStdout(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
- StreamResource::ChildStderr(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
_ => {
return Poll::Ready(Err(bad_resource()));
}
@@ -145,6 +134,7 @@ impl DenoAsyncRead for StreamResource {
#[derive(Debug, PartialEq)]
enum IoState {
Pending,
+ Flush,
Done,
}
@@ -237,6 +227,11 @@ pub trait DenoAsyncWrite {
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), ErrBox>>;
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Result<(), ErrBox>>;
}
impl DenoAsyncWrite for StreamResource {
@@ -247,21 +242,13 @@ impl DenoAsyncWrite for StreamResource {
) -> Poll<Result<usize, ErrBox>> {
let inner = self.get_mut();
let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
- StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
- StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
- StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
- StreamResource::TcpStream(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
- StreamResource::ClientTlsStream(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
- StreamResource::ServerTlsStream(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
- StreamResource::ChildStdin(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
+ StreamResource::FsFile(f) => Box::new(f),
+ StreamResource::Stdout(f) => Box::new(f),
+ StreamResource::Stderr(f) => Box::new(f),
+ StreamResource::TcpStream(f) => Box::new(f),
+ StreamResource::ClientTlsStream(f) => Box::new(f),
+ StreamResource::ServerTlsStream(f) => Box::new(f),
+ StreamResource::ChildStdin(f) => Box::new(f),
_ => {
return Poll::Ready(Err(bad_resource()));
}
@@ -276,6 +263,33 @@ impl DenoAsyncWrite for StreamResource {
}
}
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Result<(), ErrBox>> {
+ let inner = self.get_mut();
+ let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
+ StreamResource::FsFile(f) => Box::new(f),
+ StreamResource::Stdout(f) => Box::new(f),
+ StreamResource::Stderr(f) => Box::new(f),
+ StreamResource::TcpStream(f) => Box::new(f),
+ StreamResource::ClientTlsStream(f) => Box::new(f),
+ StreamResource::ServerTlsStream(f) => Box::new(f),
+ StreamResource::ChildStdin(f) => Box::new(f),
+ _ => {
+ return Poll::Ready(Err(bad_resource()));
+ }
+ };
+
+ let r = AsyncWrite::poll_flush(Pin::new(&mut f), cx);
+
+ match r {
+ Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
+ Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+
fn poll_close(
self: Pin<&mut Self>,
_cx: &mut Context,
@@ -290,6 +304,7 @@ pub struct Write<T> {
buf: T,
io_state: IoState,
state: ThreadSafeState,
+ nwritten: i32,
}
/// Creates a future that will write some of the buffer `buf` to
@@ -306,6 +321,7 @@ where
buf,
io_state: IoState::Pending,
state: state.clone(),
+ nwritten: 0,
}
}
@@ -323,21 +339,44 @@ where
panic!("poll a Read after it's done");
}
- let mut table = inner.state.lock_resource_table();
- let resource = table
- .get_mut::<StreamResource>(inner.rid)
- .ok_or_else(bad_resource)?;
- let nwritten = match DenoAsyncWrite::poll_write(
- Pin::new(resource),
- cx,
- inner.buf.as_ref(),
- ) {
- Poll::Ready(Ok(v)) => v,
- Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
- Poll::Pending => return Poll::Pending,
- };
- inner.io_state = IoState::Done;
- Poll::Ready(Ok(nwritten as i32))
+ if inner.io_state == IoState::Pending {
+ let mut table = inner.state.lock_resource_table();
+ let resource = table
+ .get_mut::<StreamResource>(inner.rid)
+ .ok_or_else(bad_resource)?;
+
+ let nwritten = match DenoAsyncWrite::poll_write(
+ Pin::new(resource),
+ cx,
+ inner.buf.as_ref(),
+ ) {
+ Poll::Ready(Ok(v)) => v,
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ inner.io_state = IoState::Flush;
+ inner.nwritten = nwritten as i32;
+ }
+
+ // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
+ // and the reasons for the need to explicitly flush are not fully known.
+ // Figure out why it's needed and preferably remove it.
+ // https://github.com/denoland/deno/issues/3565
+ if inner.io_state == IoState::Flush {
+ let mut table = inner.state.lock_resource_table();
+ let resource = table
+ .get_mut::<StreamResource>(inner.rid)
+ .ok_or_else(bad_resource)?;
+ match DenoAsyncWrite::poll_flush(Pin::new(resource), cx) {
+ Poll::Ready(Ok(_)) => {
+ inner.io_state = IoState::Done;
+ }
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ }
+
+ Poll::Ready(Ok(inner.nwritten))
}
}
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 6d843c0ba..a3a1e665e 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -9,8 +9,6 @@ use deno::Resource;
use deno::*;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
-use futures::stream::StreamExt;
-use futures::stream::TryStreamExt;
use std;
use std::convert::From;
use std::future::Future;
@@ -20,7 +18,6 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio;
-use tokio::net::tcp::Incoming;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
@@ -73,27 +70,23 @@ impl Future for Accept {
ErrBox::from(e)
})?;
- let mut listener =
- futures::compat::Compat01As03::new(&mut listener_resource.listener)
- .map_err(ErrBox::from);
+ let listener = &mut listener_resource.listener;
- match listener.poll_next_unpin(cx) {
- Poll::Ready(Some(Ok(stream))) => {
+ match listener.poll_accept(cx).map_err(ErrBox::from) {
+ Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
inner.accept_state = AcceptState::Done;
- let addr = stream.peer_addr().unwrap();
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
- Poll::Ready(Some(Err(e))) => {
+ Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
inner.accept_state = AcceptState::Done;
Poll::Ready(Err(e))
}
- _ => unreachable!(),
}
}
}
@@ -160,32 +153,20 @@ fn op_dial(
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
- let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
- futures::compat::Compat01As03::new(TcpStream::connect(&addr))
- .map_err(ErrBox::from)
- .and_then(move |tcp_stream| {
- let local_addr = match tcp_stream.local_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- let remote_addr = match tcp_stream.peer_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- let mut table = state_.lock_resource_table();
- let rid = table
- .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
- futures::future::ok((rid, local_addr, remote_addr))
- })
- .map_err(ErrBox::from)
- .and_then(move |(rid, local_addr, remote_addr)| {
- futures::future::ok(json!({
- "rid": rid,
- "localAddr": local_addr.to_string(),
- "remoteAddr": remote_addr.to_string(),
- }))
- })
- });
+ let op = async move {
+ let addr = resolve_addr(&args.hostname, args.port).await?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut table = state_.lock_resource_table();
+ let rid =
+ table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
+ Ok(json!({
+ "rid": rid,
+ "localAddr": local_addr.to_string(),
+ "remoteAddr": remote_addr.to_string(),
+ }))
+ };
Ok(JsonOp::Async(op.boxed()))
}
@@ -235,7 +216,7 @@ struct ListenArgs {
#[allow(dead_code)]
struct TcpListenerResource {
- listener: Incoming,
+ listener: TcpListener,
waker: Option<futures::task::AtomicWaker>,
local_addr: SocketAddr,
}
@@ -300,11 +281,11 @@ fn op_listen(
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
- let listener = TcpListener::bind(&addr)?;
+ let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
let local_addr = listener.local_addr()?;
let local_addr_str = local_addr.to_string();
let listener_resource = TcpListenerResource {
- listener: listener.incoming(),
+ listener,
waker: None,
local_addr,
};
diff --git a/cli/ops/process.rs b/cli/ops/process.rs
index a267130ec..40a9877ac 100644
--- a/cli/ops/process.rs
+++ b/cli/ops/process.rs
@@ -14,12 +14,10 @@ use std;
use std::convert::From;
use std::future::Future;
use std::pin::Pin;
-use std::process::Command;
use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
-use tokio::prelude::Async;
-use tokio_process::CommandExt;
+use tokio::process::Command;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
@@ -33,42 +31,21 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill))));
}
-struct CloneFileFuture {
- rid: ResourceId,
- state: ThreadSafeState,
-}
-
-impl Future for CloneFileFuture {
- type Output = Result<tokio::fs::File, ErrBox>;
-
- fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
- let inner = self.get_mut();
- let mut table = inner.state.lock_resource_table();
- let repr = table
- .get_mut::<StreamResource>(inner.rid)
- .ok_or_else(bad_resource)?;
- match repr {
- StreamResource::FsFile(ref mut file) => {
- match file.poll_try_clone().map_err(ErrBox::from) {
- Err(err) => Poll::Ready(Err(err)),
- Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
- Ok(Async::NotReady) => Poll::Pending,
- }
- }
- _ => Poll::Ready(Err(bad_resource())),
- }
- }
-}
-
fn clone_file(
rid: u32,
state: &ThreadSafeState,
) -> Result<std::fs::File, ErrBox> {
- futures::executor::block_on(CloneFileFuture {
- rid,
- state: state.clone(),
- })
- .map(|f| f.into_std())
+ let mut table = state.lock_resource_table();
+ let repr = table
+ .get_mut::<StreamResource>(rid)
+ .ok_or_else(bad_resource)?;
+ let file = match repr {
+ StreamResource::FsFile(ref mut file) => file,
+ _ => return Err(bad_resource()),
+ };
+ let tokio_file = futures::executor::block_on(file.try_clone())?;
+ let std_file = futures::executor::block_on(tokio_file.into_std());
+ Ok(std_file)
}
fn subprocess_stdio_map(s: &str) -> std::process::Stdio {
@@ -95,7 +72,7 @@ struct RunArgs {
}
struct ChildResource {
- child: futures::compat::Compat01As03<tokio_process::Child>,
+ child: tokio::process::Child,
}
impl Resource for ChildResource {}
@@ -149,8 +126,11 @@ fn op_run(
c.stderr(subprocess_stdio_map(run_args.stderr.as_ref()));
}
+ // We want to kill child when it's closed
+ c.kill_on_drop(true);
+
// Spawn the command.
- let mut child = c.spawn_async().map_err(ErrBox::from)?;
+ let mut child = c.spawn()?;
let pid = child.id();
let mut table = state_.lock_resource_table();
@@ -188,9 +168,7 @@ fn op_run(
None => None,
};
- let child_resource = ChildResource {
- child: futures::compat::Compat01As03::new(child),
- };
+ let child_resource = ChildResource { child };
let child_rid = table.add("child", Box::new(child_resource));
Ok(JsonOp::Sync(json!({
diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs
index 33900d4e0..e1897a86e 100644
--- a/cli/ops/tls.rs
+++ b/cli/ops/tls.rs
@@ -10,9 +10,6 @@ use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
-use futures::stream::StreamExt;
-use futures::stream::TryStreamExt;
use std;
use std::convert::From;
use std::fs::File;
@@ -24,7 +21,6 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use tokio;
-use tokio::net::tcp::Incoming;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
@@ -65,7 +61,7 @@ pub fn op_dial_tls(
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: DialTLSArgs = serde_json::from_value(args)?;
- let cert_file = args.cert_file;
+ let cert_file = args.cert_file.clone();
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
if let Some(path) = cert_file.clone() {
@@ -77,62 +73,35 @@ pub fn op_dial_tls(
domain.push_str("localhost");
}
- let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
- futures::compat::Compat01As03::new(TcpStream::connect(&addr))
- .and_then(move |tcp_stream| {
- let local_addr = match tcp_stream.local_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(e),
- };
- let remote_addr = match tcp_stream.peer_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(e),
- };
- let mut config = ClientConfig::new();
- config
- .root_store
- .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
-
- if let Some(path) = cert_file {
- let key_file = match File::open(path) {
- Ok(v) => v,
- Err(e) => return futures::future::err(e),
- };
- let reader = &mut BufReader::new(key_file);
- config.root_store.add_pem_file(reader).unwrap();
- }
- let tls_connector = TlsConnector::from(Arc::new(config));
- futures::future::ok((
- tls_connector,
- tcp_stream,
- local_addr,
- remote_addr,
- ))
- })
- .map_err(ErrBox::from)
- .and_then(
- move |(tls_connector, tcp_stream, local_addr, remote_addr)| {
- let dnsname = DNSNameRef::try_from_ascii_str(&domain)
- .expect("Invalid DNS lookup");
- futures::compat::Compat01As03::new(
- tls_connector.connect(dnsname, tcp_stream),
- )
- .map_err(ErrBox::from)
- .and_then(move |tls_stream| {
- let mut table = state_.lock_resource_table();
- let rid = table.add(
- "clientTlsStream",
- Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
- );
- futures::future::ok(json!({
- "rid": rid,
- "localAddr": local_addr.to_string(),
- "remoteAddr": remote_addr.to_string(),
- }))
- })
- },
- )
- });
+ let op = async move {
+ let addr = resolve_addr(&args.hostname, args.port).await?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+ if let Some(path) = cert_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
+ }
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
+ let mut table = state_.lock_resource_table();
+ let rid = table.add(
+ "clientTlsStream",
+ Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": local_addr.to_string(),
+ "remoteAddr": remote_addr.to_string(),
+ }))
+ };
Ok(JsonOp::Async(op.boxed()))
}
@@ -197,7 +166,7 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, ErrBox> {
#[allow(dead_code)]
pub struct TlsListenerResource {
- listener: Incoming,
+ listener: TcpListener,
tls_acceptor: TlsAcceptor,
waker: Option<futures::task::AtomicWaker>,
local_addr: SocketAddr,
@@ -283,11 +252,11 @@ fn op_listen_tls(
let tls_acceptor = TlsAcceptor::from(Arc::new(config));
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
- let listener = TcpListener::bind(&addr)?;
+ let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
let local_addr = listener.local_addr()?;
let local_addr_str = local_addr.to_string();
let tls_listener_resource = TlsListenerResource {
- listener: listener.incoming(),
+ listener,
tls_acceptor,
waker: None,
local_addr,
@@ -343,27 +312,23 @@ impl Future for AcceptTls {
ErrBox::from(e)
})?;
- let mut listener =
- futures::compat::Compat01As03::new(&mut listener_resource.listener)
- .map_err(ErrBox::from);
+ let listener = &mut listener_resource.listener;
- match listener.poll_next_unpin(cx) {
- Poll::Ready(Some(Ok(stream))) => {
+ match listener.poll_accept(cx).map_err(ErrBox::from) {
+ Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
inner.accept_state = AcceptTlsState::Done;
- let addr = stream.peer_addr().unwrap();
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
- Poll::Ready(Some(Err(e))) => {
+ Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
inner.accept_state = AcceptTlsState::Done;
Poll::Ready(Err(e))
}
- _ => unreachable!(),
}
}
}
@@ -380,47 +345,33 @@ fn op_accept_tls(
) -> Result<JsonOp, ErrBox> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let state1 = state.clone();
- let state2 = state.clone();
- let op = accept_tls(state, rid)
- .and_then(move |(tcp_stream, _socket_addr)| {
- let local_addr = match tcp_stream.local_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- let remote_addr = match tcp_stream.peer_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- futures::future::ok((tcp_stream, local_addr, remote_addr))
- })
- .and_then(move |(tcp_stream, local_addr, remote_addr)| {
- let table = state1.lock_resource_table();
+ let state = state.clone();
+ let op = async move {
+ let (tcp_stream, _socket_addr) = accept_tls(&state.clone(), rid).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let tls_acceptor = {
+ let table = state.lock_resource_table();
let resource = table
.get::<TlsListenerResource>(rid)
.ok_or_else(bad_resource)
.expect("Can't find tls listener");
-
- futures::compat::Compat01As03::new(
- resource.tls_acceptor.accept(tcp_stream),
+ resource.tls_acceptor.clone()
+ };
+ let tls_stream = tls_acceptor.accept(tcp_stream).await?;
+ let rid = {
+ let mut table = state.lock_resource_table();
+ table.add(
+ "serverTlsStream",
+ Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
)
- .map_err(ErrBox::from)
- .and_then(move |tls_stream| {
- let mut table = state2.lock_resource_table();
- let rid = table.add(
- "serverTlsStream",
- Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
- );
- futures::future::ok((rid, local_addr, remote_addr))
- })
- })
- .and_then(move |(rid, local_addr, remote_addr)| {
- futures::future::ok(json!({
- "rid": rid,
- "localAddr": local_addr.to_string(),
- "remoteAddr": remote_addr.to_string(),
- }))
- });
+ };
+ Ok(json!({
+ "rid": rid,
+ "localAddr": local_addr.to_string(),
+ "remoteAddr": remote_addr.to_string(),
+ }))
+ };
Ok(JsonOp::Async(op.boxed()))
}
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index 467d0bfb2..131283614 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -168,14 +168,13 @@ fn op_create_worker(
// TODO(bartlomieju): this should spawn mod execution on separate tokio task
// and block on receving message on a channel or even use sync channel /shrug
let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1);
- let fut = worker
- .execute_mod_async(&module_specifier, None, false)
- .then(move |result| {
- sender.send(result).expect("Failed to send message");
- futures::future::ok(())
- })
- .boxed()
- .compat();
+ let fut = async move {
+ let result = worker
+ .execute_mod_async(&module_specifier, None, false)
+ .await;
+ sender.send(result).expect("Failed to send message");
+ }
+ .boxed();
tokio::spawn(fut);
let result = receiver.recv().expect("Failed to receive message");
diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs
index fea47792e..ec1e0e3cf 100644
--- a/cli/tokio_util.rs
+++ b/cli/tokio_util.rs
@@ -1,30 +1,30 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use futures;
-use futures::future::FutureExt;
-use futures::future::TryFutureExt;
use std::future::Future;
use tokio;
use tokio::runtime;
-pub fn create_threadpool_runtime(
-) -> Result<tokio::runtime::Runtime, tokio::io::Error> {
- runtime::Builder::new()
- .panic_handler(|err| std::panic::resume_unwind(err))
- .build()
-}
-
pub fn run<F>(future: F)
where
F: Future<Output = Result<(), ()>> + Send + 'static,
{
- // tokio::runtime::current_thread::run(future)
- let rt = create_threadpool_runtime().expect("Unable to create Tokio runtime");
- rt.block_on_all(future.boxed().compat()).unwrap();
+ let mut rt = runtime::Builder::new()
+ .threaded_scheduler()
+ .enable_all()
+ .thread_name("deno")
+ .build()
+ .expect("Unable to create Tokio runtime");
+ rt.block_on(future).unwrap();
}
pub fn run_on_current_thread<F>(future: F)
where
F: Future<Output = Result<(), ()>> + Send + 'static,
{
- tokio::runtime::current_thread::run(future.boxed().compat());
+ let mut rt = runtime::Builder::new()
+ .basic_scheduler()
+ .enable_all()
+ .thread_name("deno")
+ .build()
+ .expect("Unable to create Tokio runtime");
+ rt.block_on(future).unwrap();
}
diff --git a/cli/worker.rs b/cli/worker.rs
index 410d6ee44..c44dfbb39 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -420,10 +420,9 @@ mod tests {
let fut = async move {
let r = worker.await;
r.unwrap();
- Ok(())
};
- tokio::spawn(fut.boxed().compat());
+ tokio::spawn(fut);
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
@@ -453,26 +452,21 @@ mod tests {
.unwrap();
let worker_ = worker.clone();
- let worker_future = worker
- .then(move |r| {
- println!("workers.rs after resource close");
- r.unwrap();
- futures::future::ok(())
- })
- .shared();
+ let worker_future = async move {
+ let result = worker.await;
+ println!("workers.rs after resource close");
+ result.unwrap();
+ }
+ .shared();
let worker_future_ = worker_future.clone();
- tokio::spawn(
- worker_future_
- .then(|_: Result<(), ()>| futures::future::ok(()))
- .compat(),
- );
+ tokio::spawn(worker_future_);
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = block_on(worker_.post_message(msg));
assert!(r.is_ok());
- block_on(worker_future).unwrap();
+ block_on(worker_future);
})
}