summaryrefslogtreecommitdiff
path: root/ext/websocket
diff options
context:
space:
mode:
Diffstat (limited to 'ext/websocket')
-rw-r--r--ext/websocket/01_websocket.js52
-rw-r--r--ext/websocket/Cargo.toml1
-rw-r--r--ext/websocket/lib.rs39
-rw-r--r--ext/websocket/server.rs191
4 files changed, 244 insertions, 39 deletions
diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js
index 5105df24d..2b8ee59a9 100644
--- a/ext/websocket/01_websocket.js
+++ b/ext/websocket/01_websocket.js
@@ -78,6 +78,11 @@ webidl.converters["WebSocketSend"] = (V, opts) => {
return webidl.converters["USVString"](V, opts);
};
+/** role */
+const SERVER = 0;
+const CLIENT = 1;
+
+/** state */
const CONNECTING = 0;
const OPEN = 1;
const CLOSING = 2;
@@ -86,6 +91,7 @@ const CLOSED = 3;
const _readyState = Symbol("[[readyState]]");
const _url = Symbol("[[url]]");
const _rid = Symbol("[[rid]]");
+const _role = Symbol("[[role]]");
const _extensions = Symbol("[[extensions]]");
const _protocol = Symbol("[[protocol]]");
const _binaryType = Symbol("[[binaryType]]");
@@ -98,6 +104,7 @@ const _idleTimeoutTimeout = Symbol("[[idleTimeoutTimeout]]");
const _serverHandleIdleTimeout = Symbol("[[serverHandleIdleTimeout]]");
class WebSocket extends EventTarget {
[_rid];
+ [_role];
[_readyState] = CONNECTING;
get readyState() {
@@ -203,6 +210,7 @@ class WebSocket extends EventTarget {
}
this[_url] = wsURL.href;
+ this[_role] = CLIENT;
ops.op_ws_check_permission_and_cancel_handle(
"WebSocket.abort()",
@@ -312,7 +320,13 @@ class WebSocket extends EventTarget {
const sendTypedArray = (view, byteLength) => {
this[_bufferedAmount] += byteLength;
PromisePrototypeThen(
- core.opAsync2("op_ws_send_binary", this[_rid], view),
+ core.opAsync2(
+ this[_role] === SERVER
+ ? "op_server_ws_send_binary"
+ : "op_ws_send_binary",
+ this[_rid],
+ view,
+ ),
() => {
this[_bufferedAmount] -= byteLength;
},
@@ -346,7 +360,11 @@ class WebSocket extends EventTarget {
const d = core.encode(string);
this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d);
PromisePrototypeThen(
- core.opAsync2("op_ws_send_text", this[_rid], string),
+ core.opAsync2(
+ this[_role] === SERVER ? "op_server_ws_send_text" : "op_ws_send_text",
+ this[_rid],
+ string,
+ ),
() => {
this[_bufferedAmount] -= TypedArrayPrototypeGetByteLength(d);
},
@@ -401,7 +419,12 @@ class WebSocket extends EventTarget {
this[_readyState] = CLOSING;
PromisePrototypeCatch(
- core.opAsync("op_ws_close", this[_rid], code, reason),
+ core.opAsync(
+ this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
+ this[_rid],
+ code,
+ reason,
+ ),
(err) => {
this[_readyState] = CLOSED;
@@ -422,7 +445,7 @@ class WebSocket extends EventTarget {
async [_eventLoop]() {
while (this[_readyState] !== CLOSED) {
const { 0: kind, 1: value } = await core.opAsync2(
- "op_ws_next_event",
+ this[_role] === SERVER ? "op_server_ws_next_event" : "op_ws_next_event",
this[_rid],
);
@@ -489,7 +512,7 @@ class WebSocket extends EventTarget {
if (prevState === OPEN) {
try {
await core.opAsync(
- "op_ws_close",
+ this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
this[_rid],
code,
value,
@@ -517,14 +540,23 @@ class WebSocket extends EventTarget {
clearTimeout(this[_idleTimeoutTimeout]);
this[_idleTimeoutTimeout] = setTimeout(async () => {
if (this[_readyState] === OPEN) {
- await core.opAsync("op_ws_send", this[_rid], {
- kind: "ping",
- });
+ await core.opAsync(
+ this[_role] === SERVER ? "op_server_ws_send" : "op_ws_send",
+ this[_rid],
+ {
+ kind: "ping",
+ },
+ );
this[_idleTimeoutTimeout] = setTimeout(async () => {
if (this[_readyState] === OPEN) {
this[_readyState] = CLOSING;
const reason = "No response from ping frame.";
- await core.opAsync("op_ws_close", this[_rid], 1001, reason);
+ await core.opAsync(
+ this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
+ this[_rid],
+ 1001,
+ reason,
+ );
this[_readyState] = CLOSED;
const errEvent = new ErrorEvent("error", {
@@ -594,7 +626,9 @@ export {
_protocol,
_readyState,
_rid,
+ _role,
_server,
_serverHandleIdleTimeout,
+ SERVER,
WebSocket,
};
diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml
index 0432c78f8..cf77cccf3 100644
--- a/ext/websocket/Cargo.toml
+++ b/ext/websocket/Cargo.toml
@@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
deno_core.workspace = true
deno_tls.workspace = true
+fastwebsockets = "0.1.0"
http.workspace = true
hyper.workspace = true
serde.workspace = true
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index 1c586b383..71f176070 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -47,13 +47,16 @@ use tokio_tungstenite::tungstenite::handshake::client::Response;
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::protocol::Message;
-use tokio_tungstenite::tungstenite::protocol::Role;
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::WebSocketStream;
pub use tokio_tungstenite; // Re-export tokio_tungstenite
+mod server;
+
+pub use server::ws_create_server_stream;
+
#[derive(Clone)]
pub struct WsRootStore(pub Option<RootCertStore>);
#[derive(Clone)]
@@ -89,35 +92,6 @@ pub enum WebSocketStreamType {
pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {}
-pub async fn ws_create_server_stream(
- state: &Rc<RefCell<OpState>>,
- transport: Pin<Box<dyn Upgraded>>,
-) -> Result<ResourceId, AnyError> {
- let ws_stream = WebSocketStream::from_raw_socket(
- transport,
- Role::Server,
- Some(WebSocketConfig {
- max_message_size: Some(128 << 20),
- max_frame_size: Some(32 << 20),
- ..Default::default()
- }),
- )
- .await;
- let (ws_tx, ws_rx) = ws_stream.split();
-
- let ws_resource = WsStreamResource {
- stream: WebSocketStreamType::Server {
- tx: AsyncRefCell::new(ws_tx),
- rx: AsyncRefCell::new(ws_rx),
- },
- cancel: Default::default(),
- };
-
- let resource_table = &mut state.borrow_mut().resource_table;
- let rid = resource_table.add(ws_resource);
- Ok(rid)
-}
-
pub struct WsStreamResource {
pub stream: WebSocketStreamType,
// When a `WsStreamResource` resource is closed, all pending 'read' ops are
@@ -549,6 +523,11 @@ deno_core::extension!(deno_websocket,
op_ws_next_event,
op_ws_send_binary,
op_ws_send_text,
+ server::op_server_ws_send,
+ server::op_server_ws_close,
+ server::op_server_ws_next_event,
+ server::op_server_ws_send_binary,
+ server::op_server_ws_send_text,
],
esm = [ "01_websocket.js", "02_websocketstream.js" ],
options = {
diff --git a/ext/websocket/server.rs b/ext/websocket/server.rs
new file mode 100644
index 000000000..4b47c88c8
--- /dev/null
+++ b/ext/websocket/server.rs
@@ -0,0 +1,191 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use crate::MessageKind;
+use crate::SendValue;
+use crate::Upgraded;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::op;
+use deno_core::AsyncRefCell;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_core::StringOrBuffer;
+use deno_core::ZeroCopyBuf;
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::pin::Pin;
+use std::rc::Rc;
+
+use fastwebsockets::CloseCode;
+use fastwebsockets::FragmentCollector;
+use fastwebsockets::Frame;
+use fastwebsockets::OpCode;
+use fastwebsockets::WebSocket;
+
+pub struct ServerWebSocket {
+ ws: AsyncRefCell<FragmentCollector<Pin<Box<dyn Upgraded>>>>,
+}
+
+impl Resource for ServerWebSocket {
+ fn name(&self) -> Cow<str> {
+ "serverWebSocket".into()
+ }
+}
+pub async fn ws_create_server_stream(
+ state: &Rc<RefCell<OpState>>,
+ transport: Pin<Box<dyn Upgraded>>,
+) -> Result<ResourceId, AnyError> {
+ let mut ws = WebSocket::after_handshake(transport);
+ ws.set_writev(false);
+ ws.set_auto_close(true);
+ ws.set_auto_pong(true);
+
+ let ws_resource = ServerWebSocket {
+ ws: AsyncRefCell::new(FragmentCollector::new(ws)),
+ };
+
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let rid = resource_table.add(ws_resource);
+ Ok(rid)
+}
+
+#[op]
+pub async fn op_server_ws_send_binary(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: ZeroCopyBuf,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<ServerWebSocket>(rid)?;
+
+ let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
+ ws.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
+ .await
+ .map_err(|err| type_error(err.to_string()))?;
+ Ok(())
+}
+
+#[op]
+pub async fn op_server_ws_send_text(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ data: String,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<ServerWebSocket>(rid)?;
+ let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
+ ws.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
+ .await
+ .map_err(|err| type_error(err.to_string()))?;
+ Ok(())
+}
+
+#[op]
+pub async fn op_server_ws_send(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ value: SendValue,
+) -> Result<(), AnyError> {
+ let msg = match value {
+ SendValue::Text(text) => {
+ Frame::new(true, OpCode::Text, None, text.into_bytes())
+ }
+ SendValue::Binary(buf) => {
+ Frame::new(true, OpCode::Binary, None, buf.to_vec())
+ }
+ SendValue::Pong => Frame::new(true, OpCode::Pong, None, vec![]),
+ SendValue::Ping => Frame::new(true, OpCode::Ping, None, vec![]),
+ };
+
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<ServerWebSocket>(rid)?;
+ let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
+
+ ws.write_frame(msg)
+ .await
+ .map_err(|err| type_error(err.to_string()))?;
+ Ok(())
+}
+
+#[op(deferred)]
+pub async fn op_server_ws_close(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ code: Option<u16>,
+ reason: Option<String>,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<ServerWebSocket>(rid)?;
+ let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
+ let frame = reason
+ .map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
+ .unwrap_or_else(|| Frame::close_raw(vec![]));
+ ws.write_frame(frame)
+ .await
+ .map_err(|err| type_error(err.to_string()))?;
+ Ok(())
+}
+
+#[op]
+pub async fn op_server_ws_next_event(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+) -> Result<(u16, StringOrBuffer), AnyError> {
+ let resource = state
+ .borrow_mut()
+ .resource_table
+ .get::<ServerWebSocket>(rid)?;
+ let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
+ let val = match ws.read_frame().await {
+ Ok(val) => val,
+ Err(err) => {
+ return Ok((
+ MessageKind::Error as u16,
+ StringOrBuffer::String(err.to_string()),
+ ))
+ }
+ };
+
+ let res = match val.opcode {
+ OpCode::Text => (
+ MessageKind::Text as u16,
+ StringOrBuffer::String(String::from_utf8(val.payload).unwrap()),
+ ),
+ OpCode::Binary => (
+ MessageKind::Binary as u16,
+ StringOrBuffer::Buffer(val.payload.into()),
+ ),
+ OpCode::Close => {
+ if val.payload.len() < 2 {
+ return Ok((1005, StringOrBuffer::String("".to_string())));
+ }
+
+ let close_code =
+ CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
+ let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
+ (close_code.into(), StringOrBuffer::String(reason))
+ }
+ OpCode::Ping => (
+ MessageKind::Ping as u16,
+ StringOrBuffer::Buffer(vec![].into()),
+ ),
+ OpCode::Pong => (
+ MessageKind::Pong as u16,
+ StringOrBuffer::Buffer(vec![].into()),
+ ),
+ OpCode::Continuation => {
+ return Err(type_error("Unexpected continuation frame"))
+ }
+ };
+ Ok(res)
+}