summaryrefslogtreecommitdiff
path: root/core/inspector.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2022-12-04 21:08:53 +0100
committerGitHub <noreply@github.com>2022-12-04 21:08:53 +0100
commit83b6085604c28124597f6e668ca4a93941999d6a (patch)
treea8ff78e1df3dd9162dbc541057e85c4b6bc41448 /core/inspector.rs
parent8b5b327b18b1cc6b0519e632d0e9b709af18820e (diff)
revert: Inspector changes (#16939)
Reverts 66dc54a7f and e2a0c3f0 Closes https://github.com/denoland/deno/issues/16926
Diffstat (limited to 'core/inspector.rs')
-rw-r--r--core/inspector.rs300
1 files changed, 132 insertions, 168 deletions
diff --git a/core/inspector.rs b/core/inspector.rs
index 30129f94b..8a9136091 100644
--- a/core/inspector.rs
+++ b/core/inspector.rs
@@ -23,10 +23,10 @@ use crate::serde_json::json;
use crate::serde_json::Value;
use anyhow::Error;
use parking_lot::Mutex;
+use std::cell::BorrowMutError;
use std::cell::RefCell;
use std::collections::HashMap;
use std::ffi::c_void;
-use std::mem::replace;
use std::mem::take;
use std::mem::MaybeUninit;
use std::pin::Pin;
@@ -34,6 +34,7 @@ use std::ptr;
use std::ptr::NonNull;
use std::rc::Rc;
use std::sync::Arc;
+use std::thread;
use v8::HandleScope;
pub enum InspectorMsgKind {
@@ -54,21 +55,12 @@ pub struct InspectorSessionProxy {
pub rx: SessionProxyReceiver,
}
-#[derive(Clone, Copy, Debug, PartialEq)]
+#[derive(Clone, Copy)]
enum PollState {
- // Inspector is not being polled at this moment, it's waiting for more events
- // from the inspector.
Idle,
- // `InspectorWaker` has been called - either explicitly by outside code
- // (like WS server), or from one of the futures we were polling.
Woken,
- // Inspector is being polled asynchronously from the owning runtime.
Polling,
- // Inspector is being polled synchronously, possibly in a reentrant way
- // (e.g. from a callback invoked by V8).
- SyncPolling,
- // Inspector has been dropped already, but wakers might outlive the inspector
- // so make sure nothing gets woken at this point.
+ Parked,
Dropped,
}
@@ -123,24 +115,14 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector {
&mut self.v8_inspector_client
}
- /// This method id called when a breakpoint is triggered, eg. using `debugger` statement. In that case
- /// inspector sends `Debugger.paused` notification. Nested message loop should be run and process all
- /// sent protocol commands until `quit_message_loop_on_pause` is called. After that execution will
- /// return to inspector and then JavaScript execution will resume.
fn run_message_loop_on_pause(&mut self, context_group_id: i32) {
assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID);
self.flags.borrow_mut().on_pause = true;
- self.poll_sessions_sync();
- assert!(
- !self.flags.borrow().on_pause,
- "V8InspectorClientImpl::run_message_loop_on_pause returned before quit_message_loop_on_pause was called"
- );
+ let _ = self.poll_sessions(None);
}
fn quit_message_loop_on_pause(&mut self) {
- let mut flags = self.flags.borrow_mut();
- assert!(flags.on_pause);
- flags.on_pause = false;
+ self.flags.borrow_mut().on_pause = false;
}
fn run_if_waiting_for_debugger(&mut self, context_group_id: i32) {
@@ -157,10 +139,7 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector {
impl Future for JsRuntimeInspector {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
- // Here we actually want to set up waker so we are notified when new
- // messages arrive. Note that other call sites might want to reenter
- // and pump sessions synchronously.
- self.poll_sessions(cx)
+ self.poll_sessions(Some(cx)).unwrap()
}
}
@@ -227,7 +206,9 @@ impl JsRuntimeInspector {
aux_data_view,
);
- self_.poll_sessions_sync();
+ // Poll the session handler so we will get notified whenever there is
+ // new incoming debugger activity.
+ let _ = self_.poll_sessions(None).unwrap();
drop(self_);
self__
@@ -255,55 +236,20 @@ impl JsRuntimeInspector {
self.sessions.borrow().has_blocking_sessions()
}
- fn poll_sessions_sync(&self) {
- let (prev_poll_state, mut prev_task_waker) = self.waker.update(|w| {
- let prev_poll_state = replace(&mut w.poll_state, PollState::SyncPolling);
- assert!(prev_poll_state != PollState::SyncPolling);
-
- let prev_task_waker = w.task_waker.take();
-
- (prev_poll_state, prev_task_waker)
- });
-
- futures::executor::block_on(futures::future::poll_fn(|cx| {
- self.poll_sessions_inner(cx);
-
- // Block the thread if either the `on_pause` or the `waiting_for_session`.
- // is set. Otherwise, return `Ready(_)` to make `block_on()` return.
- let flags = self.flags.borrow();
- if flags.on_pause || flags.waiting_for_session {
- Poll::Pending
- } else {
- Poll::Ready(())
- }
- }));
-
- // Restore the previous poll state.
- self.waker.update(|w| {
- let replaced = replace(&mut w.poll_state, prev_poll_state);
- assert_eq!(replaced, PollState::SyncPolling);
- });
-
- // The `block_on(...)` call above must have created a new `Waker` that will
- // now be registered with `sessions.session_rx` and `sessions.established`.
- // This has the consequence that when either of those streams transitions
- // from `Pending` to `Ready`, they'll wake that (stale) waker, and the
- // inspector task won't get notified. To avoid a hang, explicitly wake the
- // inspector task here; when it gets polled, it will re-register the right
- // waker (the `InspectorWaker`) with those streams.
- if let Some(waker) = prev_task_waker.take() {
- waker.wake();
- }
- }
+ fn poll_sessions(
+ &self,
+ mut invoker_cx: Option<&mut Context>,
+ ) -> Result<Poll<()>, BorrowMutError> {
+ // The futures this function uses do not have re-entrant poll() functions.
+ // However it is can happpen that poll_sessions() gets re-entered, e.g.
+ // when an interrupt request is honored while the inspector future is polled
+ // by the task executor. We let the caller know by returning some error.
+ let mut sessions = self.sessions.try_borrow_mut()?;
- fn poll_sessions(&self, invoker_cx: &mut Context) -> Poll<()> {
self.waker.update(|w| {
match w.poll_state {
- PollState::Idle | PollState::Woken => {
- w.poll_state = PollState::Polling;
- w.inspector_ptr = Some(NonNull::from(self));
- }
- s => unreachable!("state in poll_sessions {:#?}", s),
+ PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling,
+ _ => unreachable!(),
};
});
@@ -313,83 +259,94 @@ impl JsRuntimeInspector {
let cx = &mut Context::from_waker(&waker_ref);
loop {
- self.poll_sessions_inner(cx);
+ loop {
+ // Do one "handshake" with a newly connected session at a time.
+ if let Some(mut session) = sessions.handshake.take() {
+ let poll_result = session.poll_next_unpin(cx);
+ match poll_result {
+ Poll::Pending => {
+ sessions.established.push(session);
+ continue;
+ }
+ Poll::Ready(Some(session_stream_item)) => {
+ let (v8_session_ptr, msg) = session_stream_item;
+ InspectorSession::dispatch_message(v8_session_ptr, msg);
+ sessions.established.push(session);
+ continue;
+ }
+ Poll::Ready(None) => {}
+ }
+ }
+
+ // Accept new connections.
+ let poll_result = sessions.session_rx.poll_next_unpin(cx);
+ if let Poll::Ready(Some(session_proxy)) = poll_result {
+ let session = InspectorSession::new(
+ sessions.v8_inspector.clone(),
+ session_proxy,
+ false,
+ );
+ let prev = sessions.handshake.replace(session);
+ assert!(prev.is_none());
+ }
- {
- let flags = self.flags.borrow();
- assert!(!flags.on_pause);
- assert!(!flags.waiting_for_session);
+ // Poll established sessions.
+ match sessions.established.poll_next_unpin(cx) {
+ Poll::Ready(Some(session_stream_item)) => {
+ let (v8_session_ptr, msg) = session_stream_item;
+ InspectorSession::dispatch_message(v8_session_ptr, msg);
+ continue;
+ }
+ Poll::Ready(None) => break,
+ Poll::Pending => break,
+ };
}
- let new_poll_state = self.waker.update(|w| {
+ let should_block =
+ self.flags.borrow().on_pause || self.flags.borrow().waiting_for_session;
+
+ let new_state = self.waker.update(|w| {
match w.poll_state {
PollState::Woken => {
- // The inspector got woken up before the last round of polling was
- // even over, so we need to do another round.
+ // The inspector was woken while the session handler was being
+ // polled, so we poll it another time.
w.poll_state = PollState::Polling;
}
- PollState::Polling => {
- // Since all streams were polled until they all yielded `Pending`,
- // there's nothing else we can do right now.
+ PollState::Polling if !should_block => {
+ // The session handler doesn't need to be polled any longer, and
+ // there's no reason to block (execution is not paused), so this
+ // function is about to return.
w.poll_state = PollState::Idle;
- // Capture the waker that, when used, will get the inspector polled.
- w.task_waker.replace(invoker_cx.waker().clone());
+ // Register the task waker that can be used to wake the parent
+ // task that will poll the inspector future.
+ if let Some(cx) = invoker_cx.take() {
+ w.task_waker.replace(cx.waker().clone());
+ }
+ // Register the address of the inspector, which allows the waker
+ // to request an interrupt from the isolate.
+ w.inspector_ptr = NonNull::new(self as *const _ as *mut Self);
+ }
+ PollState::Polling if should_block => {
+ // Isolate execution has been paused but there are no more
+ // events to process, so this thread will be parked. Therefore,
+ // store the current thread handle in the waker so it knows
+ // which thread to unpark when new events arrive.
+ w.poll_state = PollState::Parked;
+ w.parked_thread.replace(thread::current());
}
_ => unreachable!(),
};
w.poll_state
});
-
- match new_poll_state {
- PollState::Idle => break Poll::Pending,
- PollState::Polling => continue, // Poll the session handler again.
+ match new_state {
+ PollState::Idle => break Ok(Poll::Pending), // Yield to task.
+ PollState::Polling => {} // Poll the session handler again.
+ PollState::Parked => thread::park(), // Park the thread.
_ => unreachable!(),
};
}
}
- /// Accepts incoming connections from inspector clients, and polls established
- /// inspector sessions for messages that need to be dispatched to V8. This
- /// function will repeatedly poll its innner streams and will not return until
- /// they all yield `Pending` or have ended.
- fn poll_sessions_inner(&self, cx: &mut Context) {
- loop {
- let mut sessions = self.sessions.borrow_mut();
-
- // Accept new connections.
- let poll_result = sessions.session_rx.poll_next_unpin(cx);
- match poll_result {
- Poll::Ready(Some(session_proxy)) => {
- let session = InspectorSession::new(
- self.v8_inspector.clone(),
- session_proxy,
- false,
- );
- sessions.established.push(session);
- // `session_rx` needs to be polled repeatedly until it is `Pending`.
- continue;
- }
- Poll::Ready(None) => unreachable!(), // `session_rx` should never end.
- Poll::Pending => {}
- }
-
- // Poll established inspector sessions.
- let poll_result = sessions.established.poll_next_unpin(cx);
- if let Poll::Ready(Some(session_stream_item)) = poll_result {
- let (v8_session_ptr, msg) = session_stream_item;
- // Don't hold the borrow on sessions while dispatching a message, as it
- // might result in a call to `poll_sessions_sync`.
- drop(sessions);
- InspectorSession::dispatch_message(v8_session_ptr, msg);
- // Loop around. We need to keep polling established sessions and
- // accepting new ones until eventually everything is `Pending`.
- continue;
- }
-
- break;
- }
- }
-
/// This function blocks the thread until at least one inspector client has
/// established a websocket connection.
///
@@ -399,12 +356,10 @@ impl JsRuntimeInspector {
pub fn wait_for_session_and_break_on_next_statement(&mut self) {
loop {
match self.sessions.get_mut().established.iter_mut().next() {
- Some(session) => {
- break session.break_on_next_statement();
- }
+ Some(session) => break session.break_on_next_statement(),
None => {
self.flags.get_mut().waiting_for_session = true;
- self.poll_sessions_sync();
+ let _ = self.poll_sessions(None).unwrap();
}
};
}
@@ -468,6 +423,7 @@ struct InspectorFlags {
struct SessionContainer {
v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>,
session_rx: UnboundedReceiver<InspectorSessionProxy>,
+ handshake: Option<Box<InspectorSession>>,
established: SelectAll<Box<InspectorSession>>,
}
@@ -479,6 +435,7 @@ impl SessionContainer {
Self {
v8_inspector,
session_rx: new_session_rx,
+ handshake: None,
established: SelectAll::new(),
}
}
@@ -489,11 +446,12 @@ impl SessionContainer {
/// all sessions before dropping the inspector instance.
fn drop_sessions(&mut self) {
self.v8_inspector = Default::default();
+ self.handshake.take();
self.established.clear();
}
fn has_active_sessions(&self) -> bool {
- !self.established.is_empty()
+ !self.established.is_empty() || self.handshake.is_some()
}
fn has_blocking_sessions(&self) -> bool {
@@ -509,6 +467,7 @@ impl SessionContainer {
Self {
v8_inspector: Default::default(),
session_rx: rx,
+ handshake: None,
established: SelectAll::new(),
}
}
@@ -517,6 +476,7 @@ impl SessionContainer {
struct InspectorWakerInner {
poll_state: PollState,
task_waker: Option<task::Waker>,
+ parked_thread: Option<thread::Thread>,
inspector_ptr: Option<NonNull<JsRuntimeInspector>>,
isolate_handle: v8::IsolateHandle,
}
@@ -531,6 +491,7 @@ impl InspectorWaker {
let inner = InspectorWakerInner {
poll_state: PollState::Idle,
task_waker: None,
+ parked_thread: None,
inspector_ptr: None,
isolate_handle,
};
@@ -546,39 +507,43 @@ impl InspectorWaker {
}
}
-extern "C" fn handle_interrupt(_isolate: &mut v8::Isolate, arg: *mut c_void) {
- // SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the
- // pointer to the latter is valid as long as waker is alive.
- let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) };
- inspector.poll_sessions_sync();
-}
-
impl task::ArcWake for InspectorWaker {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.update(|w| {
- // Determine whether, given the current poll state, waking up is possible
- // and necessary. If it is, change the poll state to `Woken`.
match w.poll_state {
- PollState::Idle | PollState::Polling => w.poll_state = PollState::Woken,
- PollState::Woken => {} // Even if already woken, schedule an interrupt.
- PollState::Dropped => return, // Don't do anything.
- PollState::SyncPolling => panic!("wake() called while sync polling"),
+ PollState::Idle => {
+ // Wake the task, if any, that has polled the Inspector future last.
+ if let Some(waker) = w.task_waker.take() {
+ waker.wake()
+ }
+ // Request an interrupt from the isolate if it's running and there's
+ // not unhandled interrupt request in flight.
+ if let Some(arg) = w
+ .inspector_ptr
+ .take()
+ .map(|ptr| ptr.as_ptr() as *mut c_void)
+ {
+ w.isolate_handle.request_interrupt(handle_interrupt, arg);
+ }
+ extern "C" fn handle_interrupt(
+ _isolate: &mut v8::Isolate,
+ arg: *mut c_void,
+ ) {
+ // SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the
+ // pointer to the latter is valid as long as waker is alive.
+ let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) };
+ let _ = inspector.poll_sessions(None);
+ }
+ }
+ PollState::Parked => {
+ // Unpark the isolate thread.
+ let parked_thread = w.parked_thread.take().unwrap();
+ assert_ne!(parked_thread.id(), thread::current().id());
+ parked_thread.unpark();
+ }
+ _ => {}
};
-
- // Wake the task, if any, that has polled the Inspector future last.
- if let Some(waker) = w.task_waker.take() {
- waker.wake()
- }
-
- // Request an interrupt from the isolate, if the isolate is currently
- // running and there isn't already an interrupt request in flight.
- if let Some(arg) = w
- .inspector_ptr
- .take()
- .map(|ptr| ptr.cast::<c_void>().as_ptr())
- {
- w.isolate_handle.request_interrupt(handle_interrupt, arg);
- }
+ w.poll_state = PollState::Woken;
});
}
}
@@ -603,7 +568,6 @@ impl InspectorSession {
blocking: bool,
) -> Box<Self> {
new_box_with(move |self_ptr| {
- // TODO(bartlomieju): channel should probably be a separate struct
let v8_channel = v8::inspector::ChannelBase::new::<Self>();
let mut v8_inspector = v8_inspector_rc.borrow_mut();
let v8_inspector_ptr = v8_inspector.as_mut().unwrap();