diff options
Diffstat (limited to 'src/tokio_util.rs')
-rw-r--r-- | src/tokio_util.rs | 36 |
1 files changed, 11 insertions, 25 deletions
diff --git a/src/tokio_util.rs b/src/tokio_util.rs index ef66f4610..810b826b4 100644 --- a/src/tokio_util.rs +++ b/src/tokio_util.rs @@ -1,6 +1,5 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use crate::resources::Resource; - use futures; use futures::Future; use futures::Poll; @@ -9,7 +8,14 @@ use std::mem; use std::net::SocketAddr; use tokio; use tokio::net::TcpStream; -use tokio_executor; + +pub fn run<F>(future: F) +where + F: Future<Item = (), Error = ()> + Send + 'static, +{ + // tokio::runtime::current_thread::run(future) + tokio::run(future) +} pub fn block_on<F, R, E>(future: F) -> Result<R, E> where @@ -25,10 +31,12 @@ where // Set the default executor so we can use tokio::spawn(). It's difficult to // pass around mut references to the runtime, so using with_default is // preferable. Ideally Tokio would provide this function. +#[cfg(test)] pub fn init<F>(f: F) where F: FnOnce(), { + use tokio_executor; let rt = tokio::runtime::Runtime::new().unwrap(); let mut executor = rt.executor(); let mut enter = tokio_executor::enter().expect("Multiple executors at once"); @@ -63,29 +71,7 @@ impl Future for Accept { fn poll(&mut self) -> Poll<Self::Item, Self::Error> { let (stream, addr) = match self.state { - // Similar to try_ready!, but also track/untrack accept task - // in TcpListener resource. - // In this way, when the listener is closed, the task can be - // notified to error out (instead of stuck forever). - AcceptState::Pending(ref mut r) => match r.poll_accept() { - Ok(futures::prelude::Async::Ready(t)) => { - // Notice: it is possible to be Ready on the first poll. - // When eager accept fails due to WouldBlock, - // a next poll() might still be immediately Ready. - // See https://github.com/denoland/deno/issues/1756. - r.untrack_task(); - t - } - Ok(futures::prelude::Async::NotReady) => { - // Would error out if another accept task is being tracked. - r.track_task()?; - return Ok(futures::prelude::Async::NotReady); - } - Err(e) => { - r.untrack_task(); - return Err(e); - } - }, + AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()), AcceptState::Empty => panic!("poll Accept after it's done"), }; |