summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/http/lib.rs39
-rw-r--r--ext/websocket/01_websocket.js16
-rw-r--r--ext/websocket/02_websocketstream.js2
-rw-r--r--ext/websocket/Cargo.toml2
-rw-r--r--ext/websocket/lib.rs372
-rw-r--r--ext/websocket/server.rs194
6 files changed, 176 insertions, 449 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 289e7bf0f..43e3c130a 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -1129,41 +1129,6 @@ async fn op_http_upgrade_early(
Ok(rid)
}
-struct UpgradedStream(hyper::upgrade::Upgraded);
-impl tokio::io::AsyncRead for UpgradedStream {
- fn poll_read(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &mut tokio::io::ReadBuf,
- ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
- Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
- }
-}
-
-impl tokio::io::AsyncWrite for UpgradedStream {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &[u8],
- ) -> std::task::Poll<Result<usize, std::io::Error>> {
- Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
- }
- fn poll_flush(
- self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- Pin::new(&mut self.get_mut().0).poll_flush(cx)
- }
- fn poll_shutdown(
- self: Pin<&mut Self>,
- cx: &mut Context,
- ) -> std::task::Poll<Result<(), std::io::Error>> {
- Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
- }
-}
-
-impl deno_websocket::Upgraded for UpgradedStream {}
-
#[op]
async fn op_http_upgrade_websocket(
state: Rc<RefCell<OpState>>,
@@ -1183,9 +1148,7 @@ async fn op_http_upgrade_websocket(
};
let transport = hyper::upgrade::on(request).await?;
- let ws_rid =
- ws_create_server_stream(&state, Box::pin(UpgradedStream(transport)))
- .await?;
+ let ws_rid = ws_create_server_stream(&state, transport).await?;
Ok(ws_rid)
}
diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js
index 2c6bf46b2..60378b675 100644
--- a/ext/websocket/01_websocket.js
+++ b/ext/websocket/01_websocket.js
@@ -317,9 +317,7 @@ class WebSocket extends EventTarget {
this[_bufferedAmount] += byteLength;
PromisePrototypeThen(
core.opAsync2(
- this[_role] === SERVER
- ? "op_server_ws_send_binary"
- : "op_ws_send_binary",
+ "op_ws_send_binary",
this[_rid],
view,
),
@@ -357,7 +355,7 @@ class WebSocket extends EventTarget {
this[_bufferedAmount] += TypedArrayPrototypeGetByteLength(d);
PromisePrototypeThen(
core.opAsync2(
- this[_role] === SERVER ? "op_server_ws_send_text" : "op_ws_send_text",
+ "op_ws_send_text",
this[_rid],
string,
),
@@ -416,7 +414,7 @@ class WebSocket extends EventTarget {
PromisePrototypeCatch(
core.opAsync(
- this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
+ "op_ws_close",
this[_rid],
code,
reason,
@@ -441,7 +439,7 @@ class WebSocket extends EventTarget {
async [_eventLoop]() {
while (this[_readyState] !== CLOSED) {
const { 0: kind, 1: value } = await core.opAsync2(
- this[_role] === SERVER ? "op_server_ws_next_event" : "op_ws_next_event",
+ "op_ws_next_event",
this[_rid],
);
@@ -508,7 +506,7 @@ class WebSocket extends EventTarget {
if (prevState === OPEN) {
try {
await core.opAsync(
- this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
+ "op_ws_close",
this[_rid],
code,
value,
@@ -537,7 +535,7 @@ class WebSocket extends EventTarget {
this[_idleTimeoutTimeout] = setTimeout(async () => {
if (this[_readyState] === OPEN) {
await core.opAsync(
- this[_role] === SERVER ? "op_server_ws_send" : "op_ws_send",
+ "op_ws_send",
this[_rid],
{
kind: "ping",
@@ -548,7 +546,7 @@ class WebSocket extends EventTarget {
this[_readyState] = CLOSING;
const reason = "No response from ping frame.";
await core.opAsync(
- this[_role] === SERVER ? "op_server_ws_close" : "op_ws_close",
+ "op_ws_close",
this[_rid],
1001,
reason,
diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js
index 0ee7a70aa..0d01e62ee 100644
--- a/ext/websocket/02_websocketstream.js
+++ b/ext/websocket/02_websocketstream.js
@@ -176,7 +176,7 @@ class WebSocketStream {
create.rid,
);
- if (kind > 6) {
+ if (kind > 5) {
/* close */
break;
}
diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml
index 2f5ed95b3..03cb3076a 100644
--- a/ext/websocket/Cargo.toml
+++ b/ext/websocket/Cargo.toml
@@ -16,7 +16,7 @@ path = "lib.rs"
[dependencies]
deno_core.workspace = true
deno_tls.workspace = true
-fastwebsockets = "0.1.3"
+fastwebsockets = { version = "0.2.1", features = ["upgrade"] }
http.workspace = true
hyper.workspace = true
serde.workspace = true
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index 71f176070..f63191a8e 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -3,10 +3,6 @@
use deno_core::error::invalid_hostname;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures::stream::SplitSink;
-use deno_core::futures::stream::SplitStream;
-use deno_core::futures::SinkExt;
-use deno_core::futures::StreamExt;
use deno_core::op;
use deno_core::StringOrBuffer;
@@ -21,41 +17,40 @@ use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use deno_tls::create_client_config;
+use http::header::CONNECTION;
+use http::header::UPGRADE;
use http::HeaderName;
use http::HeaderValue;
use http::Method;
use http::Request;
use http::Uri;
+use hyper::upgrade::Upgraded;
+use hyper::Body;
+use hyper::Response;
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
+use std::cell::Cell;
use std::cell::RefCell;
use std::convert::TryFrom;
use std::fmt;
use std::path::PathBuf;
-use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
-use tokio::io::AsyncRead;
-use tokio::io::AsyncWrite;
use tokio::net::TcpStream;
use tokio_rustls::rustls::RootCertStore;
use tokio_rustls::rustls::ServerName;
use tokio_rustls::TlsConnector;
-use tokio_tungstenite::client_async_with_config;
-use tokio_tungstenite::tungstenite::handshake::client::Response;
-use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
-use tokio_tungstenite::tungstenite::protocol::CloseFrame;
-use tokio_tungstenite::tungstenite::protocol::Message;
-use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
use tokio_tungstenite::MaybeTlsStream;
-use tokio_tungstenite::WebSocketStream;
-pub use tokio_tungstenite; // Re-export tokio_tungstenite
-
-mod server;
+use fastwebsockets::CloseCode;
+use fastwebsockets::FragmentCollector;
+use fastwebsockets::Frame;
+use fastwebsockets::OpCode;
+use fastwebsockets::Role;
+use fastwebsockets::WebSocket;
-pub use server::ws_create_server_stream;
+pub use tokio_tungstenite; // Re-export tokio_tungstenite
#[derive(Clone)]
pub struct WsRootStore(pub Option<RootCertStore>);
@@ -76,100 +71,6 @@ pub trait WebSocketPermissions {
/// would override previously used alias.
pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>);
-type ClientWsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
-type ServerWsStream = WebSocketStream<Pin<Box<dyn Upgraded>>>;
-
-pub enum WebSocketStreamType {
- Client {
- tx: AsyncRefCell<SplitSink<ClientWsStream, Message>>,
- rx: AsyncRefCell<SplitStream<ClientWsStream>>,
- },
- Server {
- tx: AsyncRefCell<SplitSink<ServerWsStream, Message>>,
- rx: AsyncRefCell<SplitStream<ServerWsStream>>,
- },
-}
-
-pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {}
-
-pub struct WsStreamResource {
- pub stream: WebSocketStreamType,
- // When a `WsStreamResource` resource is closed, all pending 'read' ops are
- // canceled, while 'write' ops are allowed to complete. Therefore only
- // 'read' futures are attached to this cancel handle.
- pub cancel: CancelHandle,
-}
-
-impl WsStreamResource {
- async fn send(self: &Rc<Self>, message: Message) -> Result<(), AnyError> {
- use tokio_tungstenite::tungstenite::Error;
- let res = match self.stream {
- WebSocketStreamType::Client { .. } => {
- let mut tx = RcRef::map(self, |r| match &r.stream {
- WebSocketStreamType::Client { tx, .. } => tx,
- WebSocketStreamType::Server { .. } => unreachable!(),
- })
- .borrow_mut()
- .await;
- tx.send(message).await
- }
- WebSocketStreamType::Server { .. } => {
- let mut tx = RcRef::map(self, |r| match &r.stream {
- WebSocketStreamType::Client { .. } => unreachable!(),
- WebSocketStreamType::Server { tx, .. } => tx,
- })
- .borrow_mut()
- .await;
- tx.send(message).await
- }
- };
-
- match res {
- Ok(()) => Ok(()),
- Err(Error::ConnectionClosed) => Ok(()),
- Err(tokio_tungstenite::tungstenite::Error::Protocol(
- tokio_tungstenite::tungstenite::error::ProtocolError::SendAfterClosing,
- )) => Ok(()),
- Err(err) => Err(err.into()),
- }
- }
-
- async fn next_message(
- self: &Rc<Self>,
- cancel: RcRef<CancelHandle>,
- ) -> Result<
- Option<Result<Message, tokio_tungstenite::tungstenite::Error>>,
- AnyError,
- > {
- match &self.stream {
- WebSocketStreamType::Client { .. } => {
- let mut rx = RcRef::map(self, |r| match &r.stream {
- WebSocketStreamType::Client { rx, .. } => rx,
- WebSocketStreamType::Server { .. } => unreachable!(),
- })
- .borrow_mut()
- .await;
- rx.next().or_cancel(cancel).await.map_err(AnyError::from)
- }
- WebSocketStreamType::Server { .. } => {
- let mut rx = RcRef::map(self, |r| match &r.stream {
- WebSocketStreamType::Client { .. } => unreachable!(),
- WebSocketStreamType::Server { rx, .. } => rx,
- })
- .borrow_mut()
- .await;
- rx.next().or_cancel(cancel).await.map_err(AnyError::from)
- }
- }
- }
-}
-
-impl Resource for WsStreamResource {
- fn name(&self) -> Cow<str> {
- "webSocketStream".into()
- }
-}
-
pub struct WsCancelResource(Rc<CancelHandle>);
impl Resource for WsCancelResource {
@@ -182,6 +83,15 @@ impl Resource for WsCancelResource {
}
}
+#[derive(Deserialize)]
+#[serde(tag = "kind", content = "value", rename_all = "camelCase")]
+pub enum SendValue {
+ Text(String),
+ Binary(ZeroCopyBuf),
+ Pong,
+ Ping,
+}
+
// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
@@ -257,7 +167,21 @@ where
let uri: Uri = url.parse()?;
let mut request = Request::builder().method(Method::GET).uri(&uri);
- request = request.header("User-Agent", user_agent);
+ let authority = uri.authority().unwrap().as_str();
+ let host = authority
+ .find('@')
+ .map(|idx| authority.split_at(idx + 1).1)
+ .unwrap_or_else(|| authority);
+ request = request
+ .header("User-Agent", user_agent)
+ .header("Host", host)
+ .header(UPGRADE, "websocket")
+ .header(CONNECTION, "upgrade")
+ .header(
+ "Sec-WebSocket-Key",
+ fastwebsockets::handshake::generate_key(),
+ )
+ .header("Sec-WebSocket-Version", "13");
if !protocols.is_empty() {
request = request.header("Sec-WebSocket-Protocol", protocols);
@@ -287,7 +211,7 @@ where
}
}
- let request = request.body(())?;
+ let request = request.body(Body::empty())?;
let domain = &uri.host().unwrap().to_string();
let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
Some("wss") => 443,
@@ -315,16 +239,9 @@ where
_ => unreachable!(),
};
- let client = client_async_with_config(
- request,
- socket,
- Some(WebSocketConfig {
- max_message_size: Some(128 << 20),
- max_frame_size: Some(32 << 20),
- ..Default::default()
- }),
- );
- let (stream, response): (ClientWsStream, Response) =
+ let client = fastwebsockets::handshake::client(request, socket);
+
+ let (stream, response): (WebSocket<Upgraded>, Response<Body>) =
if let Some(cancel_resource) = cancel_resource {
client.or_cancel(cancel_resource.0.to_owned()).await?
} else {
@@ -340,13 +257,9 @@ where
state.borrow_mut().resource_table.close(cancel_rid).ok();
}
- let (ws_tx, ws_rx) = stream.split();
- let resource = WsStreamResource {
- stream: WebSocketStreamType::Client {
- rx: AsyncRefCell::new(ws_rx),
- tx: AsyncRefCell::new(ws_tx),
- },
- cancel: Default::default(),
+ let resource = ServerWebSocket {
+ ws: AsyncRefCell::new(FragmentCollector::new(stream)),
+ closed: Rc::new(Cell::new(false)),
};
let mut state = state.borrow_mut();
let rid = state.resource_table.add(resource);
@@ -368,13 +281,60 @@ where
})
}
-#[derive(Deserialize)]
-#[serde(tag = "kind", content = "value", rename_all = "camelCase")]
-pub enum SendValue {
- Text(String),
- Binary(ZeroCopyBuf),
- Pong,
- Ping,
+#[repr(u16)]
+pub enum MessageKind {
+ Text = 0,
+ Binary = 1,
+ Pong = 2,
+ Ping = 3,
+ Error = 5,
+ Closed = 6,
+}
+
+pub struct ServerWebSocket {
+ ws: AsyncRefCell<FragmentCollector<Upgraded>>,
+ closed: Rc<Cell<bool>>,
+}
+
+impl ServerWebSocket {
+ #[inline]
+ pub async fn write_frame(
+ self: Rc<Self>,
+ frame: Frame,
+ ) -> Result<(), AnyError> {
+ // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
+ // to populate the write buffer. We encounter an await point when writing
+ // to the socket after the frame has already been written to the buffer.
+ let ws = unsafe { &mut *self.ws.as_ptr() };
+ ws.write_frame(frame)
+ .await
+ .map_err(|err| type_error(err.to_string()))?;
+ Ok(())
+ }
+}
+
+impl Resource for ServerWebSocket {
+ fn name(&self) -> Cow<str> {
+ "serverWebSocket".into()
+ }
+}
+pub async fn ws_create_server_stream(
+ state: &Rc<RefCell<OpState>>,
+ transport: Upgraded,
+) -> Result<ResourceId, AnyError> {
+ let mut ws = WebSocket::after_handshake(transport, Role::Server);
+ ws.set_writev(true);
+ ws.set_auto_close(true);
+ ws.set_auto_pong(true);
+
+ let ws_resource = ServerWebSocket {
+ ws: AsyncRefCell::new(FragmentCollector::new(ws)),
+ closed: Rc::new(Cell::new(false)),
+ };
+
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let rid = resource_table.add(ws_resource);
+ Ok(rid)
}
#[op]
@@ -386,9 +346,10 @@ pub async fn op_ws_send_binary(
let resource = state
.borrow_mut()
.resource_table
- .get::<WsStreamResource>(rid)?;
- resource.send(Message::Binary(data.to_vec())).await?;
- Ok(())
+ .get::<ServerWebSocket>(rid)?;
+ resource
+ .write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
+ .await
}
#[op]
@@ -400,9 +361,10 @@ pub async fn op_ws_send_text(
let resource = state
.borrow_mut()
.resource_table
- .get::<WsStreamResource>(rid)?;
- resource.send(Message::Text(data)).await?;
- Ok(())
+ .get::<ServerWebSocket>(rid)?;
+ resource
+ .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
+ .await
}
#[op]
@@ -412,18 +374,21 @@ pub async fn op_ws_send(
value: SendValue,
) -> Result<(), AnyError> {
let msg = match value {
- SendValue::Text(text) => Message::Text(text),
- SendValue::Binary(buf) => Message::Binary(buf.to_vec()),
- SendValue::Pong => Message::Pong(vec![]),
- SendValue::Ping => Message::Ping(vec![]),
+ SendValue::Text(text) => {
+ Frame::new(true, OpCode::Text, None, text.into_bytes())
+ }
+ SendValue::Binary(buf) => {
+ Frame::new(true, OpCode::Binary, None, buf.to_vec())
+ }
+ SendValue::Pong => Frame::new(true, OpCode::Pong, None, vec![]),
+ SendValue::Ping => Frame::new(true, OpCode::Ping, None, vec![]),
};
let resource = state
.borrow_mut()
.resource_table
- .get::<WsStreamResource>(rid)?;
- resource.send(msg).await?;
- Ok(())
+ .get::<ServerWebSocket>(rid)?;
+ resource.write_frame(msg).await
}
#[op(deferred)]
@@ -433,34 +398,21 @@ pub async fn op_ws_close(
code: Option<u16>,
reason: Option<String>,
) -> Result<(), AnyError> {
- let rid = rid;
- let msg = Message::Close(code.map(|c| CloseFrame {
- code: CloseCode::from(c),
- reason: match reason {
- Some(reason) => Cow::from(reason),
- None => Default::default(),
- },
- }));
-
let resource = state
.borrow_mut()
.resource_table
- .get::<WsStreamResource>(rid)?;
- resource.send(msg).await?;
+ .get::<ServerWebSocket>(rid)?;
+ let frame = reason
+ .map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
+ .unwrap_or_else(|| Frame::close_raw(vec![]));
+
+ let cell = Rc::clone(&resource.closed);
+ cell.set(true);
+ resource.write_frame(frame).await?;
Ok(())
}
-#[repr(u16)]
-pub enum MessageKind {
- Text = 0,
- Binary = 1,
- Pong = 2,
- Ping = 3,
- Error = 5,
- Closed = 6,
-}
-
-#[op]
+#[op(deferred)]
pub async fn op_ws_next_event(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
@@ -468,45 +420,58 @@ pub async fn op_ws_next_event(
let resource = state
.borrow_mut()
.resource_table
- .get::<WsStreamResource>(rid)?;
+ .get::<ServerWebSocket>(rid)?;
- let cancel = RcRef::map(&resource, |r| &r.cancel);
- let val = resource.next_message(cancel).await?;
- let res = match val {
- Some(Ok(Message::Text(text))) => {
- (MessageKind::Text as u16, StringOrBuffer::String(text))
+ let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
+ let val = match ws.read_frame().await {
+ Ok(val) => val,
+ Err(err) => {
+ // No message was received, socket closed while we waited.
+ // Try close the stream, ignoring any errors, and report closed status to JavaScript.
+ if resource.closed.get() {
+ let _ = state.borrow_mut().resource_table.close(rid);
+ return Ok((
+ MessageKind::Closed as u16,
+ StringOrBuffer::Buffer(vec![].into()),
+ ));
+ }
+
+ return Ok((
+ MessageKind::Error as u16,
+ StringOrBuffer::String(err.to_string()),
+ ));
}
- Some(Ok(Message::Binary(data))) => (
- MessageKind::Binary as u16,
- StringOrBuffer::Buffer(data.into()),
+ };
+
+ let res = match val.opcode {
+ OpCode::Text => (
+ MessageKind::Text as u16,
+ StringOrBuffer::String(String::from_utf8(val.payload).unwrap()),
),
- Some(Ok(Message::Close(Some(frame)))) => (
- frame.code.into(),
- StringOrBuffer::String(frame.reason.to_string()),
+ OpCode::Binary => (
+ MessageKind::Binary as u16,
+ StringOrBuffer::Buffer(val.payload.into()),
),
- Some(Ok(Message::Close(None))) => {
- (1005, StringOrBuffer::String("".to_string()))
+ OpCode::Close => {
+ if val.payload.len() < 2 {
+ return Ok((1005, StringOrBuffer::String("".to_string())));
+ }
+
+ let close_code =
+ CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
+ let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
+ (close_code.into(), StringOrBuffer::String(reason))
}
- Some(Ok(Message::Ping(_))) => (
+ OpCode::Ping => (
MessageKind::Ping as u16,
StringOrBuffer::Buffer(vec![].into()),
),
- Some(Ok(Message::Pong(_))) => (
+ OpCode::Pong => (
MessageKind::Pong as u16,
StringOrBuffer::Buffer(vec![].into()),
),
- Some(Err(e)) => (
- MessageKind::Error as u16,
- StringOrBuffer::String(e.to_string()),
- ),
- None => {
- // No message was received, presumably the socket closed while we waited.
- // Try close the stream, ignoring any errors, and report closed status to JavaScript.
- let _ = state.borrow_mut().resource_table.close(rid);
- (
- MessageKind::Closed as u16,
- StringOrBuffer::Buffer(vec![].into()),
- )
+ OpCode::Continuation => {
+ return Err(type_error("Unexpected continuation frame"))
}
};
Ok(res)
@@ -523,11 +488,6 @@ deno_core::extension!(deno_websocket,
op_ws_next_event,
op_ws_send_binary,
op_ws_send_text,
- server::op_server_ws_send,
- server::op_server_ws_close,
- server::op_server_ws_next_event,
- server::op_server_ws_send_binary,
- server::op_server_ws_send_text,
],
esm = [ "01_websocket.js", "02_websocketstream.js" ],
options = {
diff --git a/ext/websocket/server.rs b/ext/websocket/server.rs
deleted file mode 100644
index 44bc07e59..000000000
--- a/ext/websocket/server.rs
+++ /dev/null
@@ -1,194 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-use crate::MessageKind;
-use crate::SendValue;
-use crate::Upgraded;
-use deno_core::error::type_error;
-use deno_core::error::AnyError;
-use deno_core::op;
-use deno_core::AsyncRefCell;
-use deno_core::OpState;
-use deno_core::RcRef;
-use deno_core::Resource;
-use deno_core::ResourceId;
-use deno_core::StringOrBuffer;
-use deno_core::ZeroCopyBuf;
-use std::borrow::Cow;
-use std::cell::RefCell;
-use std::pin::Pin;
-use std::rc::Rc;
-
-use fastwebsockets::CloseCode;
-use fastwebsockets::FragmentCollector;
-use fastwebsockets::Frame;
-use fastwebsockets::OpCode;
-use fastwebsockets::WebSocket;
-
-pub struct ServerWebSocket {
- ws: AsyncRefCell<FragmentCollector<Pin<Box<dyn Upgraded>>>>,
-}
-
-impl ServerWebSocket {
- #[inline]
- pub async fn write_frame(
- self: Rc<Self>,
- frame: Frame,
- ) -> Result<(), AnyError> {
- // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
- // to populate the write buffer. We encounter an await point when writing
- // to the socket after the frame has already been written to the buffer.
- let ws = unsafe { &mut *self.ws.as_ptr() };
- ws.write_frame(frame)
- .await
- .map_err(|err| type_error(err.to_string()))?;
- Ok(())
- }
-}
-
-impl Resource for ServerWebSocket {
- fn name(&self) -> Cow<str> {
- "serverWebSocket".into()
- }
-}
-pub async fn ws_create_server_stream(
- state: &Rc<RefCell<OpState>>,
- transport: Pin<Box<dyn Upgraded>>,
-) -> Result<ResourceId, AnyError> {
- let mut ws = WebSocket::after_handshake(transport);
- ws.set_writev(false);
- ws.set_auto_close(true);
- ws.set_auto_pong(true);
-
- let ws_resource = ServerWebSocket {
- ws: AsyncRefCell::new(FragmentCollector::new(ws)),
- };
-
- let resource_table = &mut state.borrow_mut().resource_table;
- let rid = resource_table.add(ws_resource);
- Ok(rid)
-}
-
-#[op]
-pub async fn op_server_ws_send_binary(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: ZeroCopyBuf,
-) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<ServerWebSocket>(rid)?;
- resource
- .write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
- .await
-}
-
-#[op]
-pub async fn op_server_ws_send_text(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- data: String,
-) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<ServerWebSocket>(rid)?;
- resource
- .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
- .await
-}
-
-#[op]
-pub async fn op_server_ws_send(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- value: SendValue,
-) -> Result<(), AnyError> {
- let msg = match value {
- SendValue::Text(text) => {
- Frame::new(true, OpCode::Text, None, text.into_bytes())
- }
- SendValue::Binary(buf) => {
- Frame::new(true, OpCode::Binary, None, buf.to_vec())
- }
- SendValue::Pong => Frame::new(true, OpCode::Pong, None, vec![]),
- SendValue::Ping => Frame::new(true, OpCode::Ping, None, vec![]),
- };
-
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<ServerWebSocket>(rid)?;
- resource.write_frame(msg).await
-}
-
-#[op(deferred)]
-pub async fn op_server_ws_close(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- code: Option<u16>,
- reason: Option<String>,
-) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<ServerWebSocket>(rid)?;
- let frame = reason
- .map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
- .unwrap_or_else(|| Frame::close_raw(vec![]));
- resource.write_frame(frame).await
-}
-
-#[op(deferred)]
-pub async fn op_server_ws_next_event(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
-) -> Result<(u16, StringOrBuffer), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<ServerWebSocket>(rid)?;
- let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
- let val = match ws.read_frame().await {
- Ok(val) => val,
- Err(err) => {
- return Ok((
- MessageKind::Error as u16,
- StringOrBuffer::String(err.to_string()),
- ))
- }
- };
-
- let res = match val.opcode {
- OpCode::Text => (
- MessageKind::Text as u16,
- StringOrBuffer::String(String::from_utf8(val.payload).unwrap()),
- ),
- OpCode::Binary => (
- MessageKind::Binary as u16,
- StringOrBuffer::Buffer(val.payload.into()),
- ),
- OpCode::Close => {
- if val.payload.len() < 2 {
- return Ok((1005, StringOrBuffer::String("".to_string())));
- }
-
- let close_code =
- CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
- let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
- (close_code.into(), StringOrBuffer::String(reason))
- }
- OpCode::Ping => (
- MessageKind::Ping as u16,
- StringOrBuffer::Buffer(vec![].into()),
- ),
- OpCode::Pong => (
- MessageKind::Pong as u16,
- StringOrBuffer::Buffer(vec![].into()),
- ),
- OpCode::Continuation => {
- return Err(type_error("Unexpected continuation frame"))
- }
- };
- Ok(res)
-}