summaryrefslogtreecommitdiff
path: root/test_util/src
diff options
context:
space:
mode:
Diffstat (limited to 'test_util/src')
-rw-r--r--test_util/src/servers/grpc.rs2
-rw-r--r--test_util/src/servers/hyper_utils.rs146
-rw-r--r--test_util/src/servers/mod.rs301
-rw-r--r--test_util/src/servers/registry.rs13
-rw-r--r--test_util/src/servers/ws.rs16
5 files changed, 260 insertions, 218 deletions
diff --git a/test_util/src/servers/grpc.rs b/test_util/src/servers/grpc.rs
index 08d297a4c..096f8c85f 100644
--- a/test_util/src/servers/grpc.rs
+++ b/test_util/src/servers/grpc.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use futures::StreamExt;
-use h2_04 as h2;
+use h2;
use hyper1::header::HeaderName;
use hyper1::header::HeaderValue;
use rustls_tokio_stream::TlsStream;
diff --git a/test_util/src/servers/hyper_utils.rs b/test_util/src/servers/hyper_utils.rs
new file mode 100644
index 000000000..b8e9472a4
--- /dev/null
+++ b/test_util/src/servers/hyper_utils.rs
@@ -0,0 +1,146 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use bytes::Bytes;
+use futures::Future;
+use futures::FutureExt;
+use futures::Stream;
+use futures::StreamExt;
+use http;
+use http::Request;
+use http::Response;
+use http_body_util::combinators::UnsyncBoxBody;
+use hyper1 as hyper;
+use hyper_util::rt::TokioIo;
+use std::convert::Infallible;
+use std::io;
+use std::net::SocketAddr;
+use std::pin::Pin;
+use std::result::Result;
+use tokio::net::TcpListener;
+
+#[derive(Debug, Clone, Copy)]
+pub enum ServerKind {
+ Auto,
+ OnlyHttp1,
+ OnlyHttp2,
+}
+
+#[derive(Debug, Clone, Copy)]
+pub struct ServerOptions {
+ pub error_msg: &'static str,
+ pub addr: SocketAddr,
+ pub kind: ServerKind,
+}
+
+type HandlerOutput =
+ Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error>;
+
+pub async fn run_server<F, S>(options: ServerOptions, handler: F)
+where
+ F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
+ S: Future<Output = HandlerOutput> + 'static,
+{
+ let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> =
+ async move {
+ let listener = TcpListener::bind(options.addr).await?;
+ loop {
+ let (stream, _) = listener.accept().await?;
+ let io = TokioIo::new(stream);
+ deno_unsync::spawn(hyper1_serve_connection(
+ io,
+ handler,
+ options.error_msg,
+ options.kind,
+ ));
+ }
+ }
+ .boxed_local();
+
+ if let Err(e) = fut.await {
+ eprintln!("{}: {:?}", options.error_msg, e);
+ }
+}
+
+pub async fn run_server_with_acceptor<'a, A, F, S>(
+ mut acceptor: Pin<Box<A>>,
+ handler: F,
+ error_msg: &'static str,
+ kind: ServerKind,
+) where
+ A: Stream<Item = io::Result<rustls_tokio_stream::TlsStream>> + ?Sized,
+ F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
+ S: Future<Output = HandlerOutput> + 'static,
+{
+ let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> =
+ async move {
+ while let Some(result) = acceptor.next().await {
+ let stream = result?;
+ let io = TokioIo::new(stream);
+ deno_unsync::spawn(hyper1_serve_connection(
+ io, handler, error_msg, kind,
+ ));
+ }
+ Ok(())
+ }
+ .boxed_local();
+
+ if let Err(e) = fut.await {
+ eprintln!("{}: {:?}", error_msg, e);
+ }
+}
+
+async fn hyper1_serve_connection<I, F, S>(
+ io: I,
+ handler: F,
+ error_msg: &'static str,
+ kind: ServerKind,
+) where
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
+ F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
+ S: Future<Output = HandlerOutput> + 'static,
+{
+ let service = hyper1::service::service_fn(handler);
+
+ let result: Result<(), anyhow::Error> = match kind {
+ ServerKind::Auto => {
+ let builder =
+ hyper_util::server::conn::auto::Builder::new(DenoUnsyncExecutor);
+ builder
+ .serve_connection(io, service)
+ .await
+ .map_err(|e| anyhow::anyhow!("{}", e))
+ }
+ ServerKind::OnlyHttp1 => {
+ let builder = hyper1::server::conn::http1::Builder::new();
+ builder
+ .serve_connection(io, service)
+ .await
+ .map_err(|e| e.into())
+ }
+ ServerKind::OnlyHttp2 => {
+ let builder =
+ hyper1::server::conn::http2::Builder::new(DenoUnsyncExecutor);
+ builder
+ .serve_connection(io, service)
+ .await
+ .map_err(|e| e.into())
+ }
+ };
+
+ if let Err(e) = result {
+ eprintln!("{}: {:?}", error_msg, e);
+ }
+}
+
+#[derive(Clone)]
+struct DenoUnsyncExecutor;
+
+impl<Fut> hyper::rt::Executor<Fut> for DenoUnsyncExecutor
+where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+{
+ fn execute(&self, fut: Fut) {
+ deno_unsync::spawn(fut);
+ }
+}
diff --git a/test_util/src/servers/mod.rs b/test_util/src/servers/mod.rs
index 18953b7f0..7c4d76e7a 100644
--- a/test_util/src/servers/mod.rs
+++ b/test_util/src/servers/mod.rs
@@ -11,43 +11,44 @@ use denokv_proto::datapath::ReadRangeOutput;
use denokv_proto::datapath::SnapshotRead;
use denokv_proto::datapath::SnapshotReadOutput;
use denokv_proto::datapath::SnapshotReadStatus;
-use futures::Future;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
+use http;
use http::HeaderValue;
use http::Method;
use http::Request;
use http::Response;
use http::StatusCode;
-use http_1 as http;
use http_body_util::combinators::UnsyncBoxBody;
use http_body_util::BodyExt;
use http_body_util::Empty;
use http_body_util::Full;
use hyper1 as hyper;
-use hyper_util::rt::TokioIo;
use pretty_assertions::assert_eq;
use prost::Message;
use std::collections::HashMap;
use std::convert::Infallible;
use std::env;
-use std::io;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV6;
use std::path::PathBuf;
-use std::pin::Pin;
use std::result::Result;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
-use tokio::net::TcpListener;
use tokio::net::TcpStream;
mod grpc;
+mod hyper_utils;
mod registry;
mod ws;
+use hyper_utils::run_server;
+use hyper_utils::run_server_with_acceptor;
+use hyper_utils::ServerKind;
+use hyper_utils::ServerOptions;
+
use super::https::get_tls_listener_stream;
use super::https::SupportedHttpVersions;
use super::npm::CUSTOM_NPM_PACKAGE_CACHE;
@@ -94,8 +95,6 @@ pub async fn run_all_servers() {
return hyper_hello(port.parse::<u16>().unwrap()).await;
}
- // TODO(bartlomieju): in a follow up all these `wrap_` handlers could be removed
- // in favor of spawning a hyper server directly with a config object
let redirect_server_fut = wrap_redirect_server(REDIRECT_PORT);
let double_redirects_server_fut =
wrap_double_redirect_server(DOUBLE_REDIRECTS_PORT);
@@ -185,7 +184,15 @@ async fn hyper_hello(port: u16) {
http_body_util::Full::new(Bytes::from("Hello World!")),
)))
};
- run_hyper1_server(addr, handler, "server error").await;
+ run_server(
+ ServerOptions {
+ addr,
+ error_msg: "server error",
+ kind: ServerKind::Auto,
+ },
+ handler,
+ )
+ .await;
}
fn redirect_resp(url: String) -> Response<UnsyncBoxBody<Bytes, Infallible>> {
@@ -1159,221 +1166,93 @@ async fn download_npm_registry_file(
Ok(())
}
-#[derive(Clone)]
-struct DenoUnsyncExecutor;
-
-impl<Fut> hyper::rt::Executor<Fut> for DenoUnsyncExecutor
-where
- Fut: Future + 'static,
- Fut::Output: 'static,
-{
- fn execute(&self, fut: Fut) {
- deno_unsync::spawn(fut);
- }
-}
-
-#[derive(Debug, Clone, Copy)]
-enum Hyper1ServerKind {
- Auto,
- OnlyHttp1,
- OnlyHttp2,
-}
-
-async fn hyper1_serve_connection<I, F, S>(
- io: I,
- service_fn_handler: F,
- error_msg: &'static str,
- kind: Hyper1ServerKind,
-) where
- I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
- F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
- S: Future<
- Output = Result<
- Response<UnsyncBoxBody<Bytes, Infallible>>,
- anyhow::Error,
- >,
- > + 'static,
-{
- let service = hyper1::service::service_fn(service_fn_handler);
-
- let result: Result<(), anyhow::Error> = match kind {
- Hyper1ServerKind::Auto => {
- let builder =
- hyper_util::server::conn::auto::Builder::new(DenoUnsyncExecutor);
- builder
- .serve_connection(io, service)
- .await
- .map_err(|e| anyhow::anyhow!("{}", e))
- }
- Hyper1ServerKind::OnlyHttp1 => {
- let builder = hyper1::server::conn::http1::Builder::new();
- builder
- .serve_connection(io, service)
- .await
- .map_err(|e| e.into())
- }
- Hyper1ServerKind::OnlyHttp2 => {
- let builder =
- hyper1::server::conn::http2::Builder::new(DenoUnsyncExecutor);
- builder
- .serve_connection(io, service)
- .await
- .map_err(|e| e.into())
- }
- };
-
- if let Err(e) = result {
- eprintln!("{}: {:?}", error_msg, e);
- }
-}
-
-async fn run_hyper1_server_inner<F, S>(
- addr: SocketAddr,
- service_fn_handler: F,
- error_msg: &'static str,
- kind: Hyper1ServerKind,
-) where
- F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
- S: Future<
- Output = Result<
- Response<UnsyncBoxBody<Bytes, Infallible>>,
- anyhow::Error,
- >,
- > + 'static,
-{
- let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> =
- async move {
- let listener = TcpListener::bind(addr).await?;
- loop {
- let (stream, _) = listener.accept().await?;
- let io = TokioIo::new(stream);
- deno_unsync::spawn(hyper1_serve_connection(
- io,
- service_fn_handler,
- error_msg,
- kind,
- ));
- }
- }
- .boxed_local();
-
- if let Err(e) = fut.await {
- eprintln!("{}: {:?}", error_msg, e);
- }
-}
-
-async fn run_hyper1_server<F, S>(
- addr: SocketAddr,
- service_fn_handler: F,
- error_msg: &'static str,
-) where
- F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
- S: Future<
- Output = Result<
- Response<UnsyncBoxBody<Bytes, Infallible>>,
- anyhow::Error,
- >,
- > + 'static,
-{
- run_hyper1_server_inner(
- addr,
- service_fn_handler,
- error_msg,
- Hyper1ServerKind::Auto,
- )
- .await
-}
-
-// TODO(bartlomieju): dedup with `run_hyper1_server_inner`
-async fn run_hyper1_server_with_acceptor<'a, A, F, S>(
- mut acceptor: Pin<Box<A>>,
- service_fn_handler: F,
- error_msg: &'static str,
- kind: Hyper1ServerKind,
-) where
- A: Stream<Item = io::Result<rustls_tokio_stream::TlsStream>> + ?Sized,
- F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
- S: Future<
- Output = Result<
- Response<UnsyncBoxBody<Bytes, Infallible>>,
- anyhow::Error,
- >,
- > + 'static,
-{
- let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> =
- async move {
- while let Some(result) = acceptor.next().await {
- let stream = result?;
- let io = TokioIo::new(stream);
- deno_unsync::spawn(hyper1_serve_connection(
- io,
- service_fn_handler,
- error_msg,
- kind,
- ));
- }
- Ok(())
- }
- .boxed_local();
-
- if let Err(e) = fut.await {
- eprintln!("{}: {:?}", error_msg, e);
- }
-}
-
async fn wrap_redirect_server(port: u16) {
let redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
- run_hyper1_server(redirect_addr, redirect, "Redirect error").await;
+ run_server(
+ ServerOptions {
+ addr: redirect_addr,
+ error_msg: "Redirect error",
+ kind: ServerKind::Auto,
+ },
+ redirect,
+ )
+ .await;
}
async fn wrap_double_redirect_server(port: u16) {
let double_redirects_addr = SocketAddr::from(([127, 0, 0, 1], port));
- run_hyper1_server(
- double_redirects_addr,
+ run_server(
+ ServerOptions {
+ addr: double_redirects_addr,
+ error_msg: "Double redirect error",
+ kind: ServerKind::Auto,
+ },
double_redirects,
- "Double redirect error",
)
.await;
}
async fn wrap_inf_redirect_server(port: u16) {
let inf_redirects_addr = SocketAddr::from(([127, 0, 0, 1], port));
- run_hyper1_server(inf_redirects_addr, inf_redirects, "Inf redirect error")
- .await;
+ run_server(
+ ServerOptions {
+ addr: inf_redirects_addr,
+ error_msg: "Inf redirect error",
+ kind: ServerKind::Auto,
+ },
+ inf_redirects,
+ )
+ .await;
}
async fn wrap_another_redirect_server(port: u16) {
let another_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
- run_hyper1_server(
- another_redirect_addr,
+ run_server(
+ ServerOptions {
+ addr: another_redirect_addr,
+ error_msg: "Another redirect error",
+ kind: ServerKind::Auto,
+ },
another_redirect,
- "Another redirect error",
)
.await;
}
async fn wrap_auth_redirect_server(port: u16) {
let auth_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
- run_hyper1_server(auth_redirect_addr, auth_redirect, "Auth redirect error")
- .await;
+ run_server(
+ ServerOptions {
+ addr: auth_redirect_addr,
+ error_msg: "Auth redirect error",
+ kind: ServerKind::Auto,
+ },
+ auth_redirect,
+ )
+ .await;
}
async fn wrap_basic_auth_redirect_server(port: u16) {
let basic_auth_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
- run_hyper1_server(
- basic_auth_redirect_addr,
+ run_server(
+ ServerOptions {
+ addr: basic_auth_redirect_addr,
+ error_msg: "Basic auth redirect error",
+ kind: ServerKind::Auto,
+ },
basic_auth_redirect,
- "Basic auth redirect error",
)
.await;
}
async fn wrap_abs_redirect_server(port: u16) {
let abs_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
- run_hyper1_server(
- abs_redirect_addr,
+ run_server(
+ ServerOptions {
+ addr: abs_redirect_addr,
+ error_msg: "Absolute redirect error",
+ kind: ServerKind::Auto,
+ },
absolute_redirect,
- "Absolute redirect error",
)
.await;
}
@@ -1392,17 +1271,25 @@ async fn wrap_main_ipv6_server(port: u16) {
}
async fn wrap_main_server_for_addr(main_server_addr: &SocketAddr) {
- run_hyper1_server(*main_server_addr, main_server, "HTTP server error").await;
+ run_server(
+ ServerOptions {
+ addr: *main_server_addr,
+ kind: ServerKind::Auto,
+ error_msg: "HTTP server error",
+ },
+ main_server,
+ )
+ .await;
}
async fn wrap_main_https_server(port: u16) {
let tls = get_tls_listener_stream("https", port, Default::default()).await;
let tls_acceptor = tls.boxed_local();
- run_hyper1_server_with_acceptor(
+ run_server_with_acceptor(
tls_acceptor,
main_server,
"HTTPS server error",
- Hyper1ServerKind::Auto,
+ ServerKind::Auto,
)
.await
}
@@ -1415,11 +1302,11 @@ async fn wrap_https_h1_only_tls_server(port: u16) {
)
.await;
- run_hyper1_server_with_acceptor(
+ run_server_with_acceptor(
tls.boxed_local(),
main_server,
"HTTP1 only TLS server error",
- Hyper1ServerKind::OnlyHttp1,
+ ServerKind::OnlyHttp1,
)
.await
}
@@ -1432,33 +1319,37 @@ async fn wrap_https_h2_only_tls_server(port: u16) {
)
.await;
- run_hyper1_server_with_acceptor(
+ run_server_with_acceptor(
tls.boxed_local(),
main_server,
"HTTP2 only TLS server error",
- Hyper1ServerKind::OnlyHttp2,
+ ServerKind::OnlyHttp2,
)
.await
}
async fn wrap_http_h1_only_server(port: u16) {
let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], port));
- run_hyper1_server_inner(
- main_server_http_addr,
+ run_server(
+ ServerOptions {
+ addr: main_server_http_addr,
+ error_msg: "HTTP1 only server error:",
+ kind: ServerKind::OnlyHttp1,
+ },
main_server,
- "HTTP1 only server error:",
- Hyper1ServerKind::OnlyHttp1,
)
.await;
}
async fn wrap_http_h2_only_server(port: u16) {
let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], port));
- run_hyper1_server_inner(
- main_server_http_addr,
+ run_server(
+ ServerOptions {
+ addr: main_server_http_addr,
+ error_msg: "HTTP1 only server error:",
+ kind: ServerKind::OnlyHttp2,
+ },
main_server,
- "HTTP1 only server error:",
- Hyper1ServerKind::OnlyHttp2,
)
.await;
}
@@ -1480,11 +1371,11 @@ async fn wrap_client_auth_https_server(port: u16) {
}
};
- run_hyper1_server_with_acceptor(
+ run_server_with_acceptor(
tls.boxed_local(),
main_server,
"Auth TLS server error",
- Hyper1ServerKind::Auto,
+ ServerKind::Auto,
)
.await
}
diff --git a/test_util/src/servers/registry.rs b/test_util/src/servers/registry.rs
index 8f6fd32aa..3e0457253 100644
--- a/test_util/src/servers/registry.rs
+++ b/test_util/src/servers/registry.rs
@@ -1,6 +1,8 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use super::run_hyper1_server;
+use super::run_server;
+use super::ServerKind;
+use super::ServerOptions;
use bytes::Bytes;
use http_body_util::combinators::UnsyncBoxBody;
use http_body_util::Empty;
@@ -16,10 +18,13 @@ use std::net::SocketAddr;
pub async fn registry_server(port: u16) {
let registry_server_addr = SocketAddr::from(([127, 0, 0, 1], port));
- run_hyper1_server(
- registry_server_addr,
+ run_server(
+ ServerOptions {
+ addr: registry_server_addr,
+ error_msg: "Registry server error",
+ kind: ServerKind::Auto,
+ },
registry_server_handler,
- "Registry server error",
)
.await
}
diff --git a/test_util/src/servers/ws.rs b/test_util/src/servers/ws.rs
index 188216c86..f820c8bbd 100644
--- a/test_util/src/servers/ws.rs
+++ b/test_util/src/servers/ws.rs
@@ -11,10 +11,10 @@ use futures::future::join3;
use futures::future::poll_fn;
use futures::Future;
use futures::StreamExt;
-use h2_04::server::Handshake;
-use h2_04::server::SendResponse;
-use h2_04::Reason;
-use h2_04::RecvStream;
+use h2::server::Handshake;
+use h2::server::SendResponse;
+use h2::Reason;
+use h2::RecvStream;
use hyper1::upgrade::Upgraded;
use hyper1::Method;
use hyper1::Request;
@@ -70,7 +70,7 @@ pub async fn run_wss2_server(port: u16) {
.await;
while let Some(Ok(tls)) = tls.next().await {
tokio::spawn(async move {
- let mut h2 = h2_04::server::Builder::new();
+ let mut h2 = h2::server::Builder::new();
h2.enable_connect_protocol();
// Using Bytes is pretty alloc-heavy but this is a test server
let server: Handshake<_, Bytes> = h2.handshake(tls);
@@ -127,7 +127,7 @@ where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let service = hyper1::service::service_fn(
- move |mut req: http_1::Request<hyper1::body::Incoming>| async move {
+ move |mut req: http::Request<hyper1::body::Incoming>| async move {
let (response, upgrade_fut) = fastwebsockets::upgrade::upgrade(&mut req)
.map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))?;
@@ -161,13 +161,13 @@ where
async fn handle_wss_stream(
recv: Request<RecvStream>,
mut send: SendResponse<Bytes>,
-) -> Result<(), h2_04::Error> {
+) -> Result<(), h2::Error> {
if recv.method() != Method::CONNECT {
eprintln!("wss2: refusing non-CONNECT stream");
send.send_reset(Reason::REFUSED_STREAM);
return Ok(());
}
- let Some(protocol) = recv.extensions().get::<h2_04::ext::Protocol>() else {
+ let Some(protocol) = recv.extensions().get::<h2::ext::Protocol>() else {
eprintln!("wss2: refusing no-:protocol stream");
send.send_reset(Reason::REFUSED_STREAM);
return Ok(());