summaryrefslogtreecommitdiff
path: root/cli/ops/net.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-12-30 14:57:17 +0100
committerGitHub <noreply@github.com>2019-12-30 14:57:17 +0100
commit46d76a7562025374600a7f866dfc68c1b7e268e9 (patch)
tree0681d383781d8a28ac7ea23d75f22b1faeea0208 /cli/ops/net.rs
parentdf1665a8fc92168c3eb115a768ecfeccbe575e18 (diff)
upgrade: Tokio 0.2 (#3418)
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r--cli/ops/net.rs61
1 files changed, 21 insertions, 40 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs
index 6d843c0ba..a3a1e665e 100644
--- a/cli/ops/net.rs
+++ b/cli/ops/net.rs
@@ -9,8 +9,6 @@ use deno::Resource;
use deno::*;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
-use futures::stream::StreamExt;
-use futures::stream::TryStreamExt;
use std;
use std::convert::From;
use std::future::Future;
@@ -20,7 +18,6 @@ use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use tokio;
-use tokio::net::tcp::Incoming;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
@@ -73,27 +70,23 @@ impl Future for Accept {
ErrBox::from(e)
})?;
- let mut listener =
- futures::compat::Compat01As03::new(&mut listener_resource.listener)
- .map_err(ErrBox::from);
+ let listener = &mut listener_resource.listener;
- match listener.poll_next_unpin(cx) {
- Poll::Ready(Some(Ok(stream))) => {
+ match listener.poll_accept(cx).map_err(ErrBox::from) {
+ Poll::Ready(Ok((stream, addr))) => {
listener_resource.untrack_task();
inner.accept_state = AcceptState::Done;
- let addr = stream.peer_addr().unwrap();
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => {
listener_resource.track_task(cx)?;
Poll::Pending
}
- Poll::Ready(Some(Err(e))) => {
+ Poll::Ready(Err(e)) => {
listener_resource.untrack_task();
inner.accept_state = AcceptState::Done;
Poll::Ready(Err(e))
}
- _ => unreachable!(),
}
}
}
@@ -160,32 +153,20 @@ fn op_dial(
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
- let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
- futures::compat::Compat01As03::new(TcpStream::connect(&addr))
- .map_err(ErrBox::from)
- .and_then(move |tcp_stream| {
- let local_addr = match tcp_stream.local_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- let remote_addr = match tcp_stream.peer_addr() {
- Ok(v) => v,
- Err(e) => return futures::future::err(ErrBox::from(e)),
- };
- let mut table = state_.lock_resource_table();
- let rid = table
- .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
- futures::future::ok((rid, local_addr, remote_addr))
- })
- .map_err(ErrBox::from)
- .and_then(move |(rid, local_addr, remote_addr)| {
- futures::future::ok(json!({
- "rid": rid,
- "localAddr": local_addr.to_string(),
- "remoteAddr": remote_addr.to_string(),
- }))
- })
- });
+ let op = async move {
+ let addr = resolve_addr(&args.hostname, args.port).await?;
+ let tcp_stream = TcpStream::connect(&addr).await?;
+ let local_addr = tcp_stream.local_addr()?;
+ let remote_addr = tcp_stream.peer_addr()?;
+ let mut table = state_.lock_resource_table();
+ let rid =
+ table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
+ Ok(json!({
+ "rid": rid,
+ "localAddr": local_addr.to_string(),
+ "remoteAddr": remote_addr.to_string(),
+ }))
+ };
Ok(JsonOp::Async(op.boxed()))
}
@@ -235,7 +216,7 @@ struct ListenArgs {
#[allow(dead_code)]
struct TcpListenerResource {
- listener: Incoming,
+ listener: TcpListener,
waker: Option<futures::task::AtomicWaker>,
local_addr: SocketAddr,
}
@@ -300,11 +281,11 @@ fn op_listen(
let addr =
futures::executor::block_on(resolve_addr(&args.hostname, args.port))?;
- let listener = TcpListener::bind(&addr)?;
+ let listener = futures::executor::block_on(TcpListener::bind(&addr))?;
let local_addr = listener.local_addr()?;
let local_addr_str = local_addr.to_string();
let listener_resource = TcpListenerResource {
- listener: listener.incoming(),
+ listener,
waker: None,
local_addr,
};