summaryrefslogtreecommitdiff
path: root/ext/websocket/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r--ext/websocket/lib.rs123
1 files changed, 47 insertions, 76 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index 74898a471..df4127d27 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -27,7 +27,6 @@ use http::Method;
use http::Request;
use http::Uri;
use hyper::Body;
-use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::Cell;
@@ -85,15 +84,6 @@ impl Resource for WsCancelResource {
}
}
-#[derive(Deserialize)]
-#[serde(tag = "kind", content = "value", rename_all = "camelCase")]
-pub enum SendValue {
- Text(String),
- Binary(ZeroCopyBuf),
- Pong,
- Ping,
-}
-
// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
@@ -301,9 +291,8 @@ pub enum MessageKind {
Text = 0,
Binary = 1,
Pong = 2,
- Ping = 3,
- Error = 5,
- Closed = 6,
+ Error = 3,
+ Closed = 4,
}
pub struct ServerWebSocket {
@@ -406,20 +395,6 @@ pub async fn op_ws_send_text(
}
#[op]
-pub async fn op_ws_send_ping(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
-) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<ServerWebSocket>(rid)?;
- resource
- .write_frame(Frame::new(true, OpCode::Ping, None, vec![]))
- .await
-}
-
-#[op]
pub async fn op_ws_send_pong(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
@@ -463,58 +438,55 @@ pub async fn op_ws_next_event(
.get::<ServerWebSocket>(rid)?;
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
- let val = match ws.read_frame().await {
- Ok(val) => val,
- Err(err) => {
- // No message was received, socket closed while we waited.
- // Try close the stream, ignoring any errors, and report closed status to JavaScript.
- if resource.closed.get() {
- let _ = state.borrow_mut().resource_table.close(rid);
+ loop {
+ let val = match ws.read_frame().await {
+ Ok(val) => val,
+ Err(err) => {
+ // No message was received, socket closed while we waited.
+ // Try close the stream, ignoring any errors, and report closed status to JavaScript.
+ if resource.closed.get() {
+ let _ = state.borrow_mut().resource_table.close(rid);
+ return Ok((
+ MessageKind::Closed as u16,
+ StringOrBuffer::Buffer(vec![].into()),
+ ));
+ }
+
return Ok((
- MessageKind::Closed as u16,
- StringOrBuffer::Buffer(vec![].into()),
+ MessageKind::Error as u16,
+ StringOrBuffer::String(err.to_string()),
));
}
-
- return Ok((
- MessageKind::Error as u16,
- StringOrBuffer::String(err.to_string()),
- ));
- }
- };
-
- let res = match val.opcode {
- OpCode::Text => (
- MessageKind::Text as u16,
- StringOrBuffer::String(String::from_utf8(val.payload).unwrap()),
- ),
- OpCode::Binary => (
- MessageKind::Binary as u16,
- StringOrBuffer::Buffer(val.payload.into()),
- ),
- OpCode::Close => {
- if val.payload.len() < 2 {
- return Ok((1005, StringOrBuffer::String("".to_string())));
+ };
+
+ break Ok(match val.opcode {
+ OpCode::Text => (
+ MessageKind::Text as u16,
+ StringOrBuffer::String(String::from_utf8(val.payload).unwrap()),
+ ),
+ OpCode::Binary => (
+ MessageKind::Binary as u16,
+ StringOrBuffer::Buffer(val.payload.into()),
+ ),
+ OpCode::Close => {
+ if val.payload.len() < 2 {
+ return Ok((1005, StringOrBuffer::String("".to_string())));
+ }
+
+ let close_code =
+ CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
+ let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
+ (close_code.into(), StringOrBuffer::String(reason))
}
-
- let close_code =
- CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
- let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
- (close_code.into(), StringOrBuffer::String(reason))
- }
- OpCode::Ping => (
- MessageKind::Ping as u16,
- StringOrBuffer::Buffer(vec![].into()),
- ),
- OpCode::Pong => (
- MessageKind::Pong as u16,
- StringOrBuffer::Buffer(vec![].into()),
- ),
- OpCode::Continuation => {
- return Err(type_error("Unexpected continuation frame"))
- }
- };
- Ok(res)
+ OpCode::Pong => (
+ MessageKind::Pong as u16,
+ StringOrBuffer::Buffer(vec![].into()),
+ ),
+ OpCode::Continuation | OpCode::Ping => {
+ continue;
+ }
+ });
+ }
}
deno_core::extension!(deno_websocket,
@@ -527,7 +499,6 @@ deno_core::extension!(deno_websocket,
op_ws_next_event,
op_ws_send_binary,
op_ws_send_text,
- op_ws_send_ping,
op_ws_send_pong,
op_ws_server_create,
],