summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-10-30 11:49:19 -0600
committerGitHub <noreply@github.com>2023-10-30 11:49:19 -0600
commitb75f3b5ca0952db8b50cf417c107f3f14fe582d5 (patch)
treed430e93e2e5a1618995a7042b357c077073162fc
parent09204107d85351dae07a45f6a9684b5b6e573652 (diff)
feat(ext/websocket): split websocket read/write halves (#20579)
Fixes some UB when sending and receiving at the same time.
-rw-r--r--Cargo.lock4
-rw-r--r--Cargo.toml2
-rw-r--r--cli/tests/integration/js_unit_tests.rs14
-rw-r--r--cli/tests/unit/websocket_test.ts61
-rw-r--r--ext/websocket/Cargo.toml2
-rw-r--r--ext/websocket/lib.rs46
6 files changed, 103 insertions, 26 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ea7e774f3..d5c1f5558 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2259,9 +2259,9 @@ checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
name = "fastwebsockets"
-version = "0.4.4"
+version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9e6185b6dc9dddc4db0dedd2e213047e93bcbf7a0fb092abc4c4e4f3195efdb4"
+checksum = "17c35f166afb94b7f8e9449d0ad866daca111ba4053f3b1960bb480ca4382c63"
dependencies = [
"base64 0.21.4",
"hyper 0.14.27",
diff --git a/Cargo.toml b/Cargo.toml
index c2765ab66..e859641dd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -87,7 +87,7 @@ data-encoding = "2.3.3"
dlopen = "0.1.8"
encoding_rs = "=0.8.33"
ecb = "=0.1.2"
-fastwebsockets = "=0.4.4"
+fastwebsockets = "=0.5.0"
filetime = "0.2.16"
flate2 = { version = "1.0.26", features = ["zlib-ng"], default-features = false }
fs3 = "0.5.0"
diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs
index a13db02d7..f110f8aa6 100644
--- a/cli/tests/integration/js_unit_tests.rs
+++ b/cli/tests/integration/js_unit_tests.rs
@@ -112,12 +112,22 @@ util::unit_test_factory!(
fn js_unit_test(test: String) {
let _g = util::http_server();
- let mut deno = util::deno_cmd()
+ let mut deno = util::deno_cmd();
+ let deno = deno
.current_dir(util::root_path())
.arg("test")
.arg("--unstable")
.arg("--location=http://js-unit-tests/foo/bar")
- .arg("--no-prompt")
+ .arg("--no-prompt");
+
+ // TODO(mmastrac): it would be better to just load a test CA for all tests
+ let deno = if test == "websocket_test" {
+ deno.arg("--unsafely-ignore-certificate-errors")
+ } else {
+ deno
+ };
+
+ let mut deno = deno
.arg("-A")
.arg(util::tests_path().join("unit").join(format!("{test}.ts")))
.stderr(Stdio::piped())
diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts
index 11f0fd7dc..b761cd118 100644
--- a/cli/tests/unit/websocket_test.ts
+++ b/cli/tests/unit/websocket_test.ts
@@ -21,7 +21,7 @@ Deno.test(async function websocketConstructorTakeURLObjectAsParameter() {
const promise = deferred();
const ws = new WebSocket(new URL("ws://localhost:4242/"));
assertEquals(ws.url, "ws://localhost:4242/");
- ws.onerror = () => fail();
+ ws.onerror = (e) => promise.reject(e);
ws.onopen = () => ws.close();
ws.onclose = () => {
promise.resolve();
@@ -29,13 +29,66 @@ Deno.test(async function websocketConstructorTakeURLObjectAsParameter() {
await promise;
});
+Deno.test(async function websocketSendLargePacket() {
+ const promise = deferred();
+ const ws = new WebSocket(new URL("wss://localhost:4243/"));
+ assertEquals(ws.url, "wss://localhost:4243/");
+ ws.onerror = (e) => promise.reject(e);
+ ws.onopen = () => {
+ ws.send("a".repeat(65000));
+ };
+ ws.onmessage = () => {
+ ws.close();
+ };
+ ws.onclose = () => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test(async function websocketSendLargeBinaryPacket() {
+ const promise = deferred();
+ const ws = new WebSocket(new URL("wss://localhost:4243/"));
+ assertEquals(ws.url, "wss://localhost:4243/");
+ ws.onerror = (e) => promise.reject(e);
+ ws.onopen = () => {
+ ws.send(new Uint8Array(65000));
+ };
+ ws.onmessage = (msg) => {
+ console.log(msg);
+ ws.close();
+ };
+ ws.onclose = () => {
+ promise.resolve();
+ };
+ await promise;
+});
+
+Deno.test(async function websocketSendLargeBlobPacket() {
+ const promise = deferred();
+ const ws = new WebSocket(new URL("wss://localhost:4243/"));
+ assertEquals(ws.url, "wss://localhost:4243/");
+ ws.onerror = (e) => promise.reject(e);
+ ws.onopen = () => {
+ ws.send(new Blob(["a".repeat(65000)]));
+ };
+ ws.onmessage = (msg) => {
+ console.log(msg);
+ ws.close();
+ };
+ ws.onclose = () => {
+ promise.resolve();
+ };
+ await promise;
+});
+
// https://github.com/denoland/deno/pull/17762
// https://github.com/denoland/deno/issues/17761
Deno.test(async function websocketPingPong() {
const promise = deferred();
const ws = new WebSocket("ws://localhost:4245/");
assertEquals(ws.url, "ws://localhost:4245/");
- ws.onerror = () => fail();
+ ws.onerror = (e) => promise.reject(e);
ws.onmessage = (e) => {
ws.send(e.data);
};
@@ -144,7 +197,9 @@ Deno.test({
const ws = new WebSocket(serveUrl);
assertEquals(ws.url, serveUrl);
ws.onerror = () => fail();
- ws.onmessage = () => ws.send("bye");
+ ws.onmessage = (m: MessageEvent) => {
+ if (m.data == "Hello") ws.send("bye");
+ };
ws.onclose = () => {
promise.resolve();
};
diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml
index a144d0163..7dd7a9afe 100644
--- a/ext/websocket/Cargo.toml
+++ b/ext/websocket/Cargo.toml
@@ -18,7 +18,7 @@ bytes.workspace = true
deno_core.workspace = true
deno_net.workspace = true
deno_tls.workspace = true
-fastwebsockets = { workspace = true, features = ["upgrade"] }
+fastwebsockets = { workspace = true, features = ["upgrade", "unstable-split"] }
http.workspace = true
hyper = { workspace = true, features = ["backports"] }
once_cell.workspace = true
diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs
index 83d553eeb..0f3456eef 100644
--- a/ext/websocket/lib.rs
+++ b/ext/websocket/lib.rs
@@ -41,17 +41,21 @@ use std::rc::Rc;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
+use tokio::io::ReadHalf;
+use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use tokio_rustls::rustls::RootCertStore;
use tokio_rustls::rustls::ServerName;
use tokio_rustls::TlsConnector;
use fastwebsockets::CloseCode;
-use fastwebsockets::FragmentCollector;
+use fastwebsockets::FragmentCollectorRead;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
use fastwebsockets::Role;
use fastwebsockets::WebSocket;
+use fastwebsockets::WebSocketWrite;
+
mod stream;
static USE_WRITEV: Lazy<bool> = Lazy::new(|| {
@@ -332,12 +336,13 @@ pub struct ServerWebSocket {
closed: Cell<bool>,
buffer: Cell<Option<Vec<u8>>>,
string: Cell<Option<String>>,
- ws: AsyncRefCell<FragmentCollector<WebSocketStream>>,
- tx_lock: AsyncRefCell<()>,
+ ws_read: AsyncRefCell<FragmentCollectorRead<ReadHalf<WebSocketStream>>>,
+ ws_write: AsyncRefCell<WebSocketWrite<WriteHalf<WebSocketStream>>>,
}
impl ServerWebSocket {
fn new(ws: WebSocket<WebSocketStream>) -> Self {
+ let (ws_read, ws_write) = ws.split(tokio::io::split);
Self {
buffered: Cell::new(0),
error: Cell::new(None),
@@ -345,8 +350,8 @@ impl ServerWebSocket {
closed: Cell::new(false),
buffer: Cell::new(None),
string: Cell::new(None),
- ws: AsyncRefCell::new(FragmentCollector::new(ws)),
- tx_lock: AsyncRefCell::new(()),
+ ws_read: AsyncRefCell::new(FragmentCollectorRead::new(ws_read)),
+ ws_write: AsyncRefCell::new(ws_write),
}
}
@@ -361,22 +366,22 @@ impl ServerWebSocket {
}
/// Reserve a lock, but don't wait on it. This gets us our place in line.
- pub fn reserve_lock(self: &Rc<Self>) -> AsyncMutFuture<()> {
- RcRef::map(self, |r| &r.tx_lock).borrow_mut()
+ fn reserve_lock(
+ self: &Rc<Self>,
+ ) -> AsyncMutFuture<WebSocketWrite<WriteHalf<WebSocketStream>>> {
+ RcRef::map(self, |r| &r.ws_write).borrow_mut()
}
#[inline]
- pub async fn write_frame(
+ async fn write_frame(
self: &Rc<Self>,
- lock: AsyncMutFuture<()>,
+ lock: AsyncMutFuture<WebSocketWrite<WriteHalf<WebSocketStream>>>,
frame: Frame<'_>,
) -> Result<(), AnyError> {
- lock.await;
-
- // SAFETY: fastwebsockets only needs a mutable reference to the WebSocket
- // to populate the write buffer. We encounter an await point when writing
- // to the socket after the frame has already been written to the buffer.
- let ws = unsafe { &mut *self.ws.as_ptr() };
+ let mut ws = lock.await;
+ if ws.is_closed() {
+ return Ok(());
+ }
ws.write_frame(frame)
.await
.map_err(|err| type_error(err.to_string()))?;
@@ -405,6 +410,7 @@ pub fn ws_create_server_stream(
ws.set_writev(*USE_WRITEV);
ws.set_auto_close(true);
ws.set_auto_pong(true);
+
let rid = state.resource_table.add(ServerWebSocket::new(ws));
Ok(rid)
}
@@ -627,9 +633,15 @@ pub async fn op_ws_next_event(
return MessageKind::Error as u16;
}
- let mut ws = RcRef::map(&resource, |r| &r.ws).borrow_mut().await;
+ let mut ws = RcRef::map(&resource, |r| &r.ws_read).borrow_mut().await;
+ let writer = RcRef::map(&resource, |r| &r.ws_write);
+ let mut sender = move |frame| {
+ let writer = writer.clone();
+ async move { writer.borrow_mut().await.write_frame(frame).await }
+ };
loop {
- let val = match ws.read_frame().await {
+ let res = ws.read_frame(&mut sender).await;
+ let val = match res {
Ok(val) => val,
Err(err) => {
// No message was received, socket closed while we waited.