summaryrefslogtreecommitdiff
path: root/cli/tokio_util.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/tokio_util.rs')
-rw-r--r--cli/tokio_util.rs29
1 files changed, 27 insertions, 2 deletions
diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs
index 678bb8e66..4ee73eef9 100644
--- a/cli/tokio_util.rs
+++ b/cli/tokio_util.rs
@@ -7,6 +7,7 @@ use futures::Poll;
use std::io;
use std::mem;
use std::net::SocketAddr;
+use std::ops::FnOnce;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime;
@@ -78,6 +79,7 @@ where
#[derive(Debug)]
enum AcceptState {
+ Eager(Resource),
Pending(Resource),
Empty,
}
@@ -85,7 +87,7 @@ enum AcceptState {
/// Simply accepts a connection.
pub fn accept(r: Resource) -> Accept {
Accept {
- state: AcceptState::Pending(r),
+ state: AcceptState::Eager(r),
}
}
@@ -107,6 +109,16 @@ impl Future for Accept {
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
+ AcceptState::Eager(ref mut r) => match r.poll_accept() {
+ Ok(futures::prelude::Async::Ready(t)) => t,
+ Ok(futures::prelude::Async::NotReady) => {
+ self.state = AcceptState::Pending(r.to_owned());
+ return Ok(futures::prelude::Async::NotReady);
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ },
AcceptState::Pending(ref mut r) => match r.poll_accept() {
Ok(futures::prelude::Async::Ready(t)) => {
r.untrack_task();
@@ -126,8 +138,8 @@ impl Future for Accept {
};
match mem::replace(&mut self.state, AcceptState::Empty) {
- AcceptState::Pending(_) => Ok((stream, addr).into()),
AcceptState::Empty => panic!("invalid internal state"),
+ _ => Ok((stream, addr).into()),
}
}
}
@@ -166,3 +178,16 @@ where
{
f.map_err(|err| panic!("Future got unexpected error: {:?}", err))
}
+
+#[cfg(test)]
+pub fn run_in_task<F>(f: F)
+where
+ F: FnOnce() + Send + 'static,
+{
+ let fut = futures::future::lazy(move || {
+ f();
+ futures::future::ok(())
+ });
+
+ run(fut)
+}