summaryrefslogtreecommitdiff
path: root/runtime/inspector_server.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2021-08-25 13:39:23 +0200
committerGitHub <noreply@github.com>2021-08-25 13:39:23 +0200
commitf84cd9403db3545c8058a9c28474b3c99d4c2dd4 (patch)
treebeafd75f56f6650f68e92ab2d2cfa724b50ef43c /runtime/inspector_server.rs
parent873cce27b8f1b7900ea08c85b2d563ddd478a38a (diff)
refactor: cleanup Inspector and InspectorServer implementations (#11837)
Diffstat (limited to 'runtime/inspector_server.rs')
-rw-r--r--runtime/inspector_server.rs202
1 files changed, 112 insertions, 90 deletions
diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs
index 9f631ed2f..6ba4d088d 100644
--- a/runtime/inspector_server.rs
+++ b/runtime/inspector_server.rs
@@ -17,6 +17,7 @@ use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_core::InspectorSessionProxy;
+use deno_core::JsRuntime;
use deno_websocket::tokio_tungstenite::tungstenite;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -62,10 +63,13 @@ impl InspectorServer {
pub fn register_inspector(
&self,
- session_sender: UnboundedSender<InspectorSessionProxy>,
- deregister_rx: oneshot::Receiver<()>,
module_url: String,
+ js_runtime: &mut JsRuntime,
) {
+ let inspector = js_runtime.inspector();
+ let session_sender = inspector.get_session_sender();
+ let deregister_rx = inspector.add_deregister_handler();
+ // TODO(bartlomieju): simplify
let info =
InspectorInfo::new(self.host, session_sender, deregister_rx, module_url);
self.register_inspector_tx.unbounded_send(info).unwrap();
@@ -102,63 +106,88 @@ where
fn handle_ws_request(
req: http::Request<hyper::Body>,
- inspector_map: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
+ inspector_map_rc: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
) -> http::Result<http::Response<hyper::Body>> {
let (parts, body) = req.into_parts();
let req = http::Request::from_parts(parts, ());
- if let Some(new_session_tx) = req
+ let maybe_uuid = req
.uri()
.path()
.strip_prefix("/ws/")
- .and_then(|s| Uuid::parse_str(s).ok())
- .and_then(|uuid| {
- inspector_map
- .borrow()
- .get(&uuid)
- .map(|info| info.new_session_tx.clone())
- })
- {
- let resp = tungstenite::handshake::server::create_response(&req)
- .map(|resp| resp.map(|_| hyper::Body::empty()))
- .or_else(|e| match e {
- tungstenite::error::Error::HttpFormat(http_error) => Err(http_error),
- _ => http::Response::builder()
- .status(http::StatusCode::BAD_REQUEST)
- .body("Not a valid Websocket Request".into()),
- });
-
- let (parts, _) = req.into_parts();
- let req = http::Request::from_parts(parts, body);
-
- if resp.is_ok() {
- tokio::task::spawn_local(async move {
- let upgrade_result = hyper::upgrade::on(req).await;
- let upgraded = if let Ok(u) = upgrade_result {
- u
- } else {
- eprintln!("Inspector server failed to upgrade to WS connection");
- return;
- };
- let websocket =
- deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
- upgraded,
- tungstenite::protocol::Role::Server,
- None,
- )
- .await;
- let (proxy, pump) = create_websocket_proxy(websocket);
- eprintln!("Debugger session started.");
- let _ = new_session_tx.unbounded_send(proxy);
- pump.await;
- });
+ .and_then(|s| Uuid::parse_str(s).ok());
+
+ if maybe_uuid.is_none() {
+ return http::Response::builder()
+ .status(http::StatusCode::BAD_REQUEST)
+ .body("Malformed inspector UUID".into());
+ }
+
+ // run in a block to not hold borrow to `inspector_map` for too long
+ let new_session_tx = {
+ let inspector_map = inspector_map_rc.borrow();
+ let maybe_inspector_info = inspector_map.get(&maybe_uuid.unwrap());
+
+ if maybe_inspector_info.is_none() {
+ return http::Response::builder()
+ .status(http::StatusCode::NOT_FOUND)
+ .body("Invalid inspector UUID".into());
}
- resp
- } else {
- http::Response::builder()
- .status(http::StatusCode::NOT_FOUND)
- .body("No Valid inspector".into())
+
+ let info = maybe_inspector_info.unwrap();
+ info.new_session_tx.clone()
+ };
+
+ let resp = tungstenite::handshake::server::create_response(&req)
+ .map(|resp| resp.map(|_| hyper::Body::empty()))
+ .or_else(|e| match e {
+ tungstenite::error::Error::HttpFormat(http_error) => Err(http_error),
+ _ => http::Response::builder()
+ .status(http::StatusCode::BAD_REQUEST)
+ .body("Not a valid Websocket Request".into()),
+ });
+
+ if resp.is_err() {
+ return resp;
}
+
+ let (parts, _) = req.into_parts();
+ let req = http::Request::from_parts(parts, body);
+
+ // spawn a task that will wait for websocket connection and then pump messages between
+ // the socket and inspector proxy
+ tokio::task::spawn_local(async move {
+ let upgrade_result = hyper::upgrade::on(req).await;
+ let upgraded = if let Ok(u) = upgrade_result {
+ u
+ } else {
+ eprintln!("Inspector server failed to upgrade to WS connection");
+ return;
+ };
+ let websocket =
+ deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
+ upgraded,
+ tungstenite::protocol::Role::Server,
+ None,
+ )
+ .await;
+
+ // The 'outbound' channel carries messages sent to the websocket.
+ let (outbound_tx, outbound_rx) = mpsc::unbounded();
+ // The 'inbound' channel carries messages received from the websocket.
+ let (inbound_tx, inbound_rx) = mpsc::unbounded();
+
+ let inspector_session_proxy = InspectorSessionProxy {
+ tx: outbound_tx,
+ rx: inbound_rx,
+ };
+
+ eprintln!("Debugger session started.");
+ let _ = new_session_tx.unbounded_send(inspector_session_proxy);
+ pump_websocket_messages(websocket, inbound_tx, outbound_rx).await;
+ });
+
+ resp
}
fn handle_json_request(
@@ -279,58 +308,51 @@ async fn server(
}
}
-/// Creates a future that proxies messages sent and received on a warp WebSocket
-/// to a UnboundedSender/UnboundedReceiver pair. We need this to sidestep
+/// The pump future takes care of forwarding messages between the websocket
+/// and channels. It resolves when either side disconnects, ignoring any
+/// errors.
+///
+/// The future proxies messages sent and received on a warp WebSocket
+/// to a UnboundedSender/UnboundedReceiver pair. We need these "unbounded" channel ends to sidestep
/// Tokio's task budget, which causes issues when JsRuntimeInspector::poll_sessions()
/// needs to block the thread because JavaScript execution is paused.
///
/// This works because UnboundedSender/UnboundedReceiver are implemented in the
/// 'futures' crate, therefore they can't participate in Tokio's cooperative
/// task yielding.
-///
-/// A tuple is returned, where the first element is a duplex channel that can
-/// be used to send/receive messages on the websocket, and the second element
-/// is a future that does the forwarding.
-fn create_websocket_proxy(
+async fn pump_websocket_messages(
websocket: deno_websocket::tokio_tungstenite::WebSocketStream<
hyper::upgrade::Upgraded,
>,
-) -> (InspectorSessionProxy, impl Future<Output = ()> + Send) {
- // The 'outbound' channel carries messages sent to the websocket.
- let (outbound_tx, outbound_rx) = mpsc::unbounded();
-
- // The 'inbound' channel carries messages received from the websocket.
- let (inbound_tx, inbound_rx) = mpsc::unbounded();
-
- let proxy = InspectorSessionProxy {
- tx: outbound_tx,
- rx: inbound_rx,
- };
-
- // The pump future takes care of forwarding messages between the websocket
- // and channels. It resolves to () when either side disconnects, ignoring any
- // errors.
- let pump = async move {
- let (websocket_tx, websocket_rx) = websocket.split();
-
- let outbound_pump = outbound_rx
- .map(|(_maybe_call_id, msg)| tungstenite::Message::text(msg))
- .map(Ok)
- .forward(websocket_tx)
- .map_err(|_| ());
-
- let inbound_pump = websocket_rx
- .map(|result| {
- let result = result.map(|msg| msg.into_data()).map_err(AnyError::from);
- inbound_tx.unbounded_send(result)
+ inbound_tx: UnboundedSender<Vec<u8>>,
+ outbound_rx: UnboundedReceiver<(Option<i32>, String)>,
+) {
+ let (websocket_tx, websocket_rx) = websocket.split();
+
+ let outbound_pump = outbound_rx
+ .map(|(_maybe_call_id, msg)| tungstenite::Message::text(msg))
+ .map(Ok)
+ .forward(websocket_tx)
+ .map_err(|_| ());
+
+ let inbound_pump = async move {
+ let result = websocket_rx
+ .map_ok(|msg| msg.into_data())
+ .map_err(AnyError::from)
+ .map_ok(|msg| {
+ let _ = inbound_tx.unbounded_send(msg);
})
- .map_err(|_| ())
- .try_collect::<()>();
+ .try_collect::<()>()
+ .await;
- let _ = future::try_join(outbound_pump, inbound_pump).await;
- };
+ match result {
+ Ok(_) => eprintln!("Debugger session ended"),
+ Err(err) => eprintln!("Debugger session ended: {}.", err),
+ };
- (proxy, pump)
+ Ok(())
+ };
+ let _ = future::try_join(outbound_pump, inbound_pump).await;
}
/// Inspector information that is sent from the isolate thread to the server