diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-11-17 01:17:47 +0100 |
---|---|---|
committer | Ry Dahl <ry@tinyclouds.org> | 2019-11-16 19:17:47 -0500 |
commit | 8f9a942cb911ed017eb128e9fbeb6f9a48e69601 (patch) | |
tree | 4f56623262f84becac18546d9da3d5d9ce9c8735 /cli/ops/net.rs | |
parent | cb00fd6e988184420f842b1e77ca4cf627d32773 (diff) |
Use futures 0.3 API (#3358)
Diffstat (limited to 'cli/ops/net.rs')
-rw-r--r-- | cli/ops/net.rs | 136 |
1 files changed, 82 insertions, 54 deletions
diff --git a/cli/ops/net.rs b/cli/ops/net.rs index 2fe81e140..929b87dde 100644 --- a/cli/ops/net.rs +++ b/cli/ops/net.rs @@ -7,14 +7,20 @@ use crate::resolve_addr::resolve_addr; use crate::state::ThreadSafeState; use deno::Resource; use deno::*; -use futures::Async; -use futures::Future; -use futures::Poll; +use futures::future::FutureExt; +use futures::future::TryFutureExt; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; use std; use std::convert::From; +use std::future::Future; use std::net::Shutdown; use std::net::SocketAddr; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; use tokio; +use tokio::net::tcp::Incoming; use tokio::net::TcpListener; use tokio::net::TcpStream; @@ -49,17 +55,17 @@ pub struct Accept { } impl Future for Accept { - type Item = (TcpStream, SocketAddr); - type Error = ErrBox; + type Output = Result<(TcpStream, SocketAddr), ErrBox>; - fn poll(&mut self) -> Poll<Self::Item, Self::Error> { - if self.accept_state == AcceptState::Done { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + if inner.accept_state == AcceptState::Done { panic!("poll Accept after it's done"); } - let mut table = self.state.lock_resource_table(); + let mut table = inner.state.lock_resource_table(); let listener_resource = table - .get_mut::<TcpListenerResource>(self.rid) + .get_mut::<TcpListenerResource>(inner.rid) .ok_or_else(|| { let e = std::io::Error::new( std::io::ErrorKind::Other, @@ -68,44 +74,50 @@ impl Future for Accept { ErrBox::from(e) })?; - let listener = &mut listener_resource.listener; + let mut listener = + futures::compat::Compat01As03::new(&mut listener_resource.listener) + .map_err(ErrBox::from); - if self.accept_state == AcceptState::Eager { + if inner.accept_state == AcceptState::Eager { // Similar to try_ready!, but also track/untrack accept task // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). - match listener.poll_accept().map_err(ErrBox::from) { - Ok(Async::Ready((stream, addr))) => { - self.accept_state = AcceptState::Done; - return Ok((stream, addr).into()); + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stream))) => { + inner.accept_state = AcceptState::Done; + let addr = stream.peer_addr().unwrap(); + return Poll::Ready(Ok((stream, addr))); } - Ok(Async::NotReady) => { - self.accept_state = AcceptState::Pending; - return Ok(Async::NotReady); + Poll::Pending => { + inner.accept_state = AcceptState::Pending; + return Poll::Pending; } - Err(e) => { - self.accept_state = AcceptState::Done; - return Err(e); + Poll::Ready(Some(Err(e))) => { + inner.accept_state = AcceptState::Done; + return Poll::Ready(Err(e)); } + _ => unreachable!(), } } - match listener.poll_accept().map_err(ErrBox::from) { - Ok(Async::Ready((stream, addr))) => { + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stream))) => { listener_resource.untrack_task(); - self.accept_state = AcceptState::Done; - Ok((stream, addr).into()) + inner.accept_state = AcceptState::Done; + let addr = stream.peer_addr().unwrap(); + Poll::Ready(Ok((stream, addr))) } - Ok(Async::NotReady) => { - listener_resource.track_task()?; - Ok(Async::NotReady) + Poll::Pending => { + listener_resource.track_task(cx)?; + Poll::Pending } - Err(e) => { + Poll::Ready(Some(Err(e))) => { listener_resource.untrack_task(); - self.accept_state = AcceptState::Done; - Err(e) + inner.accept_state = AcceptState::Done; + Poll::Ready(Err(e)) } + _ => unreachable!(), } } } @@ -130,12 +142,18 @@ fn op_accept( let op = accept(state, rid) .and_then(move |(tcp_stream, _socket_addr)| { - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; + let local_addr = match tcp_stream.local_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; + let remote_addr = match tcp_stream.peer_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; let mut table = state_.lock_resource_table(); let rid = table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); - Ok((rid, local_addr, remote_addr)) + futures::future::ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) .and_then(move |(rid, local_addr, remote_addr)| { @@ -146,7 +164,7 @@ fn op_accept( })) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] @@ -167,15 +185,21 @@ fn op_dial( state.check_net(&args.hostname, args.port)?; let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| { - TcpStream::connect(&addr) + futures::compat::Compat01As03::new(TcpStream::connect(&addr)) .map_err(ErrBox::from) .and_then(move |tcp_stream| { - let local_addr = tcp_stream.local_addr()?; - let remote_addr = tcp_stream.peer_addr()?; + let local_addr = match tcp_stream.local_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; + let remote_addr = match tcp_stream.peer_addr() { + Ok(v) => v, + Err(e) => return futures::future::err(ErrBox::from(e)), + }; let mut table = state_.lock_resource_table(); let rid = table .add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream))); - Ok((rid, local_addr, remote_addr)) + futures::future::ok((rid, local_addr, remote_addr)) }) .map_err(ErrBox::from) .and_then(move |(rid, local_addr, remote_addr)| { @@ -187,7 +211,7 @@ fn op_dial( }) }); - Ok(JsonOp::Async(Box::new(op))) + Ok(JsonOp::Async(op.boxed())) } #[derive(Deserialize)] @@ -235,8 +259,8 @@ struct ListenArgs { #[allow(dead_code)] struct TcpListenerResource { - listener: tokio::net::TcpListener, - task: Option<futures::task::Task>, + listener: Incoming, + waker: Option<futures::task::AtomicWaker>, local_addr: SocketAddr, } @@ -244,7 +268,7 @@ impl Resource for TcpListenerResource {} impl Drop for TcpListenerResource { fn drop(&mut self) { - self.notify_task(); + self.wake_task(); } } @@ -253,12 +277,12 @@ impl TcpListenerResource { /// can be notified when listener is closed. /// /// Throws an error if another task is already tracked. - pub fn track_task(&mut self) -> Result<(), ErrBox> { + pub fn track_task(&mut self, cx: &Context) -> Result<(), ErrBox> { // Currently, we only allow tracking a single accept task for a listener. // This might be changed in the future with multiple workers. // Caveat: TcpListener by itself also only tracks an accept task at a time. // See https://github.com/tokio-rs/tokio/issues/846#issuecomment-454208883 - if self.task.is_some() { + if self.waker.is_some() { let e = std::io::Error::new( std::io::ErrorKind::Other, "Another accept task is ongoing", @@ -266,22 +290,24 @@ impl TcpListenerResource { return Err(ErrBox::from(e)); } - self.task.replace(futures::task::current()); + let waker = futures::task::AtomicWaker::new(); + waker.register(cx.waker()); + self.waker.replace(waker); Ok(()) } /// Notifies a task when listener is closed so accept future can resolve. - pub fn notify_task(&mut self) { - if let Some(task) = self.task.take() { - task.notify(); + pub fn wake_task(&mut self) { + if let Some(waker) = self.waker.as_ref() { + waker.wake(); } } /// Stop tracking a task. /// Happens when the task is done and thus no further tracking is needed. pub fn untrack_task(&mut self) { - if self.task.is_some() { - self.task.take(); + if self.waker.is_some() { + self.waker.take(); } } } @@ -296,17 +322,19 @@ fn op_listen( state.check_net(&args.hostname, args.port)?; - let addr = resolve_addr(&args.hostname, args.port).wait()?; + let addr = + futures::executor::block_on(resolve_addr(&args.hostname, args.port))?; let listener = TcpListener::bind(&addr)?; let local_addr = listener.local_addr()?; let local_addr_str = local_addr.to_string(); let listener_resource = TcpListenerResource { - listener, - task: None, + listener: listener.incoming(), + waker: None, local_addr, }; let mut table = state.lock_resource_table(); let rid = table.add("tcpListener", Box::new(listener_resource)); + debug!("New listener {} {}", rid, local_addr_str); Ok(JsonOp::Sync(json!({ "rid": rid, |