summaryrefslogtreecommitdiff
path: root/cli/ops
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops')
-rw-r--r--cli/ops/fetch.rs46
-rw-r--r--cli/ops/files.rs75
-rw-r--r--cli/ops/io.rs143
-rw-r--r--cli/ops/net.rs61
-rw-r--r--cli/ops/process.rs58
-rw-r--r--cli/ops/tls.rs167
-rw-r--r--cli/ops/workers.rs15
7 files changed, 242 insertions, 323 deletions
diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs
index 25cf99812..9db8d68be 100644
--- a/cli/ops/fetch.rs
+++ b/cli/ops/fetch.rs
@@ -7,7 +7,7 @@ use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno::*;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
+use futures::StreamExt;
use http::header::HeaderName;
use http::header::HeaderValue;
use http::Method;
@@ -56,32 +56,32 @@ pub fn op_fetch(
}
debug!("Before fetch {}", url);
let state_ = state.clone();
- let future = futures::compat::Compat01As03::new(request.send())
- .map_err(ErrBox::from)
- .and_then(move |res| {
- debug!("Fetch response {}", url);
- let status = res.status();
- let mut res_headers = Vec::new();
- for (key, val) in res.headers().iter() {
- res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
- }
- let body = HttpBody::from(res.into_body());
- let mut table = state_.lock_resource_table();
- let rid = table.add(
- "httpBody",
- Box::new(StreamResource::HttpBody(Box::new(body))),
- );
+ let future = async move {
+ let res = request.send().await?;
+ debug!("Fetch response {}", url);
+ let status = res.status();
+ let mut res_headers = Vec::new();
+ for (key, val) in res.headers().iter() {
+ res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
+ }
- let json_res = json!({
- "bodyRid": rid,
- "status": status.as_u16(),
- "statusText": status.canonical_reason().unwrap_or(""),
- "headers": res_headers
- });
+ let body = HttpBody::from(res.bytes_stream().boxed());
+ let mut table = state_.lock_resource_table();
+ let rid = table.add(
+ "httpBody",
+ Box::new(StreamResource::HttpBody(Box::new(body))),
+ );
- futures::future::ok(json_res)
+ let json_res = json!({
+ "bodyRid": rid,
+ "status": status.as_u16(),
+ "statusText": status.canonical_reason().unwrap_or(""),
+ "headers": res_headers
});
+ Ok(json_res)
+ };
+
Ok(JsonOp::Async(future.boxed()))
}
diff --git a/cli/ops/files.rs b/cli/ops/files.rs
index 1c041b38d..6f015329b 100644
--- a/cli/ops/files.rs
+++ b/cli/ops/files.rs
@@ -9,14 +9,9 @@ use crate::ops::json_op;
use crate::state::ThreadSafeState;
use deno::*;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
use std;
use std::convert::From;
-use std::future::Future;
use std::io::SeekFrom;
-use std::pin::Pin;
-use std::task::Context;
-use std::task::Poll;
use tokio;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
@@ -92,21 +87,19 @@ fn op_open(
}
let is_sync = args.promise_id.is_none();
- let op = futures::compat::Compat01As03::new(tokio::prelude::Future::map_err(
- open_options.open(filename),
- ErrBox::from,
- ))
- .and_then(move |fs_file| {
+
+ let fut = async move {
+ let fs_file = open_options.open(filename).await?;
let mut table = state_.lock_resource_table();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
- futures::future::ok(json!(rid))
- });
+ Ok(json!(rid))
+ };
if is_sync {
- let buf = futures::executor::block_on(op)?;
+ let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf))
} else {
- Ok(JsonOp::Async(op.boxed()))
+ Ok(JsonOp::Async(fut.boxed()))
}
}
@@ -127,37 +120,6 @@ fn op_close(
Ok(JsonOp::Sync(json!({})))
}
-pub struct SeekFuture {
- seek_from: SeekFrom,
- rid: ResourceId,
- state: ThreadSafeState,
-}
-
-impl Future for SeekFuture {
- type Output = Result<u64, ErrBox>;
-
- fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
- let inner = self.get_mut();
- let mut table = inner.state.lock_resource_table();
- let resource = table
- .get_mut::<StreamResource>(inner.rid)
- .ok_or_else(bad_resource)?;
-
- let tokio_file = match resource {
- StreamResource::FsFile(ref mut file) => file,
- _ => return Poll::Ready(Err(bad_resource())),
- };
-
- use tokio::prelude::Async::*;
-
- match tokio_file.poll_seek(inner.seek_from).map_err(ErrBox::from) {
- Ok(Ready(v)) => Poll::Ready(Ok(v)),
- Err(err) => Poll::Ready(Err(err)),
- Ok(NotReady) => Poll::Pending,
- }
- }
-}
-
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct SeekArgs {
@@ -189,17 +151,26 @@ fn op_seek(
}
};
- let fut = SeekFuture {
- state: state.clone(),
- seek_from,
- rid,
+ let mut table = state.lock_resource_table();
+ let resource = table
+ .get_mut::<StreamResource>(rid)
+ .ok_or_else(bad_resource)?;
+
+ let tokio_file = match resource {
+ StreamResource::FsFile(ref mut file) => file,
+ _ => return Err(bad_resource()),
+ };
+ let mut file = futures::executor::block_on(tokio_file.try_clone())?;
+
+ let fut = async move {
+ file.seek(seek_from).await?;
+ Ok(json!({}))
};
- let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() {
- let buf = futures::executor::block_on(op)?;
+ let buf = futures::executor::block_on(fut)?;
Ok(JsonOp::Sync(buf))
} else {
- Ok(JsonOp::Async(op.boxed()))
+ Ok(JsonOp::Async(fut.boxed()))
}
}
diff --git a/cli/ops/io.rs b/cli/ops/io.rs
index 11afb1891..f268adc03 100644
--- a/cli/ops/io.rs
+++ b/cli/ops/io.rs
@@ -8,18 +8,15 @@ use deno::ErrBox;
use deno::Resource;
use deno::*;
use futures;
-use futures::compat::AsyncRead01CompatExt;
-use futures::compat::AsyncWrite01CompatExt;
use futures::future::FutureExt;
-use futures::io::{AsyncRead, AsyncWrite};
use std;
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio;
+use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
-use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;
@@ -86,9 +83,9 @@ pub enum StreamResource {
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
HttpBody(Box<HttpBody>),
- ChildStdin(tokio_process::ChildStdin),
- ChildStdout(tokio_process::ChildStdout),
- ChildStderr(tokio_process::ChildStderr),
+ ChildStdin(tokio::process::ChildStdin),
+ ChildStdout(tokio::process::ChildStdout),
+ ChildStderr(tokio::process::ChildStderr),
}
impl Resource for StreamResource {}
@@ -111,22 +108,14 @@ impl DenoAsyncRead for StreamResource {
) -> Poll<Result<usize, ErrBox>> {
let inner = self.get_mut();
let mut f: Box<dyn AsyncRead + Unpin> = match inner {
- StreamResource::FsFile(f) => Box::new(AsyncRead01CompatExt::compat(f)),
- StreamResource::Stdin(f) => Box::new(AsyncRead01CompatExt::compat(f)),
- StreamResource::TcpStream(f) => Box::new(AsyncRead01CompatExt::compat(f)),
- StreamResource::ClientTlsStream(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
- StreamResource::ServerTlsStream(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
+ StreamResource::FsFile(f) => Box::new(f),
+ StreamResource::Stdin(f) => Box::new(f),
+ StreamResource::TcpStream(f) => Box::new(f),
+ StreamResource::ClientTlsStream(f) => Box::new(f),
+ StreamResource::ServerTlsStream(f) => Box::new(f),
+ StreamResource::ChildStdout(f) => Box::new(f),
+ StreamResource::ChildStderr(f) => Box::new(f),
StreamResource::HttpBody(f) => Box::new(f),
- StreamResource::ChildStdout(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
- StreamResource::ChildStderr(f) => {
- Box::new(AsyncRead01CompatExt::compat(f))
- }
_ => {
return Poll::Ready(Err(bad_resource()));
}
@@ -145,6 +134,7 @@ impl DenoAsyncRead for StreamResource {
#[derive(Debug, PartialEq)]
enum IoState {
Pending,
+ Flush,
Done,
}
@@ -237,6 +227,11 @@ pub trait DenoAsyncWrite {
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), ErrBox>>;
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Result<(), ErrBox>>;
}
impl DenoAsyncWrite for StreamResource {
@@ -247,21 +242,13 @@ impl DenoAsyncWrite for StreamResource {
) -> Poll<Result<usize, ErrBox>> {
let inner = self.get_mut();
let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
- StreamResource::FsFile(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
- StreamResource::Stdout(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
- StreamResource::Stderr(f) => Box::new(AsyncWrite01CompatExt::compat(f)),
- StreamResource::TcpStream(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
- StreamResource::ClientTlsStream(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
- StreamResource::ServerTlsStream(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
- StreamResource::ChildStdin(f) => {
- Box::new(AsyncWrite01CompatExt::compat(f))
- }
+ StreamResource::FsFile(f) => Box::new(f),
+ StreamResource::Stdout(f) => Box::new(f),
+ StreamResource::Stderr(f) => Box::new(f),
+ StreamResource::TcpStream(f) => Box::new(f),
+ StreamResource::ClientTlsStream(f) => Box::new(f),
+ StreamResource::ServerTlsStream(f) => Box::new(f),
+ StreamResource::ChildStdin(f) => Box::new(f),
_ => {
return Poll::Ready(Err(bad_resource()));
}
@@ -276,6 +263,33 @@ impl DenoAsyncWrite for StreamResource {
}
}
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ ) -> Poll<Result<(), ErrBox>> {
+ let inner = self.get_mut();
+ let mut f: Box<dyn AsyncWrite + Unpin> = match inner {
+ StreamResource::FsFile(f) => Box::new(f),
+ StreamResource::Stdout(f) => Box::new(f),
+ StreamResource::Stderr(f) => Box::new(f),
+ StreamResource::TcpStream(f) => Box::new(f),
+ StreamResource::ClientTlsStream(f) => Box::new(f),
+ StreamResource::ServerTlsStream(f) => Box::new(f),
+ StreamResource::ChildStdin(f) => Box::new(f),
+ _ => {
+ return Poll::Ready(Err(bad_resource()));
+ }
+ };
+
+ let r = AsyncWrite::poll_flush(Pin::new(&mut f), cx);
+
+ match r {
+ Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))),
+ Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
+ Poll::Pending => Poll::Pending,
+ }
+ }
+
fn poll_close(
self: Pin<&mut Self>,
_cx: &mut Context,
@@ -290,6 +304,7 @@ pub struct Write<T> {
buf: T,
io_state: IoState,
state: ThreadSafeState,
+ nwritten: i32,
}
/// Creates a future that will write some of the buffer `buf` to
@@ -306,6 +321,7 @@ where
buf,
io_state: IoState::Pending,
state: state.clone(),
+ nwritten: 0,
}
}
@@ -323,21 +339,44 @@ where
panic!("poll a Read after it's done");
}
- let mut table = inner.state.lock_resource_table();
- let resource = table
- .get_mut::<StreamResource>(inner.rid)
- .ok_or_else(bad_resource)?;
- let nwritten = match DenoAsyncWrite::poll_write(
- Pin::new(resource),
- cx,
- inner.buf.as_ref(),
- ) {
- Poll::Ready(Ok(v)) => v,
- Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
- Poll::Pending => return Poll::Pending,
- };
- inner.io_state = IoState::Done;
- Poll::Ready(Ok(nwritten as i32))
+ if inner.io_state == IoState::Pending {
+ let mut table = inner.state.lock_resource_table();
+ let resource = table
+ .get_mut::<StreamResource>(inner.rid)
+ .ok_or_else(bad_resource)?;
+
+ let nwritten = match DenoAsyncWrite::poll_write(
+ Pin::new(resource),
+ cx,
+ inner.buf.as_ref(),
+ ) {
+ Poll::Ready(Ok(v)) => v,
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ inner.io_state = IoState::Flush;
+ inner.nwritten = nwritten as i32;
+ }
+
+ // TODO(bartlomieju): this step was added during upgrade to Tokio 0.2
+ // and the reasons for the need to explicitly flush are not fully known.
+ // Figure out why it's needed and preferably remove it.
+ // https://github.com/denoland/deno/issues/3565
+ if inner.io_state == IoState::Flush {
+ let mut table = inner.state.lock_resource_table();
+ let resource = table
+ .get_mut::<StreamResource>(inner.rid)
+ .ok_or_else(bad_resource)?;
+ match DenoAsyncWrite::poll_flush(Pin::new(resource), cx) {
+ Poll::Ready(Ok(_)) => {
+ inner.io_state = IoState::Done;
+ }
+ Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
+ Poll::Pending => return Poll::Pending,
+ };
+ }
+
+ Poll::Ready(Ok(inner.nwritten))
}
}
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 6d843c0ba..a3a1e665e 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -9,8 +9,6 @@ use deno::Resource;
use deno::*;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
-use futures::stream::StreamExt;
-use futures::stream::TryStreamExt;
use std;
use std::convert::From;
use std::future::Future;
@@ -20,7 +18,6 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio;
-use tokio::net::tcp::Incoming;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
@@ -73,27 +70,23 @@ impl Future for Accept {
ErrBox::from(e)
})?;
- let mut listener =
- futures::compat::Compat01As03::new(&mut listener_resource.listener)
- .map_err(ErrBox::from);
+ let listener = &mut listener_resource.listener;
- match listener.poll_next_unpin(cx) {
- Poll::Ready(Some(Ok(stream))) => {
+ match listener.poll_accept(cx).map_err(ErrBox::from) {
+ Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
inner.accept_state = AcceptState::Done;
- let addr = stream.peer_addr().unwrap();
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
- Poll::Ready(Some(Err(e))) => {
+ Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
inner.accept_state = AcceptState::Done;
Poll::Ready(Err(e))
}
- _ => unreachable!(),
}
}
}
@@ -160,32 +153,20 @@ fn op_dial(
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
- let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
- futures::compat::Compat01As03::new(TcpStream::connect(&addr))
- .map_err(ErrBox::from)
- .and_then(move |tcp_stream| {
- let local_addr = match tcp_stream.local_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- let remote_addr = match tcp_stream.peer_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- let mut table = state_.lock_resource_table();
- let rid = table
- .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
- futures::future::ok((rid, local_addr, remote_addr))
- })
- .map_err(ErrBox::from)
- .and_then(move |(rid, local_addr, remote_addr)| {
- futures::future::ok(json!({
- "rid": rid,
- "localAddr": local_addr.to_string(),
- "remoteAddr": remote_addr.to_string(),
- }))
- })
- });
+ let op = async move {
+ let addr = resolve_addr(&args.hostname, args.port).await?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut table = state_.lock_resource_table();
+ let rid =
+ table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
+ Ok(json!({
+ "rid": rid,
+ "localAddr": local_addr.to_string(),
+ "remoteAddr": remote_addr.to_string(),
+ }))
+ };
Ok(JsonOp::Async(op.boxed()))
}
@@ -235,7 +216,7 @@ struct ListenArgs {
#[allow(dead_code)]
struct TcpListenerResource {
- listener: Incoming,
+ listener: TcpListener,
waker: Option<futures::task::AtomicWaker>,
local_addr: SocketAddr,
}
@@ -300,11 +281,11 @@ fn op_listen(
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
- let listener = TcpListener::bind(&addr)?;
+ let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
let local_addr = listener.local_addr()?;
let local_addr_str = local_addr.to_string();
let listener_resource = TcpListenerResource {
- listener: listener.incoming(),
+ listener,
waker: None,
local_addr,
};
diff --git a/cli/ops/process.rs b/cli/ops/process.rs
index a267130ec..40a9877ac 100644
--- a/cli/ops/process.rs
+++ b/cli/ops/process.rs
@@ -14,12 +14,10 @@ use std;
use std::convert::From;
use std::future::Future;
use std::pin::Pin;
-use std::process::Command;
use std::process::ExitStatus;
use std::task::Context;
use std::task::Poll;
-use tokio::prelude::Async;
-use tokio_process::CommandExt;
+use tokio::process::Command;
#[cfg(unix)]
use std::os::unix::process::ExitStatusExt;
@@ -33,42 +31,21 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill))));
}
-struct CloneFileFuture {
- rid: ResourceId,
- state: ThreadSafeState,
-}
-
-impl Future for CloneFileFuture {
- type Output = Result<tokio::fs::File, ErrBox>;
-
- fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
- let inner = self.get_mut();
- let mut table = inner.state.lock_resource_table();
- let repr = table
- .get_mut::<StreamResource>(inner.rid)
- .ok_or_else(bad_resource)?;
- match repr {
- StreamResource::FsFile(ref mut file) => {
- match file.poll_try_clone().map_err(ErrBox::from) {
- Err(err) => Poll::Ready(Err(err)),
- Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
- Ok(Async::NotReady) => Poll::Pending,
- }
- }
- _ => Poll::Ready(Err(bad_resource())),
- }
- }
-}
-
fn clone_file(
rid: u32,
state: &ThreadSafeState,
) -> Result<std::fs::File, ErrBox> {
- futures::executor::block_on(CloneFileFuture {
- rid,
- state: state.clone(),
- })
- .map(|f| f.into_std())
+ let mut table = state.lock_resource_table();
+ let repr = table
+ .get_mut::<StreamResource>(rid)
+ .ok_or_else(bad_resource)?;
+ let file = match repr {
+ StreamResource::FsFile(ref mut file) => file,
+ _ => return Err(bad_resource()),
+ };
+ let tokio_file = futures::executor::block_on(file.try_clone())?;
+ let std_file = futures::executor::block_on(tokio_file.into_std());
+ Ok(std_file)
}
fn subprocess_stdio_map(s: &str) -> std::process::Stdio {
@@ -95,7 +72,7 @@ struct RunArgs {
}
struct ChildResource {
- child: futures::compat::Compat01As03<tokio_process::Child>,
+ child: tokio::process::Child,
}
impl Resource for ChildResource {}
@@ -149,8 +126,11 @@ fn op_run(
c.stderr(subprocess_stdio_map(run_args.stderr.as_ref()));
}
+ // We want to kill child when it's closed
+ c.kill_on_drop(true);
+
// Spawn the command.
- let mut child = c.spawn_async().map_err(ErrBox::from)?;
+ let mut child = c.spawn()?;
let pid = child.id();
let mut table = state_.lock_resource_table();
@@ -188,9 +168,7 @@ fn op_run(
None => None,
};
- let child_resource = ChildResource {
- child: futures::compat::Compat01As03::new(child),
- };
+ let child_resource = ChildResource { child };
let child_rid = table.add("child", Box::new(child_resource));
Ok(JsonOp::Sync(json!({
diff --git a/cli/ops/tls.rs b/cli/ops/tls.rs
index 33900d4e0..e1897a86e 100644
--- a/cli/ops/tls.rs
+++ b/cli/ops/tls.rs
@@ -10,9 +10,6 @@ use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
-use futures::stream::StreamExt;
-use futures::stream::TryStreamExt;
use std;
use std::convert::From;
use std::fs::File;
@@ -24,7 +21,6 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use tokio;
-use tokio::net::tcp::Incoming;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_rustls::{rustls::ClientConfig, TlsConnector};
@@ -65,7 +61,7 @@ pub fn op_dial_tls(
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: DialTLSArgs = serde_json::from_value(args)?;
- let cert_file = args.cert_file;
+ let cert_file = args.cert_file.clone();
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
if let Some(path) = cert_file.clone() {
@@ -77,62 +73,35 @@ pub fn op_dial_tls(
domain.push_str("localhost");
}
- let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
- futures::compat::Compat01As03::new(TcpStream::connect(&addr))
- .and_then(move |tcp_stream| {
- let local_addr = match tcp_stream.local_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(e),
- };
- let remote_addr = match tcp_stream.peer_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(e),
- };
- let mut config = ClientConfig::new();
- config
- .root_store
- .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
-
- if let Some(path) = cert_file {
- let key_file = match File::open(path) {
- Ok(v) => v,
- Err(e) => return futures::future::err(e),
- };
- let reader = &mut BufReader::new(key_file);
- config.root_store.add_pem_file(reader).unwrap();
- }
- let tls_connector = TlsConnector::from(Arc::new(config));
- futures::future::ok((
- tls_connector,
- tcp_stream,
- local_addr,
- remote_addr,
- ))
- })
- .map_err(ErrBox::from)
- .and_then(
- move |(tls_connector, tcp_stream, local_addr, remote_addr)| {
- let dnsname = DNSNameRef::try_from_ascii_str(&domain)
- .expect("Invalid DNS lookup");
- futures::compat::Compat01As03::new(
- tls_connector.connect(dnsname, tcp_stream),
- )
- .map_err(ErrBox::from)
- .and_then(move |tls_stream| {
- let mut table = state_.lock_resource_table();
- let rid = table.add(
- "clientTlsStream",
- Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
- );
- futures::future::ok(json!({
- "rid": rid,
- "localAddr": local_addr.to_string(),
- "remoteAddr": remote_addr.to_string(),
- }))
- })
- },
- )
- });
+ let op = async move {
+ let addr = resolve_addr(&args.hostname, args.port).await?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut config = ClientConfig::new();
+ config
+ .root_store
+ .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
+ if let Some(path) = cert_file {
+ let key_file = File::open(path)?;
+ let reader = &mut BufReader::new(key_file);
+ config.root_store.add_pem_file(reader).unwrap();
+ }
+ let tls_connector = TlsConnector::from(Arc::new(config));
+ let dnsname =
+ DNSNameRef::try_from_ascii_str(&domain).expect("Invalid DNS lookup");
+ let tls_stream = tls_connector.connect(dnsname, tcp_stream).await?;
+ let mut table = state_.lock_resource_table();
+ let rid = table.add(
+ "clientTlsStream",
+ Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
+ );
+ Ok(json!({
+ "rid": rid,
+ "localAddr": local_addr.to_string(),
+ "remoteAddr": remote_addr.to_string(),
+ }))
+ };
Ok(JsonOp::Async(op.boxed()))
}
@@ -197,7 +166,7 @@ fn load_keys(path: &str) -> Result<Vec<PrivateKey>, ErrBox> {
#[allow(dead_code)]
pub struct TlsListenerResource {
- listener: Incoming,
+ listener: TcpListener,
tls_acceptor: TlsAcceptor,
waker: Option<futures::task::AtomicWaker>,
local_addr: SocketAddr,
@@ -283,11 +252,11 @@ fn op_listen_tls(
let tls_acceptor = TlsAcceptor::from(Arc::new(config));
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
- let listener = TcpListener::bind(&addr)?;
+ let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
let local_addr = listener.local_addr()?;
let local_addr_str = local_addr.to_string();
let tls_listener_resource = TlsListenerResource {
- listener: listener.incoming(),
+ listener,
tls_acceptor,
waker: None,
local_addr,
@@ -343,27 +312,23 @@ impl Future for AcceptTls {
ErrBox::from(e)
})?;
- let mut listener =
- futures::compat::Compat01As03::new(&mut listener_resource.listener)
- .map_err(ErrBox::from);
+ let listener = &mut listener_resource.listener;
- match listener.poll_next_unpin(cx) {
- Poll::Ready(Some(Ok(stream))) => {
+ match listener.poll_accept(cx).map_err(ErrBox::from) {
+ Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
inner.accept_state = AcceptTlsState::Done;
- let addr = stream.peer_addr().unwrap();
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
- Poll::Ready(Some(Err(e))) => {
+ Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
inner.accept_state = AcceptTlsState::Done;
Poll::Ready(Err(e))
}
- _ => unreachable!(),
}
}
}
@@ -380,47 +345,33 @@ fn op_accept_tls(
) -> Result<JsonOp, ErrBox> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let state1 = state.clone();
- let state2 = state.clone();
- let op = accept_tls(state, rid)
- .and_then(move |(tcp_stream, _socket_addr)| {
- let local_addr = match tcp_stream.local_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- let remote_addr = match tcp_stream.peer_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- futures::future::ok((tcp_stream, local_addr, remote_addr))
- })
- .and_then(move |(tcp_stream, local_addr, remote_addr)| {
- let table = state1.lock_resource_table();
+ let state = state.clone();
+ let op = async move {
+ let (tcp_stream, _socket_addr) = accept_tls(&state.clone(), rid).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let tls_acceptor = {
+ let table = state.lock_resource_table();
let resource = table
.get::<TlsListenerResource>(rid)
.ok_or_else(bad_resource)
.expect("Can't find tls listener");
-
- futures::compat::Compat01As03::new(
- resource.tls_acceptor.accept(tcp_stream),
+ resource.tls_acceptor.clone()
+ };
+ let tls_stream = tls_acceptor.accept(tcp_stream).await?;
+ let rid = {
+ let mut table = state.lock_resource_table();
+ table.add(
+ "serverTlsStream",
+ Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
)
- .map_err(ErrBox::from)
- .and_then(move |tls_stream| {
- let mut table = state2.lock_resource_table();
- let rid = table.add(
- "serverTlsStream",
- Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
- );
- futures::future::ok((rid, local_addr, remote_addr))
- })
- })
- .and_then(move |(rid, local_addr, remote_addr)| {
- futures::future::ok(json!({
- "rid": rid,
- "localAddr": local_addr.to_string(),
- "remoteAddr": remote_addr.to_string(),
- }))
- });
+ };
+ Ok(json!({
+ "rid": rid,
+ "localAddr": local_addr.to_string(),
+ "remoteAddr": remote_addr.to_string(),
+ }))
+ };
Ok(JsonOp::Async(op.boxed()))
}
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index 467d0bfb2..131283614 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -168,14 +168,13 @@ fn op_create_worker(
// TODO(bartlomieju): this should spawn mod execution on separate tokio task
// and block on receving message on a channel or even use sync channel /shrug
let (sender, receiver) = mpsc::sync_channel::<Result<(), ErrBox>>(1);
- let fut = worker
- .execute_mod_async(&module_specifier, None, false)
- .then(move |result| {
- sender.send(result).expect("Failed to send message");
- futures::future::ok(())
- })
- .boxed()
- .compat();
+ let fut = async move {
+ let result = worker
+ .execute_mod_async(&module_specifier, None, false)
+ .await;
+ sender.send(result).expect("Failed to send message");
+ }
+ .boxed();
tokio::spawn(fut);
let result = receiver.recv().expect("Failed to receive message");