summaryrefslogtreecommitdiff
path: root/runtime/inspector_server.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-04-22 11:17:31 +0200
committerGitHub <noreply@github.com>2023-04-22 11:17:31 +0200
commit068228cb454d14a6f5943061a5a6569b9e395e23 (patch)
tree4b62ba7df771ac6dc32c70273e397e863f84dc05 /runtime/inspector_server.rs
parenta615eb3b56545960ec9684991442dd34a8b2abfc (diff)
refactor: rewrite tests to "fastwebsockets" crate (#18781)
Migrating off of `tokio-tungstenite` crate. --------- Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
Diffstat (limited to 'runtime/inspector_server.rs')
-rw-r--r--runtime/inspector_server.rs94
1 files changed, 43 insertions, 51 deletions
diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs
index d65e813cb..25d0d796c 100644
--- a/runtime/inspector_server.rs
+++ b/runtime/inspector_server.rs
@@ -1,7 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use core::convert::Infallible as Never; // Alias for the future `!` type.
-use deno_core::error::AnyError;
+// Alias for the future `!` type.
+use core::convert::Infallible as Never;
use deno_core::futures::channel::mpsc;
use deno_core::futures::channel::mpsc::UnboundedReceiver;
use deno_core::futures::channel::mpsc::UnboundedSender;
@@ -18,8 +18,9 @@ use deno_core::serde_json::Value;
use deno_core::InspectorMsg;
use deno_core::InspectorSessionProxy;
use deno_core::JsRuntime;
-use deno_websocket::tokio_tungstenite::tungstenite;
-use deno_websocket::tokio_tungstenite::WebSocketStream;
+use fastwebsockets::Frame;
+use fastwebsockets::OpCode;
+use fastwebsockets::WebSocket;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::Infallible;
@@ -145,35 +146,27 @@ fn handle_ws_request(
let info = maybe_inspector_info.unwrap();
info.new_session_tx.clone()
};
+ let (parts, _) = req.into_parts();
+ let mut req = http::Request::from_parts(parts, body);
- let resp = tungstenite::handshake::server::create_response(&req)
- .map(|resp| resp.map(|_| hyper::Body::empty()))
- .or_else(|e| match e {
- tungstenite::error::Error::HttpFormat(http_error) => Err(http_error),
- _ => http::Response::builder()
+ let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) {
+ Ok(e) => e,
+ _ => {
+ return http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
- .body("Not a valid Websocket Request".into()),
- })?;
-
- let (parts, _) = req.into_parts();
- let req = http::Request::from_parts(parts, body);
+ .body("Not a valid Websocket Request".into());
+ }
+ };
// spawn a task that will wait for websocket connection and then pump messages between
// the socket and inspector proxy
tokio::task::spawn_local(async move {
- let upgrade_result = hyper::upgrade::on(req).await;
- let upgraded = if let Ok(u) = upgrade_result {
- u
+ let websocket = if let Ok(w) = fut.await {
+ w
} else {
eprintln!("Inspector server failed to upgrade to WS connection");
return;
};
- let websocket = WebSocketStream::from_raw_socket(
- upgraded,
- tungstenite::protocol::Role::Server,
- None,
- )
- .await;
// The 'outbound' channel carries messages sent to the websocket.
let (outbound_tx, outbound_rx) = mpsc::unbounded();
@@ -324,37 +317,36 @@ async fn server(
/// 'futures' crate, therefore they can't participate in Tokio's cooperative
/// task yielding.
async fn pump_websocket_messages(
- websocket: WebSocketStream<hyper::upgrade::Upgraded>,
+ mut websocket: WebSocket<hyper::upgrade::Upgraded>,
inbound_tx: UnboundedSender<String>,
- outbound_rx: UnboundedReceiver<InspectorMsg>,
+ mut outbound_rx: UnboundedReceiver<InspectorMsg>,
) {
- let (websocket_tx, websocket_rx) = websocket.split();
-
- let outbound_pump = outbound_rx
- .map(|msg| tungstenite::Message::text(msg.content))
- .map(Ok)
- .forward(websocket_tx)
- .map_err(|_| ());
-
- let inbound_pump = async move {
- let _result = websocket_rx
- .map_err(AnyError::from)
- .map_ok(|msg| {
- // Messages that cannot be converted to strings are ignored.
- if let Ok(msg_text) = msg.into_text() {
- let _ = inbound_tx.unbounded_send(msg_text);
+ 'pump: loop {
+ tokio::select! {
+ Some(msg) = outbound_rx.next() => {
+ let msg = Frame::text(msg.content.into_bytes());
+ let _ = websocket.write_frame(msg).await;
}
- })
- .try_collect::<()>()
- .await;
-
- // Users don't care if there was an error coming from debugger,
- // just about the fact that debugger did disconnect.
- eprintln!("Debugger session ended");
-
- Ok(())
- };
- let _ = future::try_join(outbound_pump, inbound_pump).await;
+ Ok(msg) = websocket.read_frame() => {
+ match msg.opcode {
+ OpCode::Text => {
+ if let Ok(s) = String::from_utf8(msg.payload) {
+ let _ = inbound_tx.unbounded_send(s);
+ }
+ }
+ OpCode::Close => {
+ // Users don't care if there was an error coming from debugger,
+ // just about the fact that debugger did disconnect.
+ eprintln!("Debugger session ended");
+ break 'pump;
+ }
+ _ => {
+ // Ignore other messages.
+ }
+ }
+ }
+ }
+ }
}
/// Inspector information that is sent from the isolate thread to the server