diff options
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r-- | ext/websocket/lib.rs | 75 |
1 files changed, 52 insertions, 23 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 798856bc1..71aa66ff3 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -1,11 +1,10 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - +use crate::stream::WebSocketStream; +use bytes::Bytes; use deno_core::error::invalid_hostname; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; -use deno_core::StringOrBuffer; - use deno_core::url; use deno_core::AsyncRefCell; use deno_core::ByteString; @@ -15,7 +14,10 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; +use deno_core::StringOrBuffer; use deno_core::ZeroCopyBuf; +use deno_net::raw::take_network_stream_resource; +use deno_net::raw::NetworkStream; use deno_tls::create_client_config; use http::header::CONNECTION; use http::header::UPGRADE; @@ -24,9 +26,7 @@ use http::HeaderValue; use http::Method; use http::Request; use http::Uri; -use hyper::upgrade::Upgraded; use hyper::Body; -use hyper::Response; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; @@ -52,6 +52,7 @@ use fastwebsockets::Role; use fastwebsockets::WebSocket; pub use tokio_tungstenite; // Re-export tokio_tungstenite +mod stream; #[derive(Clone)] pub struct WsRootStore(pub Option<RootCertStore>); @@ -243,17 +244,21 @@ where let client = fastwebsockets::handshake::client(&LocalExecutor, request, socket); - let (stream, response): (WebSocket<Upgraded>, Response<Body>) = - if let Some(cancel_resource) = cancel_resource { - client.or_cancel(cancel_resource.0.to_owned()).await? - } else { - client.await - } - .map_err(|err| { - DomExceptionNetworkError::new(&format!( - "failed to connect to WebSocket: {err}" - )) - })?; + let (upgraded, response) = if let Some(cancel_resource) = cancel_resource { + client.or_cancel(cancel_resource.0.to_owned()).await? + } else { + client.await + } + .map_err(|err| { + DomExceptionNetworkError::new(&format!( + "failed to connect to WebSocket: {err}" + )) + })?; + + let inner = MaybeTlsStream::Plain(upgraded.into_inner()); + let stream = + WebSocketStream::new(stream::WsStreamKind::Tungstenite(inner), None); + let stream = WebSocket::after_handshake(stream, Role::Client); if let Some(cancel_rid) = cancel_handle { state.borrow_mut().resource_table.close(cancel_rid).ok(); @@ -294,7 +299,7 @@ pub enum MessageKind { } pub struct ServerWebSocket { - ws: AsyncRefCell<FragmentCollector<Upgraded>>, + ws: AsyncRefCell<FragmentCollector<WebSocketStream>>, closed: Rc<Cell<bool>>, } @@ -320,11 +325,19 @@ impl Resource for ServerWebSocket { "serverWebSocket".into() } } -pub async fn ws_create_server_stream( - state: &Rc<RefCell<OpState>>, - transport: Upgraded, + +pub fn ws_create_server_stream( + state: &mut OpState, + transport: NetworkStream, + read_buf: Bytes, ) -> Result<ResourceId, AnyError> { - let mut ws = WebSocket::after_handshake(transport, Role::Server); + let mut ws = WebSocket::after_handshake( + WebSocketStream::new( + stream::WsStreamKind::Network(transport), + Some(read_buf), + ), + Role::Server, + ); ws.set_writev(true); ws.set_auto_close(true); ws.set_auto_pong(true); @@ -334,12 +347,27 @@ pub async fn ws_create_server_stream( closed: Rc::new(Cell::new(false)), }; - let resource_table = &mut state.borrow_mut().resource_table; - let rid = resource_table.add(ws_resource); + let rid = state.resource_table.add(ws_resource); Ok(rid) } #[op] +pub fn op_ws_server_create( + state: &mut OpState, + conn: ResourceId, + extra_bytes: &[u8], +) -> Result<ResourceId, AnyError> { + let network_stream = + take_network_stream_resource(&mut state.resource_table, conn)?; + // Copying the extra bytes, but unlikely this will account for much + ws_create_server_stream( + state, + network_stream, + Bytes::from(extra_bytes.to_vec()), + ) +} + +#[op] pub async fn op_ws_send_binary( state: Rc<RefCell<OpState>>, rid: ResourceId, @@ -490,6 +518,7 @@ deno_core::extension!(deno_websocket, op_ws_next_event, op_ws_send_binary, op_ws_send_text, + op_ws_server_create, ], esm = [ "01_websocket.js", "02_websocketstream.js" ], options = { |