summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2020-05-05 05:39:42 +0200
committerBert Belder <bertbelder@gmail.com>2020-05-05 16:12:45 +0200
commite574437922db0693e7be7a5df7c474f306e55f7b (patch)
treeb0c798efa2de29efc3555e18c807a3d0e4abfda1
parent6e287d951853ff38fb7002d31b9677c184ae6ffa (diff)
Fix inspector hanging when task budget is exceeded (#5083)
The issue is solved by proxying websocket messages over a pair of `futures::mpsc::unbounded` channels. As these are are implemented in the 'futures' crate, they can't participate in Tokio's cooperative task yielding.
-rw-r--r--cli/inspector.rs152
-rw-r--r--cli/tests/inspector3.js6
-rw-r--r--cli/tests/integration_tests.rs117
3 files changed, 204 insertions, 71 deletions
diff --git a/cli/inspector.rs b/cli/inspector.rs
index a637e980f..b6f653f4c 100644
--- a/cli/inspector.rs
+++ b/cli/inspector.rs
@@ -37,7 +37,6 @@ use std::sync::Once;
use std::thread;
use uuid::Uuid;
use warp::filters::ws;
-use warp::filters::ws::WebSocket;
use warp::Filter;
struct InspectorServer {
@@ -91,7 +90,7 @@ struct InspectorInfo {
host: SocketAddr,
uuid: Uuid,
thread_name: Option<String>,
- new_websocket_tx: UnboundedSender<WebSocket>,
+ new_websocket_tx: UnboundedSender<WebSocketProxy>,
canary_rx: oneshot::Receiver<Never>,
}
@@ -178,7 +177,9 @@ async fn server(
g.get(&uuid).map(|info| info.new_websocket_tx.clone()).map(
|new_websocket_tx| {
ws.on_upgrade(move |websocket| async move {
- let _ = new_websocket_tx.unbounded_send(websocket);
+ let (proxy, pump) = create_websocket_proxy(websocket);
+ let _ = new_websocket_tx.unbounded_send(proxy);
+ pump.await;
})
},
)
@@ -223,6 +224,69 @@ async fn server(
}
}
+type WebSocketProxySender = UnboundedSender<ws::Message>;
+type WebSocketProxyReceiver =
+ UnboundedReceiver<Result<ws::Message, warp::Error>>;
+
+/// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form
+/// a duplex channel for sending/receiving websocket messages.
+struct WebSocketProxy {
+ tx: WebSocketProxySender,
+ rx: WebSocketProxyReceiver,
+}
+
+impl WebSocketProxy {
+ pub fn split(self) -> (WebSocketProxySender, WebSocketProxyReceiver) {
+ (self.tx, self.rx)
+ }
+}
+
+/// Creates a future that proxies messages sent and received on a warp WebSocket
+/// to a UnboundedSender/UnboundedReceiver pair. We need this to sidestep
+/// Tokio's task budget, which causes issues when DenoInspector::poll_sessions()
+/// needs to block the thread because JavaScript execution is paused.
+///
+/// This works because UnboundedSender/UnboundedReceiver are implemented in the
+/// 'futures' crate, therefore they can't participate in Tokio's cooperative
+/// task yielding.
+///
+/// A tuple is returned, where the first element is a duplex channel that can
+/// be used to send/receive messages on the websocket, and the second element
+/// is a future that does the forwarding.
+fn create_websocket_proxy(
+ websocket: ws::WebSocket,
+) -> (WebSocketProxy, impl Future<Output = ()> + Send) {
+ // The 'outbound' channel carries messages sent to the websocket.
+ let (outbound_tx, outbound_rx) = mpsc::unbounded();
+
+ // The 'inbound' channel carries messages received from the websocket.
+ let (inbound_tx, inbound_rx) = mpsc::unbounded();
+
+ let proxy = WebSocketProxy {
+ tx: outbound_tx,
+ rx: inbound_rx,
+ };
+
+ // The pump future takes care of forwarding messages between the websocket
+ // and channels. It resolves to () when either side disconnects, ignoring any
+ // errors.
+ let pump = async move {
+ let (websocket_tx, websocket_rx) = websocket.split();
+
+ let outbound_pump =
+ outbound_rx.map(Ok).forward(websocket_tx).map_err(|_| ());
+
+ let inbound_pump = websocket_rx
+ .map(|msg| inbound_tx.unbounded_send(msg))
+ .map_err(|_| ())
+ .try_collect::<()>();
+
+ let _ = future::try_join(outbound_pump, inbound_pump).await;
+ };
+
+ (proxy, pump)
+}
+
#[derive(Clone, Copy)]
enum PollState {
Idle,
@@ -322,7 +386,8 @@ impl DenoInspector {
let mut hs = v8::HandleScope::new(v8_isolate);
let scope = hs.enter();
- let (new_websocket_tx, new_websocket_rx) = mpsc::unbounded::<WebSocket>();
+ let (new_websocket_tx, new_websocket_rx) =
+ mpsc::unbounded::<WebSocketProxy>();
let (canary_tx, canary_rx) = oneshot::channel::<Never>();
let info = InspectorInfo {
@@ -511,7 +576,7 @@ struct InspectorSessions {
impl InspectorSessions {
fn new(
inspector_ptr: *mut DenoInspector,
- new_websocket_rx: UnboundedReceiver<WebSocket>,
+ new_websocket_rx: UnboundedReceiver<WebSocketProxy>,
) -> RefCell<Self> {
let new_incoming = new_websocket_rx
.map(move |websocket| DenoInspectorSession::new(inspector_ptr, websocket))
@@ -609,11 +674,8 @@ impl task::ArcWake for InspectorWaker {
struct DenoInspectorSession {
v8_channel: v8::inspector::ChannelBase,
v8_session: v8::UniqueRef<v8::inspector::V8InspectorSession>,
- message_handler: Pin<Box<dyn Future<Output = ()> + 'static>>,
- // Internal channel/queue that temporarily stores messages sent by V8 to
- // the front-end, before they are sent over the websocket.
- outbound_queue_tx:
- UnboundedSender<v8::UniquePtr<v8::inspector::StringBuffer>>,
+ websocket_tx: WebSocketProxySender,
+ websocket_rx_handler: Pin<Box<dyn Future<Output = ()> + 'static>>,
}
impl Deref for DenoInspectorSession {
@@ -634,7 +696,7 @@ impl DenoInspectorSession {
pub fn new(
inspector_ptr: *mut DenoInspector,
- websocket: WebSocket,
+ websocket: WebSocketProxy,
) -> Box<Self> {
new_box_with(move |self_ptr| {
let v8_channel = v8::inspector::ChannelBase::new::<Self>();
@@ -648,54 +710,38 @@ impl DenoInspectorSession {
&empty_view,
);
- let (outbound_queue_tx, outbound_queue_rx) =
- mpsc::unbounded::<v8::UniquePtr<v8::inspector::StringBuffer>>();
-
- let message_handler =
- Self::create_message_handler(self_ptr, websocket, outbound_queue_rx);
+ let (websocket_tx, websocket_rx) = websocket.split();
+ let websocket_rx_handler =
+ Self::receive_from_websocket(self_ptr, websocket_rx);
Self {
v8_channel,
v8_session,
- message_handler,
- outbound_queue_tx,
+ websocket_tx,
+ websocket_rx_handler,
}
})
}
- fn create_message_handler(
+ /// Returns a future that receives messages from the websocket and dispatches
+ /// them to the V8 session.
+ fn receive_from_websocket(
self_ptr: *mut Self,
- websocket: WebSocket,
- outbound_queue_rx: UnboundedReceiver<
- v8::UniquePtr<v8::inspector::StringBuffer>,
- >,
+ websocket_rx: WebSocketProxyReceiver,
) -> Pin<Box<dyn Future<Output = ()> + 'static>> {
- let (websocket_tx, websocket_rx) = websocket.split();
-
- // Receive messages from the websocket and dispatch them to the V8 session.
- let inbound_pump = websocket_rx
- .map_ok(move |msg| {
- let msg = msg.as_bytes();
- let msg = v8::inspector::StringView::from(msg);
- unsafe { &mut *self_ptr }.dispatch_protocol_message(&msg);
- })
- .try_collect::<()>();
-
- // Convert and forward messages from the outbound message queue to the
- // websocket.
- let outbound_pump = outbound_queue_rx
- .map(move |msg| {
- let msg = msg.unwrap().string().to_string();
- let msg = ws::Message::text(msg);
- Ok(msg)
- })
- .forward(websocket_tx);
-
- let disconnect_future = future::try_join(inbound_pump, outbound_pump);
-
async move {
eprintln!("Debugger session started.");
- match disconnect_future.await {
+
+ let result = websocket_rx
+ .map_ok(move |msg| {
+ let msg = msg.as_bytes();
+ let msg = v8::inspector::StringView::from(msg);
+ unsafe { &mut *self_ptr }.dispatch_protocol_message(&msg);
+ })
+ .try_collect::<()>()
+ .await;
+
+ match result {
Ok(_) => eprintln!("Debugger session ended."),
Err(err) => eprintln!("Debugger session ended: {}.", err),
};
@@ -703,6 +749,12 @@ impl DenoInspectorSession {
.boxed_local()
}
+ fn send_to_websocket(&self, msg: v8::UniquePtr<v8::inspector::StringBuffer>) {
+ let msg = msg.unwrap().string().to_string();
+ let msg = ws::Message::text(msg);
+ let _ = self.websocket_tx.unbounded_send(msg);
+ }
+
pub fn break_on_first_statement(&mut self) {
let reason = v8::inspector::StringView::from(&b"debugCommand"[..]);
let detail = v8::inspector::StringView::empty();
@@ -724,14 +776,14 @@ impl v8::inspector::ChannelImpl for DenoInspectorSession {
_call_id: i32,
message: v8::UniquePtr<v8::inspector::StringBuffer>,
) {
- let _ = self.outbound_queue_tx.unbounded_send(message);
+ self.send_to_websocket(message);
}
fn send_notification(
&mut self,
message: v8::UniquePtr<v8::inspector::StringBuffer>,
) {
- let _ = self.outbound_queue_tx.unbounded_send(message);
+ self.send_to_websocket(message);
}
fn flush_protocol_notifications(&mut self) {}
@@ -740,7 +792,7 @@ impl v8::inspector::ChannelImpl for DenoInspectorSession {
impl Future for DenoInspectorSession {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- self.message_handler.poll_unpin(cx)
+ self.websocket_rx_handler.poll_unpin(cx)
}
}
diff --git a/cli/tests/inspector3.js b/cli/tests/inspector3.js
new file mode 100644
index 000000000..de079a1bc
--- /dev/null
+++ b/cli/tests/inspector3.js
@@ -0,0 +1,6 @@
+for (let i = 0; i < 128; i++) {
+ console.log(i);
+ debugger;
+}
+await new Promise((res, _) => setTimeout(res, 100));
+console.log("done");
diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs
index 0847c1806..d4c58a685 100644
--- a/cli/tests/integration_tests.rs
+++ b/cli/tests/integration_tests.rs
@@ -2152,10 +2152,7 @@ fn extract_ws_url_from_stderr(
#[tokio::test]
async fn inspector_connect() {
- let script = deno::test_util::root_path()
- .join("cli")
- .join("tests")
- .join("inspector1.js");
+ let script = util::tests_path().join("inspector1.js");
let mut child = util::deno_cmd()
.arg("run")
// Warning: each inspector test should be on its own port to avoid
@@ -2174,6 +2171,7 @@ async fn inspector_connect() {
.expect("Can't connect");
assert_eq!("101 Switching Protocols", response.status().to_string());
child.kill().unwrap();
+ child.wait().unwrap();
}
enum TestStep {
@@ -2184,10 +2182,7 @@ enum TestStep {
#[tokio::test]
async fn inspector_break_on_first_line() {
- let script = deno::test_util::root_path()
- .join("cli")
- .join("tests")
- .join("inspector2.js");
+ let script = util::tests_path().join("inspector2.js");
let mut child = util::deno_cmd()
.arg("run")
// Warning: each inspector test should be on its own port to avoid
@@ -2224,12 +2219,12 @@ async fn inspector_break_on_first_line() {
WsRecv(r#"{"id":3,"result":{}}"#),
WsRecv(r#"{"method":"Debugger.paused","#),
WsSend(
- r#"{"id":5,"method":"Runtime.evaluate","params":{"expression":"Deno.core.print(\"hello from the inspector\\n\")","contextId":1,"includeCommandLineAPI":true,"silent":false,"returnByValue":true}}"#,
+ r#"{"id":4,"method":"Runtime.evaluate","params":{"expression":"Deno.core.print(\"hello from the inspector\\n\")","contextId":1,"includeCommandLineAPI":true,"silent":false,"returnByValue":true}}"#,
),
- WsRecv(r#"{"id":5,"result":{"result":{"type":"undefined"}}}"#),
+ WsRecv(r#"{"id":4,"result":{"result":{"type":"undefined"}}}"#),
StdOut("hello from the inspector"),
- WsSend(r#"{"id":6,"method":"Debugger.resume"}"#),
- WsRecv(r#"{"id":6,"result":{}}"#),
+ WsSend(r#"{"id":5,"method":"Debugger.resume"}"#),
+ WsRecv(r#"{"id":5,"result":{}}"#),
StdOut("hello from the script"),
];
@@ -2254,14 +2249,12 @@ async fn inspector_break_on_first_line() {
}
child.kill().unwrap();
+ child.wait().unwrap();
}
#[tokio::test]
async fn inspector_pause() {
- let script = deno::test_util::root_path()
- .join("cli")
- .join("tests")
- .join("inspector1.js");
+ let script = util::tests_path().join("inspector1.js");
let mut child = util::deno_cmd()
.arg("run")
// Warning: each inspector test should be on its own port to avoid
@@ -2317,10 +2310,7 @@ async fn inspector_pause() {
#[tokio::test]
async fn inspector_port_collision() {
- let script = deno::test_util::root_path()
- .join("cli")
- .join("tests")
- .join("inspector1.js");
+ let script = util::tests_path().join("inspector1.js");
let mut child1 = util::deno_cmd()
.arg("run")
.arg("--inspect=127.0.0.1:9231")
@@ -2348,8 +2338,93 @@ async fn inspector_port_collision() {
.read_to_string(&mut stderr_str_2)
.unwrap();
assert!(stderr_str_2.contains("Cannot start inspector server"));
+
child1.kill().unwrap();
- let _ = child2.kill();
+ child1.wait().unwrap();
+ child2.wait().unwrap();
+}
+
+#[tokio::test]
+async fn inspector_does_not_hang() {
+ let script = util::tests_path().join("inspector3.js");
+ let mut child = util::deno_cmd()
+ .arg("run")
+ // Warning: each inspector test should be on its own port to avoid
+ // conflicting with another inspector test.
+ .arg("--inspect-brk=127.0.0.1:9232")
+ .arg(script)
+ .stdout(std::process::Stdio::piped())
+ .stderr(std::process::Stdio::piped())
+ .spawn()
+ .unwrap();
+
+ let stderr = child.stderr.as_mut().unwrap();
+ let ws_url = extract_ws_url_from_stderr(stderr);
+ let (socket, response) = tokio_tungstenite::connect_async(ws_url)
+ .await
+ .expect("Can't connect");
+ assert_eq!(response.status(), 101); // Switching protocols.
+
+ let (mut socket_tx, socket_rx) = socket.split();
+ let mut socket_rx =
+ socket_rx.map(|msg| msg.unwrap().to_string()).filter(|msg| {
+ let pass = !msg.starts_with(r#"{"method":"Debugger.scriptParsed","#);
+ futures::future::ready(pass)
+ });
+
+ let stdout = child.stdout.as_mut().unwrap();
+ let mut stdout_lines =
+ std::io::BufReader::new(stdout).lines().map(|r| r.unwrap());
+
+ use TestStep::*;
+ let test_steps = vec![
+ WsSend(r#"{"id":1,"method":"Runtime.enable"}"#),
+ WsSend(r#"{"id":2,"method":"Debugger.enable"}"#),
+ WsRecv(
+ r#"{"method":"Runtime.executionContextCreated","params":{"context":{"id":1,"#,
+ ),
+ WsRecv(r#"{"id":1,"result":{}}"#),
+ WsRecv(r#"{"id":2,"result":{"debuggerId":"#),
+ WsSend(r#"{"id":3,"method":"Runtime.runIfWaitingForDebugger"}"#),
+ WsRecv(r#"{"id":3,"result":{}}"#),
+ WsRecv(r#"{"method":"Debugger.paused","#),
+ WsSend(r#"{"id":4,"method":"Debugger.resume"}"#),
+ WsRecv(r#"{"id":4,"result":{}}"#),
+ WsRecv(r#"{"method":"Debugger.resumed","params":{}}"#),
+ ];
+
+ for step in test_steps {
+ match step {
+ WsRecv(s) => assert!(socket_rx.next().await.unwrap().starts_with(s)),
+ WsSend(s) => socket_tx.send(s.into()).await.unwrap(),
+ _ => unreachable!(),
+ }
+ }
+
+ for i in 0..128u32 {
+ let request_id = i + 10;
+ // Expect the number {i} on stdout.
+ let s = format!("{}", i);
+ assert_eq!(stdout_lines.next().unwrap(), s);
+ // Expect hitting the `debugger` statement.
+ let s = r#"{"method":"Debugger.paused","#;
+ assert!(socket_rx.next().await.unwrap().starts_with(s));
+ // Send the 'Debugger.resume' request.
+ let s = format!(r#"{{"id":{},"method":"Debugger.resume"}}"#, request_id);
+ socket_tx.send(s.into()).await.unwrap();
+ // Expect confirmation of the 'Debugger.resume' request.
+ let s = format!(r#"{{"id":{},"result":{{}}}}"#, request_id);
+ assert_eq!(socket_rx.next().await.unwrap(), s);
+ let s = r#"{"method":"Debugger.resumed","params":{}}"#;
+ assert_eq!(socket_rx.next().await.unwrap(), s);
+ }
+
+ // Check that we can gracefully close the websocket connection.
+ socket_tx.close().await.unwrap();
+ socket_rx.for_each(|_| async {}).await;
+
+ assert_eq!(&stdout_lines.next().unwrap(), "done");
+ assert!(child.wait().unwrap().success());
}
mod util {