diff options
Diffstat (limited to 'test_util/src')
-rw-r--r-- | test_util/src/lib.rs | 65 |
1 files changed, 28 insertions, 37 deletions
diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs index 3d5c60175..3f54fa9ee 100644 --- a/test_util/src/lib.rs +++ b/test_util/src/lib.rs @@ -5,28 +5,28 @@ #[macro_use] extern crate lazy_static; +use core::mem::replace; use futures::FutureExt; use futures::Stream; use futures::StreamExt; -use futures::TryStreamExt; use hyper::header::HeaderValue; +use hyper::server::Server; use hyper::service::make_service_fn; use hyper::service::service_fn; use hyper::Body; use hyper::Request; use hyper::Response; -use hyper::Server; use hyper::StatusCode; use os_pipe::pipe; #[cfg(unix)] pub use pty; use regex::Regex; use std::collections::HashMap; +use std::convert::Infallible; use std::env; use std::io; use std::io::Read; use std::io::Write; -use std::mem::replace; use std::net::SocketAddr; use std::path::PathBuf; use std::pin::Pin; @@ -146,11 +146,9 @@ async fn hyper_hello(port: u16) { println!("hyper hello"); let addr = SocketAddr::from(([127, 0, 0, 1], port)); let hello_svc = make_service_fn(|_| async move { - Ok::<_, hyper::error::Error>(service_fn( - move |_: Request<Body>| async move { - Ok::<_, hyper::error::Error>(Response::new(Body::from("Hello World!"))) - }, - )) + Ok::<_, Infallible>(service_fn(move |_: Request<Body>| async move { + Ok::<_, Infallible>(Response::new(Body::from("Hello World!"))) + })) }); let server = Server::bind(&addr).serve(hello_svc); @@ -203,7 +201,7 @@ async fn another_redirect(req: Request<Body>) -> hyper::Result<Response<Body>> { } async fn run_ws_server(addr: &SocketAddr) { - let mut listener = TcpListener::bind(addr).await.unwrap(); + let listener = TcpListener::bind(addr).await.unwrap(); while let Ok((stream, _addr)) = listener.accept().await { tokio::spawn(async move { let ws_stream_fut = accept_async(stream); @@ -278,7 +276,7 @@ async fn run_wss_server(addr: &SocketAddr) { let tls_config = get_tls_config(cert_file, key_file).await.unwrap(); let tls_acceptor = TlsAcceptor::from(tls_config); - let mut listener = TcpListener::bind(addr).await.unwrap(); + let listener = TcpListener::bind(addr).await.unwrap(); while let Ok((stream, _addr)) = listener.accept().await { let acceptor = tls_acceptor.clone(); @@ -620,7 +618,7 @@ unsafe impl std::marker::Send for HyperAcceptor<'_> {} async fn wrap_redirect_server() { let redirect_svc = - make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(redirect)) }); + make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(redirect)) }); let redirect_addr = SocketAddr::from(([127, 0, 0, 1], REDIRECT_PORT)); let redirect_server = Server::bind(&redirect_addr).serve(redirect_svc); if let Err(e) = redirect_server.await { @@ -630,7 +628,7 @@ async fn wrap_redirect_server() { async fn wrap_double_redirect_server() { let double_redirects_svc = make_service_fn(|_| async { - Ok::<_, hyper::Error>(service_fn(double_redirects)) + Ok::<_, Infallible>(service_fn(double_redirects)) }); let double_redirects_addr = SocketAddr::from(([127, 0, 0, 1], DOUBLE_REDIRECTS_PORT)); @@ -643,7 +641,7 @@ async fn wrap_double_redirect_server() { async fn wrap_inf_redirect_server() { let inf_redirects_svc = make_service_fn(|_| async { - Ok::<_, hyper::Error>(service_fn(inf_redirects)) + Ok::<_, Infallible>(service_fn(inf_redirects)) }); let inf_redirects_addr = SocketAddr::from(([127, 0, 0, 1], INF_REDIRECTS_PORT)); @@ -656,7 +654,7 @@ async fn wrap_inf_redirect_server() { async fn wrap_another_redirect_server() { let another_redirect_svc = make_service_fn(|_| async { - Ok::<_, hyper::Error>(service_fn(another_redirect)) + Ok::<_, Infallible>(service_fn(another_redirect)) }); let another_redirect_addr = SocketAddr::from(([127, 0, 0, 1], ANOTHER_REDIRECT_PORT)); @@ -669,7 +667,7 @@ async fn wrap_another_redirect_server() { async fn wrap_abs_redirect_server() { let abs_redirect_svc = make_service_fn(|_| async { - Ok::<_, hyper::Error>(service_fn(absolute_redirect)) + Ok::<_, Infallible>(service_fn(absolute_redirect)) }); let abs_redirect_addr = SocketAddr::from(([127, 0, 0, 1], REDIRECT_ABSOLUTE_PORT)); @@ -681,9 +679,8 @@ async fn wrap_abs_redirect_server() { } async fn wrap_main_server() { - let main_server_svc = make_service_fn(|_| async { - Ok::<_, hyper::Error>(service_fn(main_server)) - }); + let main_server_svc = + make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) }); let main_server_addr = SocketAddr::from(([127, 0, 0, 1], PORT)); let main_server = Server::bind(&main_server_addr).serve(main_server_svc); if let Err(e) = main_server.await { @@ -698,29 +695,23 @@ async fn wrap_main_https_server() { let tls_config = get_tls_config(cert_file, key_file) .await .expect("Cannot get TLS config"); - let mut tcp = TcpListener::bind(&main_server_https_addr) - .await - .expect("Cannot bind TCP"); loop { + let tcp = TcpListener::bind(&main_server_https_addr) + .await + .expect("Cannot bind TCP"); let tls_acceptor = TlsAcceptor::from(tls_config.clone()); // Prepare a long-running future stream to accept and serve cients. - let incoming_tls_stream = tcp - .incoming() - .map_err(|e| { - eprintln!("Error Incoming: {:?}", e); - io::Error::new(io::ErrorKind::Other, e) - }) - .and_then(move |s| { - use futures::TryFutureExt; - tls_acceptor.accept(s).map_err(|e| { - eprintln!("TLS Error {:?}", e); - e - }) - }) - .boxed(); + let incoming_tls_stream = async_stream::stream! { + loop { + let (socket, _) = tcp.accept().await?; + let stream = tls_acceptor.accept(socket); + yield stream.await; + } + } + .boxed(); let main_server_https_svc = make_service_fn(|_| async { - Ok::<_, hyper::Error>(service_fn(main_server)) + Ok::<_, Infallible>(service_fn(main_server)) }); let main_server_https = Server::builder(HyperAcceptor { acceptor: incoming_tls_stream, @@ -737,7 +728,7 @@ async fn wrap_main_https_server() { // Use the single-threaded scheduler. The hyper server is used as a point of // comparison for the (single-threaded!) benchmarks in cli/bench. We're not // comparing apples to apples if we use the default multi-threaded scheduler. -#[tokio::main(basic_scheduler)] +#[tokio::main(flavor = "current_thread")] pub async fn run_all_servers() { if let Some(port) = env::args().nth(1) { return hyper_hello(port.parse::<u16>().unwrap()).await; |