diff options
Diffstat (limited to 'cli/tokio_util.rs')
-rw-r--r-- | cli/tokio_util.rs | 29 |
1 files changed, 27 insertions, 2 deletions
diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index 678bb8e66..4ee73eef9 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -7,6 +7,7 @@ use futures::Poll; use std::io; use std::mem; use std::net::SocketAddr; +use std::ops::FnOnce; use tokio; use tokio::net::TcpStream; use tokio::runtime; @@ -78,6 +79,7 @@ where #[derive(Debug)] enum AcceptState { + Eager(Resource), Pending(Resource), Empty, } @@ -85,7 +87,7 @@ enum AcceptState { /// Simply accepts a connection. pub fn accept(r: Resource) -> Accept { Accept { - state: AcceptState::Pending(r), + state: AcceptState::Eager(r), } } @@ -107,6 +109,16 @@ impl Future for Accept { // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). + AcceptState::Eager(ref mut r) => match r.poll_accept() { + Ok(futures::prelude::Async::Ready(t)) => t, + Ok(futures::prelude::Async::NotReady) => { + self.state = AcceptState::Pending(r.to_owned()); + return Ok(futures::prelude::Async::NotReady); + } + Err(e) => { + return Err(e); + } + }, AcceptState::Pending(ref mut r) => match r.poll_accept() { Ok(futures::prelude::Async::Ready(t)) => { r.untrack_task(); @@ -126,8 +138,8 @@ impl Future for Accept { }; match mem::replace(&mut self.state, AcceptState::Empty) { - AcceptState::Pending(_) => Ok((stream, addr).into()), AcceptState::Empty => panic!("invalid internal state"), + _ => Ok((stream, addr).into()), } } } @@ -166,3 +178,16 @@ where { f.map_err(|err| panic!("Future got unexpected error: {:?}", err)) } + +#[cfg(test)] +pub fn run_in_task<F>(f: F) +where + F: FnOnce() + Send + 'static, +{ + let fut = futures::future::lazy(move || { + f(); + futures::future::ok(()) + }); + + run(fut) +} |