diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-09-15 21:51:25 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-15 21:51:25 +0200 |
commit | 5a1505db67d0326bf37b765c8a566584b44a2c1a (patch) | |
tree | 4f1cadc6bc85f8538cc0c21ebc7354e404d48bcf /test_util/src/lib.rs | |
parent | 11f0ccf8052065e37f92d8ba43e9624fcd3b9814 (diff) |
feat(ext/node): http2.connect() API (#19671)
This commit improves compatibility of "node:http2" module by polyfilling
"connect" method and "ClientHttp2Session" class. Basic operations like
streaming, header and trailer handling are working correctly.
Refing/unrefing is still a TODO and "npm:grpc-js/grpc" is not yet working
correctly.
---------
Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'test_util/src/lib.rs')
-rw-r--r-- | test_util/src/lib.rs | 131 |
1 files changed, 119 insertions, 12 deletions
diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs index 0600867d7..635520b44 100644 --- a/test_util/src/lib.rs +++ b/test_util/src/lib.rs @@ -7,6 +7,7 @@ use futures::FutureExt; use futures::Stream; use futures::StreamExt; use hyper::header::HeaderValue; +use hyper::http; use hyper::server::Server; use hyper::service::make_service_fn; use hyper::service::service_fn; @@ -57,6 +58,7 @@ use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_rustls::rustls; +use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; use url::Url; @@ -102,6 +104,8 @@ const WS_PORT: u16 = 4242; const WSS_PORT: u16 = 4243; const WS_CLOSE_PORT: u16 = 4244; const WS_PING_PORT: u16 = 4245; +const H2_GRPC_PORT: u16 = 4246; +const H2S_GRPC_PORT: u16 = 4247; pub const PERMISSION_VARIANTS: [&str; 5] = ["read", "write", "env", "net", "run"]; @@ -1664,17 +1668,7 @@ async fn wrap_https_h1_only_tls_server() { async fn wrap_https_h2_only_tls_server() { let main_server_https_addr = SocketAddr::from(([127, 0, 0, 1], H2_ONLY_TLS_PORT)); - let cert_file = "tls/localhost.crt"; - let key_file = "tls/localhost.key"; - let ca_cert_file = "tls/RootCA.pem"; - let tls_config = get_tls_config( - cert_file, - key_file, - ca_cert_file, - SupportedHttpVersions::Http2Only, - ) - .await - .unwrap(); + let tls_config = create_tls_server_config().await; loop { let tcp = TcpListener::bind(&main_server_https_addr) .await @@ -1707,6 +1701,20 @@ async fn wrap_https_h2_only_tls_server() { } } +async fn create_tls_server_config() -> Arc<rustls::ServerConfig> { + let cert_file = "tls/localhost.crt"; + let key_file = "tls/localhost.key"; + let ca_cert_file = "tls/RootCA.pem"; + get_tls_config( + cert_file, + key_file, + ca_cert_file, + SupportedHttpVersions::Http2Only, + ) + .await + .unwrap() +} + async fn wrap_https_h1_only_server() { let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], H1_ONLY_PORT)); @@ -1729,6 +1737,103 @@ async fn wrap_https_h2_only_server() { let _ = main_server_http.await; } +async fn h2_grpc_server() { + let addr = SocketAddr::from(([127, 0, 0, 1], H2_GRPC_PORT)); + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); + + let addr_tls = SocketAddr::from(([127, 0, 0, 1], H2S_GRPC_PORT)); + let listener_tls = tokio::net::TcpListener::bind(addr_tls).await.unwrap(); + let tls_config = create_tls_server_config().await; + + async fn serve(socket: TcpStream) -> Result<(), anyhow::Error> { + let mut connection = h2::server::handshake(socket).await?; + + while let Some(result) = connection.accept().await { + let (request, respond) = result?; + tokio::spawn(async move { + let _ = handle_request(request, respond).await; + }); + } + + Ok(()) + } + + async fn serve_tls( + socket: TlsStream<TcpStream>, + ) -> Result<(), anyhow::Error> { + let mut connection = h2::server::handshake(socket).await?; + + while let Some(result) = connection.accept().await { + let (request, respond) = result?; + tokio::spawn(async move { + let _ = handle_request(request, respond).await; + }); + } + + Ok(()) + } + + async fn handle_request( + mut request: http::Request<h2::RecvStream>, + mut respond: h2::server::SendResponse<bytes::Bytes>, + ) -> Result<(), anyhow::Error> { + let body = request.body_mut(); + while let Some(data) = body.data().await { + let data = data?; + let _ = body.flow_control().release_capacity(data.len()); + } + + let maybe_recv_trailers = body.trailers().await?; + + let response = http::Response::new(()); + let mut send = respond.send_response(response, false)?; + send.send_data(bytes::Bytes::from_static(b"hello "), false)?; + send.send_data(bytes::Bytes::from_static(b"world\n"), false)?; + let mut trailers = http::HeaderMap::new(); + trailers.insert( + http::HeaderName::from_static("abc"), + HeaderValue::from_static("def"), + ); + trailers.insert( + http::HeaderName::from_static("opr"), + HeaderValue::from_static("stv"), + ); + if let Some(recv_trailers) = maybe_recv_trailers { + for (key, value) in recv_trailers { + trailers.insert(key.unwrap(), value); + } + } + send.send_trailers(trailers)?; + + Ok(()) + } + + let http = tokio::spawn(async move { + loop { + if let Ok((socket, _peer_addr)) = listener.accept().await { + tokio::spawn(async move { + let _ = serve(socket).await; + }); + } + } + }); + + let https = tokio::spawn(async move { + loop { + if let Ok((socket, _peer_addr)) = listener_tls.accept().await { + let tls_acceptor = TlsAcceptor::from(tls_config.clone()); + let tls = tls_acceptor.accept(socket).await.unwrap(); + tokio::spawn(async move { + let _ = serve_tls(tls).await; + }); + } + } + }); + + http.await.unwrap(); + https.await.unwrap(); +} + async fn wrap_client_auth_https_server() { let main_server_https_addr = SocketAddr::from(([127, 0, 0, 1], HTTPS_CLIENT_AUTH_PORT)); @@ -1821,6 +1926,7 @@ pub async fn run_all_servers() { let h2_only_server_tls_fut = wrap_https_h2_only_tls_server(); let h1_only_server_fut = wrap_https_h1_only_server(); let h2_only_server_fut = wrap_https_h2_only_server(); + let h2_grpc_server_fut = h2_grpc_server(); let mut server_fut = async { futures::join!( @@ -1843,7 +1949,8 @@ pub async fn run_all_servers() { h1_only_server_tls_fut, h2_only_server_tls_fut, h1_only_server_fut, - h2_only_server_fut + h2_only_server_fut, + h2_grpc_server_fut, ) } .boxed(); |