// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use bytes::Bytes; use futures::Future; use futures::FutureExt; use futures::Stream; use futures::StreamExt; use http; use http::Request; use http::Response; use http_body_util::combinators::UnsyncBoxBody; use hyper_util::rt::TokioIo; use std::convert::Infallible; use std::io; use std::net::SocketAddr; use std::pin::Pin; use std::result::Result; use tokio::net::TcpListener; #[derive(Debug, Clone, Copy)] pub enum ServerKind { Auto, OnlyHttp1, OnlyHttp2, } #[derive(Debug, Clone, Copy)] pub struct ServerOptions { pub error_msg: &'static str, pub addr: SocketAddr, pub kind: ServerKind, } pub type HandlerOutput = Result>, anyhow::Error>; pub async fn run_server(options: ServerOptions, handler: F) where F: Fn(Request) -> S + Copy + 'static, S: Future + 'static, { let fut: Pin>>> = async move { let listener = TcpListener::bind(options.addr).await?; println!("ready: {}", options.addr); loop { let (stream, _) = listener.accept().await?; let io = TokioIo::new(stream); deno_unsync::spawn(hyper_serve_connection( io, handler, options.error_msg, options.kind, )); } } .boxed_local(); if let Err(e) = fut.await { let err_str = e.to_string(); if !err_str.contains("early eof") { eprintln!("{}: {:?}", options.error_msg, e); } } } pub async fn run_server_with_acceptor<'a, A, F, S>( mut acceptor: Pin>, handler: F, error_msg: &'static str, kind: ServerKind, ) where A: Stream> + ?Sized, F: Fn(Request) -> S + Copy + 'static, S: Future + 'static, { let fut: Pin>>> = async move { while let Some(result) = acceptor.next().await { let stream = result?; let io = TokioIo::new(stream); deno_unsync::spawn(hyper_serve_connection( io, handler, error_msg, kind, )); } Ok(()) } .boxed_local(); if let Err(e) = fut.await { let err_str = e.to_string(); if !err_str.contains("early eof") { eprintln!("{}: {:?}", error_msg, e); } } } async fn hyper_serve_connection( io: I, handler: F, error_msg: &'static str, kind: ServerKind, ) where I: hyper::rt::Read + hyper::rt::Write + Unpin + 'static, F: Fn(Request) -> S + Copy + 'static, S: Future + 'static, { let service = hyper::service::service_fn(handler); let result: Result<(), anyhow::Error> = match kind { ServerKind::Auto => { let builder = hyper_util::server::conn::auto::Builder::new(DenoUnsyncExecutor); builder .serve_connection(io, service) .await .map_err(|e| anyhow::anyhow!("{:?}", e)) } ServerKind::OnlyHttp1 => { let builder = hyper::server::conn::http1::Builder::new(); builder .serve_connection(io, service) .await .map_err(|e| e.into()) } ServerKind::OnlyHttp2 => { let builder = hyper::server::conn::http2::Builder::new(DenoUnsyncExecutor); builder .serve_connection(io, service) .await .map_err(|e| e.into()) } }; if let Err(e) = result { let err_str = e.to_string(); if !err_str.contains("early eof") { eprintln!("{}: {:?}", error_msg, e); } } } #[derive(Clone)] struct DenoUnsyncExecutor; impl hyper::rt::Executor for DenoUnsyncExecutor where Fut: Future + 'static, Fut::Output: 'static, { fn execute(&self, fut: Fut) { deno_unsync::spawn(fut); } }