diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-06-13 11:16:17 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-13 17:16:17 +0000 |
commit | 72da18dd476e269a8677e698c01b08e683fac4a5 (patch) | |
tree | beb286de172982ac62fcfad8216dda583bb44681 | |
parent | ceb03cfb037cf7024a5048b17b508ddda59cfa05 (diff) |
fix(ext/websockets): ensure we fully send frames before close (#19484)
Fixes #19483
-rw-r--r-- | cli/tests/unit/websocket_test.ts | 54 | ||||
-rw-r--r-- | ext/websocket/lib.rs | 37 |
2 files changed, 81 insertions, 10 deletions
diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts index 16384da40..ac33f9d69 100644 --- a/cli/tests/unit/websocket_test.ts +++ b/cli/tests/unit/websocket_test.ts @@ -100,7 +100,7 @@ Deno.test( promise.resolve(); }; - await Promise.all([promise, server]); + await Promise.all([promise, server.finished]); ws.close(); }, ); @@ -145,7 +145,57 @@ Deno.test({ ws.onclose = () => { promise.resolve(); }; - await Promise.all([promise, server]); + await Promise.all([promise, server.finished]); +}); + +// https://github.com/denoland/deno/issues/19483 +Deno.test({ + sanitizeOps: false, + sanitizeResources: false, +}, async function websocketCloseFlushes() { + const promise = deferred(); + + const ac = new AbortController(); + const listeningPromise = deferred(); + + const server = Deno.serve({ + handler: (req) => { + const { response, socket } = Deno.upgradeWebSocket(req); + socket.onopen = () => socket.send("Hello"); + socket.onmessage = () => { + socket.send("Bye"); + socket.close(); + }; + socket.onclose = () => ac.abort(); + socket.onerror = () => fail(); + return response; + }, + signal: ac.signal, + onListen: () => listeningPromise.resolve(), + hostname: "localhost", + port: 4247, + }); + + await listeningPromise; + + const ws = new WebSocket("ws://localhost:4247/"); + assertEquals(ws.url, "ws://localhost:4247/"); + let seenBye = false; + ws.onerror = () => fail(); + ws.onmessage = ({ data }) => { + if (data == "Hello") { + ws.send("Hello!"); + } else { + assertEquals(data, "Bye"); + seenBye = true; + } + }; + ws.onclose = () => { + promise.resolve(); + }; + await Promise.all([promise, server.finished]); + + assert(seenBye); }); Deno.test( diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index bcbec2c5e..cbf9f8ff1 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -6,6 +6,7 @@ use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; use deno_core::url; +use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; @@ -354,12 +355,19 @@ impl ServerWebSocket { } } + /// Reserve a lock, but don't wait on it. This gets us our place in line. + pub fn reserve_lock(self: &Rc<Self>) -> AsyncMutFuture<()> { + RcRef::map(self, |r| &r.tx_lock).borrow_mut() + } + #[inline] pub async fn write_frame( self: &Rc<Self>, + lock: AsyncMutFuture<()>, frame: Frame, ) -> Result<(), AnyError> { - let _lock = RcRef::map(self, |r| &r.tx_lock).borrow_mut().await; + lock.await; + // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket // to populate the write buffer. We encounter an await point when writing // to the socket after the frame has already been written to the buffer. @@ -407,9 +415,10 @@ pub fn op_ws_send_binary( let data = data.to_vec(); let len = data.len(); resource.buffered.set(resource.buffered.get() + len); + let lock = resource.reserve_lock(); deno_core::task::spawn(async move { if let Err(err) = resource - .write_frame(Frame::new(true, OpCode::Binary, None, data)) + .write_frame(lock, Frame::new(true, OpCode::Binary, None, data)) .await { resource.set_error(Some(err.to_string())); @@ -424,9 +433,13 @@ pub fn op_ws_send_text(state: &mut OpState, rid: ResourceId, data: String) { let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap(); let len = data.len(); resource.buffered.set(resource.buffered.get() + len); + let lock = resource.reserve_lock(); deno_core::task::spawn(async move { if let Err(err) = resource - .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + .write_frame( + lock, + Frame::new(true, OpCode::Text, None, data.into_bytes()), + ) .await { resource.set_error(Some(err.to_string())); @@ -448,8 +461,9 @@ pub async fn op_ws_send_binary_async( .resource_table .get::<ServerWebSocket>(rid)?; let data = data.to_vec(); + let lock = resource.reserve_lock(); resource - .write_frame(Frame::new(true, OpCode::Binary, None, data)) + .write_frame(lock, Frame::new(true, OpCode::Binary, None, data)) .await } @@ -464,8 +478,12 @@ pub async fn op_ws_send_text_async( .borrow_mut() .resource_table .get::<ServerWebSocket>(rid)?; + let lock = resource.reserve_lock(); resource - .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes())) + .write_frame( + lock, + Frame::new(true, OpCode::Text, None, data.into_bytes()), + ) .await } @@ -488,7 +506,8 @@ pub async fn op_ws_send_pong( .borrow_mut() .resource_table .get::<ServerWebSocket>(rid)?; - resource.write_frame(Frame::pong(vec![])).await + let lock = resource.reserve_lock(); + resource.write_frame(lock, Frame::pong(vec![])).await } #[op] @@ -500,8 +519,9 @@ pub async fn op_ws_send_ping( .borrow_mut() .resource_table .get::<ServerWebSocket>(rid)?; + let lock = resource.reserve_lock(); resource - .write_frame(Frame::new(true, OpCode::Ping, None, vec![])) + .write_frame(lock, Frame::new(true, OpCode::Ping, None, vec![])) .await } @@ -521,7 +541,8 @@ pub async fn op_ws_close( .unwrap_or_else(|| Frame::close_raw(vec![])); resource.closed.set(true); - resource.write_frame(frame).await?; + let lock = resource.reserve_lock(); + resource.write_frame(lock, frame).await?; Ok(()) } |