summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/websocket_test.ts46
-rw-r--r--ext/websocket/server.rs43
2 files changed, 69 insertions, 20 deletions
diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts
index 6737edc11..948e2add2 100644
--- a/cli/tests/unit/websocket_test.ts
+++ b/cli/tests/unit/websocket_test.ts
@@ -36,3 +36,49 @@ Deno.test(async function websocketPingPong() {
await promise;
ws.close();
});
+
+// https://github.com/denoland/deno/issues/18700
+Deno.test(
+ { sanitizeOps: false, sanitizeResources: false },
+ async function websocketWriteLock() {
+ const ac = new AbortController();
+ const listeningPromise = deferred();
+
+ const server = Deno.serve({
+ handler: (req) => {
+ const { socket, response } = Deno.upgradeWebSocket(req);
+ socket.onopen = function () {
+ setTimeout(() => socket.send("Hello"), 500);
+ };
+ socket.onmessage = function (e) {
+ assertEquals(e.data, "Hello");
+ ac.abort();
+ };
+ return response;
+ },
+ signal: ac.signal,
+ onListen: () => listeningPromise.resolve(),
+ hostname: "localhost",
+ port: 4246,
+ });
+
+ await listeningPromise;
+ const promise = deferred();
+ const ws = new WebSocket("ws://localhost:4246/");
+ assertEquals(ws.url, "ws://localhost:4246/");
+ ws.onerror = () => fail();
+ ws.onmessage = (e) => {
+ assertEquals(e.data, "Hello");
+ setTimeout(() => {
+ ws.send(e.data);
+ }, 1000);
+ promise.resolve();
+ };
+ ws.onclose = () => {
+ promise.resolve();
+ };
+
+ await Promise.all([promise, server]);
+ ws.close();
+ },
+);
diff --git a/ext/websocket/server.rs b/ext/websocket/server.rs
index eb8737b19..44bc07e59 100644
--- a/ext/websocket/server.rs
+++ b/ext/websocket/server.rs
@@ -28,6 +28,23 @@ pub struct ServerWebSocket {
ws: AsyncRefCell<FragmentCollector<Pin<Box<dyn Upgraded>>>>,
}
+impl ServerWebSocket {
+ #[inline]
+ pub async fn write_frame(
+ self: Rc<Self>,
+ frame: Frame,
+ ) -> Result<(), AnyError> {
+ // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
+ // to populate the write buffer. We encounter an await point when writing
+ // to the socket after the frame has already been written to the buffer.
+ let ws = unsafe { &mut *self.ws.as_ptr() };
+ ws.write_frame(frame)
+ .await
+ .map_err(|err| type_error(err.to_string()))?;
+ Ok(())
+ }
+}
+
impl Resource for ServerWebSocket {
fn name(&self) -> Cow<str> {
"serverWebSocket".into()
@@ -61,12 +78,9 @@ pub async fn op_server_ws_send_binary(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
-
- let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
- ws.write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
+ resource
+ .write_frame(Frame::new(true, OpCode::Binary, None, data.to_vec()))
.await
- .map_err(|err| type_error(err.to_string()))?;
- Ok(())
}
#[op]
@@ -79,11 +93,9 @@ pub async fn op_server_ws_send_text(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
- let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
- ws.write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
+ resource
+ .write_frame(Frame::new(true, OpCode::Text, None, data.into_bytes()))
.await
- .map_err(|err| type_error(err.to_string()))?;
- Ok(())
}
#[op]
@@ -107,12 +119,7 @@ pub async fn op_server_ws_send(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
- let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
-
- ws.write_frame(msg)
- .await
- .map_err(|err| type_error(err.to_string()))?;
- Ok(())
+ resource.write_frame(msg).await
}
#[op(deferred)]
@@ -126,14 +133,10 @@ pub async fn op_server_ws_close(
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)?;
- let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
let frame = reason
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
.unwrap_or_else(|| Frame::close_raw(vec![]));
- ws.write_frame(frame)
- .await
- .map_err(|err| type_error(err.to_string()))?;
- Ok(())
+ resource.write_frame(frame).await
}
#[op(deferred)]