summaryrefslogtreecommitdiff
path: root/core/inspector.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/inspector.rs')
-rw-r--r--core/inspector.rs299
1 files changed, 167 insertions, 132 deletions
diff --git a/core/inspector.rs b/core/inspector.rs
index 8a9136091..7450e46b8 100644
--- a/core/inspector.rs
+++ b/core/inspector.rs
@@ -23,10 +23,10 @@ use crate::serde_json::json;
use crate::serde_json::Value;
use anyhow::Error;
use parking_lot::Mutex;
-use std::cell::BorrowMutError;
use std::cell::RefCell;
use std::collections::HashMap;
use std::ffi::c_void;
+use std::mem::replace;
use std::mem::take;
use std::mem::MaybeUninit;
use std::pin::Pin;
@@ -34,7 +34,6 @@ use std::ptr;
use std::ptr::NonNull;
use std::rc::Rc;
use std::sync::Arc;
-use std::thread;
use v8::HandleScope;
pub enum InspectorMsgKind {
@@ -55,12 +54,21 @@ pub struct InspectorSessionProxy {
pub rx: SessionProxyReceiver,
}
-#[derive(Clone, Copy)]
+#[derive(Clone, Copy, Debug, PartialEq)]
enum PollState {
+ // Inspector is not being polled at this moment, it's waiting for more events
+ // from the inspector.
Idle,
+ // `InspectorWaker` has been called - either explicitly by outside code
+ // (like WS server), or from one of the futures we were polling.
Woken,
+ // Inspector is being polled asynchronously from the owning runtime.
Polling,
- Parked,
+ // Inspector is being polled synchronously, possibly in a reentrant way
+ // (e.g. from a callback invoked by V8).
+ SyncPolling,
+ // Inspector has been dropped already, but wakers might outlive the inspector
+ // so make sure nothing gets woken at this point.
Dropped,
}
@@ -115,14 +123,24 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector {
&mut self.v8_inspector_client
}
+ /// This method id called when a breakpoint is triggered, eg. using `debugger` statement. In that case
+ /// inspector sends `Debugger.paused` notification. Nested message loop should be run and process all
+ /// sent protocol commands until `quit_message_loop_on_pause` is called. After that execution will
+ /// return to inspector and then JavaScript execution will resume.
fn run_message_loop_on_pause(&mut self, context_group_id: i32) {
assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
self.flags.borrow_mut().on_pause = true;
- let _ = self.poll_sessions(None);
+ self.poll_sessions_sync();
+ assert!(
+ !self.flags.borrow().on_pause,
+ "V8InspectorClientImpl::run_message_loop_on_pause returned before quit_message_loop_on_pause was called"
+ );
}
fn quit_message_loop_on_pause(&mut self) {
- self.flags.borrow_mut().on_pause = false;
+ let mut flags = self.flags.borrow_mut();
+ assert!(flags.on_pause);
+ flags.on_pause = false;
}
fn run_if_waiting_for_debugger(&mut self, context_group_id: i32) {
@@ -139,7 +157,10 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector {
impl Future for JsRuntimeInspector {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
- self.poll_sessions(Some(cx)).unwrap()
+ // Here we actually want to set up waker so we are notified when new
+ // messages arrive. Note that other call sites might want to reenter
+ // and pump sessions synchronously.
+ self.poll_sessions(cx)
}
}
@@ -206,9 +227,7 @@ impl JsRuntimeInspector {
aux_data_view,
);
- // Poll the session handler so we will get notified whenever there is
- // new incoming debugger activity.
- let _ = self_.poll_sessions(None).unwrap();
+ self_.poll_sessions_sync();
drop(self_);
self__
@@ -236,20 +255,55 @@ impl JsRuntimeInspector {
self.sessions.borrow().has_blocking_sessions()
}
- fn poll_sessions(
- &self,
- mut invoker_cx: Option<&mut Context>,
- ) -> Result<Poll<()>, BorrowMutError> {
- // The futures this function uses do not have re-entrant poll() functions.
- // However it is can happpen that poll_sessions() gets re-entered, e.g.
- // when an interrupt request is honored while the inspector future is polled
- // by the task executor. We let the caller know by returning some error.
- let mut sessions = self.sessions.try_borrow_mut()?;
+ fn poll_sessions_sync(&self) {
+ let (prev_poll_state, mut prev_task_waker) = self.waker.update(|w| {
+ let prev_poll_state = replace(&mut w.poll_state, PollState::SyncPolling);
+ assert!(prev_poll_state != PollState::SyncPolling);
+
+ let prev_task_waker = w.task_waker.take();
+ (prev_poll_state, prev_task_waker)
+ });
+
+ futures::executor::block_on(futures::future::poll_fn(|cx| {
+ self.poll_sessions_inner(cx);
+
+ // Block the thread if either the `on_pause` or the `waiting_for_session`.
+ // is set. Otherwise, return `Ready(_)` to make `block_on()` return.
+ let flags = self.flags.borrow();
+ if flags.on_pause || flags.waiting_for_session {
+ Poll::Pending
+ } else {
+ Poll::Ready(())
+ }
+ }));
+
+ // Restore the previous poll state.
+ self.waker.update(|w| {
+ let replaced = replace(&mut w.poll_state, prev_poll_state);
+ assert_eq!(replaced, PollState::SyncPolling);
+ });
+
+ // The `block_on(...)` call above must have created a new `Waker` that will
+ // now be registered with `sessions.session_rx` and `sessions.established`.
+ // This has the consequence that when either of those streams transitions
+ // from `Pending` to `Ready`, they'll wake that (stale) waker, and the
+ // inspector task won't get notified. To avoid a hang, explicitly wake the
+ // inspector task here; when it gets polled, it will re-register the right
+ // waker (the `InspectorWaker`) with those streams.
+ if let Some(waker) = prev_task_waker.take() {
+ waker.wake();
+ }
+ }
+
+ fn poll_sessions(&self, invoker_cx: &mut Context) -> Poll<()> {
self.waker.update(|w| {
match w.poll_state {
- PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling,
- _ => unreachable!(),
+ PollState::Idle | PollState::Woken => {
+ w.poll_state = PollState::Polling;
+ w.inspector_ptr = Some(NonNull::from(self));
+ }
+ s => unreachable!("state in poll_sessions {:#?}", s),
};
});
@@ -259,94 +313,83 @@ impl JsRuntimeInspector {
let cx = &mut Context::from_waker(&waker_ref);
loop {
- loop {
- // Do one "handshake" with a newly connected session at a time.
- if let Some(mut session) = sessions.handshake.take() {
- let poll_result = session.poll_next_unpin(cx);
- match poll_result {
- Poll::Pending => {
- sessions.established.push(session);
- continue;
- }
- Poll::Ready(Some(session_stream_item)) => {
- let (v8_session_ptr, msg) = session_stream_item;
- InspectorSession::dispatch_message(v8_session_ptr, msg);
- sessions.established.push(session);
- continue;
- }
- Poll::Ready(None) => {}
- }
- }
-
- // Accept new connections.
- let poll_result = sessions.session_rx.poll_next_unpin(cx);
- if let Poll::Ready(Some(session_proxy)) = poll_result {
- let session = InspectorSession::new(
- sessions.v8_inspector.clone(),
- session_proxy,
- false,
- );
- let prev = sessions.handshake.replace(session);
- assert!(prev.is_none());
- }
+ self.poll_sessions_inner(cx);
- // Poll established sessions.
- match sessions.established.poll_next_unpin(cx) {
- Poll::Ready(Some(session_stream_item)) => {
- let (v8_session_ptr, msg) = session_stream_item;
- InspectorSession::dispatch_message(v8_session_ptr, msg);
- continue;
- }
- Poll::Ready(None) => break,
- Poll::Pending => break,
- };
+ {
+ let flags = self.flags.borrow();
+ assert!(!flags.on_pause);
+ assert!(!flags.waiting_for_session);
}
- let should_block =
- self.flags.borrow().on_pause || self.flags.borrow().waiting_for_session;
-
- let new_state = self.waker.update(|w| {
+ let new_poll_state = self.waker.update(|w| {
match w.poll_state {
PollState::Woken => {
- // The inspector was woken while the session handler was being
- // polled, so we poll it another time.
+ // The inspector got woken up before the last round of polling was
+ // even over, so we need to do another round.
w.poll_state = PollState::Polling;
}
- PollState::Polling if !should_block => {
- // The session handler doesn't need to be polled any longer, and
- // there's no reason to block (execution is not paused), so this
- // function is about to return.
+ PollState::Polling => {
+ // Since all streams were polled until they all yielded `Pending`,
+ // there's nothing else we can do right now.
w.poll_state = PollState::Idle;
- // Register the task waker that can be used to wake the parent
- // task that will poll the inspector future.
- if let Some(cx) = invoker_cx.take() {
- w.task_waker.replace(cx.waker().clone());
- }
- // Register the address of the inspector, which allows the waker
- // to request an interrupt from the isolate.
- w.inspector_ptr = NonNull::new(self as *const _ as *mut Self);
- }
- PollState::Polling if should_block => {
- // Isolate execution has been paused but there are no more
- // events to process, so this thread will be parked. Therefore,
- // store the current thread handle in the waker so it knows
- // which thread to unpark when new events arrive.
- w.poll_state = PollState::Parked;
- w.parked_thread.replace(thread::current());
+ // Capture the waker that, when used, will get the inspector polled.
+ w.task_waker.replace(invoker_cx.waker().clone());
}
_ => unreachable!(),
};
w.poll_state
});
- match new_state {
- PollState::Idle => break Ok(Poll::Pending), // Yield to task.
- PollState::Polling => {} // Poll the session handler again.
- PollState::Parked => thread::park(), // Park the thread.
+
+ match new_poll_state {
+ PollState::Idle => break Poll::Pending,
+ PollState::Polling => continue, // Poll the session handler again.
_ => unreachable!(),
};
}
}
+ /// Accepts incoming connections from inspector clients, and polls established
+ /// inspector sessions for messages that need to be dispatched to V8. This
+ /// function will repeatedly poll its innner streams and will not return until
+ /// they all yield `Pending` or have ended.
+ fn poll_sessions_inner(&self, cx: &mut Context) {
+ loop {
+ let mut sessions = self.sessions.borrow_mut();
+
+ // Accept new connections.
+ let poll_result = sessions.session_rx.poll_next_unpin(cx);
+ match poll_result {
+ Poll::Ready(Some(session_proxy)) => {
+ let session = InspectorSession::new(
+ self.v8_inspector.clone(),
+ session_proxy,
+ false,
+ );
+ sessions.established.push(session);
+ // `session_rx` needs to be polled repeatedly until it is `Pending`.
+ continue;
+ }
+ Poll::Ready(None) => unreachable!(), // `session_rx` should never end.
+ Poll::Pending => {}
+ }
+
+ // Poll established inspector sessions.
+ let poll_result = sessions.established.poll_next_unpin(cx);
+ if let Poll::Ready(Some(session_stream_item)) = poll_result {
+ let (v8_session_ptr, msg) = session_stream_item;
+ // Don't hold the borrow on sessions while dispatching a message, as it
+ // might result in a call to `poll_sessions_sync`.
+ drop(sessions);
+ InspectorSession::dispatch_message(v8_session_ptr, msg);
+ // Loop around. We need to keep polling established sessions and
+ // accepting new ones until eventually everything is `Pending`.
+ continue;
+ }
+
+ break;
+ }
+ }
+
/// This function blocks the thread until at least one inspector client has
/// established a websocket connection.
///
@@ -356,10 +399,12 @@ impl JsRuntimeInspector {
pub fn wait_for_session_and_break_on_next_statement(&mut self) {
loop {
match self.sessions.get_mut().established.iter_mut().next() {
- Some(session) => break session.break_on_next_statement(),
+ Some(session) => {
+ break session.break_on_next_statement();
+ }
None => {
self.flags.get_mut().waiting_for_session = true;
- let _ = self.poll_sessions(None).unwrap();
+ self.poll_sessions_sync();
}
};
}
@@ -423,7 +468,6 @@ struct InspectorFlags {
struct SessionContainer {
v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>,
session_rx: UnboundedReceiver<InspectorSessionProxy>,
- handshake: Option<Box<InspectorSession>>,
established: SelectAll<Box<InspectorSession>>,
}
@@ -435,7 +479,6 @@ impl SessionContainer {
Self {
v8_inspector,
session_rx: new_session_rx,
- handshake: None,
established: SelectAll::new(),
}
}
@@ -446,12 +489,11 @@ impl SessionContainer {
/// all sessions before dropping the inspector instance.
fn drop_sessions(&mut self) {
self.v8_inspector = Default::default();
- self.handshake.take();
self.established.clear();
}
fn has_active_sessions(&self) -> bool {
- !self.established.is_empty() || self.handshake.is_some()
+ !self.established.is_empty()
}
fn has_blocking_sessions(&self) -> bool {
@@ -467,7 +509,6 @@ impl SessionContainer {
Self {
v8_inspector: Default::default(),
session_rx: rx,
- handshake: None,
established: SelectAll::new(),
}
}
@@ -476,7 +517,6 @@ impl SessionContainer {
struct InspectorWakerInner {
poll_state: PollState,
task_waker: Option<task::Waker>,
- parked_thread: Option<thread::Thread>,
inspector_ptr: Option<NonNull<JsRuntimeInspector>>,
isolate_handle: v8::IsolateHandle,
}
@@ -491,7 +531,6 @@ impl InspectorWaker {
let inner = InspectorWakerInner {
poll_state: PollState::Idle,
task_waker: None,
- parked_thread: None,
inspector_ptr: None,
isolate_handle,
};
@@ -507,43 +546,38 @@ impl InspectorWaker {
}
}
+extern "C" fn handle_interrupt(_isolate: &mut v8::Isolate, arg: *mut c_void) {
+ // SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the
+ // pointer to the latter is valid as long as waker is alive.
+ let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) };
+ inspector.poll_sessions_sync();
+}
+
impl task::ArcWake for InspectorWaker {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.update(|w| {
+ // Determine whether, given the current poll state, waking up is possible
+ // and necessary. If it is, change the poll state to `Woken`.
match w.poll_state {
- PollState::Idle => {
- // Wake the task, if any, that has polled the Inspector future last.
- if let Some(waker) = w.task_waker.take() {
- waker.wake()
- }
- // Request an interrupt from the isolate if it's running and there's
- // not unhandled interrupt request in flight.
- if let Some(arg) = w
- .inspector_ptr
- .take()
- .map(|ptr| ptr.as_ptr() as *mut c_void)
- {
- w.isolate_handle.request_interrupt(handle_interrupt, arg);
- }
- extern "C" fn handle_interrupt(
- _isolate: &mut v8::Isolate,
- arg: *mut c_void,
- ) {
- // SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the
- // pointer to the latter is valid as long as waker is alive.
- let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) };
- let _ = inspector.poll_sessions(None);
- }
- }
- PollState::Parked => {
- // Unpark the isolate thread.
- let parked_thread = w.parked_thread.take().unwrap();
- assert_ne!(parked_thread.id(), thread::current().id());
- parked_thread.unpark();
- }
- _ => {}
+ PollState::Idle | PollState::Polling => w.poll_state = PollState::Woken,
+ PollState::Woken | PollState::Dropped => return, // Nothing to do.
+ PollState::SyncPolling => panic!("wake() called while sync polling"),
};
- w.poll_state = PollState::Woken;
+
+ // Wake the task, if any, that has polled the Inspector future last.
+ if let Some(waker) = w.task_waker.take() {
+ waker.wake()
+ }
+
+ // Request an interrupt from the isolate, if the isolate is currently
+ // running and there isn't already an interrupt request in flight.
+ if let Some(arg) = w
+ .inspector_ptr
+ .take()
+ .map(|ptr| ptr.cast::<c_void>().as_ptr())
+ {
+ w.isolate_handle.request_interrupt(handle_interrupt, arg);
+ }
});
}
}
@@ -568,6 +602,7 @@ impl InspectorSession {
blocking: bool,
) -> Box<Self> {
new_box_with(move |self_ptr| {
+ // TODO(bartlomieju): channel should probably be a separate struct
let v8_channel = v8::inspector::ChannelBase::new::<Self>();
let mut v8_inspector = v8_inspector_rc.borrow_mut();
let v8_inspector_ptr = v8_inspector.as_mut().unwrap();