summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock8
-rw-r--r--Cargo.toml1
-rw-r--r--cli/Cargo.toml2
-rw-r--r--cli/tests/integration/inspector_tests.rs90
-rw-r--r--cli/tests/testdata/run/websocket_test.ts2
-rw-r--r--ext/websocket/Cargo.toml2
-rw-r--r--runtime/Cargo.toml1
-rw-r--r--runtime/inspector_server.rs94
-rw-r--r--test_util/Cargo.toml3
-rw-r--r--test_util/src/lib.rs169
10 files changed, 228 insertions, 144 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ceeb2cf2a..114a6e0e8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -721,12 +721,14 @@ dependencies = [
"env_logger",
"eszip",
"fancy-regex",
+ "fastwebsockets",
"flaky_test",
"flate2",
"fs3",
"fwdansi",
"glibc_version",
"http",
+ "hyper",
"import_map 0.15.0",
"indexmap",
"jsonc-parser",
@@ -1235,6 +1237,7 @@ dependencies = [
"deno_webstorage",
"dlopen",
"encoding_rs",
+ "fastwebsockets",
"filetime",
"fs3",
"fwdansi",
@@ -1791,9 +1794,9 @@ dependencies = [
[[package]]
name = "fastwebsockets"
-version = "0.2.4"
+version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fcf2f933f24f45831bd66580a8f9394e440f1f5a23806cf0d4d8b6649e1a01e9"
+checksum = "a9e973e2bd2dbd77cc9e929ede2ce65984a35ac5481976afbfbd509cb40dc965"
dependencies = [
"base64 0.21.0",
"cc",
@@ -4864,6 +4867,7 @@ dependencies = [
"atty",
"base64 0.13.1",
"console_static_text",
+ "fastwebsockets",
"flate2",
"futures",
"hyper",
diff --git a/Cargo.toml b/Cargo.toml
index 6b49de231..aa12e1629 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -91,6 +91,7 @@ data-url = "=0.2.0"
dlopen = "0.1.8"
encoding_rs = "=0.8.31"
ecb = "=0.1.1"
+fastwebsockets = "=0.2.5"
flate2 = "=1.0.24"
fs3 = "0.5.0"
futures = "0.3.21"
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 96fe458ae..ebd858330 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -72,9 +72,11 @@ encoding_rs.workspace = true
env_logger = "=0.9.0"
eszip = "=0.41.0"
fancy-regex = "=0.10.0"
+fastwebsockets.workspace = true
flate2.workspace = true
fs3.workspace = true
http.workspace = true
+hyper.workspace = true
import_map = "=0.15.0"
indexmap.workspace = true
jsonc-parser = { version = "=0.21.0", features = ["serde"] }
diff --git a/cli/tests/integration/inspector_tests.rs b/cli/tests/integration/inspector_tests.rs
index 067963786..35ff01403 100644
--- a/cli/tests/integration/inspector_tests.rs
+++ b/cli/tests/integration/inspector_tests.rs
@@ -1,15 +1,16 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use deno_core::anyhow::anyhow;
use deno_core::error::AnyError;
-use deno_core::futures::prelude::*;
-use deno_core::futures::stream::SplitSink;
-use deno_core::futures::stream::SplitStream;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::url;
use deno_runtime::deno_fetch::reqwest;
use deno_runtime::deno_websocket::tokio_tungstenite;
-use deno_runtime::deno_websocket::tokio_tungstenite::tungstenite;
+use fastwebsockets::FragmentCollector;
+use fastwebsockets::Frame;
+use hyper::upgrade::Upgraded;
+use hyper::Request;
use std::io::BufRead;
use test_util as util;
use test_util::TempDir;
@@ -17,18 +18,20 @@ use tokio::net::TcpStream;
use util::http_server;
use util::DenoChild;
+struct SpawnExecutor;
+
+impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
+where
+ Fut: std::future::Future + Send + 'static,
+ Fut::Output: Send + 'static,
+{
+ fn execute(&self, fut: Fut) {
+ tokio::task::spawn(fut);
+ }
+}
+
struct InspectorTester {
- socket_tx: SplitSink<
- tokio_tungstenite::WebSocketStream<
- tokio_tungstenite::MaybeTlsStream<TcpStream>,
- >,
- tungstenite::Message,
- >,
- socket_rx: SplitStream<
- tokio_tungstenite::WebSocketStream<
- tokio_tungstenite::MaybeTlsStream<TcpStream>,
- >,
- >,
+ socket: FragmentCollector<Upgraded>,
notification_filter: Box<dyn FnMut(&str) -> bool + 'static>,
child: DenoChild,
stderr_lines: Box<dyn Iterator<Item = String>>,
@@ -52,17 +55,42 @@ impl InspectorTester {
let mut stderr_lines =
std::io::BufReader::new(stderr).lines().map(|r| r.unwrap());
- let ws_url = extract_ws_url_from_stderr(&mut stderr_lines);
+ let uri = extract_ws_url_from_stderr(&mut stderr_lines);
+
+ let domain = &uri.host().unwrap().to_string();
+ let port = &uri.port().unwrap_or(match uri.scheme() {
+ "wss" | "https" => 443,
+ _ => 80,
+ });
+ let addr = format!("{domain}:{port}");
+
+ let stream = TcpStream::connect(addr).await.unwrap();
+
+ let host = uri.host_str().unwrap();
+
+ let req = Request::builder()
+ .method("GET")
+ .uri(uri.path())
+ .header("Host", host)
+ .header(hyper::header::UPGRADE, "websocket")
+ .header(hyper::header::CONNECTION, "Upgrade")
+ .header(
+ "Sec-WebSocket-Key",
+ fastwebsockets::handshake::generate_key(),
+ )
+ .header("Sec-WebSocket-Version", "13")
+ .body(hyper::Body::empty())
+ .unwrap();
let (socket, response) =
- tokio_tungstenite::connect_async(ws_url).await.unwrap();
- assert_eq!(response.status(), 101); // Switching protocols.
+ fastwebsockets::handshake::client(&SpawnExecutor, req, stream)
+ .await
+ .unwrap();
- let (socket_tx, socket_rx) = socket.split();
+ assert_eq!(response.status(), 101); // Switching protocols.
Self {
- socket_tx,
- socket_rx,
+ socket: FragmentCollector::new(socket),
notification_filter: Box::new(notification_filter),
child,
stderr_lines: Box::new(stderr_lines),
@@ -74,10 +102,10 @@ impl InspectorTester {
// TODO(bartlomieju): add graceful error handling
for msg in messages {
let result = self
- .socket_tx
- .send(msg.to_string().into())
+ .socket
+ .write_frame(Frame::text(msg.to_string().into_bytes()))
.await
- .map_err(|e| e.into());
+ .map_err(|e| anyhow!(e));
self.handle_error(result);
}
}
@@ -111,8 +139,9 @@ impl InspectorTester {
async fn recv(&mut self) -> String {
loop {
- let result = self.socket_rx.next().await.unwrap().map_err(|e| e.into());
- let message = self.handle_error(result).to_string();
+ let result = self.socket.read_frame().await.map_err(|e| anyhow!(e));
+ let message =
+ String::from_utf8(self.handle_error(result).payload).unwrap();
if (self.notification_filter)(&message) {
return message;
}
@@ -236,7 +265,7 @@ fn skip_check_line(
let mut line = stderr_lines.next().unwrap();
line = util::strip_ansi_codes(&line).to_string();
- if line.starts_with("Check") {
+ if line.starts_with("Check") || line.starts_with("Download") {
continue;
}
@@ -514,8 +543,11 @@ async fn inspector_does_not_hang() {
}
// Check that we can gracefully close the websocket connection.
- tester.socket_tx.close().await.unwrap();
- tester.socket_rx.for_each(|_| async {}).await;
+ tester
+ .socket
+ .write_frame(Frame::close_raw(vec![]))
+ .await
+ .unwrap();
assert_eq!(&tester.stdout_lines.next().unwrap(), "done");
assert!(tester.child.wait().unwrap().success());
diff --git a/cli/tests/testdata/run/websocket_test.ts b/cli/tests/testdata/run/websocket_test.ts
index a9dc34ad1..27bc5adf9 100644
--- a/cli/tests/testdata/run/websocket_test.ts
+++ b/cli/tests/testdata/run/websocket_test.ts
@@ -161,7 +161,7 @@ Deno.test("websocket error", async () => {
assert(err instanceof ErrorEvent);
// Error message got changed because we don't use warp in test_util
- assertEquals(err.message, "UnexpectedEof: tls handshake eof");
+ assertEquals(err.message, "InvalidData: received corrupt message");
promise1.resolve();
};
await promise1;
diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml
index a96b6cceb..53e184e1e 100644
--- a/ext/websocket/Cargo.toml
+++ b/ext/websocket/Cargo.toml
@@ -16,7 +16,7 @@ path = "lib.rs"
[dependencies]
deno_core.workspace = true
deno_tls.workspace = true
-fastwebsockets = { version = "0.2.4", features = ["upgrade"] }
+fastwebsockets = { workspace = true, features = ["upgrade"] }
http.workspace = true
hyper.workspace = true
serde.workspace = true
diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml
index 58f292e8f..20cbda0bf 100644
--- a/runtime/Cargo.toml
+++ b/runtime/Cargo.toml
@@ -80,6 +80,7 @@ deno_web.workspace = true
deno_webidl.workspace = true
deno_websocket.workspace = true
deno_webstorage.workspace = true
+fastwebsockets.workspace = true
atty.workspace = true
console_static_text.workspace = true
diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs
index d65e813cb..25d0d796c 100644
--- a/runtime/inspector_server.rs
+++ b/runtime/inspector_server.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use core::convert::Infallible as Never; // Alias for the future `!` type.
-use deno_core::error::AnyError;
+// Alias for the future `!` type.
+use core::convert::Infallible as Never;
use deno_core::futures::channel::mpsc;
use deno_core::futures::channel::mpsc::UnboundedReceiver;
use deno_core::futures::channel::mpsc::UnboundedSender;
@@ -18,8 +18,9 @@ use deno_core::serde_json::Value;
use deno_core::InspectorMsg;
use deno_core::InspectorSessionProxy;
use deno_core::JsRuntime;
-use deno_websocket::tokio_tungstenite::tungstenite;
-use deno_websocket::tokio_tungstenite::WebSocketStream;
+use fastwebsockets::Frame;
+use fastwebsockets::OpCode;
+use fastwebsockets::WebSocket;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::Infallible;
@@ -145,35 +146,27 @@ fn handle_ws_request(
let info = maybe_inspector_info.unwrap();
info.new_session_tx.clone()
};
+ let (parts, _) = req.into_parts();
+ let mut req = http::Request::from_parts(parts, body);
- let resp = tungstenite::handshake::server::create_response(&req)
- .map(|resp| resp.map(|_| hyper::Body::empty()))
- .or_else(|e| match e {
- tungstenite::error::Error::HttpFormat(http_error) => Err(http_error),
- _ => http::Response::builder()
+ let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) {
+ Ok(e) => e,
+ _ => {
+ return http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
- .body("Not a valid Websocket Request".into()),
- })?;
-
- let (parts, _) = req.into_parts();
- let req = http::Request::from_parts(parts, body);
+ .body("Not a valid Websocket Request".into());
+ }
+ };
// spawn a task that will wait for websocket connection and then pump messages between
// the socket and inspector proxy
tokio::task::spawn_local(async move {
- let upgrade_result = hyper::upgrade::on(req).await;
- let upgraded = if let Ok(u) = upgrade_result {
- u
+ let websocket = if let Ok(w) = fut.await {
+ w
} else {
eprintln!("Inspector server failed to upgrade to WS connection");
return;
};
- let websocket = WebSocketStream::from_raw_socket(
- upgraded,
- tungstenite::protocol::Role::Server,
- None,
- )
- .await;
// The 'outbound' channel carries messages sent to the websocket.
let (outbound_tx, outbound_rx) = mpsc::unbounded();
@@ -324,37 +317,36 @@ async fn server(
/// 'futures' crate, therefore they can't participate in Tokio's cooperative
/// task yielding.
async fn pump_websocket_messages(
- websocket: WebSocketStream<hyper::upgrade::Upgraded>,
+ mut websocket: WebSocket<hyper::upgrade::Upgraded>,
inbound_tx: UnboundedSender<String>,
- outbound_rx: UnboundedReceiver<InspectorMsg>,
+ mut outbound_rx: UnboundedReceiver<InspectorMsg>,
) {
- let (websocket_tx, websocket_rx) = websocket.split();
-
- let outbound_pump = outbound_rx
- .map(|msg| tungstenite::Message::text(msg.content))
- .map(Ok)
- .forward(websocket_tx)
- .map_err(|_| ());
-
- let inbound_pump = async move {
- let _result = websocket_rx
- .map_err(AnyError::from)
- .map_ok(|msg| {
- // Messages that cannot be converted to strings are ignored.
- if let Ok(msg_text) = msg.into_text() {
- let _ = inbound_tx.unbounded_send(msg_text);
+ 'pump: loop {
+ tokio::select! {
+ Some(msg) = outbound_rx.next() => {
+ let msg = Frame::text(msg.content.into_bytes());
+ let _ = websocket.write_frame(msg).await;
}
- })
- .try_collect::<()>()
- .await;
-
- // Users don't care if there was an error coming from debugger,
- // just about the fact that debugger did disconnect.
- eprintln!("Debugger session ended");
-
- Ok(())
- };
- let _ = future::try_join(outbound_pump, inbound_pump).await;
+ Ok(msg) = websocket.read_frame() => {
+ match msg.opcode {
+ OpCode::Text => {
+ if let Ok(s) = String::from_utf8(msg.payload) {
+ let _ = inbound_tx.unbounded_send(s);
+ }
+ }
+ OpCode::Close => {
+ // Users don't care if there was an error coming from debugger,
+ // just about the fact that debugger did disconnect.
+ eprintln!("Debugger session ended");
+ break 'pump;
+ }
+ _ => {
+ // Ignore other messages.
+ }
+ }
+ }
+ }
+ }
}
/// Inspector information that is sent from the isolate thread to the server
diff --git a/test_util/Cargo.toml b/test_util/Cargo.toml
index cb1ea46cc..593491311 100644
--- a/test_util/Cargo.toml
+++ b/test_util/Cargo.toml
@@ -19,6 +19,7 @@ async-stream = "0.3.3"
atty.workspace = true
base64.workspace = true
console_static_text.workspace = true
+fastwebsockets = { workspace = true, features = ["upgrade"] }
flate2.workspace = true
futures.workspace = true
hyper = { workspace = true, features = ["server", "http1", "http2", "runtime"] }
@@ -40,7 +41,7 @@ tar.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
-tokio-tungstenite.workspace = true
+tokio-tungstenite = { workspace = true, features = ["rustls-tls-webpki-roots"] }
url.workspace = true
[target.'cfg(unix)'.dependencies]
diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs
index 6a6614ad0..e647c0a4c 100644
--- a/test_util/src/lib.rs
+++ b/test_util/src/lib.rs
@@ -2,6 +2,7 @@
// Usage: provide a port as argument to run hyper_hello benchmark server
// otherwise this starts multiple servers on many ports for test endpoints.
use anyhow::anyhow;
+use futures::Future;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
@@ -9,6 +10,7 @@ use hyper::header::HeaderValue;
use hyper::server::Server;
use hyper::service::make_service_fn;
use hyper::service::service_fn;
+use hyper::upgrade::Upgraded;
use hyper::Body;
use hyper::Request;
use hyper::Response;
@@ -49,7 +51,6 @@ use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_rustls::rustls;
use tokio_rustls::TlsAcceptor;
-use tokio_tungstenite::accept_async;
use url::Url;
pub mod assertions;
@@ -302,69 +303,128 @@ async fn basic_auth_redirect(
Ok(resp)
}
+async fn echo_websocket_handler(
+ ws: fastwebsockets::WebSocket<Upgraded>,
+) -> Result<(), anyhow::Error> {
+ let mut ws = fastwebsockets::FragmentCollector::new(ws);
+
+ loop {
+ let frame = ws.read_frame().await.unwrap();
+ match frame.opcode {
+ fastwebsockets::OpCode::Close => break,
+ fastwebsockets::OpCode::Text | fastwebsockets::OpCode::Binary => {
+ ws.write_frame(frame).await.unwrap();
+ }
+ _ => {}
+ }
+ }
+
+ Ok(())
+}
+
+type WsHandler =
+ fn(
+ fastwebsockets::WebSocket<Upgraded>,
+ ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send>>;
+
+fn spawn_ws_server<S>(stream: S, handler: WsHandler)
+where
+ S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
+{
+ let srv_fn = service_fn(move |mut req: Request<Body>| async move {
+ let (response, upgrade_fut) = fastwebsockets::upgrade::upgrade(&mut req)
+ .map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))?;
+
+ tokio::spawn(async move {
+ let ws = upgrade_fut
+ .await
+ .map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))
+ .unwrap();
+
+ if let Err(e) = handler(ws).await {
+ eprintln!("Error in websocket connection: {}", e);
+ }
+ });
+
+ Ok::<_, anyhow::Error>(response)
+ });
+
+ tokio::spawn(async move {
+ let conn_fut = hyper::server::conn::Http::new()
+ .serve_connection(stream, srv_fn)
+ .with_upgrades();
+
+ if let Err(e) = conn_fut.await {
+ eprintln!("websocket server error: {e:?}");
+ }
+ });
+}
+
async fn run_ws_server(addr: &SocketAddr) {
let listener = TcpListener::bind(addr).await.unwrap();
println!("ready: ws"); // Eye catcher for HttpServerCount
while let Ok((stream, _addr)) = listener.accept().await {
- tokio::spawn(async move {
- let ws_stream_fut = accept_async(stream);
-
- let ws_stream = ws_stream_fut.await;
- if let Ok(ws_stream) = ws_stream {
- let (tx, rx) = ws_stream.split();
- rx.forward(tx)
- .map(|result| {
- if let Err(e) = result {
- println!("websocket server error: {e:?}");
- }
- })
- .await;
- }
- });
+ spawn_ws_server(stream, |ws| Box::pin(echo_websocket_handler(ws)));
}
}
+async fn ping_websocket_handler(
+ ws: fastwebsockets::WebSocket<Upgraded>,
+) -> Result<(), anyhow::Error> {
+ use fastwebsockets::Frame;
+ use fastwebsockets::OpCode;
+
+ let mut ws = fastwebsockets::FragmentCollector::new(ws);
+
+ for i in 0..9 {
+ ws.write_frame(Frame::new(true, OpCode::Ping, None, vec![]))
+ .await
+ .unwrap();
+
+ let frame = ws.read_frame().await.unwrap();
+ assert_eq!(frame.opcode, OpCode::Pong);
+ assert!(frame.payload.is_empty());
+
+ ws.write_frame(Frame::text(format!("hello {}", i).as_bytes().to_vec()))
+ .await
+ .unwrap();
+
+ let frame = ws.read_frame().await.unwrap();
+ assert_eq!(frame.opcode, OpCode::Text);
+ assert_eq!(frame.payload, format!("hello {}", i).as_bytes());
+ }
+
+ ws.write_frame(fastwebsockets::Frame::close(1000, b""))
+ .await
+ .unwrap();
+
+ Ok(())
+}
+
async fn run_ws_ping_server(addr: &SocketAddr) {
let listener = TcpListener::bind(addr).await.unwrap();
println!("ready: ws"); // Eye catcher for HttpServerCount
while let Ok((stream, _addr)) = listener.accept().await {
- tokio::spawn(async move {
- let ws_stream = accept_async(stream).await;
- use futures::SinkExt;
- use tokio_tungstenite::tungstenite::Message;
- if let Ok(mut ws_stream) = ws_stream {
- for i in 0..9 {
- ws_stream.send(Message::Ping(vec![])).await.unwrap();
-
- let msg = ws_stream.next().await.unwrap().unwrap();
- assert_eq!(msg, Message::Pong(vec![]));
-
- ws_stream
- .send(Message::Text(format!("hello {}", i)))
- .await
- .unwrap();
-
- let msg = ws_stream.next().await.unwrap().unwrap();
- assert_eq!(msg, Message::Text(format!("hello {}", i)));
- }
-
- ws_stream.close(None).await.unwrap();
- }
- });
+ spawn_ws_server(stream, |ws| Box::pin(ping_websocket_handler(ws)));
}
}
+async fn close_websocket_handler(
+ ws: fastwebsockets::WebSocket<Upgraded>,
+) -> Result<(), anyhow::Error> {
+ let mut ws = fastwebsockets::FragmentCollector::new(ws);
+
+ ws.write_frame(fastwebsockets::Frame::close_raw(vec![]))
+ .await
+ .unwrap();
+
+ Ok(())
+}
+
async fn run_ws_close_server(addr: &SocketAddr) {
let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, _addr)) = listener.accept().await {
- tokio::spawn(async move {
- let ws_stream_fut = accept_async(stream);
-
- let ws_stream = ws_stream_fut.await;
- if let Ok(mut ws_stream) = ws_stream {
- ws_stream.close(None).await.unwrap();
- }
- });
+ spawn_ws_server(stream, |ws| Box::pin(close_websocket_handler(ws)));
}
}
@@ -471,18 +531,9 @@ async fn run_wss_server(addr: &SocketAddr) {
tokio::spawn(async move {
match acceptor.accept(stream).await {
Ok(tls_stream) => {
- let ws_stream_fut = accept_async(tls_stream);
- let ws_stream = ws_stream_fut.await;
- if let Ok(ws_stream) = ws_stream {
- let (tx, rx) = ws_stream.split();
- rx.forward(tx)
- .map(|result| {
- if let Err(e) = result {
- println!("Websocket server error: {e:?}");
- }
- })
- .await;
- }
+ spawn_ws_server(tls_stream, |ws| {
+ Box::pin(echo_websocket_handler(ws))
+ });
}
Err(e) => {
eprintln!("TLS accept error: {e:?}");