summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-12-27 11:45:12 +0100
committerGitHub <noreply@github.com>2023-12-27 11:45:12 +0100
commit4f4dcf52916b5e9d200d223d046cef332b5a598e (patch)
treeb7be6be7f0032d59969924c30ae4b1cda3db39a5
parent0efe438f7c191d8504355e03b27fe7e3055c9387 (diff)
refactor: rewrite remaining test server to Hyper 1.1 (#21708)
Ref https://github.com/denoland/deno/issues/21578
-rw-r--r--Cargo.lock4
-rw-r--r--test_util/Cargo.toml4
-rw-r--r--test_util/src/servers/mod.rs683
-rw-r--r--test_util/src/servers/registry.rs47
4 files changed, 440 insertions, 298 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 2a7d188de..3772c739d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5969,15 +5969,15 @@ dependencies = [
"base64 0.21.5",
"bytes",
"console_static_text",
+ "deno_unsync 0.3.0",
"denokv_proto",
"fastwebsockets",
"flate2",
"futures",
"glob",
- "h2 0.3.22",
"h2 0.4.0",
"http 1.0.0",
- "hyper 0.14.27",
+ "http-body-util",
"hyper 1.1.0",
"hyper-util",
"lazy-regex",
diff --git a/test_util/Cargo.toml b/test_util/Cargo.toml
index bb0f84d76..d51252a3b 100644
--- a/test_util/Cargo.toml
+++ b/test_util/Cargo.toml
@@ -19,15 +19,15 @@ async-stream = "0.3.3"
base64.workspace = true
bytes.workspace = true
console_static_text.workspace = true
+deno_unsync = "0.3.0"
denokv_proto.workspace = true
fastwebsockets.workspace = true
flate2 = { workspace = true, features = ["default"] }
futures.workspace = true
glob.workspace = true
-h2.workspace = true
h2_04 = { package = "h2", version = "0.4" }
+http-body-util = "0.1"
http_1 = { package = "http", version = "1.0" }
-hyper = { workspace = true, features = ["server", "http1", "http2", "runtime"] }
hyper-util.workspace = true
hyper1.workspace = true
lazy-regex.workspace = true
diff --git a/test_util/src/servers/mod.rs b/test_util/src/servers/mod.rs
index bf85aaed0..18953b7f0 100644
--- a/test_util/src/servers/mod.rs
+++ b/test_util/src/servers/mod.rs
@@ -3,6 +3,7 @@
// otherwise this starts multiple servers on many ports for test endpoints.
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
+use bytes::Bytes;
use denokv_proto::datapath::AtomicWrite;
use denokv_proto::datapath::AtomicWriteOutput;
use denokv_proto::datapath::AtomicWriteStatus;
@@ -10,17 +11,22 @@ use denokv_proto::datapath::ReadRangeOutput;
use denokv_proto::datapath::SnapshotRead;
use denokv_proto::datapath::SnapshotReadOutput;
use denokv_proto::datapath::SnapshotReadStatus;
+use futures::Future;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
-use hyper::header::HeaderValue;
-use hyper::server::Server;
-use hyper::service::make_service_fn;
-use hyper::service::service_fn;
-use hyper::Body;
-use hyper::Request;
-use hyper::Response;
-use hyper::StatusCode;
+use http::HeaderValue;
+use http::Method;
+use http::Request;
+use http::Response;
+use http::StatusCode;
+use http_1 as http;
+use http_body_util::combinators::UnsyncBoxBody;
+use http_body_util::BodyExt;
+use http_body_util::Empty;
+use http_body_util::Full;
+use hyper1 as hyper;
+use hyper_util::rt::TokioIo;
use pretty_assertions::assert_eq;
use prost::Message;
use std::collections::HashMap;
@@ -33,10 +39,9 @@ use std::net::SocketAddrV6;
use std::path::PathBuf;
use std::pin::Pin;
use std::result::Result;
-use std::task::Context;
-use std::task::Poll;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
+use tokio::net::TcpListener;
use tokio::net::TcpStream;
mod grpc;
@@ -89,6 +94,8 @@ pub async fn run_all_servers() {
return hyper_hello(port.parse::<u16>().unwrap()).await;
}
+ // TODO(bartlomieju): in a follow up all these `wrap_` handlers could be removed
+ // in favor of spawning a hyper server directly with a config object
let redirect_server_fut = wrap_redirect_server(REDIRECT_PORT);
let double_redirects_server_fut =
wrap_double_redirect_server(DOUBLE_REDIRECTS_PORT);
@@ -156,34 +163,45 @@ pub async fn run_all_servers() {
server_fut.await;
}
+fn empty_body() -> UnsyncBoxBody<Bytes, Infallible> {
+ UnsyncBoxBody::new(Empty::new())
+}
+
+fn string_body(str_: &str) -> UnsyncBoxBody<Bytes, Infallible> {
+ UnsyncBoxBody::new(Full::new(Bytes::from(str_.to_string())))
+}
+
+fn json_body(value: serde_json::Value) -> UnsyncBoxBody<Bytes, Infallible> {
+ let str_ = value.to_string();
+ string_body(&str_)
+}
+
/// Benchmark server that just serves "hello world" responses.
async fn hyper_hello(port: u16) {
println!("hyper hello");
let addr = SocketAddr::from(([127, 0, 0, 1], port));
- let hello_svc = make_service_fn(|_| async move {
- Ok::<_, Infallible>(service_fn(move |_: Request<Body>| async move {
- Ok::<_, Infallible>(Response::new(Body::from("Hello World!")))
- }))
- });
-
- let server = Server::bind(&addr).serve(hello_svc);
- if let Err(e) = server.await {
- eprintln!("server error: {e}");
- }
+ let handler = move |_: Request<hyper::body::Incoming>| async move {
+ Ok::<_, anyhow::Error>(Response::new(UnsyncBoxBody::new(
+ http_body_util::Full::new(Bytes::from("Hello World!")),
+ )))
+ };
+ run_hyper1_server(addr, handler, "server error").await;
}
-fn redirect_resp(url: String) -> Response<Body> {
- let mut redirect_resp = Response::new(Body::empty());
+fn redirect_resp(url: String) -> Response<UnsyncBoxBody<Bytes, Infallible>> {
+ let mut redirect_resp = Response::new(UnsyncBoxBody::new(Empty::new()));
*redirect_resp.status_mut() = StatusCode::MOVED_PERMANENTLY;
redirect_resp.headers_mut().insert(
- hyper::header::LOCATION,
+ http::header::LOCATION,
HeaderValue::from_str(&url[..]).unwrap(),
);
redirect_resp
}
-async fn redirect(req: Request<Body>) -> hyper::Result<Response<Body>> {
+async fn redirect(
+ req: Request<hyper::body::Incoming>,
+) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error> {
let p = req.uri().path();
assert_eq!(&p[0..1], "/");
let url = format!("http://localhost:{PORT}{p}");
@@ -191,7 +209,9 @@ async fn redirect(req: Request<Body>) -> hyper::Result<Response<Body>> {
Ok(redirect_resp(url))
}
-async fn double_redirects(req: Request<Body>) -> hyper::Result<Response<Body>> {
+async fn double_redirects(
+ req: Request<hyper::body::Incoming>,
+) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error> {
let p = req.uri().path();
assert_eq!(&p[0..1], "/");
let url = format!("http://localhost:{REDIRECT_PORT}{p}");
@@ -199,7 +219,9 @@ async fn double_redirects(req: Request<Body>) -> hyper::Result<Response<Body>> {
Ok(redirect_resp(url))
}
-async fn inf_redirects(req: Request<Body>) -> hyper::Result<Response<Body>> {
+async fn inf_redirects(
+ req: Request<hyper::body::Incoming>,
+) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error> {
let p = req.uri().path();
assert_eq!(&p[0..1], "/");
let url = format!("http://localhost:{INF_REDIRECTS_PORT}{p}");
@@ -207,7 +229,9 @@ async fn inf_redirects(req: Request<Body>) -> hyper::Result<Response<Body>> {
Ok(redirect_resp(url))
}
-async fn another_redirect(req: Request<Body>) -> hyper::Result<Response<Body>> {
+async fn another_redirect(
+ req: Request<hyper::body::Incoming>,
+) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error> {
let p = req.uri().path();
assert_eq!(&p[0..1], "/");
let url = format!("http://localhost:{PORT}/subdir{p}");
@@ -215,7 +239,9 @@ async fn another_redirect(req: Request<Body>) -> hyper::Result<Response<Body>> {
Ok(redirect_resp(url))
}
-async fn auth_redirect(req: Request<Body>) -> hyper::Result<Response<Body>> {
+async fn auth_redirect(
+ req: Request<hyper::body::Incoming>,
+) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error> {
if let Some(auth) = req
.headers()
.get("authorization")
@@ -229,14 +255,14 @@ async fn auth_redirect(req: Request<Body>) -> hyper::Result<Response<Body>> {
}
}
- let mut resp = Response::new(Body::empty());
+ let mut resp = Response::new(UnsyncBoxBody::new(Empty::new()));
*resp.status_mut() = StatusCode::NOT_FOUND;
Ok(resp)
}
async fn basic_auth_redirect(
- req: Request<Body>,
-) -> hyper::Result<Response<Body>> {
+ req: Request<hyper::body::Incoming>,
+) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error> {
if let Some(auth) = req
.headers()
.get("authorization")
@@ -252,7 +278,7 @@ async fn basic_auth_redirect(
}
}
- let mut resp = Response::new(Body::empty());
+ let mut resp = Response::new(UnsyncBoxBody::new(Empty::new()));
*resp.status_mut() = StatusCode::NOT_FOUND;
Ok(resp)
}
@@ -331,8 +357,8 @@ async fn run_tls_server(port: u16) {
}
async fn absolute_redirect(
- req: Request<Body>,
-) -> hyper::Result<Response<Body>> {
+ req: Request<hyper::body::Incoming>,
+) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error> {
let path = req.uri().path();
if path == "/" {
@@ -373,7 +399,7 @@ async fn absolute_redirect(
let file_path = testdata_path().join(&req.uri().path()[1..]);
if file_path.is_dir() || !file_path.exists() {
- let mut not_found_resp = Response::new(Body::empty());
+ let mut not_found_resp = Response::new(UnsyncBoxBody::new(Empty::new()));
*not_found_resp.status_mut() = StatusCode::NOT_FOUND;
return Ok(not_found_resp);
}
@@ -384,12 +410,14 @@ async fn absolute_redirect(
}
async fn main_server(
- req: Request<Body>,
-) -> Result<Response<Body>, hyper::http::Error> {
+ req: Request<hyper::body::Incoming>,
+) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error> {
return match (req.method(), req.uri().path()) {
(_, "/echo_server") => {
let (parts, body) = req.into_parts();
- let mut response = Response::new(body);
+ let mut response = Response::new(UnsyncBoxBody::new(Full::new(
+ body.collect().await?.to_bytes(),
+ )));
if let Some(status) = parts.headers.get("x-status") {
*response.status_mut() =
@@ -398,9 +426,9 @@ async fn main_server(
response.headers_mut().extend(parts.headers);
Ok(response)
}
- (&hyper::Method::POST, "/echo_multipart_file") => {
+ (&Method::POST, "/echo_multipart_file") => {
let body = req.into_body();
- let bytes = &hyper::body::to_bytes(body).await.unwrap()[0..];
+ let bytes = &body.collect().await.unwrap().to_bytes()[0..];
let start = b"--boundary\t \r\n\
Content-Disposition: form-data; name=\"field_1\"\r\n\
\r\n\
@@ -413,7 +441,8 @@ async fn main_server(
let end = b"\r\n--boundary--\r\n";
let b = [start as &[u8], bytes, end].concat();
- let mut response = Response::new(Body::from(b));
+ let mut response =
+ Response::new(UnsyncBoxBody::new(Full::new(Bytes::from(b))));
response.headers_mut().insert(
"content-type",
HeaderValue::from_static("multipart/form-data;boundary=boundary"),
@@ -434,7 +463,7 @@ async fn main_server(
console.log(\"Hi\")\
\r\n--boundary--\r\n\
Epilogue";
- let mut res = Response::new(Body::from(b));
+ let mut res = Response::new(string_body(b));
res.headers_mut().insert(
"content-type",
HeaderValue::from_static("multipart/form-data;boundary=boundary"),
@@ -455,7 +484,7 @@ async fn main_server(
console.log(\"Hi\")\
\r\n--boundary--\r\n\
Epilogue";
- let mut res = Response::new(Body::from(b));
+ let mut res = Response::new(string_body(b));
res.headers_mut().insert(
"content-type",
HeaderValue::from_static("multipart/form-datatststs;boundary=boundary"),
@@ -463,17 +492,17 @@ async fn main_server(
Ok(res)
}
(_, "/bad_redirect") => {
- let mut res = Response::new(Body::empty());
+ let mut res = Response::new(empty_body());
*res.status_mut() = StatusCode::FOUND;
Ok(res)
}
(_, "/server_error") => {
- let mut res = Response::new(Body::empty());
+ let mut res = Response::new(empty_body());
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(res)
}
(_, "/x_deno_warning.js") => {
- let mut res = Response::new(Body::empty());
+ let mut res = Response::new(empty_body());
*res.status_mut() = StatusCode::MOVED_PERMANENTLY;
res
.headers_mut()
@@ -485,7 +514,7 @@ async fn main_server(
Ok(res)
}
(_, "/non_ascii_redirect") => {
- let mut res = Response::new(Body::empty());
+ let mut res = Response::new(empty_body());
*res.status_mut() = StatusCode::MOVED_PERMANENTLY;
res.headers_mut().insert(
"location",
@@ -497,7 +526,7 @@ async fn main_server(
let if_none_match = req.headers().get("if-none-match");
if if_none_match == Some(&HeaderValue::from_static("33a64df551425fcc55e"))
{
- let mut resp = Response::new(Body::empty());
+ let mut resp = Response::new(empty_body());
*resp.status_mut() = StatusCode::NOT_MODIFIED;
resp.headers_mut().insert(
"Content-type",
@@ -509,7 +538,7 @@ async fn main_server(
Ok(resp)
} else {
- let mut resp = Response::new(Body::from("console.log('etag')"));
+ let mut resp = Response::new(string_body("console.log('etag')"));
resp.headers_mut().insert(
"Content-type",
HeaderValue::from_static("application/typescript"),
@@ -521,7 +550,7 @@ async fn main_server(
}
}
(_, "/xTypeScriptTypes.js") => {
- let mut res = Response::new(Body::from("export const foo = 'foo';"));
+ let mut res = Response::new(string_body("export const foo = 'foo';"));
res.headers_mut().insert(
"Content-type",
HeaderValue::from_static("application/javascript"),
@@ -533,7 +562,7 @@ async fn main_server(
Ok(res)
}
(_, "/xTypeScriptTypes.jsx") => {
- let mut res = Response::new(Body::from("export const foo = 'foo';"));
+ let mut res = Response::new(string_body("export const foo = 'foo';"));
res
.headers_mut()
.insert("Content-type", HeaderValue::from_static("text/jsx"));
@@ -545,7 +574,7 @@ async fn main_server(
}
(_, "/xTypeScriptTypes.ts") => {
let mut res =
- Response::new(Body::from("export const foo: string = 'foo';"));
+ Response::new(string_body("export const foo: string = 'foo';"));
res.headers_mut().insert(
"Content-type",
HeaderValue::from_static("application/typescript"),
@@ -557,7 +586,7 @@ async fn main_server(
Ok(res)
}
(_, "/xTypeScriptTypes.d.ts") => {
- let mut res = Response::new(Body::from("export const foo: 'foo';"));
+ let mut res = Response::new(string_body("export const foo: 'foo';"));
res.headers_mut().insert(
"Content-type",
HeaderValue::from_static("application/typescript"),
@@ -565,7 +594,7 @@ async fn main_server(
Ok(res)
}
(_, "/run/type_directives_redirect.js") => {
- let mut res = Response::new(Body::from("export const foo = 'foo';"));
+ let mut res = Response::new(string_body("export const foo = 'foo';"));
res.headers_mut().insert(
"Content-type",
HeaderValue::from_static("application/javascript"),
@@ -579,7 +608,7 @@ async fn main_server(
Ok(res)
}
(_, "/run/type_headers_deno_types.foo.js") => {
- let mut res = Response::new(Body::from(
+ let mut res = Response::new(string_body(
"export function foo(text) { console.log(text); }",
));
res.headers_mut().insert(
@@ -596,7 +625,7 @@ async fn main_server(
}
(_, "/run/type_headers_deno_types.d.ts") => {
let mut res =
- Response::new(Body::from("export function foo(text: number): void;"));
+ Response::new(string_body("export function foo(text: number): void;"));
res.headers_mut().insert(
"Content-type",
HeaderValue::from_static("application/typescript"),
@@ -605,7 +634,7 @@ async fn main_server(
}
(_, "/run/type_headers_deno_types.foo.d.ts") => {
let mut res =
- Response::new(Body::from("export function foo(text: string): void;"));
+ Response::new(string_body("export function foo(text: string): void;"));
res.headers_mut().insert(
"Content-type",
HeaderValue::from_static("application/typescript"),
@@ -613,7 +642,7 @@ async fn main_server(
Ok(res)
}
(_, "/subdir/xTypeScriptTypesRedirect.d.ts") => {
- let mut res = Response::new(Body::from(
+ let mut res = Response::new(string_body(
"import './xTypeScriptTypesRedirected.d.ts';",
));
res.headers_mut().insert(
@@ -623,7 +652,7 @@ async fn main_server(
Ok(res)
}
(_, "/subdir/xTypeScriptTypesRedirected.d.ts") => {
- let mut res = Response::new(Body::from("export const foo: 'foo';"));
+ let mut res = Response::new(string_body("export const foo: 'foo';"));
res.headers_mut().insert(
"Content-type",
HeaderValue::from_static("application/typescript"),
@@ -631,7 +660,7 @@ async fn main_server(
Ok(res)
}
(_, "/referenceTypes.js") => {
- let mut res = Response::new(Body::from("/// <reference types=\"./xTypeScriptTypes.d.ts\" />\r\nexport const foo = \"foo\";\r\n"));
+ let mut res = Response::new(string_body("/// <reference types=\"./xTypeScriptTypes.d.ts\" />\r\nexport const foo = \"foo\";\r\n"));
res.headers_mut().insert(
"Content-type",
HeaderValue::from_static("application/javascript"),
@@ -639,7 +668,7 @@ async fn main_server(
Ok(res)
}
(_, "/subdir/file_with_:_in_name.ts") => {
- let mut res = Response::new(Body::from(
+ let mut res = Response::new(string_body(
"console.log('Hello from file_with_:_in_name.ts');",
));
res.headers_mut().insert(
@@ -650,7 +679,7 @@ async fn main_server(
}
(_, "/v1/extensionless") => {
let mut res =
- Response::new(Body::from(r#"export * from "/subdir/mod1.ts";"#));
+ Response::new(string_body(r#"export * from "/subdir/mod1.ts";"#));
res.headers_mut().insert(
"content-type",
HeaderValue::from_static("application/typescript"),
@@ -658,7 +687,7 @@ async fn main_server(
Ok(res)
}
(_, "/subdir/no_js_ext@1.0.0") => {
- let mut res = Response::new(Body::from(
+ let mut res = Response::new(string_body(
r#"import { printHello } from "./mod2.ts";
printHello();
"#,
@@ -678,19 +707,19 @@ async fn main_server(
body,
))
} else {
- Ok(Response::new(Body::empty()))
+ Ok(Response::new(empty_body()))
}
}
(_, "/http_version") => {
let version = format!("{:?}", req.version());
- Ok(Response::new(version.into()))
+ Ok(Response::new(string_body(&version)))
}
(_, "/content_length") => {
let content_length = format!("{:?}", req.headers().get("content-length"));
- Ok(Response::new(content_length.into()))
+ Ok(Response::new(string_body(&content_length)))
}
(_, "/jsx/jsx-runtime") | (_, "/jsx/jsx-dev-runtime") => {
- let mut res = Response::new(Body::from(
+ let mut res = Response::new(string_body(
r#"export function jsx(
_type,
_props,
@@ -711,8 +740,8 @@ async fn main_server(
Ok(res)
}
(_, "/dynamic") => {
- let mut res = Response::new(Body::from(
- serde_json::to_string_pretty(&std::time::SystemTime::now()).unwrap(),
+ let mut res = Response::new(string_body(
+ &serde_json::to_string_pretty(&std::time::SystemTime::now()).unwrap(),
));
res
.headers_mut()
@@ -720,8 +749,8 @@ async fn main_server(
Ok(res)
}
(_, "/dynamic_cache") => {
- let mut res = Response::new(Body::from(
- serde_json::to_string_pretty(&std::time::SystemTime::now()).unwrap(),
+ let mut res = Response::new(string_body(
+ &serde_json::to_string_pretty(&std::time::SystemTime::now()).unwrap(),
));
res.headers_mut().insert(
"cache-control",
@@ -730,7 +759,7 @@ async fn main_server(
Ok(res)
}
(_, "/dynamic_module.ts") => {
- let mut res = Response::new(Body::from(format!(
+ let mut res = Response::new(string_body(&format!(
r#"export const time = {};"#,
std::time::SystemTime::now().elapsed().unwrap().as_nanos()
)));
@@ -742,17 +771,16 @@ async fn main_server(
}
(_, "/echo_accept") => {
let accept = req.headers().get("accept").map(|v| v.to_str().unwrap());
- let res = Response::new(Body::from(
- serde_json::json!({ "accept": accept }).to_string(),
- ));
+ let res =
+ Response::new(json_body(serde_json::json!({ "accept": accept })));
Ok(res)
}
(_, "/search_params") => {
let query = req.uri().query().map(|s| s.to_string());
- let res = Response::new(Body::from(query.unwrap_or_default()));
+ let res = Response::new(string_body(&query.unwrap_or_default()));
Ok(res)
}
- (&hyper::Method::POST, "/kv_remote_authorize") => {
+ (&Method::POST, "/kv_remote_authorize") => {
if req
.headers()
.get("authorization")
@@ -763,7 +791,7 @@ async fn main_server(
return Ok(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
);
}
@@ -771,25 +799,22 @@ async fn main_server(
Ok(
Response::builder()
.header("content-type", "application/json")
- .body(Body::from(
- serde_json::json!({
- "version": 1,
- "databaseId": KV_DATABASE_ID,
- "endpoints": [
- {
- "url": format!("http://localhost:{}/kv_blackhole", PORT),
- "consistency": "strong",
- }
- ],
- "token": KV_DATABASE_TOKEN,
- "expiresAt": "2099-01-01T00:00:00Z",
- })
- .to_string(),
- ))
+ .body(json_body(serde_json::json!({
+ "version": 1,
+ "databaseId": KV_DATABASE_ID,
+ "endpoints": [
+ {
+ "url": format!("http://localhost:{}/kv_blackhole", PORT),
+ "consistency": "strong",
+ }
+ ],
+ "token": KV_DATABASE_TOKEN,
+ "expiresAt": "2099-01-01T00:00:00Z",
+ })))
.unwrap(),
)
}
- (&hyper::Method::POST, "/kv_remote_authorize_invalid_format") => {
+ (&Method::POST, "/kv_remote_authorize_invalid_format") => {
if req
.headers()
.get("authorization")
@@ -800,7 +825,7 @@ async fn main_server(
return Ok(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
);
}
@@ -808,17 +833,14 @@ async fn main_server(
Ok(
Response::builder()
.header("content-type", "application/json")
- .body(Body::from(
- serde_json::json!({
- "version": 1,
- "databaseId": KV_DATABASE_ID,
- })
- .to_string(),
- ))
+ .body(json_body(serde_json::json!({
+ "version": 1,
+ "databaseId": KV_DATABASE_ID,
+ })))
.unwrap(),
)
}
- (&hyper::Method::POST, "/kv_remote_authorize_invalid_version") => {
+ (&Method::POST, "/kv_remote_authorize_invalid_version") => {
if req
.headers()
.get("authorization")
@@ -829,7 +851,7 @@ async fn main_server(
return Ok(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
);
}
@@ -837,25 +859,22 @@ async fn main_server(
Ok(
Response::builder()
.header("content-type", "application/json")
- .body(Body::from(
- serde_json::json!({
- "version": 1000,
- "databaseId": KV_DATABASE_ID,
- "endpoints": [
- {
- "url": format!("http://localhost:{}/kv_blackhole", PORT),
- "consistency": "strong",
- }
- ],
- "token": KV_DATABASE_TOKEN,
- "expiresAt": "2099-01-01T00:00:00Z",
- })
- .to_string(),
- ))
+ .body(json_body(serde_json::json!({
+ "version": 1000,
+ "databaseId": KV_DATABASE_ID,
+ "endpoints": [
+ {
+ "url": format!("http://localhost:{}/kv_blackhole", PORT),
+ "consistency": "strong",
+ }
+ ],
+ "token": KV_DATABASE_TOKEN,
+ "expiresAt": "2099-01-01T00:00:00Z",
+ })))
.unwrap(),
)
}
- (&hyper::Method::POST, "/kv_blackhole/snapshot_read") => {
+ (&Method::POST, "/kv_blackhole/snapshot_read") => {
if req
.headers()
.get("authorization")
@@ -866,20 +885,23 @@ async fn main_server(
return Ok(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
);
}
- let body = hyper::body::to_bytes(req.into_body())
+ let body = req
+ .into_body()
+ .collect()
.await
- .unwrap_or_default();
+ .unwrap_or_default()
+ .to_bytes();
let Ok(body): Result<SnapshotRead, _> = prost::Message::decode(&body[..])
else {
return Ok(
Response::builder()
.status(StatusCode::BAD_REQUEST)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
);
};
@@ -887,13 +909,13 @@ async fn main_server(
return Ok(
Response::builder()
.status(StatusCode::BAD_REQUEST)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
);
}
Ok(
Response::builder()
- .body(Body::from(
+ .body(UnsyncBoxBody::new(Full::new(Bytes::from(
SnapshotReadOutput {
ranges: body
.ranges
@@ -905,11 +927,11 @@ async fn main_server(
status: SnapshotReadStatus::SrSuccess.into(),
}
.encode_to_vec(),
- ))
+ ))))
.unwrap(),
)
}
- (&hyper::Method::POST, "/kv_blackhole/atomic_write") => {
+ (&Method::POST, "/kv_blackhole/atomic_write") => {
if req
.headers()
.get("authorization")
@@ -920,68 +942,71 @@ async fn main_server(
return Ok(
Response::builder()
.status(StatusCode::UNAUTHORIZED)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
);
}
- let body = hyper::body::to_bytes(req.into_body())
+ let body = req
+ .into_body()
+ .collect()
.await
- .unwrap_or_default();
+ .unwrap_or_default()
+ .to_bytes();
let Ok(_body): Result<AtomicWrite, _> = prost::Message::decode(&body[..])
else {
return Ok(
Response::builder()
.status(StatusCode::BAD_REQUEST)
- .body(Body::empty())
+ .body(empty_body())
.unwrap(),
);
};
Ok(
Response::builder()
- .body(Body::from(
+ .body(UnsyncBoxBody::new(Full::new(Bytes::from(
AtomicWriteOutput {
status: AtomicWriteStatus::AwSuccess.into(),
versionstamp: vec![0u8; 10],
failed_checks: vec![],
}
.encode_to_vec(),
- ))
+ ))))
.unwrap(),
)
}
- (&hyper::Method::GET, "/upgrade/sleep/release-latest.txt") => {
+ (&Method::GET, "/upgrade/sleep/release-latest.txt") => {
tokio::time::sleep(Duration::from_secs(95)).await;
return Ok(
Response::builder()
.status(StatusCode::OK)
- .body(Body::from("99999.99.99"))
+ .body(string_body("99999.99.99"))
.unwrap(),
);
}
- (&hyper::Method::GET, "/upgrade/sleep/canary-latest.txt") => {
+ (&Method::GET, "/upgrade/sleep/canary-latest.txt") => {
tokio::time::sleep(Duration::from_secs(95)).await;
return Ok(
Response::builder()
.status(StatusCode::OK)
- .body(Body::from("bda3850f84f24b71e02512c1ba2d6bf2e3daa2fd"))
+ .body(string_body("bda3850f84f24b71e02512c1ba2d6bf2e3daa2fd"))
.unwrap(),
);
}
- (&hyper::Method::GET, "/release-latest.txt") => {
+ (&Method::GET, "/release-latest.txt") => {
return Ok(
Response::builder()
.status(StatusCode::OK)
// use a deno version that will never happen
- .body(Body::from("99999.99.99"))
+ .body(string_body("99999.99.99"))
.unwrap(),
);
}
- (&hyper::Method::GET, "/canary-latest.txt") => {
+ (&Method::GET, "/canary-latest.txt") => {
return Ok(
Response::builder()
.status(StatusCode::OK)
- .body(Body::from("bda3850f84f24b71e02512c1ba2d6bf2e3daa2fd"))
+ .body(string_body("bda3850f84f24b71e02512c1ba2d6bf2e3daa2fd"))
.unwrap(),
);
}
@@ -1008,7 +1033,8 @@ async fn main_server(
Err(err) => {
return Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
- .body(format!("{err:#}").into());
+ .body(string_body(&format!("{err:#}")))
+ .map_err(|e| e.into());
}
}
} else if req.uri().path().starts_with("/npm/registry/") {
@@ -1026,7 +1052,8 @@ async fn main_server(
{
return Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
- .body(format!("{err:#}").into());
+ .body(string_body(&format!("{err:#}")))
+ .map_err(|e| e.into());
};
// serve the file
@@ -1047,19 +1074,21 @@ async fn main_server(
return Response::builder()
.status(StatusCode::OK)
.header("content-type", "application/typescript")
- .body(Body::empty());
+ .body(empty_body())
+ .map_err(|e| e.into());
}
Response::builder()
.status(StatusCode::NOT_FOUND)
- .body(Body::empty())
+ .body(empty_body())
+ .map_err(|e| e.into())
}
};
}
fn handle_custom_npm_registry_path(
path: &str,
-) -> Result<Option<Response<Body>>, anyhow::Error> {
+) -> Result<Option<Response<UnsyncBoxBody<Bytes, Infallible>>>, anyhow::Error> {
let parts = path
.split('/')
.filter(|p| !p.is_empty())
@@ -1130,109 +1159,223 @@ async fn download_npm_registry_file(
Ok(())
}
-/// Taken from example in https://github.com/ctz/hyper-rustls/blob/a02ef72a227dcdf102f86e905baa7415c992e8b3/examples/server.rs
-struct HyperAcceptor<'a> {
- acceptor: Pin<
- Box<dyn Stream<Item = io::Result<rustls_tokio_stream::TlsStream>> + 'a>,
- >,
+#[derive(Clone)]
+struct DenoUnsyncExecutor;
+
+impl<Fut> hyper::rt::Executor<Fut> for DenoUnsyncExecutor
+where
+ Fut: Future + 'static,
+ Fut::Output: 'static,
+{
+ fn execute(&self, fut: Fut) {
+ deno_unsync::spawn(fut);
+ }
}
-impl hyper::server::accept::Accept for HyperAcceptor<'_> {
- type Conn = rustls_tokio_stream::TlsStream;
- type Error = io::Error;
+#[derive(Debug, Clone, Copy)]
+enum Hyper1ServerKind {
+ Auto,
+ OnlyHttp1,
+ OnlyHttp2,
+}
- fn poll_accept(
- mut self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
- Pin::new(&mut self.acceptor).poll_next(cx)
+async fn hyper1_serve_connection<I, F, S>(
+ io: I,
+ service_fn_handler: F,
+ error_msg: &'static str,
+ kind: Hyper1ServerKind,
+) where
+ I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static,
+ F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
+ S: Future<
+ Output = Result<
+ Response<UnsyncBoxBody<Bytes, Infallible>>,
+ anyhow::Error,
+ >,
+ > + 'static,
+{
+ let service = hyper1::service::service_fn(service_fn_handler);
+
+ let result: Result<(), anyhow::Error> = match kind {
+ Hyper1ServerKind::Auto => {
+ let builder =
+ hyper_util::server::conn::auto::Builder::new(DenoUnsyncExecutor);
+ builder
+ .serve_connection(io, service)
+ .await
+ .map_err(|e| anyhow::anyhow!("{}", e))
+ }
+ Hyper1ServerKind::OnlyHttp1 => {
+ let builder = hyper1::server::conn::http1::Builder::new();
+ builder
+ .serve_connection(io, service)
+ .await
+ .map_err(|e| e.into())
+ }
+ Hyper1ServerKind::OnlyHttp2 => {
+ let builder =
+ hyper1::server::conn::http2::Builder::new(DenoUnsyncExecutor);
+ builder
+ .serve_connection(io, service)
+ .await
+ .map_err(|e| e.into())
+ }
+ };
+
+ if let Err(e) = result {
+ eprintln!("{}: {:?}", error_msg, e);
+ }
+}
+
+async fn run_hyper1_server_inner<F, S>(
+ addr: SocketAddr,
+ service_fn_handler: F,
+ error_msg: &'static str,
+ kind: Hyper1ServerKind,
+) where
+ F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
+ S: Future<
+ Output = Result<
+ Response<UnsyncBoxBody<Bytes, Infallible>>,
+ anyhow::Error,
+ >,
+ > + 'static,
+{
+ let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> =
+ async move {
+ let listener = TcpListener::bind(addr).await?;
+ loop {
+ let (stream, _) = listener.accept().await?;
+ let io = TokioIo::new(stream);
+ deno_unsync::spawn(hyper1_serve_connection(
+ io,
+ service_fn_handler,
+ error_msg,
+ kind,
+ ));
+ }
+ }
+ .boxed_local();
+
+ if let Err(e) = fut.await {
+ eprintln!("{}: {:?}", error_msg, e);
}
}
-#[allow(clippy::non_send_fields_in_send_ty)]
-// SAFETY: unsafe trait must have unsafe implementation
-unsafe impl std::marker::Send for HyperAcceptor<'_> {}
+async fn run_hyper1_server<F, S>(
+ addr: SocketAddr,
+ service_fn_handler: F,
+ error_msg: &'static str,
+) where
+ F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
+ S: Future<
+ Output = Result<
+ Response<UnsyncBoxBody<Bytes, Infallible>>,
+ anyhow::Error,
+ >,
+ > + 'static,
+{
+ run_hyper1_server_inner(
+ addr,
+ service_fn_handler,
+ error_msg,
+ Hyper1ServerKind::Auto,
+ )
+ .await
+}
+
+// TODO(bartlomieju): dedup with `run_hyper1_server_inner`
+async fn run_hyper1_server_with_acceptor<'a, A, F, S>(
+ mut acceptor: Pin<Box<A>>,
+ service_fn_handler: F,
+ error_msg: &'static str,
+ kind: Hyper1ServerKind,
+) where
+ A: Stream<Item = io::Result<rustls_tokio_stream::TlsStream>> + ?Sized,
+ F: Fn(Request<hyper::body::Incoming>) -> S + Copy + 'static,
+ S: Future<
+ Output = Result<
+ Response<UnsyncBoxBody<Bytes, Infallible>>,
+ anyhow::Error,
+ >,
+ > + 'static,
+{
+ let fut: Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>> =
+ async move {
+ while let Some(result) = acceptor.next().await {
+ let stream = result?;
+ let io = TokioIo::new(stream);
+ deno_unsync::spawn(hyper1_serve_connection(
+ io,
+ service_fn_handler,
+ error_msg,
+ kind,
+ ));
+ }
+ Ok(())
+ }
+ .boxed_local();
+
+ if let Err(e) = fut.await {
+ eprintln!("{}: {:?}", error_msg, e);
+ }
+}
async fn wrap_redirect_server(port: u16) {
- let redirect_svc =
- make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(redirect)) });
let redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
- let redirect_server = Server::bind(&redirect_addr).serve(redirect_svc);
- if let Err(e) = redirect_server.await {
- eprintln!("Redirect error: {e:?}");
- }
+ run_hyper1_server(redirect_addr, redirect, "Redirect error").await;
}
async fn wrap_double_redirect_server(port: u16) {
- let double_redirects_svc = make_service_fn(|_| async {
- Ok::<_, Infallible>(service_fn(double_redirects))
- });
let double_redirects_addr = SocketAddr::from(([127, 0, 0, 1], port));
- let double_redirects_server =
- Server::bind(&double_redirects_addr).serve(double_redirects_svc);
- if let Err(e) = double_redirects_server.await {
- eprintln!("Double redirect error: {e:?}");
- }
+ run_hyper1_server(
+ double_redirects_addr,
+ double_redirects,
+ "Double redirect error",
+ )
+ .await;
}
async fn wrap_inf_redirect_server(port: u16) {
- let inf_redirects_svc = make_service_fn(|_| async {
- Ok::<_, Infallible>(service_fn(inf_redirects))
- });
let inf_redirects_addr = SocketAddr::from(([127, 0, 0, 1], port));
- let inf_redirects_server =
- Server::bind(&inf_redirects_addr).serve(inf_redirects_svc);
- if let Err(e) = inf_redirects_server.await {
- eprintln!("Inf redirect error: {e:?}");
- }
+ run_hyper1_server(inf_redirects_addr, inf_redirects, "Inf redirect error")
+ .await;
}
async fn wrap_another_redirect_server(port: u16) {
- let another_redirect_svc = make_service_fn(|_| async {
- Ok::<_, Infallible>(service_fn(another_redirect))
- });
let another_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
- let another_redirect_server =
- Server::bind(&another_redirect_addr).serve(another_redirect_svc);
- if let Err(e) = another_redirect_server.await {
- eprintln!("Another redirect error: {e:?}");
- }
+ run_hyper1_server(
+ another_redirect_addr,
+ another_redirect,
+ "Another redirect error",
+ )
+ .await;
}
async fn wrap_auth_redirect_server(port: u16) {
- let auth_redirect_svc = make_service_fn(|_| async {
- Ok::<_, Infallible>(service_fn(auth_redirect))
- });
let auth_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
- let auth_redirect_server =
- Server::bind(&auth_redirect_addr).serve(auth_redirect_svc);
- if let Err(e) = auth_redirect_server.await {
- eprintln!("Auth redirect error: {e:?}");
- }
+ run_hyper1_server(auth_redirect_addr, auth_redirect, "Auth redirect error")
+ .await;
}
async fn wrap_basic_auth_redirect_server(port: u16) {
- let basic_auth_redirect_svc = make_service_fn(|_| async {
- Ok::<_, Infallible>(service_fn(basic_auth_redirect))
- });
let basic_auth_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
- let basic_auth_redirect_server =
- Server::bind(&basic_auth_redirect_addr).serve(basic_auth_redirect_svc);
- if let Err(e) = basic_auth_redirect_server.await {
- eprintln!("Basic auth redirect error: {e:?}");
- }
+ run_hyper1_server(
+ basic_auth_redirect_addr,
+ basic_auth_redirect,
+ "Basic auth redirect error",
+ )
+ .await;
}
async fn wrap_abs_redirect_server(port: u16) {
- let abs_redirect_svc = make_service_fn(|_| async {
- Ok::<_, Infallible>(service_fn(absolute_redirect))
- });
let abs_redirect_addr = SocketAddr::from(([127, 0, 0, 1], port));
- let abs_redirect_server =
- Server::bind(&abs_redirect_addr).serve(abs_redirect_svc);
- if let Err(e) = abs_redirect_server.await {
- eprintln!("Absolute redirect error: {e:?}");
- }
+ run_hyper1_server(
+ abs_redirect_addr,
+ absolute_redirect,
+ "Absolute redirect error",
+ )
+ .await;
}
async fn wrap_main_server(port: u16) {
@@ -1249,23 +1392,19 @@ async fn wrap_main_ipv6_server(port: u16) {
}
async fn wrap_main_server_for_addr(main_server_addr: &SocketAddr) {
- let main_server_svc =
- make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) });
- let main_server = Server::bind(main_server_addr).serve(main_server_svc);
- if let Err(e) = main_server.await {
- eprintln!("HTTP server error: {e:?}");
- }
+ run_hyper1_server(*main_server_addr, main_server, "HTTP server error").await;
}
async fn wrap_main_https_server(port: u16) {
let tls = get_tls_listener_stream("https", port, Default::default()).await;
- let main_server_https_svc =
- make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) });
- let main_server_https = Server::builder(HyperAcceptor {
- acceptor: tls.boxed_local(),
- })
- .serve(main_server_https_svc);
- let _ = main_server_https.await;
+ let tls_acceptor = tls.boxed_local();
+ run_hyper1_server_with_acceptor(
+ tls_acceptor,
+ main_server,
+ "HTTPS server error",
+ Hyper1ServerKind::Auto,
+ )
+ .await
}
async fn wrap_https_h1_only_tls_server(port: u16) {
@@ -1276,15 +1415,13 @@ async fn wrap_https_h1_only_tls_server(port: u16) {
)
.await;
- let main_server_https_svc =
- make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) });
- let main_server_https = Server::builder(HyperAcceptor {
- acceptor: tls.boxed_local(),
- })
- .http1_only(true)
- .serve(main_server_https_svc);
-
- let _ = main_server_https.await;
+ run_hyper1_server_with_acceptor(
+ tls.boxed_local(),
+ main_server,
+ "HTTP1 only TLS server error",
+ Hyper1ServerKind::OnlyHttp1,
+ )
+ .await
}
async fn wrap_https_h2_only_tls_server(port: u16) {
@@ -1295,37 +1432,35 @@ async fn wrap_https_h2_only_tls_server(port: u16) {
)
.await;
- let main_server_https_svc =
- make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) });
- let main_server_https = Server::builder(HyperAcceptor {
- acceptor: tls.boxed_local(),
- })
- .http2_only(true)
- .serve(main_server_https_svc);
-
- let _ = main_server_https.await;
+ run_hyper1_server_with_acceptor(
+ tls.boxed_local(),
+ main_server,
+ "HTTP2 only TLS server error",
+ Hyper1ServerKind::OnlyHttp2,
+ )
+ .await
}
async fn wrap_http_h1_only_server(port: u16) {
let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], port));
-
- let main_server_http_svc =
- make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) });
- let main_server_http = Server::bind(&main_server_http_addr)
- .http1_only(true)
- .serve(main_server_http_svc);
- let _ = main_server_http.await;
+ run_hyper1_server_inner(
+ main_server_http_addr,
+ main_server,
+ "HTTP1 only server error:",
+ Hyper1ServerKind::OnlyHttp1,
+ )
+ .await;
}
async fn wrap_http_h2_only_server(port: u16) {
let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], port));
-
- let main_server_http_svc =
- make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) });
- let main_server_http = Server::bind(&main_server_http_addr)
- .http2_only(true)
- .serve(main_server_http_svc);
- let _ = main_server_http.await;
+ run_hyper1_server_inner(
+ main_server_http_addr,
+ main_server,
+ "HTTP1 only server error:",
+ Hyper1ServerKind::OnlyHttp2,
+ )
+ .await;
}
async fn wrap_client_auth_https_server(port: u16) {
@@ -1345,18 +1480,22 @@ async fn wrap_client_auth_https_server(port: u16) {
}
};
- let main_server_https_svc =
- make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) });
- let main_server_https = Server::builder(HyperAcceptor {
- acceptor: tls.boxed_local(),
- })
- .serve(main_server_https_svc);
-
- let _ = main_server_https.await;
+ run_hyper1_server_with_acceptor(
+ tls.boxed_local(),
+ main_server,
+ "Auth TLS server error",
+ Hyper1ServerKind::Auto,
+ )
+ .await
}
-fn custom_headers(p: &str, body: Vec<u8>) -> Response<Body> {
- let mut response = Response::new(Body::from(body));
+fn custom_headers(
+ p: &str,
+ body: Vec<u8>,
+) -> Response<UnsyncBoxBody<Bytes, Infallible>> {
+ let mut response = Response::new(UnsyncBoxBody::new(
+ http_body_util::Full::new(Bytes::from(body)),
+ ));
if p.ends_with("/run/import_compression/brotli") {
response
diff --git a/test_util/src/servers/registry.rs b/test_util/src/servers/registry.rs
index 56a9e31b0..8f6fd32aa 100644
--- a/test_util/src/servers/registry.rs
+++ b/test_util/src/servers/registry.rs
@@ -1,37 +1,38 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use hyper::server::Server;
-use hyper::service::make_service_fn;
-use hyper::service::service_fn;
-use hyper::Body;
-use hyper::Request;
-use hyper::Response;
-use hyper::StatusCode;
+use super::run_hyper1_server;
+use bytes::Bytes;
+use http_body_util::combinators::UnsyncBoxBody;
+use http_body_util::Empty;
+use http_body_util::Full;
+use hyper1::body::Incoming;
+use hyper1::Request;
+use hyper1::Response;
+use hyper1::StatusCode;
use serde_json::json;
use std::convert::Infallible;
use std::net::SocketAddr;
pub async fn registry_server(port: u16) {
let registry_server_addr = SocketAddr::from(([127, 0, 0, 1], port));
- let registry_server_svc = make_service_fn(|_| async {
- Ok::<_, Infallible>(service_fn(registry_server_handler))
- });
- let registry_server =
- Server::bind(&registry_server_addr).serve(registry_server_svc);
- if let Err(e) = registry_server.await {
- eprintln!("Registry server error: {:?}", e);
- }
+
+ run_hyper1_server(
+ registry_server_addr,
+ registry_server_handler,
+ "Registry server error",
+ )
+ .await
}
async fn registry_server_handler(
- req: Request<Body>,
-) -> Result<Response<Body>, hyper::http::Error> {
+ req: Request<Incoming>,
+) -> Result<Response<UnsyncBoxBody<Bytes, Infallible>>, anyhow::Error> {
let path = req.uri().path();
// TODO(bartlomieju): add a proper router here
if path.starts_with("/api/scope/") {
let body = serde_json::to_string_pretty(&json!({})).unwrap();
- let res = Response::new(Body::from(body));
+ let res = Response::new(UnsyncBoxBody::new(Full::from(body)));
return Ok(res);
} else if path.starts_with("/api/scopes/") {
let body = serde_json::to_string_pretty(&json!({
@@ -40,7 +41,7 @@ async fn registry_server_handler(
"error": null
}))
.unwrap();
- let res = Response::new(Body::from(body));
+ let res = Response::new(UnsyncBoxBody::new(Full::from(body)));
return Ok(res);
} else if path.starts_with("/api/publish_status/") {
let body = serde_json::to_string_pretty(&json!({
@@ -49,11 +50,13 @@ async fn registry_server_handler(
"error": null
}))
.unwrap();
- let res = Response::new(Body::from(body));
+ let res = Response::new(UnsyncBoxBody::new(Full::from(body)));
return Ok(res);
}
- Response::builder()
+ let empty_body = UnsyncBoxBody::new(Empty::new());
+ let res = Response::builder()
.status(StatusCode::NOT_FOUND)
- .body(Body::empty())
+ .body(empty_body)?;
+ Ok(res)
}