summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/websocket/01_websocket.js6
-rw-r--r--ext/websocket/02_websocketstream.js20
-rw-r--r--ext/websocket/lib.rs123
3 files changed, 55 insertions, 94 deletions
diff --git a/ext/websocket/01_websocket.js b/ext/websocket/01_websocket.js
index 1b7a45ce0..cb9f756d2 100644
--- a/ext/websocket/01_websocket.js
+++ b/ext/websocket/01_websocket.js
@@ -478,7 +478,7 @@ class WebSocket extends EventTarget {
this[_serverHandleIdleTimeout]();
break;
}
- case 5: {
+ case 3: {
/* error */
this[_readyState] = CLOSED;
@@ -492,10 +492,6 @@ class WebSocket extends EventTarget {
core.tryClose(this[_rid]);
break;
}
- case 3: {
- /* ping */
- break;
- }
default: {
/* close */
const code = kind;
diff --git a/ext/websocket/02_websocketstream.js b/ext/websocket/02_websocketstream.js
index f545d7a99..8e7100cdb 100644
--- a/ext/websocket/02_websocketstream.js
+++ b/ext/websocket/02_websocketstream.js
@@ -236,7 +236,7 @@ class WebSocketStream {
},
});
const pull = async (controller) => {
- const { 0: kind, 1: value } = await core.opAsync(
+ const { 0: kind, 1: value } = await core.opAsync2(
"op_ws_next_event",
this[_rid],
);
@@ -249,7 +249,11 @@ class WebSocketStream {
controller.enqueue(value);
break;
}
- case 5: {
+ case 2: {
+ /* pong */
+ break;
+ }
+ case 3: {
/* error */
const err = new Error(value);
this[_closed].reject(err);
@@ -257,17 +261,7 @@ class WebSocketStream {
core.tryClose(this[_rid]);
break;
}
- case 3: {
- /* ping */
- await core.opAsync("op_ws_send_pong", this[_rid]);
- await pull(controller);
- break;
- }
- case 2: {
- /* pong */
- break;
- }
- case 6: {
+ case 4: {
/* closed */
this[_closed].resolve(undefined);
core.tryClose(this[_rid]);
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index 74898a471..df4127d27 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -27,7 +27,6 @@ use http::Method;
use http::Request;
use http::Uri;
use hyper::Body;
-use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::Cell;
@@ -85,15 +84,6 @@ impl Resource for WsCancelResource {
}
}
-#[derive(Deserialize)]
-#[serde(tag = "kind", content = "value", rename_all = "camelCase")]
-pub enum SendValue {
- Text(String),
- Binary(ZeroCopyBuf),
- Pong,
- Ping,
-}
-
// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
@@ -301,9 +291,8 @@ pub enum MessageKind {
Text = 0,
Binary = 1,
Pong = 2,
- Ping = 3,
- Error = 5,
- Closed = 6,
+ Error = 3,
+ Closed = 4,
}
pub struct ServerWebSocket {
@@ -406,20 +395,6 @@ pub async fn op_ws_send_text(
}
#[op]
-pub async fn op_ws_send_ping(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
-) -> Result<(), AnyError> {
- let resource = state
- .borrow_mut()
- .resource_table
- .get::<ServerWebSocket>(rid)?;
- resource
- .write_frame(Frame::new(true, OpCode::Ping, None, vec![]))
- .await
-}
-
-#[op]
pub async fn op_ws_send_pong(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
@@ -463,58 +438,55 @@ pub async fn op_ws_next_event(
.get::<ServerWebSocket>(rid)?;
let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
- let val = match ws.read_frame().await {
- Ok(val) => val,
- Err(err) => {
- // No message was received, socket closed while we waited.
- // Try close the stream, ignoring any errors, and report closed status to JavaScript.
- if resource.closed.get() {
- let _ = state.borrow_mut().resource_table.close(rid);
+ loop {
+ let val = match ws.read_frame().await {
+ Ok(val) => val,
+ Err(err) => {
+ // No message was received, socket closed while we waited.
+ // Try close the stream, ignoring any errors, and report closed status to JavaScript.
+ if resource.closed.get() {
+ let _ = state.borrow_mut().resource_table.close(rid);
+ return Ok((
+ MessageKind::Closed as u16,
+ StringOrBuffer::Buffer(vec![].into()),
+ ));
+ }
+
return Ok((
- MessageKind::Closed as u16,
- StringOrBuffer::Buffer(vec![].into()),
+ MessageKind::Error as u16,
+ StringOrBuffer::String(err.to_string()),
));
}
-
- return Ok((
- MessageKind::Error as u16,
- StringOrBuffer::String(err.to_string()),
- ));
- }
- };
-
- let res = match val.opcode {
- OpCode::Text => (
- MessageKind::Text as u16,
- StringOrBuffer::String(String::from_utf8(val.payload).unwrap()),
- ),
- OpCode::Binary => (
- MessageKind::Binary as u16,
- StringOrBuffer::Buffer(val.payload.into()),
- ),
- OpCode::Close => {
- if val.payload.len() < 2 {
- return Ok((1005, StringOrBuffer::String("".to_string())));
+ };
+
+ break Ok(match val.opcode {
+ OpCode::Text => (
+ MessageKind::Text as u16,
+ StringOrBuffer::String(String::from_utf8(val.payload).unwrap()),
+ ),
+ OpCode::Binary => (
+ MessageKind::Binary as u16,
+ StringOrBuffer::Buffer(val.payload.into()),
+ ),
+ OpCode::Close => {
+ if val.payload.len() < 2 {
+ return Ok((1005, StringOrBuffer::String("".to_string())));
+ }
+
+ let close_code =
+ CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
+ let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
+ (close_code.into(), StringOrBuffer::String(reason))
}
-
- let close_code =
- CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]]));
- let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap();
- (close_code.into(), StringOrBuffer::String(reason))
- }
- OpCode::Ping => (
- MessageKind::Ping as u16,
- StringOrBuffer::Buffer(vec![].into()),
- ),
- OpCode::Pong => (
- MessageKind::Pong as u16,
- StringOrBuffer::Buffer(vec![].into()),
- ),
- OpCode::Continuation => {
- return Err(type_error("Unexpected continuation frame"))
- }
- };
- Ok(res)
+ OpCode::Pong => (
+ MessageKind::Pong as u16,
+ StringOrBuffer::Buffer(vec![].into()),
+ ),
+ OpCode::Continuation | OpCode::Ping => {
+ continue;
+ }
+ });
+ }
}
deno_core::extension!(deno_websocket,
@@ -527,7 +499,6 @@ deno_core::extension!(deno_websocket,
op_ws_next_event,
op_ws_send_binary,
op_ws_send_text,
- op_ws_send_ping,
op_ws_send_pong,
op_ws_server_create,
],