diff options
Diffstat (limited to 'test_util/src')
-rw-r--r-- | test_util/src/servers/grpc.rs | 2 | ||||
-rw-r--r-- | test_util/src/servers/hyper_utils.rs | 146 | ||||
-rw-r--r-- | test_util/src/servers/mod.rs | 301 | ||||
-rw-r--r-- | test_util/src/servers/registry.rs | 13 | ||||
-rw-r--r-- | test_util/src/servers/ws.rs | 16 |
5 files changed, 260 insertions, 218 deletions
diff --git a/test_util/src/servers/grpc.rs b/test_util/src/servers/grpc.rs index 08d297a4c..096f8c85f 100644 --- a/test_util/src/servers/grpc.rs +++ b/test_util/src/servers/grpc.rs @@ -1,7 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use futures::StreamExt; -use h2_04 as h2; +use h2; use hyper1::header::HeaderName; use hyper1::header::HeaderValue; use rustls_tokio_stream::TlsStream; diff --git a/test_util/src/servers/hyper_utils.rs b/test_util/src/servers/hyper_utils.rs new file mode 100644 index 000000000..b8e9472a4 --- /dev/null +++ b/test_util/src/servers/hyper_utils.rs @@ -0,0 +1,146 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use bytes::Bytes; +use futures::Future; +use futures::FutureExt; +use futures::Stream; +use futures::StreamExt; +use http; +use http::Request; +use http::Response; +use http_body_util::combinators::UnsyncBoxBody; +use hyper1 as hyper; +use hyper_util::rt::TokioIo; +use std::convert::Infallible; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::result::Result; +use tokio::net::TcpListener; + +#[derive(Debug, Clone, Copy)] +pub enum ServerKind { + Auto, + OnlyHttp1, + OnlyHttp2, +} + +#[derive(Debug, Clone, Copy)] +pub struct ServerOptions { + pub error_msg: &'static str, + pub addr: SocketAddr, + pub kind: ServerKind, +} + +type HandlerOutput = + Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error>; + +pub async fn run_server<F, S>(options: ServerOptions, handler: F) +where + F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static, + S: Future<Output = HandlerOutput> + 'static, +{ + let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> = + async move { + let listener = TcpListener::bind(options.addr).await?; + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + deno_unsync::spawn(hyper1_serve_connection( + io, + handler, + options.error_msg, + options.kind, + )); + } + } + .boxed_local(); + + if let Err(e) = fut.await { + eprintln!("{}: {:?}", options.error_msg, e); + } +} + +pub async fn run_server_with_acceptor<'a, A, F, S>( + mut acceptor: Pin<Box<A>>, + handler: F, + error_msg: &'static str, + kind: ServerKind, +) where + A: Stream<Item = io::Result<rustls_tokio_stream::TlsStream>> + ?Sized, + F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static, + S: Future<Output = HandlerOutput> + 'static, +{ + let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> = + async move { + while let Some(result) = acceptor.next().await { + let stream = result?; + let io = TokioIo::new(stream); + deno_unsync::spawn(hyper1_serve_connection( + io, handler, error_msg, kind, + )); + } + Ok(()) + } + .boxed_local(); + + if let Err(e) = fut.await { + eprintln!("{}: {:?}", error_msg, e); + } +} + +async fn hyper1_serve_connection<I, F, S>( + io: I, + handler: F, + error_msg: &'static str, + kind: ServerKind, +) where + I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, + F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static, + S: Future<Output = HandlerOutput> + 'static, +{ + let service = hyper1::service::service_fn(handler); + + let result: Result<(), anyhow::Error> = match kind { + ServerKind::Auto => { + let builder = + hyper_util::server::conn::auto::Builder::new(DenoUnsyncExecutor); + builder + .serve_connection(io, service) + .await + .map_err(|e| anyhow::anyhow!("{}", e)) + } + ServerKind::OnlyHttp1 => { + let builder = hyper1::server::conn::http1::Builder::new(); + builder + .serve_connection(io, service) + .await + .map_err(|e| e.into()) + } + ServerKind::OnlyHttp2 => { + let builder = + hyper1::server::conn::http2::Builder::new(DenoUnsyncExecutor); + builder + .serve_connection(io, service) + .await + .map_err(|e| e.into()) + } + }; + + if let Err(e) = result { + eprintln!("{}: {:?}", error_msg, e); + } +} + +#[derive(Clone)] +struct DenoUnsyncExecutor; + +impl<Fut> hyper::rt::Executor<Fut> for DenoUnsyncExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + deno_unsync::spawn(fut); + } +} diff --git a/test_util/src/servers/mod.rs b/test_util/src/servers/mod.rs index 18953b7f0..7c4d76e7a 100644 --- a/test_util/src/servers/mod.rs +++ b/test_util/src/servers/mod.rs @@ -11,43 +11,44 @@ use denokv_proto::datapath::ReadRangeOutput; use denokv_proto::datapath::SnapshotRead; use denokv_proto::datapath::SnapshotReadOutput; use denokv_proto::datapath::SnapshotReadStatus; -use futures::Future; use futures::FutureExt; use futures::Stream; use futures::StreamExt; +use http; use http::HeaderValue; use http::Method; use http::Request; use http::Response; use http::StatusCode; -use http_1 as http; use http_body_util::combinators::UnsyncBoxBody; use http_body_util::BodyExt; use http_body_util::Empty; use http_body_util::Full; use hyper1 as hyper; -use hyper_util::rt::TokioIo; use pretty_assertions::assert_eq; use prost::Message; use std::collections::HashMap; use std::convert::Infallible; use std::env; -use std::io; use std::net::Ipv6Addr; use std::net::SocketAddr; use std::net::SocketAddrV6; use std::path::PathBuf; -use std::pin::Pin; use std::result::Result; use std::time::Duration; use tokio::io::AsyncWriteExt; -use tokio::net::TcpListener; use tokio::net::TcpStream; mod grpc; +mod hyper_utils; mod registry; mod ws; +use hyper_utils::run_server; +use hyper_utils::run_server_with_acceptor; +use hyper_utils::ServerKind; +use hyper_utils::ServerOptions; + use super::https::get_tls_listener_stream; use super::https::SupportedHttpVersions; use super::npm::CUSTOM_NPM_PACKAGE_CACHE; @@ -94,8 +95,6 @@ pub async fn run_all_servers() { return hyper_hello(port.parse::<u16>().unwrap()).await; } - // TODO(bartlomieju): in a follow up all these `wrap_` handlers could be removed - // in favor of spawning a hyper server directly with a config object let redirect_server_fut = wrap_redirect_server(REDIRECT_PORT); let double_redirects_server_fut = wrap_double_redirect_server(DOUBLE_REDIRECTS_PORT); @@ -185,7 +184,15 @@ async fn hyper_hello(port: u16) { http_body_util::Full::new(Bytes::from("Hello World!")), ))) }; - run_hyper1_server(addr, handler, "server error").await; + run_server( + ServerOptions { + addr, + error_msg: "server error", + kind: ServerKind::Auto, + }, + handler, + ) + .await; } fn redirect_resp(url: String) -> Response<UnsyncBoxBody<Bytes, Infallible>> { @@ -1159,221 +1166,93 @@ async fn download_npm_registry_file( Ok(()) } -#[derive(Clone)] -struct DenoUnsyncExecutor; - -impl<Fut> hyper::rt::Executor<Fut> for DenoUnsyncExecutor -where - Fut: Future + 'static, - Fut::Output: 'static, -{ - fn execute(&self, fut: Fut) { - deno_unsync::spawn(fut); - } -} - -#[derive(Debug, Clone, Copy)] -enum Hyper1ServerKind { - Auto, - OnlyHttp1, - OnlyHttp2, -} - -async fn hyper1_serve_connection<I, F, S>( - io: I, - service_fn_handler: F, - error_msg: &'static str, - kind: Hyper1ServerKind, -) where - I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, - F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static, - S: Future< - Output = Result< - Response<UnsyncBoxBody<Bytes, Infallible>>, - anyhow::Error, - >, - > + 'static, -{ - let service = hyper1::service::service_fn(service_fn_handler); - - let result: Result<(), anyhow::Error> = match kind { - Hyper1ServerKind::Auto => { - let builder = - hyper_util::server::conn::auto::Builder::new(DenoUnsyncExecutor); - builder - .serve_connection(io, service) - .await - .map_err(|e| anyhow::anyhow!("{}", e)) - } - Hyper1ServerKind::OnlyHttp1 => { - let builder = hyper1::server::conn::http1::Builder::new(); - builder - .serve_connection(io, service) - .await - .map_err(|e| e.into()) - } - Hyper1ServerKind::OnlyHttp2 => { - let builder = - hyper1::server::conn::http2::Builder::new(DenoUnsyncExecutor); - builder - .serve_connection(io, service) - .await - .map_err(|e| e.into()) - } - }; - - if let Err(e) = result { - eprintln!("{}: {:?}", error_msg, e); - } -} - -async fn run_hyper1_server_inner<F, S>( - addr: SocketAddr, - service_fn_handler: F, - error_msg: &'static str, - kind: Hyper1ServerKind, -) where - F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static, - S: Future< - Output = Result< - Response<UnsyncBoxBody<Bytes, Infallible>>, - anyhow::Error, - >, - > + 'static, -{ - let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> = - async move { - let listener = TcpListener::bind(addr).await?; - loop { - let (stream, _) = listener.accept().await?; - let io = TokioIo::new(stream); - deno_unsync::spawn(hyper1_serve_connection( - io, - service_fn_handler, - error_msg, - kind, - )); - } - } - .boxed_local(); - - if let Err(e) = fut.await { - eprintln!("{}: {:?}", error_msg, e); - } -} - -async fn run_hyper1_server<F, S>( - addr: SocketAddr, - service_fn_handler: F, - error_msg: &'static str, -) where - F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static, - S: Future< - Output = Result< - Response<UnsyncBoxBody<Bytes, Infallible>>, - anyhow::Error, - >, - > + 'static, -{ - run_hyper1_server_inner( - addr, - service_fn_handler, - error_msg, - Hyper1ServerKind::Auto, - ) - .await -} - -// TODO(bartlomieju): dedup with `run_hyper1_server_inner` -async fn run_hyper1_server_with_acceptor<'a, A, F, S>( - mut acceptor: Pin<Box<A>>, - service_fn_handler: F, - error_msg: &'static str, - kind: Hyper1ServerKind, -) where - A: Stream<Item = io::Result<rustls_tokio_stream::TlsStream>> + ?Sized, - F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static, - S: Future< - Output = Result< - Response<UnsyncBoxBody<Bytes, Infallible>>, - anyhow::Error, - >, - > + 'static, -{ - let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> = - async move { - while let Some(result) = acceptor.next().await { - let stream = result?; - let io = TokioIo::new(stream); - deno_unsync::spawn(hyper1_serve_connection( - io, - service_fn_handler, - error_msg, - kind, - )); - } - Ok(()) - } - .boxed_local(); - - if let Err(e) = fut.await { - eprintln!("{}: {:?}", error_msg, e); - } -} - async fn wrap_redirect_server(port: u16) { let redirect_addr = SocketAddr::from(([127, 0, 0, 1], port)); - run_hyper1_server(redirect_addr, redirect, "Redirect error").await; + run_server( + ServerOptions { + addr: redirect_addr, + error_msg: "Redirect error", + kind: ServerKind::Auto, + }, + redirect, + ) + .await; } async fn wrap_double_redirect_server(port: u16) { let double_redirects_addr = SocketAddr::from(([127, 0, 0, 1], port)); - run_hyper1_server( - double_redirects_addr, + run_server( + ServerOptions { + addr: double_redirects_addr, + error_msg: "Double redirect error", + kind: ServerKind::Auto, + }, double_redirects, - "Double redirect error", ) .await; } async fn wrap_inf_redirect_server(port: u16) { let inf_redirects_addr = SocketAddr::from(([127, 0, 0, 1], port)); - run_hyper1_server(inf_redirects_addr, inf_redirects, "Inf redirect error") - .await; + run_server( + ServerOptions { + addr: inf_redirects_addr, + error_msg: "Inf redirect error", + kind: ServerKind::Auto, + }, + inf_redirects, + ) + .await; } async fn wrap_another_redirect_server(port: u16) { let another_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port)); - run_hyper1_server( - another_redirect_addr, + run_server( + ServerOptions { + addr: another_redirect_addr, + error_msg: "Another redirect error", + kind: ServerKind::Auto, + }, another_redirect, - "Another redirect error", ) .await; } async fn wrap_auth_redirect_server(port: u16) { let auth_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port)); - run_hyper1_server(auth_redirect_addr, auth_redirect, "Auth redirect error") - .await; + run_server( + ServerOptions { + addr: auth_redirect_addr, + error_msg: "Auth redirect error", + kind: ServerKind::Auto, + }, + auth_redirect, + ) + .await; } async fn wrap_basic_auth_redirect_server(port: u16) { let basic_auth_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port)); - run_hyper1_server( - basic_auth_redirect_addr, + run_server( + ServerOptions { + addr: basic_auth_redirect_addr, + error_msg: "Basic auth redirect error", + kind: ServerKind::Auto, + }, basic_auth_redirect, - "Basic auth redirect error", ) .await; } async fn wrap_abs_redirect_server(port: u16) { let abs_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port)); - run_hyper1_server( - abs_redirect_addr, + run_server( + ServerOptions { + addr: abs_redirect_addr, + error_msg: "Absolute redirect error", + kind: ServerKind::Auto, + }, absolute_redirect, - "Absolute redirect error", ) .await; } @@ -1392,17 +1271,25 @@ async fn wrap_main_ipv6_server(port: u16) { } async fn wrap_main_server_for_addr(main_server_addr: &SocketAddr) { - run_hyper1_server(*main_server_addr, main_server, "HTTP server error").await; + run_server( + ServerOptions { + addr: *main_server_addr, + kind: ServerKind::Auto, + error_msg: "HTTP server error", + }, + main_server, + ) + .await; } async fn wrap_main_https_server(port: u16) { let tls = get_tls_listener_stream("https", port, Default::default()).await; let tls_acceptor = tls.boxed_local(); - run_hyper1_server_with_acceptor( + run_server_with_acceptor( tls_acceptor, main_server, "HTTPS server error", - Hyper1ServerKind::Auto, + ServerKind::Auto, ) .await } @@ -1415,11 +1302,11 @@ async fn wrap_https_h1_only_tls_server(port: u16) { ) .await; - run_hyper1_server_with_acceptor( + run_server_with_acceptor( tls.boxed_local(), main_server, "HTTP1 only TLS server error", - Hyper1ServerKind::OnlyHttp1, + ServerKind::OnlyHttp1, ) .await } @@ -1432,33 +1319,37 @@ async fn wrap_https_h2_only_tls_server(port: u16) { ) .await; - run_hyper1_server_with_acceptor( + run_server_with_acceptor( tls.boxed_local(), main_server, "HTTP2 only TLS server error", - Hyper1ServerKind::OnlyHttp2, + ServerKind::OnlyHttp2, ) .await } async fn wrap_http_h1_only_server(port: u16) { let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], port)); - run_hyper1_server_inner( - main_server_http_addr, + run_server( + ServerOptions { + addr: main_server_http_addr, + error_msg: "HTTP1 only server error:", + kind: ServerKind::OnlyHttp1, + }, main_server, - "HTTP1 only server error:", - Hyper1ServerKind::OnlyHttp1, ) .await; } async fn wrap_http_h2_only_server(port: u16) { let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], port)); - run_hyper1_server_inner( - main_server_http_addr, + run_server( + ServerOptions { + addr: main_server_http_addr, + error_msg: "HTTP1 only server error:", + kind: ServerKind::OnlyHttp2, + }, main_server, - "HTTP1 only server error:", - Hyper1ServerKind::OnlyHttp2, ) .await; } @@ -1480,11 +1371,11 @@ async fn wrap_client_auth_https_server(port: u16) { } }; - run_hyper1_server_with_acceptor( + run_server_with_acceptor( tls.boxed_local(), main_server, "Auth TLS server error", - Hyper1ServerKind::Auto, + ServerKind::Auto, ) .await } diff --git a/test_util/src/servers/registry.rs b/test_util/src/servers/registry.rs index 8f6fd32aa..3e0457253 100644 --- a/test_util/src/servers/registry.rs +++ b/test_util/src/servers/registry.rs @@ -1,6 +1,8 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use super::run_hyper1_server; +use super::run_server; +use super::ServerKind; +use super::ServerOptions; use bytes::Bytes; use http_body_util::combinators::UnsyncBoxBody; use http_body_util::Empty; @@ -16,10 +18,13 @@ use std::net::SocketAddr; pub async fn registry_server(port: u16) { let registry_server_addr = SocketAddr::from(([127, 0, 0, 1], port)); - run_hyper1_server( - registry_server_addr, + run_server( + ServerOptions { + addr: registry_server_addr, + error_msg: "Registry server error", + kind: ServerKind::Auto, + }, registry_server_handler, - "Registry server error", ) .await } diff --git a/test_util/src/servers/ws.rs b/test_util/src/servers/ws.rs index 188216c86..f820c8bbd 100644 --- a/test_util/src/servers/ws.rs +++ b/test_util/src/servers/ws.rs @@ -11,10 +11,10 @@ use futures::future::join3; use futures::future::poll_fn; use futures::Future; use futures::StreamExt; -use h2_04::server::Handshake; -use h2_04::server::SendResponse; -use h2_04::Reason; -use h2_04::RecvStream; +use h2::server::Handshake; +use h2::server::SendResponse; +use h2::Reason; +use h2::RecvStream; use hyper1::upgrade::Upgraded; use hyper1::Method; use hyper1::Request; @@ -70,7 +70,7 @@ pub async fn run_wss2_server(port: u16) { .await; while let Some(Ok(tls)) = tls.next().await { tokio::spawn(async move { - let mut h2 = h2_04::server::Builder::new(); + let mut h2 = h2::server::Builder::new(); h2.enable_connect_protocol(); // Using Bytes is pretty alloc-heavy but this is a test server let server: Handshake<_, Bytes> = h2.handshake(tls); @@ -127,7 +127,7 @@ where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, { let service = hyper1::service::service_fn( - move |mut req: http_1::Request<hyper1::body::Incoming>| async move { + move |mut req: http::Request<hyper1::body::Incoming>| async move { let (response, upgrade_fut) = fastwebsockets::upgrade::upgrade(&mut req) .map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))?; @@ -161,13 +161,13 @@ where async fn handle_wss_stream( recv: Request<RecvStream>, mut send: SendResponse<Bytes>, -) -> Result<(), h2_04::Error> { +) -> Result<(), h2::Error> { if recv.method() != Method::CONNECT { eprintln!("wss2: refusing non-CONNECT stream"); send.send_reset(Reason::REFUSED_STREAM); return Ok(()); } - let Some(protocol) = recv.extensions().get::<h2_04::ext::Protocol>() else { + let Some(protocol) = recv.extensions().get::<h2::ext::Protocol>() else { eprintln!("wss2: refusing no-:protocol stream"); send.send_reset(Reason::REFUSED_STREAM); return Ok(()); |