diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-11-01 15:11:01 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-01 21:11:01 +0000 |
commit | 42c426e7695a0037032d1ac5237830800eeaaed4 (patch) | |
tree | 242f9aa30187464f1b6314387654a76d8dc76fc0 /ext/websocket/lib.rs | |
parent | 587f2e0800a55e58b2579758d4278a4129b609c0 (diff) |
feat(ext/websocket): websockets over http2 (#21040)
Implements `WebSocket` over http/2. This requires a conformant http/2
server supporting the extended connect protocol.
Passes approximately 100 new WPT tests (mostly `?wpt_flags=h2` versions
of existing websockets APIs).
This is implemented as a fallback when http/1.1 fails, so a server that
supports both h1 and h2 WebSockets will still end up on the http/1.1
upgrade path.
The patch also cleas up the websockets handshake to split it up into
http, https+http1 and https+http2, making it a little less intertwined.
This uncovered a likely bug in the WPT test server:
https://github.com/web-platform-tests/wpt/issues/42896
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r-- | ext/websocket/lib.rs | 332 |
1 files changed, 228 insertions, 104 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index ac40b8304..c2599f6f6 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -1,16 +1,19 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use crate::stream::WebSocketStream; use bytes::Bytes; +use deno_core::anyhow::bail; use deno_core::error::invalid_hostname; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::TryFutureExt; use deno_core::op2; +use deno_core::unsync::spawn; use deno_core::url; use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::ByteString; -use deno_core::CancelFuture; use deno_core::CancelHandle; +use deno_core::CancelTryFuture; use deno_core::JsBuffer; use deno_core::OpState; use deno_core::RcRef; @@ -19,13 +22,16 @@ use deno_core::ResourceId; use deno_core::ToJsBuffer; use deno_net::raw::NetworkStream; use deno_tls::create_client_config; +use deno_tls::rustls::ClientConfig; use deno_tls::RootCertStoreProvider; +use deno_tls::SocketUse; use http::header::CONNECTION; use http::header::UPGRADE; use http::HeaderName; use http::HeaderValue; use http::Method; use http::Request; +use http::StatusCode; use http::Uri; use hyper::Body; use once_cell::sync::Lazy; @@ -146,66 +152,175 @@ pub struct CreateResponse { extensions: String, } -async fn handshake<S: AsyncRead + AsyncWrite + Send + Unpin + 'static>( - cancel_resource: Option<Rc<CancelHandle>>, +async fn handshake_websocket( + state: &Rc<RefCell<OpState>>, + uri: &Uri, + protocols: &str, + headers: Option<Vec<(ByteString, ByteString)>>, +) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> { + let mut request = Request::builder().method(Method::GET).uri( + uri + .path_and_query() + .ok_or(type_error("Missing path in url".to_string()))? + .as_str(), + ); + + let authority = uri.authority().unwrap().as_str(); + let host = authority + .find('@') + .map(|idx| authority.split_at(idx + 1).1) + .unwrap_or_else(|| authority); + request = request + .header("Host", host) + .header(UPGRADE, "websocket") + .header(CONNECTION, "Upgrade") + .header( + "Sec-WebSocket-Key", + fastwebsockets::handshake::generate_key(), + ); + + let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone(); + request = + populate_common_request_headers(request, &user_agent, protocols, &headers)?; + + let request = request.body(Body::empty())?; + let domain = &uri.host().unwrap().to_string(); + let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { + Some("wss") => 443, + Some("ws") => 80, + _ => unreachable!(), + }); + let addr = format!("{domain}:{port}"); + + let res = match uri.scheme_str() { + Some("ws") => handshake_http1_ws(request, &addr).await?, + Some("wss") => { + match handshake_http1_wss(state, request, domain, &addr).await { + Ok(res) => res, + Err(_) => { + handshake_http2_wss( + state, + uri, + authority, + &user_agent, + protocols, + domain, + &headers, + &addr, + ) + .await? + } + } + } + _ => unreachable!(), + }; + Ok(res) +} + +async fn handshake_http1_ws( request: Request<Body>, - socket: S, -) -> Result<(WebSocket<WebSocketStream>, http::Response<Body>), AnyError> { - let client = - fastwebsockets::handshake::client(&LocalExecutor, request, socket); + addr: &String, +) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> { + let tcp_socket = TcpStream::connect(addr).await?; + handshake_connection(request, tcp_socket).await +} - let (upgraded, response) = if let Some(cancel_resource) = cancel_resource { - client.or_cancel(cancel_resource).await? - } else { - client.await +async fn handshake_http1_wss( + state: &Rc<RefCell<OpState>>, + request: Request<Body>, + domain: &str, + addr: &str, +) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> { + let tcp_socket = TcpStream::connect(addr).await?; + let tls_config = create_ws_client_config(state, SocketUse::Http1Only)?; + let dnsname = + ServerName::try_from(domain).map_err(|_| invalid_hostname(domain))?; + let mut tls_connector = TlsStream::new_client_side( + tcp_socket, + tls_config.into(), + dnsname, + NonZeroUsize::new(65536), + ); + // If we can bail on an http/1.1 ALPN mismatch here, we can avoid doing extra work + tls_connector.handshake().await?; + handshake_connection(request, tls_connector).await +} + +#[allow(clippy::too_many_arguments)] +async fn handshake_http2_wss( + state: &Rc<RefCell<OpState>>, + uri: &Uri, + authority: &str, + user_agent: &str, + protocols: &str, + domain: &str, + headers: &Option<Vec<(ByteString, ByteString)>>, + addr: &str, +) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> { + let tcp_socket = TcpStream::connect(addr).await?; + let tls_config = create_ws_client_config(state, SocketUse::Http2Only)?; + let dnsname = + ServerName::try_from(domain).map_err(|_| invalid_hostname(domain))?; + // We need to better expose the underlying errors here + let mut tls_connector = + TlsStream::new_client_side(tcp_socket, tls_config.into(), dnsname, None); + let handshake = tls_connector.handshake().await?; + if handshake.alpn.is_none() { + bail!("Didn't receive h2 alpn, aborting connection"); } - .map_err(|err| { - DomExceptionNetworkError::new(&format!( - "failed to connect to WebSocket: {err}" - )) - })?; + let h2 = h2::client::Builder::new(); + let (mut send, conn) = h2.handshake::<_, Bytes>(tls_connector).await?; + spawn(conn); + let mut request = Request::builder(); + request = request.method(Method::CONNECT); + let uri = Uri::builder() + .authority(authority) + .path_and_query(uri.path_and_query().unwrap().as_str()) + .scheme("https") + .build()?; + request = request.uri(uri); + request = + populate_common_request_headers(request, user_agent, protocols, headers)?; + request = request.extension(h2::ext::Protocol::from("websocket")); + let (resp, send) = send.send_request(request.body(())?, false)?; + let resp = resp.await?; + if resp.status() != StatusCode::OK { + bail!("Invalid status code: {}", resp.status()); + } + let (http::response::Parts { headers, .. }, recv) = resp.into_parts(); + let mut stream = WebSocket::after_handshake( + WebSocketStream::new(stream::WsStreamKind::H2(send, recv), None), + Role::Client, + ); + // We currently don't support vectored writes in the H2 streams + stream.set_writev(false); + // TODO(mmastrac): we should be able to use a zero masking key over HTTPS + // stream.set_auto_apply_mask(false); + Ok((stream, headers)) +} + +async fn handshake_connection< + S: AsyncRead + AsyncWrite + Send + Unpin + 'static, +>( + request: Request<Body>, + socket: S, +) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> { + let (upgraded, response) = + fastwebsockets::handshake::client(&LocalExecutor, request, socket).await?; let upgraded = upgraded.into_inner(); let stream = WebSocketStream::new(stream::WsStreamKind::Upgraded(upgraded), None); let stream = WebSocket::after_handshake(stream, Role::Client); - Ok((stream, response)) + Ok((stream, response.into_parts().0.headers)) } -#[op2(async)] -#[serde] -pub async fn op_ws_create<WP>( - state: Rc<RefCell<OpState>>, - #[string] api_name: String, - #[string] url: String, - #[string] protocols: String, - #[smi] cancel_handle: Option<ResourceId>, - #[serde] headers: Option<Vec<(ByteString, ByteString)>>, -) -> Result<CreateResponse, AnyError> -where - WP: WebSocketPermissions + 'static, -{ - { - let mut s = state.borrow_mut(); - s.borrow_mut::<WP>() - .check_net_url(&url::Url::parse(&url)?, &api_name) - .expect( - "Permission check should have been done in op_ws_check_permission", - ); - } - - let cancel_resource = if let Some(cancel_rid) = cancel_handle { - let r = state - .borrow_mut() - .resource_table - .get::<WsCancelResource>(cancel_rid)?; - Some(r.0.clone()) - } else { - None - }; - - let unsafely_ignore_certificate_errors = state +pub fn create_ws_client_config( + state: &Rc<RefCell<OpState>>, + socket_use: SocketUse, +) -> Result<ClientConfig, AnyError> { + let unsafely_ignore_certificate_errors: Option<Vec<String>> = state .borrow() .try_borrow::<UnsafelyIgnoreCertificateErrors>() .and_then(|it| it.0.clone()); @@ -213,29 +328,25 @@ where .borrow() .borrow::<WsRootStoreProvider>() .get_or_try_init()?; - let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone(); - let uri: Uri = url.parse()?; - let mut request = Request::builder().method(Method::GET).uri( - uri - .path_and_query() - .ok_or(type_error("Missing path in url".to_string()))? - .as_str(), - ); - let authority = uri.authority().unwrap().as_str(); - let host = authority - .find('@') - .map(|idx| authority.split_at(idx + 1).1) - .unwrap_or_else(|| authority); + create_client_config( + root_cert_store, + vec![], + unsafely_ignore_certificate_errors, + None, + socket_use, + ) +} + +/// Headers common to both http/1.1 and h2 requests. +fn populate_common_request_headers( + mut request: http::request::Builder, + user_agent: &str, + protocols: &str, + headers: &Option<Vec<(ByteString, ByteString)>>, +) -> Result<http::request::Builder, AnyError> { request = request .header("User-Agent", user_agent) - .header("Host", host) - .header(UPGRADE, "websocket") - .header(CONNECTION, "Upgrade") - .header( - "Sec-WebSocket-Key", - fastwebsockets::handshake::generate_key(), - ) .header("Sec-WebSocket-Version", "13"); if !protocols.is_empty() { @@ -244,9 +355,9 @@ where if let Some(headers) = headers { for (key, value) in headers { - let name = HeaderName::from_bytes(&key) + let name = HeaderName::from_bytes(key) .map_err(|err| type_error(err.to_string()))?; - let v = HeaderValue::from_bytes(&value) + let v = HeaderValue::from_bytes(value) .map_err(|err| type_error(err.to_string()))?; let is_disallowed_header = matches!( @@ -265,40 +376,54 @@ where } } } + Ok(request) +} - let request = request.body(Body::empty())?; - let domain = &uri.host().unwrap().to_string(); - let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { - Some("wss") => 443, - Some("ws") => 80, - _ => unreachable!(), - }); - let addr = format!("{domain}:{port}"); - let tcp_socket = TcpStream::connect(addr).await?; - - let (stream, response) = match uri.scheme_str() { - Some("ws") => handshake(cancel_resource, request, tcp_socket).await?, - Some("wss") => { - let tls_config = create_client_config( - root_cert_store, - vec![], - unsafely_ignore_certificate_errors, - None, - )?; - let dnsname = ServerName::try_from(domain.as_str()) - .map_err(|_| invalid_hostname(domain))?; - let mut tls_connector = TlsStream::new_client_side( - tcp_socket, - tls_config.into(), - dnsname, - NonZeroUsize::new(65536), +#[op2(async)] +#[serde] +pub async fn op_ws_create<WP>( + state: Rc<RefCell<OpState>>, + #[string] api_name: String, + #[string] url: String, + #[string] protocols: String, + #[smi] cancel_handle: Option<ResourceId>, + #[serde] headers: Option<Vec<(ByteString, ByteString)>>, +) -> Result<CreateResponse, AnyError> +where + WP: WebSocketPermissions + 'static, +{ + { + let mut s = state.borrow_mut(); + s.borrow_mut::<WP>() + .check_net_url(&url::Url::parse(&url)?, &api_name) + .expect( + "Permission check should have been done in op_ws_check_permission", ); - let _hs = tls_connector.handshake().await?; - handshake(cancel_resource, request, tls_connector).await? - } - _ => unreachable!(), + } + + let cancel_resource = if let Some(cancel_rid) = cancel_handle { + let r = state + .borrow_mut() + .resource_table + .get::<WsCancelResource>(cancel_rid)?; + Some(r.0.clone()) + } else { + None }; + let uri: Uri = url.parse()?; + + let handshake = handshake_websocket(&state, &uri, &protocols, headers) + .map_err(|err| { + AnyError::from(DomExceptionNetworkError::new(&format!( + "failed to connect to WebSocket: {err}" + ))) + }); + let (stream, response) = match cancel_resource { + Some(rc) => handshake.try_or_cancel(rc).await, + None => handshake.await, + }?; + if let Some(cancel_rid) = cancel_handle { if let Ok(res) = state.borrow_mut().resource_table.take_any(cancel_rid) { res.close(); @@ -308,12 +433,11 @@ where let mut state = state.borrow_mut(); let rid = state.resource_table.add(ServerWebSocket::new(stream)); - let protocol = match response.headers().get("Sec-WebSocket-Protocol") { + let protocol = match response.get("Sec-WebSocket-Protocol") { Some(header) => header.to_str().unwrap(), None => "", }; let extensions = response - .headers() .get_all("Sec-WebSocket-Extensions") .iter() .map(|header| header.to_str().unwrap()) |