diff options
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 61 |
1 files changed, 21 insertions, 40 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 6d843c0ba..a3a1e665e 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -9,8 +9,6 @@ use deno::Resource; use deno::*; use futures::future::FutureExt; use futures::future::TryFutureExt; -use futures::stream::StreamExt; -use futures::stream::TryStreamExt; use std; use std::convert::From; use std::future::Future; @@ -20,7 +18,6 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; use tokio; -use tokio::net::tcp::Incoming; use tokio::net::TcpListener; use tokio::net::TcpStream; @@ -73,27 +70,23 @@ impl Future for Accept { ErrBox::from(e) })?; - let mut listener = - futures::compat::Compat01As03::new(&mut listener_resource.listener) - .map_err(ErrBox::from); + let listener = &mut listener_resource.listener; - match listener.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(stream))) => { + match listener.poll_accept(cx).map_err(ErrBox::from) { + Poll::Ready(Ok((stream, addr))) => { listener_resource.untrack_task(); inner.accept_state = AcceptState::Done; - let addr = stream.peer_addr().unwrap(); Poll::Ready(Ok((stream, addr))) } Poll::Pending => { listener_resource.track_task(cx)?; Poll::Pending } - Poll::Ready(Some(Err(e))) => { + Poll::Ready(Err(e)) => { listener_resource.untrack_task(); inner.accept_state = AcceptState::Done; Poll::Ready(Err(e)) } - _ => unreachable!(), } } } @@ -160,32 +153,20 @@ fn op_dial( let state_ = state.clone(); state.check_net(&args.hostname, args.port)?; - let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { - futures::compat::Compat01As03::new(TcpStream::connect(&addr)) - .map_err(ErrBox::from) - .and_then(move |tcp_stream| { - let local_addr = match tcp_stream.local_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(ErrBox::from(e)), - }; - let remote_addr = match tcp_stream.peer_addr() { - Ok(v) => v, - Err(e) => return futures::future::err(ErrBox::from(e)), - }; - let mut table = state_.lock_resource_table(); - let rid = table - .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); - futures::future::ok((rid, local_addr, remote_addr)) - }) - .map_err(ErrBox::from) - .and_then(move |(rid, local_addr, remote_addr)| { - futures::future::ok(json!({ - "rid": rid, - "localAddr": local_addr.to_string(), - "remoteAddr": remote_addr.to_string(), - })) - }) - }); + let op = async move { + let addr = resolve_addr(&args.hostname, args.port).await?; + let tcp_stream = TcpStream::connect(&addr).await?; + let local_addr = tcp_stream.local_addr()?; + let remote_addr = tcp_stream.peer_addr()?; + let mut table = state_.lock_resource_table(); + let rid = + table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); + Ok(json!({ + "rid": rid, + "localAddr": local_addr.to_string(), + "remoteAddr": remote_addr.to_string(), + })) + }; Ok(JsonOp::Async(op.boxed())) } @@ -235,7 +216,7 @@ struct ListenArgs { #[allow(dead_code)] struct TcpListenerResource { - listener: Incoming, + listener: TcpListener, waker: Option<futures::task::AtomicWaker>, local_addr: SocketAddr, } @@ -300,11 +281,11 @@ fn op_listen( let addr = futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; - let listener = TcpListener::bind(&addr)?; + let listener = futures::executor::block_on(TcpListener::bind(&addr))?; let local_addr = listener.local_addr()?; let local_addr_str = local_addr.to_string(); let listener_resource = TcpListenerResource { - listener: listener.incoming(), + listener, waker: None, local_addr, }; |