diff options
-rw-r--r-- | core/inspector.rs | 82 | ||||
-rw-r--r-- | runtime/inspector_server.rs | 202 | ||||
-rw-r--r-- | runtime/web_worker.rs | 9 | ||||
-rw-r--r-- | runtime/worker.rs | 9 |
4 files changed, 142 insertions, 160 deletions
diff --git a/core/inspector.rs b/core/inspector.rs index d8a51e601..ef59cf413 100644 --- a/core/inspector.rs +++ b/core/inspector.rs @@ -41,9 +41,7 @@ use std::thread; /// If first argument is `None` then it's a notification, otherwise /// it's a message. pub type SessionProxySender = UnboundedSender<(Option<i32>, String)>; -// TODO(bartlomieju): does it even need to send a Result? -// It seems `Vec<u8>` would be enough -pub type SessionProxyReceiver = UnboundedReceiver<Result<Vec<u8>, AnyError>>; +pub type SessionProxyReceiver = UnboundedReceiver<Vec<u8>>; /// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form /// a duplex channel for sending/receiving messages in V8 session. @@ -89,11 +87,11 @@ pub struct JsRuntimeInspector { impl Drop for JsRuntimeInspector { fn drop(&mut self) { - // Since the waker is cloneable, it might outlive the inspector itself. + // Since the waker is cloneable, it might outlive the inspector itself. // Set the poll state to 'dropped' so it doesn't attempt to request an // interrupt from the isolate. self.waker.update(|w| w.poll_state = PollState::Dropped); - // TODO(bartlomieju): this comment is out of date + // V8 automatically deletes all sessions when an `V8Inspector` instance is // deleted, however InspectorSession also has a drop handler that cleans // up after itself. To avoid a double free, make sure the inspector is @@ -134,9 +132,11 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector { } } -/// `JsRuntimeInspector` implements a Future so that it can poll for new incoming -/// connections and messages from the WebSocket server. The Worker that owns -/// this `JsRuntimeInspector` will call this function from `Worker::poll()`. +/// Polling `JsRuntimeInspector` allows inspector to accept new incoming +/// connections and "pump" messages in different sessions. +/// +/// It should be polled on tick of event loop, ie. in `JsRuntime::poll_event_loop` +/// function. impl Future for JsRuntimeInspector { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { @@ -501,8 +501,7 @@ impl task::ArcWake for InspectorWaker { struct InspectorSession { v8_channel: v8::inspector::ChannelBase, v8_session: Rc<RefCell<v8::UniqueRef<v8::inspector::V8InspectorSession>>>, - proxy_tx: SessionProxySender, - proxy_rx_handler: Pin<Box<dyn Future<Output = ()> + 'static>>, + proxy: InspectorSessionProxy, } impl InspectorSession { @@ -524,15 +523,10 @@ impl InspectorSession { v8::inspector::StringView::empty(), ))); - let (proxy_tx, proxy_rx) = session_proxy.split(); - let proxy_rx_handler = - Self::receive_from_proxy(v8_session.clone(), proxy_rx); - Self { v8_channel, v8_session, - proxy_tx, - proxy_rx_handler, + proxy: session_proxy, } }) } @@ -546,46 +540,13 @@ impl InspectorSession { v8_session_ptr.dispatch_protocol_message(msg); } - // TODO(bartlomieju): this function should be reworked into `impl Future` - // or `impl Stream` - /// Returns a future that receives messages from the proxy and dispatches - /// them to the V8 session. - fn receive_from_proxy( - v8_session_rc: Rc< - RefCell<v8::UniqueRef<v8::inspector::V8InspectorSession>>, - >, - proxy_rx: SessionProxyReceiver, - ) -> Pin<Box<dyn Future<Output = ()> + 'static>> { - async move { - let result = proxy_rx - .map_ok(move |msg| { - let msg = v8::inspector::StringView::from(msg.as_slice()); - let mut v8_session = v8_session_rc.borrow_mut(); - let v8_session_ptr = v8_session.as_mut(); - v8_session_ptr.dispatch_protocol_message(msg); - }) - .try_collect::<()>() - .await; - - // TODO(bartlomieju): ideally these prints should be moved - // to `server.rs` as they are unwanted in context of REPL/coverage collection - // but right now they do not pose a huge problem. Investigate how to - // move them to `server.rs`. - match result { - Ok(_) => eprintln!("Debugger session ended."), - Err(err) => eprintln!("Debugger session ended: {}.", err), - }; - } - .boxed_local() - } - fn send_message( &self, maybe_call_id: Option<i32>, msg: v8::UniquePtr<v8::inspector::StringBuffer>, ) { let msg = msg.unwrap().string().to_string(); - let _ = self.proxy_tx.unbounded_send((maybe_call_id, msg)); + let _ = self.proxy.tx.unbounded_send((maybe_call_id, msg)); } pub fn break_on_next_statement(&mut self) { @@ -626,17 +587,30 @@ impl v8::inspector::ChannelImpl for InspectorSession { fn flush_protocol_notifications(&mut self) {} } +/// This is a "pump" future takes care of receiving messages and dispatching +/// them to the inspector. It resolves when receiver closes. impl Future for InspectorSession { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - self.proxy_rx_handler.poll_unpin(cx) + while let Poll::Ready(maybe_msg) = self.proxy.rx.poll_next_unpin(cx) { + if let Some(msg) = maybe_msg { + let msg = v8::inspector::StringView::from(msg.as_slice()); + let mut v8_session = self.v8_session.borrow_mut(); + let v8_session_ptr = v8_session.as_mut(); + v8_session_ptr.dispatch_protocol_message(msg); + } else { + return Poll::Ready(()); + } + } + + Poll::Pending } } /// A local inspector session that can be used to send and receive protocol messages directly on /// the same thread as an isolate. pub struct LocalInspectorSession { - v8_session_tx: UnboundedSender<Result<Vec<u8>, AnyError>>, + v8_session_tx: UnboundedSender<Vec<u8>>, v8_session_rx: UnboundedReceiver<(Option<i32>, String)>, response_tx_map: HashMap<i32, oneshot::Sender<serde_json::Value>>, next_message_id: i32, @@ -645,7 +619,7 @@ pub struct LocalInspectorSession { impl LocalInspectorSession { pub fn new( - v8_session_tx: UnboundedSender<Result<Vec<u8>, AnyError>>, + v8_session_tx: UnboundedSender<Vec<u8>>, v8_session_rx: UnboundedReceiver<(Option<i32>, String)>, ) -> Self { let response_tx_map = HashMap::new(); @@ -687,7 +661,7 @@ impl LocalInspectorSession { let raw_message = serde_json::to_string(&message).unwrap(); self .v8_session_tx - .unbounded_send(Ok(raw_message.as_bytes().to_vec())) + .unbounded_send(raw_message.as_bytes().to_vec()) .unwrap(); loop { diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs index 9f631ed2f..6ba4d088d 100644 --- a/runtime/inspector_server.rs +++ b/runtime/inspector_server.rs @@ -17,6 +17,7 @@ use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; use deno_core::InspectorSessionProxy; +use deno_core::JsRuntime; use deno_websocket::tokio_tungstenite::tungstenite; use std::cell::RefCell; use std::collections::HashMap; @@ -62,10 +63,13 @@ impl InspectorServer { pub fn register_inspector( &self, - session_sender: UnboundedSender<InspectorSessionProxy>, - deregister_rx: oneshot::Receiver<()>, module_url: String, + js_runtime: &mut JsRuntime, ) { + let inspector = js_runtime.inspector(); + let session_sender = inspector.get_session_sender(); + let deregister_rx = inspector.add_deregister_handler(); + // TODO(bartlomieju): simplify let info = InspectorInfo::new(self.host, session_sender, deregister_rx, module_url); self.register_inspector_tx.unbounded_send(info).unwrap(); @@ -102,63 +106,88 @@ where fn handle_ws_request( req: http::Request<hyper::Body>, - inspector_map: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>, + inspector_map_rc: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>, ) -> http::Result<http::Response<hyper::Body>> { let (parts, body) = req.into_parts(); let req = http::Request::from_parts(parts, ()); - if let Some(new_session_tx) = req + let maybe_uuid = req .uri() .path() .strip_prefix("/ws/") - .and_then(|s| Uuid::parse_str(s).ok()) - .and_then(|uuid| { - inspector_map - .borrow() - .get(&uuid) - .map(|info| info.new_session_tx.clone()) - }) - { - let resp = tungstenite::handshake::server::create_response(&req) - .map(|resp| resp.map(|_| hyper::Body::empty())) - .or_else(|e| match e { - tungstenite::error::Error::HttpFormat(http_error) => Err(http_error), - _ => http::Response::builder() - .status(http::StatusCode::BAD_REQUEST) - .body("Not a valid Websocket Request".into()), - }); - - let (parts, _) = req.into_parts(); - let req = http::Request::from_parts(parts, body); - - if resp.is_ok() { - tokio::task::spawn_local(async move { - let upgrade_result = hyper::upgrade::on(req).await; - let upgraded = if let Ok(u) = upgrade_result { - u - } else { - eprintln!("Inspector server failed to upgrade to WS connection"); - return; - }; - let websocket = - deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( - upgraded, - tungstenite::protocol::Role::Server, - None, - ) - .await; - let (proxy, pump) = create_websocket_proxy(websocket); - eprintln!("Debugger session started."); - let _ = new_session_tx.unbounded_send(proxy); - pump.await; - }); + .and_then(|s| Uuid::parse_str(s).ok()); + + if maybe_uuid.is_none() { + return http::Response::builder() + .status(http::StatusCode::BAD_REQUEST) + .body("Malformed inspector UUID".into()); + } + + // run in a block to not hold borrow to `inspector_map` for too long + let new_session_tx = { + let inspector_map = inspector_map_rc.borrow(); + let maybe_inspector_info = inspector_map.get(&maybe_uuid.unwrap()); + + if maybe_inspector_info.is_none() { + return http::Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body("Invalid inspector UUID".into()); } - resp - } else { - http::Response::builder() - .status(http::StatusCode::NOT_FOUND) - .body("No Valid inspector".into()) + + let info = maybe_inspector_info.unwrap(); + info.new_session_tx.clone() + }; + + let resp = tungstenite::handshake::server::create_response(&req) + .map(|resp| resp.map(|_| hyper::Body::empty())) + .or_else(|e| match e { + tungstenite::error::Error::HttpFormat(http_error) => Err(http_error), + _ => http::Response::builder() + .status(http::StatusCode::BAD_REQUEST) + .body("Not a valid Websocket Request".into()), + }); + + if resp.is_err() { + return resp; } + + let (parts, _) = req.into_parts(); + let req = http::Request::from_parts(parts, body); + + // spawn a task that will wait for websocket connection and then pump messages between + // the socket and inspector proxy + tokio::task::spawn_local(async move { + let upgrade_result = hyper::upgrade::on(req).await; + let upgraded = if let Ok(u) = upgrade_result { + u + } else { + eprintln!("Inspector server failed to upgrade to WS connection"); + return; + }; + let websocket = + deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + tungstenite::protocol::Role::Server, + None, + ) + .await; + + // The 'outbound' channel carries messages sent to the websocket. + let (outbound_tx, outbound_rx) = mpsc::unbounded(); + // The 'inbound' channel carries messages received from the websocket. + let (inbound_tx, inbound_rx) = mpsc::unbounded(); + + let inspector_session_proxy = InspectorSessionProxy { + tx: outbound_tx, + rx: inbound_rx, + }; + + eprintln!("Debugger session started."); + let _ = new_session_tx.unbounded_send(inspector_session_proxy); + pump_websocket_messages(websocket, inbound_tx, outbound_rx).await; + }); + + resp } fn handle_json_request( @@ -279,58 +308,51 @@ async fn server( } } -/// Creates a future that proxies messages sent and received on a warp WebSocket -/// to a UnboundedSender/UnboundedReceiver pair. We need this to sidestep +/// The pump future takes care of forwarding messages between the websocket +/// and channels. It resolves when either side disconnects, ignoring any +/// errors. +/// +/// The future proxies messages sent and received on a warp WebSocket +/// to a UnboundedSender/UnboundedReceiver pair. We need these "unbounded" channel ends to sidestep /// Tokio's task budget, which causes issues when JsRuntimeInspector::poll_sessions() /// needs to block the thread because JavaScript execution is paused. /// /// This works because UnboundedSender/UnboundedReceiver are implemented in the /// 'futures' crate, therefore they can't participate in Tokio's cooperative /// task yielding. -/// -/// A tuple is returned, where the first element is a duplex channel that can -/// be used to send/receive messages on the websocket, and the second element -/// is a future that does the forwarding. -fn create_websocket_proxy( +async fn pump_websocket_messages( websocket: deno_websocket::tokio_tungstenite::WebSocketStream< hyper::upgrade::Upgraded, >, -) -> (InspectorSessionProxy, impl Future<Output = ()> + Send) { - // The 'outbound' channel carries messages sent to the websocket. - let (outbound_tx, outbound_rx) = mpsc::unbounded(); - - // The 'inbound' channel carries messages received from the websocket. - let (inbound_tx, inbound_rx) = mpsc::unbounded(); - - let proxy = InspectorSessionProxy { - tx: outbound_tx, - rx: inbound_rx, - }; - - // The pump future takes care of forwarding messages between the websocket - // and channels. It resolves to () when either side disconnects, ignoring any - // errors. - let pump = async move { - let (websocket_tx, websocket_rx) = websocket.split(); - - let outbound_pump = outbound_rx - .map(|(_maybe_call_id, msg)| tungstenite::Message::text(msg)) - .map(Ok) - .forward(websocket_tx) - .map_err(|_| ()); - - let inbound_pump = websocket_rx - .map(|result| { - let result = result.map(|msg| msg.into_data()).map_err(AnyError::from); - inbound_tx.unbounded_send(result) + inbound_tx: UnboundedSender<Vec<u8>>, + outbound_rx: UnboundedReceiver<(Option<i32>, String)>, +) { + let (websocket_tx, websocket_rx) = websocket.split(); + + let outbound_pump = outbound_rx + .map(|(_maybe_call_id, msg)| tungstenite::Message::text(msg)) + .map(Ok) + .forward(websocket_tx) + .map_err(|_| ()); + + let inbound_pump = async move { + let result = websocket_rx + .map_ok(|msg| msg.into_data()) + .map_err(AnyError::from) + .map_ok(|msg| { + let _ = inbound_tx.unbounded_send(msg); }) - .map_err(|_| ()) - .try_collect::<()>(); + .try_collect::<()>() + .await; - let _ = future::try_join(outbound_pump, inbound_pump).await; - }; + match result { + Ok(_) => eprintln!("Debugger session ended"), + Err(err) => eprintln!("Debugger session ended: {}.", err), + }; - (proxy, pump) + Ok(()) + }; + let _ = future::try_join(outbound_pump, inbound_pump).await; } /// Inspector information that is sent from the isolate thread to the server diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 06e5d2e3b..dede48027 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -388,14 +388,7 @@ impl WebWorker { }); if let Some(server) = options.maybe_inspector_server.clone() { - let inspector = js_runtime.inspector(); - let session_sender = inspector.get_session_sender(); - let deregister_rx = inspector.add_deregister_handler(); - server.register_inspector( - session_sender, - deregister_rx, - main_module.to_string(), - ); + server.register_inspector(main_module.to_string(), &mut js_runtime); } let (internal_handle, external_handle) = { diff --git a/runtime/worker.rs b/runtime/worker.rs index b1096024b..979d024d4 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -161,14 +161,7 @@ impl MainWorker { }); if let Some(server) = options.maybe_inspector_server.clone() { - let inspector = js_runtime.inspector(); - let session_sender = inspector.get_session_sender(); - let deregister_rx = inspector.add_deregister_handler(); - server.register_inspector( - session_sender, - deregister_rx, - main_module.to_string(), - ); + server.register_inspector(main_module.to_string(), &mut js_runtime); } Self { |