diff options
author | Bert Belder <bertbelder@gmail.com> | 2020-05-05 05:39:42 +0200 |
---|---|---|
committer | Bert Belder <bertbelder@gmail.com> | 2020-05-05 16:12:45 +0200 |
commit | e574437922db0693e7be7a5df7c474f306e55f7b (patch) | |
tree | b0c798efa2de29efc3555e18c807a3d0e4abfda1 /cli/inspector.rs | |
parent | 6e287d951853ff38fb7002d31b9677c184ae6ffa (diff) |
Fix inspector hanging when task budget is exceeded (#5083)
The issue is solved by proxying websocket messages over a pair of
`futures::mpsc::unbounded` channels. As these are are implemented in
the 'futures' crate, they can't participate in Tokio's cooperative
task yielding.
Diffstat (limited to 'cli/inspector.rs')
-rw-r--r-- | cli/inspector.rs | 152 |
1 files changed, 102 insertions, 50 deletions
diff --git a/cli/inspector.rs b/cli/inspector.rs index a637e980f..b6f653f4c 100644 --- a/cli/inspector.rs +++ b/cli/inspector.rs @@ -37,7 +37,6 @@ use std::sync::Once; use std::thread; use uuid::Uuid; use warp::filters::ws; -use warp::filters::ws::WebSocket; use warp::Filter; struct InspectorServer { @@ -91,7 +90,7 @@ struct InspectorInfo { host: SocketAddr, uuid: Uuid, thread_name: Option<String>, - new_websocket_tx: UnboundedSender<WebSocket>, + new_websocket_tx: UnboundedSender<WebSocketProxy>, canary_rx: oneshot::Receiver<Never>, } @@ -178,7 +177,9 @@ async fn server( g.get(&uuid).map(|info| info.new_websocket_tx.clone()).map( |new_websocket_tx| { ws.on_upgrade(move |websocket| async move { - let _ = new_websocket_tx.unbounded_send(websocket); + let (proxy, pump) = create_websocket_proxy(websocket); + let _ = new_websocket_tx.unbounded_send(proxy); + pump.await; }) }, ) @@ -223,6 +224,69 @@ async fn server( } } +type WebSocketProxySender = UnboundedSender<ws::Message>; +type WebSocketProxyReceiver = + UnboundedReceiver<Result<ws::Message, warp::Error>>; + +/// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form +/// a duplex channel for sending/receiving websocket messages. +struct WebSocketProxy { + tx: WebSocketProxySender, + rx: WebSocketProxyReceiver, +} + +impl WebSocketProxy { + pub fn split(self) -> (WebSocketProxySender, WebSocketProxyReceiver) { + (self.tx, self.rx) + } +} + +/// Creates a future that proxies messages sent and received on a warp WebSocket +/// to a UnboundedSender/UnboundedReceiver pair. We need this to sidestep +/// Tokio's task budget, which causes issues when DenoInspector::poll_sessions() +/// needs to block the thread because JavaScript execution is paused. +/// +/// This works because UnboundedSender/UnboundedReceiver are implemented in the +/// 'futures' crate, therefore they can't participate in Tokio's cooperative +/// task yielding. +/// +/// A tuple is returned, where the first element is a duplex channel that can +/// be used to send/receive messages on the websocket, and the second element +/// is a future that does the forwarding. +fn create_websocket_proxy( + websocket: ws::WebSocket, +) -> (WebSocketProxy, impl Future<Output = ()> + Send) { + // The 'outbound' channel carries messages sent to the websocket. + let (outbound_tx, outbound_rx) = mpsc::unbounded(); + + // The 'inbound' channel carries messages received from the websocket. + let (inbound_tx, inbound_rx) = mpsc::unbounded(); + + let proxy = WebSocketProxy { + tx: outbound_tx, + rx: inbound_rx, + }; + + // The pump future takes care of forwarding messages between the websocket + // and channels. It resolves to () when either side disconnects, ignoring any + // errors. + let pump = async move { + let (websocket_tx, websocket_rx) = websocket.split(); + + let outbound_pump = + outbound_rx.map(Ok).forward(websocket_tx).map_err(|_| ()); + + let inbound_pump = websocket_rx + .map(|msg| inbound_tx.unbounded_send(msg)) + .map_err(|_| ()) + .try_collect::<()>(); + + let _ = future::try_join(outbound_pump, inbound_pump).await; + }; + + (proxy, pump) +} + #[derive(Clone, Copy)] enum PollState { Idle, @@ -322,7 +386,8 @@ impl DenoInspector { let mut hs = v8::HandleScope::new(v8_isolate); let scope = hs.enter(); - let (new_websocket_tx, new_websocket_rx) = mpsc::unbounded::<WebSocket>(); + let (new_websocket_tx, new_websocket_rx) = + mpsc::unbounded::<WebSocketProxy>(); let (canary_tx, canary_rx) = oneshot::channel::<Never>(); let info = InspectorInfo { @@ -511,7 +576,7 @@ struct InspectorSessions { impl InspectorSessions { fn new( inspector_ptr: *mut DenoInspector, - new_websocket_rx: UnboundedReceiver<WebSocket>, + new_websocket_rx: UnboundedReceiver<WebSocketProxy>, ) -> RefCell<Self> { let new_incoming = new_websocket_rx .map(move |websocket| DenoInspectorSession::new(inspector_ptr, websocket)) @@ -609,11 +674,8 @@ impl task::ArcWake for InspectorWaker { struct DenoInspectorSession { v8_channel: v8::inspector::ChannelBase, v8_session: v8::UniqueRef<v8::inspector::V8InspectorSession>, - message_handler: Pin<Box<dyn Future<Output = ()> + 'static>>, - // Internal channel/queue that temporarily stores messages sent by V8 to - // the front-end, before they are sent over the websocket. - outbound_queue_tx: - UnboundedSender<v8::UniquePtr<v8::inspector::StringBuffer>>, + websocket_tx: WebSocketProxySender, + websocket_rx_handler: Pin<Box<dyn Future<Output = ()> + 'static>>, } impl Deref for DenoInspectorSession { @@ -634,7 +696,7 @@ impl DenoInspectorSession { pub fn new( inspector_ptr: *mut DenoInspector, - websocket: WebSocket, + websocket: WebSocketProxy, ) -> Box<Self> { new_box_with(move |self_ptr| { let v8_channel = v8::inspector::ChannelBase::new::<Self>(); @@ -648,54 +710,38 @@ impl DenoInspectorSession { &empty_view, ); - let (outbound_queue_tx, outbound_queue_rx) = - mpsc::unbounded::<v8::UniquePtr<v8::inspector::StringBuffer>>(); - - let message_handler = - Self::create_message_handler(self_ptr, websocket, outbound_queue_rx); + let (websocket_tx, websocket_rx) = websocket.split(); + let websocket_rx_handler = + Self::receive_from_websocket(self_ptr, websocket_rx); Self { v8_channel, v8_session, - message_handler, - outbound_queue_tx, + websocket_tx, + websocket_rx_handler, } }) } - fn create_message_handler( + /// Returns a future that receives messages from the websocket and dispatches + /// them to the V8 session. + fn receive_from_websocket( self_ptr: *mut Self, - websocket: WebSocket, - outbound_queue_rx: UnboundedReceiver< - v8::UniquePtr<v8::inspector::StringBuffer>, - >, + websocket_rx: WebSocketProxyReceiver, ) -> Pin<Box<dyn Future<Output = ()> + 'static>> { - let (websocket_tx, websocket_rx) = websocket.split(); - - // Receive messages from the websocket and dispatch them to the V8 session. - let inbound_pump = websocket_rx - .map_ok(move |msg| { - let msg = msg.as_bytes(); - let msg = v8::inspector::StringView::from(msg); - unsafe { &mut *self_ptr }.dispatch_protocol_message(&msg); - }) - .try_collect::<()>(); - - // Convert and forward messages from the outbound message queue to the - // websocket. - let outbound_pump = outbound_queue_rx - .map(move |msg| { - let msg = msg.unwrap().string().to_string(); - let msg = ws::Message::text(msg); - Ok(msg) - }) - .forward(websocket_tx); - - let disconnect_future = future::try_join(inbound_pump, outbound_pump); - async move { eprintln!("Debugger session started."); - match disconnect_future.await { + + let result = websocket_rx + .map_ok(move |msg| { + let msg = msg.as_bytes(); + let msg = v8::inspector::StringView::from(msg); + unsafe { &mut *self_ptr }.dispatch_protocol_message(&msg); + }) + .try_collect::<()>() + .await; + + match result { Ok(_) => eprintln!("Debugger session ended."), Err(err) => eprintln!("Debugger session ended: {}.", err), }; @@ -703,6 +749,12 @@ impl DenoInspectorSession { .boxed_local() } + fn send_to_websocket(&self, msg: v8::UniquePtr<v8::inspector::StringBuffer>) { + let msg = msg.unwrap().string().to_string(); + let msg = ws::Message::text(msg); + let _ = self.websocket_tx.unbounded_send(msg); + } + pub fn break_on_first_statement(&mut self) { let reason = v8::inspector::StringView::from(&b"debugCommand"[..]); let detail = v8::inspector::StringView::empty(); @@ -724,14 +776,14 @@ impl v8::inspector::ChannelImpl for DenoInspectorSession { _call_id: i32, message: v8::UniquePtr<v8::inspector::StringBuffer>, ) { - let _ = self.outbound_queue_tx.unbounded_send(message); + self.send_to_websocket(message); } fn send_notification( &mut self, message: v8::UniquePtr<v8::inspector::StringBuffer>, ) { - let _ = self.outbound_queue_tx.unbounded_send(message); + self.send_to_websocket(message); } fn flush_protocol_notifications(&mut self) {} @@ -740,7 +792,7 @@ impl v8::inspector::ChannelImpl for DenoInspectorSession { impl Future for DenoInspectorSession { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - self.message_handler.poll_unpin(cx) + self.websocket_rx_handler.poll_unpin(cx) } } |