diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-04-22 11:17:31 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-22 11:17:31 +0200 |
commit | 068228cb454d14a6f5943061a5a6569b9e395e23 (patch) | |
tree | 4b62ba7df771ac6dc32c70273e397e863f84dc05 /runtime/inspector_server.rs | |
parent | a615eb3b56545960ec9684991442dd34a8b2abfc (diff) |
refactor: rewrite tests to "fastwebsockets" crate (#18781)
Migrating off of `tokio-tungstenite` crate.
---------
Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
Diffstat (limited to 'runtime/inspector_server.rs')
-rw-r--r-- | runtime/inspector_server.rs | 94 |
1 files changed, 43 insertions, 51 deletions
diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs index d65e813cb..25d0d796c 100644 --- a/runtime/inspector_server.rs +++ b/runtime/inspector_server.rs @@ -1,7 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use core::convert::Infallible as Never; // Alias for the future `!` type. -use deno_core::error::AnyError; +// Alias for the future `!` type. +use core::convert::Infallible as Never; use deno_core::futures::channel::mpsc; use deno_core::futures::channel::mpsc::UnboundedReceiver; use deno_core::futures::channel::mpsc::UnboundedSender; @@ -18,8 +18,9 @@ use deno_core::serde_json::Value; use deno_core::InspectorMsg; use deno_core::InspectorSessionProxy; use deno_core::JsRuntime; -use deno_websocket::tokio_tungstenite::tungstenite; -use deno_websocket::tokio_tungstenite::WebSocketStream; +use fastwebsockets::Frame; +use fastwebsockets::OpCode; +use fastwebsockets::WebSocket; use std::cell::RefCell; use std::collections::HashMap; use std::convert::Infallible; @@ -145,35 +146,27 @@ fn handle_ws_request( let info = maybe_inspector_info.unwrap(); info.new_session_tx.clone() }; + let (parts, _) = req.into_parts(); + let mut req = http::Request::from_parts(parts, body); - let resp = tungstenite::handshake::server::create_response(&req) - .map(|resp| resp.map(|_| hyper::Body::empty())) - .or_else(|e| match e { - tungstenite::error::Error::HttpFormat(http_error) => Err(http_error), - _ => http::Response::builder() + let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) { + Ok(e) => e, + _ => { + return http::Response::builder() .status(http::StatusCode::BAD_REQUEST) - .body("Not a valid Websocket Request".into()), - })?; - - let (parts, _) = req.into_parts(); - let req = http::Request::from_parts(parts, body); + .body("Not a valid Websocket Request".into()); + } + }; // spawn a task that will wait for websocket connection and then pump messages between // the socket and inspector proxy tokio::task::spawn_local(async move { - let upgrade_result = hyper::upgrade::on(req).await; - let upgraded = if let Ok(u) = upgrade_result { - u + let websocket = if let Ok(w) = fut.await { + w } else { eprintln!("Inspector server failed to upgrade to WS connection"); return; }; - let websocket = WebSocketStream::from_raw_socket( - upgraded, - tungstenite::protocol::Role::Server, - None, - ) - .await; // The 'outbound' channel carries messages sent to the websocket. let (outbound_tx, outbound_rx) = mpsc::unbounded(); @@ -324,37 +317,36 @@ async fn server( /// 'futures' crate, therefore they can't participate in Tokio's cooperative /// task yielding. async fn pump_websocket_messages( - websocket: WebSocketStream<hyper::upgrade::Upgraded>, + mut websocket: WebSocket<hyper::upgrade::Upgraded>, inbound_tx: UnboundedSender<String>, - outbound_rx: UnboundedReceiver<InspectorMsg>, + mut outbound_rx: UnboundedReceiver<InspectorMsg>, ) { - let (websocket_tx, websocket_rx) = websocket.split(); - - let outbound_pump = outbound_rx - .map(|msg| tungstenite::Message::text(msg.content)) - .map(Ok) - .forward(websocket_tx) - .map_err(|_| ()); - - let inbound_pump = async move { - let _result = websocket_rx - .map_err(AnyError::from) - .map_ok(|msg| { - // Messages that cannot be converted to strings are ignored. - if let Ok(msg_text) = msg.into_text() { - let _ = inbound_tx.unbounded_send(msg_text); + 'pump: loop { + tokio::select! { + Some(msg) = outbound_rx.next() => { + let msg = Frame::text(msg.content.into_bytes()); + let _ = websocket.write_frame(msg).await; } - }) - .try_collect::<()>() - .await; - - // Users don't care if there was an error coming from debugger, - // just about the fact that debugger did disconnect. - eprintln!("Debugger session ended"); - - Ok(()) - }; - let _ = future::try_join(outbound_pump, inbound_pump).await; + Ok(msg) = websocket.read_frame() => { + match msg.opcode { + OpCode::Text => { + if let Ok(s) = String::from_utf8(msg.payload) { + let _ = inbound_tx.unbounded_send(s); + } + } + OpCode::Close => { + // Users don't care if there was an error coming from debugger, + // just about the fact that debugger did disconnect. + eprintln!("Debugger session ended"); + break 'pump; + } + _ => { + // Ignore other messages. + } + } + } + } + } } /// Inspector information that is sent from the isolate thread to the server |