diff options
Diffstat (limited to 'cli/tokio_util.rs')
-rw-r--r-- | cli/tokio_util.rs | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs new file mode 100644 index 000000000..810b826b4 --- /dev/null +++ b/cli/tokio_util.rs @@ -0,0 +1,118 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::resources::Resource; +use futures; +use futures::Future; +use futures::Poll; +use std::io; +use std::mem; +use std::net::SocketAddr; +use tokio; +use tokio::net::TcpStream; + +pub fn run<F>(future: F) +where + F: Future<Item = (), Error = ()> + Send + 'static, +{ + // tokio::runtime::current_thread::run(future) + tokio::run(future) +} + +pub fn block_on<F, R, E>(future: F) -> Result<R, E> +where + F: Send + 'static + Future<Item = R, Error = E>, + R: Send + 'static, + E: Send + 'static, +{ + let (tx, rx) = futures::sync::oneshot::channel(); + tokio::spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + rx.wait().unwrap() +} + +// Set the default executor so we can use tokio::spawn(). It's difficult to +// pass around mut references to the runtime, so using with_default is +// preferable. Ideally Tokio would provide this function. +#[cfg(test)] +pub fn init<F>(f: F) +where + F: FnOnce(), +{ + use tokio_executor; + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut executor = rt.executor(); + let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f()); +} + +#[derive(Debug)] +enum AcceptState { + Pending(Resource), + Empty, +} + +/// Simply accepts a connection. +pub fn accept(r: Resource) -> Accept { + Accept { + state: AcceptState::Pending(r), + } +} + +/// A future which can be used to easily read available number of bytes to fill +/// a buffer. +/// +/// Created by the [`read`] function. +#[derive(Debug)] +pub struct Accept { + state: AcceptState, +} + +impl Future for Accept { + type Item = (TcpStream, SocketAddr); + type Error = io::Error; + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + let (stream, addr) = match self.state { + AcceptState::Pending(ref mut r) => try_ready!(r.poll_accept()), + AcceptState::Empty => panic!("poll Accept after it's done"), + }; + + match mem::replace(&mut self.state, AcceptState::Empty) { + AcceptState::Pending(_) => Ok((stream, addr).into()), + AcceptState::Empty => panic!("invalid internal state"), + } + } +} + +/// `futures::future::poll_fn` only support `F: FnMut()->Poll<T, E>` +/// However, we require that `F: FnOnce()->Poll<T, E>`. +/// Therefore, we created our version of `poll_fn`. +pub fn poll_fn<T, E, F>(f: F) -> PollFn<F> +where + F: FnOnce() -> Poll<T, E>, +{ + PollFn { inner: Some(f) } +} + +pub struct PollFn<F> { + inner: Option<F>, +} + +impl<T, E, F> Future for PollFn<F> +where + F: FnOnce() -> Poll<T, E>, +{ + type Item = T; + type Error = E; + + fn poll(&mut self) -> Poll<T, E> { + let f = self.inner.take().expect("Inner fn has been taken."); + f() + } +} + +pub fn panic_on_error<I, E, F>(f: F) -> impl Future<Item = I, Error = ()> +where + F: Future<Item = I, Error = E>, + E: std::fmt::Debug, +{ + f.map_err(|err| panic!("Future got unexpected error: {:?}", err)) +} |