diff options
Diffstat (limited to 'ext/websocket/server.rs')
-rw-r--r-- | ext/websocket/server.rs | 194 |
1 files changed, 0 insertions, 194 deletions
diff --git a/ext/websocket/server.rs b/ext/websocket/server.rs deleted file mode 100644 index 44bc07e59..000000000 --- a/ext/websocket/server.rs +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -use crate::MessageKind; -use crate::SendValue; -use crate::Upgraded; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::op; -use deno_core::AsyncRefCell; -use deno_core::OpState; -use deno_core::RcRef; -use deno_core::Resource; -use deno_core::ResourceId; -use deno_core::StringOrBuffer; -use deno_core::ZeroCopyBuf; -use std::borrow::Cow; -use std::cell::RefCell; -use std::pin::Pin; -use std::rc::Rc; - -use fastwebsockets::CloseCode; -use fastwebsockets::FragmentCollector; -use fastwebsockets::Frame; -use fastwebsockets::OpCode; -use fastwebsockets::WebSocket; - -pub struct ServerWebSocket { - ws: AsyncRefCell<FragmentCollector<Pin<Box<dyn Upgraded>>>>, -} - -impl ServerWebSocket { - #[inline] - pub async fn write_frame( - self: Rc<Self>, - frame: Frame, - ) -> Result<(), AnyError> { - // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket - // to populate the write buffer. We encounter an await point when writing - // to the socket after the frame has already been written to the buffer. - let ws = unsafe { &mut *self.ws.as_ptr() }; - ws.write_frame(frame) - .await - .map_err(|err| type_error(err.to_string()))?; - Ok(()) - } -} - -impl Resource for ServerWebSocket { - fn name(&self) -> Cow<str> { - "serverWebSocket".into() - } -} -pub async fn ws_create_server_stream( - state: &Rc<RefCell<OpState>>, - transport: Pin<Box<dyn Upgraded>>, -) -> Result<ResourceId, AnyError> { - let mut ws = WebSocket::after_handshake(transport); - ws.set_writev(false); - ws.set_auto_close(true); - ws.set_auto_pong(true); - - let ws_resource = ServerWebSocket { - ws: AsyncRefCell::new(FragmentCollector::new(ws)), - }; - - let resource_table = &mut state.borrow_mut().resource_table; - let rid = resource_table.add(ws_resource); - Ok(rid) -} - -#[op] -pub async fn op_server_ws_send_binary( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - data: ZeroCopyBuf, -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::<ServerWebSocket>(rid)?; - resource - .write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec())) - .await -} - -#[op] -pub async fn op_server_ws_send_text( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - data: String, -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::<ServerWebSocket>(rid)?; - resource - .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) - .await -} - -#[op] -pub async fn op_server_ws_send( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - value: SendValue, -) -> Result<(), AnyError> { - let msg = match value { - SendValue::Text(text) => { - Frame::new(true, OpCode::Text, None, text.into_bytes()) - } - SendValue::Binary(buf) => { - Frame::new(true, OpCode::Binary, None, buf.to_vec()) - } - SendValue::Pong => Frame::new(true, OpCode::Pong, None, vec![]), - SendValue::Ping => Frame::new(true, OpCode::Ping, None, vec![]), - }; - - let resource = state - .borrow_mut() - .resource_table - .get::<ServerWebSocket>(rid)?; - resource.write_frame(msg).await -} - -#[op(deferred)] -pub async fn op_server_ws_close( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - code: Option<u16>, - reason: Option<String>, -) -> Result<(), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::<ServerWebSocket>(rid)?; - let frame = reason - .map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes())) - .unwrap_or_else(|| Frame::close_raw(vec![])); - resource.write_frame(frame).await -} - -#[op(deferred)] -pub async fn op_server_ws_next_event( - state: Rc<RefCell<OpState>>, - rid: ResourceId, -) -> Result<(u16, StringOrBuffer), AnyError> { - let resource = state - .borrow_mut() - .resource_table - .get::<ServerWebSocket>(rid)?; - let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; - let val = match ws.read_frame().await { - Ok(val) => val, - Err(err) => { - return Ok(( - MessageKind::Error as u16, - StringOrBuffer::String(err.to_string()), - )) - } - }; - - let res = match val.opcode { - OpCode::Text => ( - MessageKind::Text as u16, - StringOrBuffer::String(String::from_utf8(val.payload).unwrap()), - ), - OpCode::Binary => ( - MessageKind::Binary as u16, - StringOrBuffer::Buffer(val.payload.into()), - ), - OpCode::Close => { - if val.payload.len() < 2 { - return Ok((1005, StringOrBuffer::String("".to_string()))); - } - - let close_code = - CloseCode::from(u16::from_be_bytes([val.payload[0], val.payload[1]])); - let reason = String::from_utf8(val.payload[2..].to_vec()).unwrap(); - (close_code.into(), StringOrBuffer::String(reason)) - } - OpCode::Ping => ( - MessageKind::Ping as u16, - StringOrBuffer::Buffer(vec![].into()), - ), - OpCode::Pong => ( - MessageKind::Pong as u16, - StringOrBuffer::Buffer(vec![].into()), - ), - OpCode::Continuation => { - return Err(type_error("Unexpected continuation frame")) - } - }; - Ok(res) -} |