diff options
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r-- | ext/websocket/lib.rs | 123 |
1 files changed, 47 insertions, 76 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 74898a471..df4127d27 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -27,7 +27,6 @@ use http::Method; use http::Request; use http::Uri; use hyper::Body; -use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::Cell; @@ -85,15 +84,6 @@ impl Resource for WsCancelResource { } } -#[derive(Deserialize)] -#[serde(tag = "kind", content = "value", rename_all = "camelCase")] -pub enum SendValue { - Text(String), - Binary(ZeroCopyBuf), - Pong, - Ping, -} - // This op is needed because creating a WS instance in JavaScript is a sync // operation and should throw error when permissions are not fulfilled, // but actual op that connects WS is async. @@ -301,9 +291,8 @@ pub enum MessageKind { Text = 0, Binary = 1, Pong = 2, - Ping = 3, - Error = 5, - Closed = 6, + Error = 3, + Closed = 4, } pub struct ServerWebSocket { @@ -406,20 +395,6 @@ pub async fn op_ws_send_text( } #[op] -pub async fn op_ws_send_ping( - state: Rc<RefCell<OpState>>, - rid: ResourceId, -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::<ServerWebSocket>(rid)?; - resource - .write_frame(Frame::new(true, OpCode::Ping, None, vec![])) - .await -} - -#[op] pub async fn op_ws_send_pong( state: Rc<RefCell<OpState>>, rid: ResourceId, @@ -463,58 +438,55 @@ pub async fn op_ws_next_event( .get::<ServerWebSocket>(rid)?; let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; - let val = match ws.read_frame().await { - Ok(val) => val, - Err(err) => { - // No message was received, socket closed while we waited. - // Try close the stream, ignoring any errors, and report closed status to JavaScript. - if resource.closed.get() { - let _ = state.borrow_mut().resource_table.close(rid); + loop { + let val = match ws.read_frame().await { + Ok(val) => val, + Err(err) => { + // No message was received, socket closed while we waited. + // Try close the stream, ignoring any errors, and report closed status to JavaScript. + if resource.closed.get() { + let _ = state.borrow_mut().resource_table.close(rid); + return Ok(( + MessageKind::Closed as u16, + StringOrBuffer::Buffer(vec![].into()), + )); + } + return Ok(( - MessageKind::Closed as u16, - StringOrBuffer::Buffer(vec![].into()), + MessageKind::Error as u16, + StringOrBuffer::String(err.to_string()), )); } - - return Ok(( - MessageKind::Error as u16, - StringOrBuffer::String(err.to_string()), - )); - } - }; - - let res = match val.opcode { - OpCode::Text => ( - MessageKind::Text as u16, - StringOrBuffer::String(String::from_utf8(val.payload).unwrap()), - ), - OpCode::Binary => ( - MessageKind::Binary as u16, - StringOrBuffer::Buffer(val.payload.into()), - ), - OpCode::Close => { - if val.payload.len() < 2 { - return Ok((1005, StringOrBuffer::String("".to_string()))); + }; + + break Ok(match val.opcode { + OpCode::Text => ( + MessageKind::Text as u16, + StringOrBuffer::String(String::from_utf8(val.payload).unwrap()), + ), + OpCode::Binary => ( + MessageKind::Binary as u16, + StringOrBuffer::Buffer(val.payload.into()), + ), + OpCode::Close => { + if val.payload.len() < 2 { + return Ok((1005, StringOrBuffer::String("".to_string()))); + } + + let close_code = + CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]])); + let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap(); + (close_code.into(), StringOrBuffer::String(reason)) } - - let close_code = - CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]])); - let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap(); - (close_code.into(), StringOrBuffer::String(reason)) - } - OpCode::Ping => ( - MessageKind::Ping as u16, - StringOrBuffer::Buffer(vec![].into()), - ), - OpCode::Pong => ( - MessageKind::Pong as u16, - StringOrBuffer::Buffer(vec![].into()), - ), - OpCode::Continuation => { - return Err(type_error("Unexpected continuation frame")) - } - }; - Ok(res) + OpCode::Pong => ( + MessageKind::Pong as u16, + StringOrBuffer::Buffer(vec![].into()), + ), + OpCode::Continuation | OpCode::Ping => { + continue; + } + }); + } } deno_core::extension!(deno_websocket, @@ -527,7 +499,6 @@ deno_core::extension!(deno_websocket, op_ws_next_event, op_ws_send_binary, op_ws_send_text, - op_ws_send_ping, op_ws_send_pong, op_ws_server_create, ], |