diff options
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r-- | ext/websocket/lib.rs | 88 |
1 files changed, 76 insertions, 12 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index f2101b413..af987c1e4 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -281,8 +281,10 @@ where } let resource = ServerWebSocket { + buffered: Cell::new(0), + errored: Cell::new(None), ws: AsyncRefCell::new(FragmentCollector::new(stream)), - closed: Rc::new(Cell::new(false)), + closed: Cell::new(false), tx_lock: AsyncRefCell::new(()), }; let mut state = state.borrow_mut(); @@ -315,18 +317,20 @@ pub enum MessageKind { } pub struct ServerWebSocket { + buffered: Cell<usize>, + errored: Cell<Option<AnyError>>, ws: AsyncRefCell<FragmentCollector<WebSocketStream>>, - closed: Rc<Cell<bool>>, + closed: Cell<bool>, tx_lock: AsyncRefCell<()>, } impl ServerWebSocket { #[inline] pub async fn write_frame( - self: Rc<Self>, + self: &Rc<Self>, frame: Frame, ) -> Result<(), AnyError> { - let _lock = RcRef::map(&self, |r| &r.tx_lock).borrow_mut().await; + let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await; // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket // to populate the write buffer. We encounter an await point when writing // to the socket after the frame has already been written to the buffer. @@ -361,8 +365,10 @@ pub fn ws_create_server_stream( ws.set_auto_pong(true); let ws_resource = ServerWebSocket { + buffered: Cell::new(0), + errored: Cell::new(None), ws: AsyncRefCell::new(FragmentCollector::new(ws)), - closed: Rc::new(Cell::new(false)), + closed: Cell::new(false), tx_lock: AsyncRefCell::new(()), }; @@ -370,8 +376,48 @@ pub fn ws_create_server_stream( Ok(rid) } -#[op] -pub async fn op_ws_send_binary( +#[op(fast)] +pub fn op_ws_send_binary( + state: &mut OpState, + rid: ResourceId, + data: ZeroCopyBuf, +) { + let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap(); + let data = data.to_vec(); + let len = data.len(); + resource.buffered.set(resource.buffered.get() + len); + deno_core::task::spawn(async move { + if let Err(err) = resource + .write_frame(Frame::new(true, OpCode::Binary, None, data)) + .await + { + resource.errored.set(Some(err)); + } else { + resource.buffered.set(resource.buffered.get() - len); + } + }); +} + +#[op(fast)] +pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) { + let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap(); + let len = data.len(); + resource.buffered.set(resource.buffered.get() + len); + deno_core::task::spawn(async move { + if let Err(err) = resource + .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + .await + { + resource.errored.set(Some(err)); + } else { + resource.buffered.set(resource.buffered.get() - len); + } + }); +} + +/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure. +#[op(fast)] +pub async fn op_ws_send_binary_async( state: Rc<RefCell<OpState>>, rid: ResourceId, data: ZeroCopyBuf, @@ -380,13 +426,15 @@ pub async fn op_ws_send_binary( .borrow_mut() .resource_table .get::<ServerWebSocket>(rid)?; + let data = data.to_vec(); resource - .write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec())) + .write_frame(Frame::new(true, OpCode::Binary, None, data)) .await } -#[op] -pub async fn op_ws_send_text( +/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure. +#[op(fast)] +pub async fn op_ws_send_text_async( state: Rc<RefCell<OpState>>, rid: ResourceId, data: String, @@ -400,6 +448,16 @@ pub async fn op_ws_send_text( .await } +#[op(fast)] +pub fn op_ws_get_buffered_amount(state: &mut OpState, rid: ResourceId) -> u32 { + state + .resource_table + .get::<ServerWebSocket>(rid) + .unwrap() + .buffered + .get() as u32 +} + #[op] pub async fn op_ws_send_pong( state: Rc<RefCell<OpState>>, @@ -441,8 +499,7 @@ pub async fn op_ws_close( .map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes())) .unwrap_or_else(|| Frame::close_raw(vec![])); - let cell = Rc::clone(&resource.closed); - cell.set(true); + resource.closed.set(true); resource.write_frame(frame).await?; Ok(()) } @@ -457,6 +514,10 @@ pub async fn op_ws_next_event( .resource_table .get::<ServerWebSocket>(rid)?; + if let Some(err) = resource.errored.take() { + return Err(err); + } + let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await; loop { let val = match ws.read_frame().await { @@ -519,8 +580,11 @@ deno_core::extension!(deno_websocket, op_ws_next_event, op_ws_send_binary, op_ws_send_text, + op_ws_send_binary_async, + op_ws_send_text_async, op_ws_send_ping, op_ws_send_pong, + op_ws_get_buffered_amount, ], esm = [ "01_websocket.js", "02_websocketstream.js" ], options = { |