diff options
Diffstat (limited to 'test_util/src/servers/ws.rs')
-rw-r--r-- | test_util/src/servers/ws.rs | 268 |
1 files changed, 0 insertions, 268 deletions
diff --git a/test_util/src/servers/ws.rs b/test_util/src/servers/ws.rs deleted file mode 100644 index 815119b6a..000000000 --- a/test_util/src/servers/ws.rs +++ /dev/null @@ -1,268 +0,0 @@ -// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. - -use anyhow::anyhow; -use bytes::Bytes; -use fastwebsockets::FragmentCollector; -use fastwebsockets::Frame; -use fastwebsockets::OpCode; -use fastwebsockets::Role; -use fastwebsockets::WebSocket; -use futures::future::join3; -use futures::future::poll_fn; -use futures::Future; -use futures::StreamExt; -use h2::server::Handshake; -use h2::server::SendResponse; -use h2::Reason; -use h2::RecvStream; -use hyper::upgrade::Upgraded; -use hyper::Method; -use hyper::Request; -use hyper::Response; -use hyper::StatusCode; -use hyper_util::rt::TokioIo; -use pretty_assertions::assert_eq; -use std::pin::Pin; -use std::result::Result; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWriteExt; - -use super::get_tcp_listener_stream; -use super::get_tls_listener_stream; -use super::SupportedHttpVersions; - -pub async fn run_ws_server(port: u16) { - let mut tcp = get_tcp_listener_stream("ws", port).await; - while let Some(Ok(stream)) = tcp.next().await { - spawn_ws_server(stream, |ws| Box::pin(echo_websocket_handler(ws))); - } -} - -pub async fn run_ws_ping_server(port: u16) { - let mut tcp = get_tcp_listener_stream("ws (ping)", port).await; - while let Some(Ok(stream)) = tcp.next().await { - spawn_ws_server(stream, |ws| Box::pin(ping_websocket_handler(ws))); - } -} - -pub async fn run_wss_server(port: u16) { - let mut tls = get_tls_listener_stream("wss", port, Default::default()).await; - while let Some(Ok(tls_stream)) = tls.next().await { - tokio::spawn(async move { - spawn_ws_server(tls_stream, |ws| Box::pin(echo_websocket_handler(ws))); - }); - } -} - -pub async fn run_ws_close_server(port: u16) { - let mut tcp = get_tcp_listener_stream("ws (close)", port).await; - while let Some(Ok(stream)) = tcp.next().await { - spawn_ws_server(stream, |ws| Box::pin(close_websocket_handler(ws))); - } -} - -pub async fn run_wss2_server(port: u16) { - let mut tls = get_tls_listener_stream( - "wss2 (tls)", - port, - SupportedHttpVersions::Http2Only, - ) - .await; - while let Some(Ok(tls)) = tls.next().await { - tokio::spawn(async move { - let mut h2 = h2::server::Builder::new(); - h2.enable_connect_protocol(); - // Using Bytes is pretty alloc-heavy but this is a test server - let server: Handshake<_, Bytes> = h2.handshake(tls); - let mut server = match server.await { - Ok(server) => server, - Err(e) => { - println!("Failed to handshake h2: {e:?}"); - return; - } - }; - loop { - let Some(conn) = server.accept().await else { - break; - }; - let (recv, send) = match conn { - Ok(conn) => conn, - Err(e) => { - println!("Failed to accept a connection: {e:?}"); - break; - } - }; - tokio::spawn(handle_wss_stream(recv, send)); - } - }); - } -} - -async fn echo_websocket_handler( - ws: fastwebsockets::WebSocket<TokioIo<Upgraded>>, -) -> Result<(), anyhow::Error> { - let mut ws = FragmentCollector::new(ws); - - loop { - let frame = ws.read_frame().await.unwrap(); - match frame.opcode { - OpCode::Close => break, - OpCode::Text | OpCode::Binary => { - ws.write_frame(frame).await.unwrap(); - } - _ => {} - } - } - - Ok(()) -} - -type WsHandler = - fn( - fastwebsockets::WebSocket<TokioIo<Upgraded>>, - ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send>>; - -fn spawn_ws_server<S>(stream: S, handler: WsHandler) -where - S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, -{ - let service = hyper::service::service_fn( - move |mut req: http::Request<hyper::body::Incoming>| async move { - let (response, upgrade_fut) = fastwebsockets::upgrade::upgrade(&mut req) - .map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))?; - - tokio::spawn(async move { - let ws = upgrade_fut - .await - .map_err(|e| anyhow!("Error upgrading websocket connection: {}", e)) - .unwrap(); - - if let Err(e) = handler(ws).await { - eprintln!("Error in websocket connection: {}", e); - } - }); - - Ok::<_, anyhow::Error>(response) - }, - ); - - let io = TokioIo::new(stream); - tokio::spawn(async move { - let conn = hyper::server::conn::http1::Builder::new() - .serve_connection(io, service) - .with_upgrades(); - - if let Err(e) = conn.await { - eprintln!("websocket server error: {e:?}"); - } - }); -} - -async fn handle_wss_stream( - recv: Request<RecvStream>, - mut send: SendResponse<Bytes>, -) -> Result<(), h2::Error> { - if recv.method() != Method::CONNECT { - eprintln!("wss2: refusing non-CONNECT stream"); - send.send_reset(Reason::REFUSED_STREAM); - return Ok(()); - } - let Some(protocol) = recv.extensions().get::<h2::ext::Protocol>() else { - eprintln!("wss2: refusing no-:protocol stream"); - send.send_reset(Reason::REFUSED_STREAM); - return Ok(()); - }; - if protocol.as_str() != "websocket" && protocol.as_str() != "WebSocket" { - eprintln!("wss2: refusing non-websocket stream"); - send.send_reset(Reason::REFUSED_STREAM); - return Ok(()); - } - let mut body = recv.into_body(); - let mut response = Response::new(()); - *response.status_mut() = StatusCode::OK; - let mut resp = send.send_response(response, false)?; - // Use a duplex stream to talk to fastwebsockets because it's just faster to implement - let (a, b) = tokio::io::duplex(65536); - let f1 = tokio::spawn(tokio::task::unconstrained(async move { - let ws = WebSocket::after_handshake(a, Role::Server); - let mut ws = FragmentCollector::new(ws); - loop { - let frame = ws.read_frame().await.unwrap(); - if frame.opcode == OpCode::Close { - break; - } - ws.write_frame(frame).await.unwrap(); - } - })); - let (mut br, mut bw) = tokio::io::split(b); - let f2 = tokio::spawn(tokio::task::unconstrained(async move { - loop { - let Some(Ok(data)) = poll_fn(|cx| body.poll_data(cx)).await else { - return; - }; - body.flow_control().release_capacity(data.len()).unwrap(); - let Ok(_) = bw.write_all(&data).await else { - break; - }; - } - })); - let f3 = tokio::spawn(tokio::task::unconstrained(async move { - loop { - let mut buf = [0; 65536]; - let n = br.read(&mut buf).await.unwrap(); - if n == 0 { - break; - } - resp.reserve_capacity(n); - poll_fn(|cx| resp.poll_capacity(cx)).await; - resp - .send_data(Bytes::copy_from_slice(&buf[0..n]), false) - .unwrap(); - } - resp.send_data(Bytes::new(), true).unwrap(); - })); - _ = join3(f1, f2, f3).await; - Ok(()) -} - -async fn close_websocket_handler( - ws: fastwebsockets::WebSocket<TokioIo<Upgraded>>, -) -> Result<(), anyhow::Error> { - let mut ws = FragmentCollector::new(ws); - - ws.write_frame(Frame::close_raw(vec![].into())) - .await - .unwrap(); - - Ok(()) -} - -async fn ping_websocket_handler( - ws: fastwebsockets::WebSocket<TokioIo<Upgraded>>, -) -> Result<(), anyhow::Error> { - let mut ws = FragmentCollector::new(ws); - - for i in 0..9 { - ws.write_frame(Frame::new(true, OpCode::Ping, None, vec![].into())) - .await - .unwrap(); - - let frame = ws.read_frame().await.unwrap(); - assert_eq!(frame.opcode, OpCode::Pong); - assert!(frame.payload.is_empty()); - - ws.write_frame(Frame::text( - format!("hello {}", i).as_bytes().to_vec().into(), - )) - .await - .unwrap(); - - let frame = ws.read_frame().await.unwrap(); - assert_eq!(frame.opcode, OpCode::Text); - assert_eq!(frame.payload, format!("hello {}", i).as_bytes()); - } - - ws.write_frame(Frame::close(1000, b"")).await.unwrap(); - - Ok(()) -} |