diff options
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r-- | ext/websocket/lib.rs | 37 |
1 files changed, 29 insertions, 8 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index bcbec2c5e..cbf9f8ff1 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -6,6 +6,7 @@ use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; use deno_core::url; +use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; @@ -354,12 +355,19 @@ impl ServerWebSocket { } } + /// Reserve a lock, but don't wait on it. This gets us our place in line. + pub fn reserve_lock(self: &Rc<Self>) -> AsyncMutFuture<()> { + RcRef::map(self, |r| &r.tx_lock).borrow_mut() + } + #[inline] pub async fn write_frame( self: &Rc<Self>, + lock: AsyncMutFuture<()>, frame: Frame, ) -> Result<(), AnyError> { - let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await; + lock.await; + // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket // to populate the write buffer. We encounter an await point when writing // to the socket after the frame has already been written to the buffer. @@ -407,9 +415,10 @@ pub fn op_ws_send_binary( let data = data.to_vec(); let len = data.len(); resource.buffered.set(resource.buffered.get() + len); + let lock = resource.reserve_lock(); deno_core::task::spawn(async move { if let Err(err) = resource - .write_frame(Frame::new(true, OpCode::Binary, None, data)) + .write_frame(lock, Frame::new(true, OpCode::Binary, None, data)) .await { resource.set_error(Some(err.to_string())); @@ -424,9 +433,13 @@ pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) { let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap(); let len = data.len(); resource.buffered.set(resource.buffered.get() + len); + let lock = resource.reserve_lock(); deno_core::task::spawn(async move { if let Err(err) = resource - .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + .write_frame( + lock, + Frame::new(true, OpCode::Text, None, data.into_bytes()), + ) .await { resource.set_error(Some(err.to_string())); @@ -448,8 +461,9 @@ pub async fn op_ws_send_binary_async( .resource_table .get::<ServerWebSocket>(rid)?; let data = data.to_vec(); + let lock = resource.reserve_lock(); resource - .write_frame(Frame::new(true, OpCode::Binary, None, data)) + .write_frame(lock, Frame::new(true, OpCode::Binary, None, data)) .await } @@ -464,8 +478,12 @@ pub async fn op_ws_send_text_async( .borrow_mut() .resource_table .get::<ServerWebSocket>(rid)?; + let lock = resource.reserve_lock(); resource - .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + .write_frame( + lock, + Frame::new(true, OpCode::Text, None, data.into_bytes()), + ) .await } @@ -488,7 +506,8 @@ pub async fn op_ws_send_pong( .borrow_mut() .resource_table .get::<ServerWebSocket>(rid)?; - resource.write_frame(Frame::pong(vec![])).await + let lock = resource.reserve_lock(); + resource.write_frame(lock, Frame::pong(vec![])).await } #[op] @@ -500,8 +519,9 @@ pub async fn op_ws_send_ping( .borrow_mut() .resource_table .get::<ServerWebSocket>(rid)?; + let lock = resource.reserve_lock(); resource - .write_frame(Frame::new(true, OpCode::Ping, None, vec![])) + .write_frame(lock, Frame::new(true, OpCode::Ping, None, vec![])) .await } @@ -521,7 +541,8 @@ pub async fn op_ws_close( .unwrap_or_else(|| Frame::close_raw(vec![])); resource.closed.set(true); - resource.write_frame(frame).await?; + let lock = resource.reserve_lock(); + resource.write_frame(lock, frame).await?; Ok(()) } |