summaryrefslogtreecommitdiff
path: root/ext/websocket/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/websocket/lib.rs')
-rw-r--r--ext/websocket/lib.rs37
1 files changed, 29 insertions, 8 deletions
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index bcbec2c5e..cbf9f8ff1 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -6,6 +6,7 @@ use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::url;
+use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
@@ -354,12 +355,19 @@ impl ServerWebSocket {
}
}
+ /// Reserve a lock, but don't wait on it. This gets us our place in line.
+ pub fn reserve_lock(self: &Rc<Self>) -> AsyncMutFuture<()> {
+ RcRef::map(self, |r| &r.tx_lock).borrow_mut()
+ }
+
#[inline]
pub async fn write_frame(
self: &Rc<Self>,
+ lock: AsyncMutFuture<()>,
frame: Frame,
) -> Result<(), AnyError> {
- let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await;
+ lock.await;
+
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
// to populate the write buffer. We encounter an await point when writing
// to the socket after the frame has already been written to the buffer.
@@ -407,9 +415,10 @@ pub fn op_ws_send_binary(
let data = data.to_vec();
let len = data.len();
resource.buffered.set(resource.buffered.get() + len);
+ let lock = resource.reserve_lock();
deno_core::task::spawn(async move {
if let Err(err) = resource
- .write_frame(Frame::new(true, OpCode::Binary, None, data))
+ .write_frame(lock, Frame::new(true, OpCode::Binary, None, data))
.await
{
resource.set_error(Some(err.to_string()));
@@ -424,9 +433,13 @@ pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) {
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
let len = data.len();
resource.buffered.set(resource.buffered.get() + len);
+ let lock = resource.reserve_lock();
deno_core::task::spawn(async move {
if let Err(err) = resource
- .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
+ .write_frame(
+ lock,
+ Frame::new(true, OpCode::Text, None, data.into_bytes()),
+ )
.await
{
resource.set_error(Some(err.to_string()));
@@ -448,8 +461,9 @@ pub async fn op_ws_send_binary_async(
.resource_table
.get::<ServerWebSocket>(rid)?;
let data = data.to_vec();
+ let lock = resource.reserve_lock();
resource
- .write_frame(Frame::new(true, OpCode::Binary, None, data))
+ .write_frame(lock, Frame::new(true, OpCode::Binary, None, data))
.await
}
@@ -464,8 +478,12 @@ pub async fn op_ws_send_text_async(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
+ let lock = resource.reserve_lock();
resource
- .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
+ .write_frame(
+ lock,
+ Frame::new(true, OpCode::Text, None, data.into_bytes()),
+ )
.await
}
@@ -488,7 +506,8 @@ pub async fn op_ws_send_pong(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
- resource.write_frame(Frame::pong(vec![])).await
+ let lock = resource.reserve_lock();
+ resource.write_frame(lock, Frame::pong(vec![])).await
}
#[op]
@@ -500,8 +519,9 @@ pub async fn op_ws_send_ping(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
+ let lock = resource.reserve_lock();
resource
- .write_frame(Frame::new(true, OpCode::Ping, None, vec![]))
+ .write_frame(lock, Frame::new(true, OpCode::Ping, None, vec![]))
.await
}
@@ -521,7 +541,8 @@ pub async fn op_ws_close(
.unwrap_or_else(|| Frame::close_raw(vec![]));
resource.closed.set(true);
- resource.write_frame(frame).await?;
+ let lock = resource.reserve_lock();
+ resource.write_frame(lock, frame).await?;
Ok(())
}