summaryrefslogtreecommitdiff
path: root/ext/websocket/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r--ext/websocket/lib.rs75
1 files changed, 52 insertions, 23 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index 798856bc1..71aa66ff3 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -1,11 +1,10 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
+use crate::stream::WebSocketStream;
+use bytes::Bytes;
use deno_core::error::invalid_hostname;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
-use deno_core::StringOrBuffer;
-
use deno_core::url;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
@@ -15,7 +14,10 @@ use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
+use deno_core::StringOrBuffer;
use deno_core::ZeroCopyBuf;
+use deno_net::raw::take_network_stream_resource;
+use deno_net::raw::NetworkStream;
use deno_tls::create_client_config;
use http::header::CONNECTION;
use http::header::UPGRADE;
@@ -24,9 +26,7 @@ use http::HeaderValue;
use http::Method;
use http::Request;
use http::Uri;
-use hyper::upgrade::Upgraded;
use hyper::Body;
-use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
@@ -52,6 +52,7 @@ use fastwebsockets::Role;
use fastwebsockets::WebSocket;
pub use tokio_tungstenite; // Re-export tokio_tungstenite
+mod stream;
#[derive(Clone)]
pub struct WsRootStore(pub Option<RootCertStore>);
@@ -243,17 +244,21 @@ where
let client =
fastwebsockets::handshake::client(&LocalExecutor, request, socket);
- let (stream, response): (WebSocket<Upgraded>, Response<Body>) =
- if let Some(cancel_resource) = cancel_resource {
- client.or_cancel(cancel_resource.0.to_owned()).await?
- } else {
- client.await
- }
- .map_err(|err| {
- DomExceptionNetworkError::new(&format!(
- "failed to connect to WebSocket: {err}"
- ))
- })?;
+ let (upgraded, response) = if let Some(cancel_resource) = cancel_resource {
+ client.or_cancel(cancel_resource.0.to_owned()).await?
+ } else {
+ client.await
+ }
+ .map_err(|err| {
+ DomExceptionNetworkError::new(&format!(
+ "failed to connect to WebSocket: {err}"
+ ))
+ })?;
+
+ let inner = MaybeTlsStream::Plain(upgraded.into_inner());
+ let stream =
+ WebSocketStream::new(stream::WsStreamKind::Tungstenite(inner), None);
+ let stream = WebSocket::after_handshake(stream, Role::Client);
if let Some(cancel_rid) = cancel_handle {
state.borrow_mut().resource_table.close(cancel_rid).ok();
@@ -294,7 +299,7 @@ pub enum MessageKind {
}
pub struct ServerWebSocket {
- ws: AsyncRefCell<FragmentCollector<Upgraded>>,
+ ws: AsyncRefCell<FragmentCollector<WebSocketStream>>,
closed: Rc<Cell<bool>>,
}
@@ -320,11 +325,19 @@ impl Resource for ServerWebSocket {
"serverWebSocket".into()
}
}
-pub async fn ws_create_server_stream(
- state: &Rc<RefCell<OpState>>,
- transport: Upgraded,
+
+pub fn ws_create_server_stream(
+ state: &mut OpState,
+ transport: NetworkStream,
+ read_buf: Bytes,
) -> Result<ResourceId, AnyError> {
- let mut ws = WebSocket::after_handshake(transport, Role::Server);
+ let mut ws = WebSocket::after_handshake(
+ WebSocketStream::new(
+ stream::WsStreamKind::Network(transport),
+ Some(read_buf),
+ ),
+ Role::Server,
+ );
ws.set_writev(true);
ws.set_auto_close(true);
ws.set_auto_pong(true);
@@ -334,12 +347,27 @@ pub async fn ws_create_server_stream(
closed: Rc::new(Cell::new(false)),
};
- let resource_table = &mut state.borrow_mut().resource_table;
- let rid = resource_table.add(ws_resource);
+ let rid = state.resource_table.add(ws_resource);
Ok(rid)
}
#[op]
+pub fn op_ws_server_create(
+ state: &mut OpState,
+ conn: ResourceId,
+ extra_bytes: &[u8],
+) -> Result<ResourceId, AnyError> {
+ let network_stream =
+ take_network_stream_resource(&mut state.resource_table, conn)?;
+ // Copying the extra bytes, but unlikely this will account for much
+ ws_create_server_stream(
+ state,
+ network_stream,
+ Bytes::from(extra_bytes.to_vec()),
+ )
+}
+
+#[op]
pub async fn op_ws_send_binary(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
@@ -490,6 +518,7 @@ deno_core::extension!(deno_websocket,
op_ws_next_event,
op_ws_send_binary,
op_ws_send_text,
+ op_ws_server_create,
],
esm = [ "01_websocket.js", "02_websocketstream.js" ],
options = {