summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/websocket_test.ts54
-rw-r--r--ext/websocket/lib.rs37
2 files changed, 81 insertions, 10 deletions
diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts
index 16384da40..ac33f9d69 100644
--- a/cli/tests/unit/websocket_test.ts
+++ b/cli/tests/unit/websocket_test.ts
@@ -100,7 +100,7 @@ Deno.test(
promise.resolve();
};
- await Promise.all([promise, server]);
+ await Promise.all([promise, server.finished]);
ws.close();
},
);
@@ -145,7 +145,57 @@ Deno.test({
ws.onclose = () => {
promise.resolve();
};
- await Promise.all([promise, server]);
+ await Promise.all([promise, server.finished]);
+});
+
+// https://github.com/denoland/deno/issues/19483
+Deno.test({
+ sanitizeOps: false,
+ sanitizeResources: false,
+}, async function websocketCloseFlushes() {
+ const promise = deferred();
+
+ const ac = new AbortController();
+ const listeningPromise = deferred();
+
+ const server = Deno.serve({
+ handler: (req) => {
+ const { response, socket } = Deno.upgradeWebSocket(req);
+ socket.onopen = () => socket.send("Hello");
+ socket.onmessage = () => {
+ socket.send("Bye");
+ socket.close();
+ };
+ socket.onclose = () => ac.abort();
+ socket.onerror = () => fail();
+ return response;
+ },
+ signal: ac.signal,
+ onListen: () => listeningPromise.resolve(),
+ hostname: "localhost",
+ port: 4247,
+ });
+
+ await listeningPromise;
+
+ const ws = new WebSocket("ws://localhost:4247/");
+ assertEquals(ws.url, "ws://localhost:4247/");
+ let seenBye = false;
+ ws.onerror = () => fail();
+ ws.onmessage = ({ data }) => {
+ if (data == "Hello") {
+ ws.send("Hello!");
+ } else {
+ assertEquals(data, "Bye");
+ seenBye = true;
+ }
+ };
+ ws.onclose = () => {
+ promise.resolve();
+ };
+ await Promise.all([promise, server.finished]);
+
+ assert(seenBye);
});
Deno.test(
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index bcbec2c5e..cbf9f8ff1 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -6,6 +6,7 @@ use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::url;
+use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
@@ -354,12 +355,19 @@ impl ServerWebSocket {
}
}
+ /// Reserve a lock, but don't wait on it. This gets us our place in line.
+ pub fn reserve_lock(self: &Rc<Self>) -> AsyncMutFuture<()> {
+ RcRef::map(self, |r| &r.tx_lock).borrow_mut()
+ }
+
#[inline]
pub async fn write_frame(
self: &Rc<Self>,
+ lock: AsyncMutFuture<()>,
frame: Frame,
) -> Result<(), AnyError> {
- let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await;
+ lock.await;
+
// SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
// to populate the write buffer. We encounter an await point when writing
// to the socket after the frame has already been written to the buffer.
@@ -407,9 +415,10 @@ pub fn op_ws_send_binary(
let data = data.to_vec();
let len = data.len();
resource.buffered.set(resource.buffered.get() + len);
+ let lock = resource.reserve_lock();
deno_core::task::spawn(async move {
if let Err(err) = resource
- .write_frame(Frame::new(true, OpCode::Binary, None, data))
+ .write_frame(lock, Frame::new(true, OpCode::Binary, None, data))
.await
{
resource.set_error(Some(err.to_string()));
@@ -424,9 +433,13 @@ pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) {
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
let len = data.len();
resource.buffered.set(resource.buffered.get() + len);
+ let lock = resource.reserve_lock();
deno_core::task::spawn(async move {
if let Err(err) = resource
- .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
+ .write_frame(
+ lock,
+ Frame::new(true, OpCode::Text, None, data.into_bytes()),
+ )
.await
{
resource.set_error(Some(err.to_string()));
@@ -448,8 +461,9 @@ pub async fn op_ws_send_binary_async(
.resource_table
.get::<ServerWebSocket>(rid)?;
let data = data.to_vec();
+ let lock = resource.reserve_lock();
resource
- .write_frame(Frame::new(true, OpCode::Binary, None, data))
+ .write_frame(lock, Frame::new(true, OpCode::Binary, None, data))
.await
}
@@ -464,8 +478,12 @@ pub async fn op_ws_send_text_async(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
+ let lock = resource.reserve_lock();
resource
- .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
+ .write_frame(
+ lock,
+ Frame::new(true, OpCode::Text, None, data.into_bytes()),
+ )
.await
}
@@ -488,7 +506,8 @@ pub async fn op_ws_send_pong(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
- resource.write_frame(Frame::pong(vec![])).await
+ let lock = resource.reserve_lock();
+ resource.write_frame(lock, Frame::pong(vec![])).await
}
#[op]
@@ -500,8 +519,9 @@ pub async fn op_ws_send_ping(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
+ let lock = resource.reserve_lock();
resource
- .write_frame(Frame::new(true, OpCode::Ping, None, vec![]))
+ .write_frame(lock, Frame::new(true, OpCode::Ping, None, vec![]))
.await
}
@@ -521,7 +541,8 @@ pub async fn op_ws_close(
.unwrap_or_else(|| Frame::close_raw(vec![]));
resource.closed.set(true);
- resource.write_frame(frame).await?;
+ let lock = resource.reserve_lock();
+ resource.write_frame(lock, frame).await?;
Ok(())
}