diff options
Diffstat (limited to 'core/inspector.rs')
-rw-r--r-- | core/inspector.rs | 299 |
1 files changed, 167 insertions, 132 deletions
diff --git a/core/inspector.rs b/core/inspector.rs index 8a9136091..7450e46b8 100644 --- a/core/inspector.rs +++ b/core/inspector.rs @@ -23,10 +23,10 @@ use crate::serde_json::json; use crate::serde_json::Value; use anyhow::Error; use parking_lot::Mutex; -use std::cell::BorrowMutError; use std::cell::RefCell; use std::collections::HashMap; use std::ffi::c_void; +use std::mem::replace; use std::mem::take; use std::mem::MaybeUninit; use std::pin::Pin; @@ -34,7 +34,6 @@ use std::ptr; use std::ptr::NonNull; use std::rc::Rc; use std::sync::Arc; -use std::thread; use v8::HandleScope; pub enum InspectorMsgKind { @@ -55,12 +54,21 @@ pub struct InspectorSessionProxy { pub rx: SessionProxyReceiver, } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug, PartialEq)] enum PollState { + // Inspector is not being polled at this moment, it's waiting for more events + // from the inspector. Idle, + // `InspectorWaker` has been called - either explicitly by outside code + // (like WS server), or from one of the futures we were polling. Woken, + // Inspector is being polled asynchronously from the owning runtime. Polling, - Parked, + // Inspector is being polled synchronously, possibly in a reentrant way + // (e.g. from a callback invoked by V8). + SyncPolling, + // Inspector has been dropped already, but wakers might outlive the inspector + // so make sure nothing gets woken at this point. Dropped, } @@ -115,14 +123,24 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector { &mut self.v8_inspector_client } + /// This method id called when a breakpoint is triggered, eg. using `debugger` statement. In that case + /// inspector sends `Debugger.paused` notification. Nested message loop should be run and process all + /// sent protocol commands until `quit_message_loop_on_pause` is called. After that execution will + /// return to inspector and then JavaScript execution will resume. fn run_message_loop_on_pause(&mut self, context_group_id: i32) { assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID); self.flags.borrow_mut().on_pause = true; - let _ = self.poll_sessions(None); + self.poll_sessions_sync(); + assert!( + !self.flags.borrow().on_pause, + "V8InspectorClientImpl::run_message_loop_on_pause returned before quit_message_loop_on_pause was called" + ); } fn quit_message_loop_on_pause(&mut self) { - self.flags.borrow_mut().on_pause = false; + let mut flags = self.flags.borrow_mut(); + assert!(flags.on_pause); + flags.on_pause = false; } fn run_if_waiting_for_debugger(&mut self, context_group_id: i32) { @@ -139,7 +157,10 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector { impl Future for JsRuntimeInspector { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { - self.poll_sessions(Some(cx)).unwrap() + // Here we actually want to set up waker so we are notified when new + // messages arrive. Note that other call sites might want to reenter + // and pump sessions synchronously. + self.poll_sessions(cx) } } @@ -206,9 +227,7 @@ impl JsRuntimeInspector { aux_data_view, ); - // Poll the session handler so we will get notified whenever there is - // new incoming debugger activity. - let _ = self_.poll_sessions(None).unwrap(); + self_.poll_sessions_sync(); drop(self_); self__ @@ -236,20 +255,55 @@ impl JsRuntimeInspector { self.sessions.borrow().has_blocking_sessions() } - fn poll_sessions( - &self, - mut invoker_cx: Option<&mut Context>, - ) -> Result<Poll<()>, BorrowMutError> { - // The futures this function uses do not have re-entrant poll() functions. - // However it is can happpen that poll_sessions() gets re-entered, e.g. - // when an interrupt request is honored while the inspector future is polled - // by the task executor. We let the caller know by returning some error. - let mut sessions = self.sessions.try_borrow_mut()?; + fn poll_sessions_sync(&self) { + let (prev_poll_state, mut prev_task_waker) = self.waker.update(|w| { + let prev_poll_state = replace(&mut w.poll_state, PollState::SyncPolling); + assert!(prev_poll_state != PollState::SyncPolling); + + let prev_task_waker = w.task_waker.take(); + (prev_poll_state, prev_task_waker) + }); + + futures::executor::block_on(futures::future::poll_fn(|cx| { + self.poll_sessions_inner(cx); + + // Block the thread if either the `on_pause` or the `waiting_for_session`. + // is set. Otherwise, return `Ready(_)` to make `block_on()` return. + let flags = self.flags.borrow(); + if flags.on_pause || flags.waiting_for_session { + Poll::Pending + } else { + Poll::Ready(()) + } + })); + + // Restore the previous poll state. + self.waker.update(|w| { + let replaced = replace(&mut w.poll_state, prev_poll_state); + assert_eq!(replaced, PollState::SyncPolling); + }); + + // The `block_on(...)` call above must have created a new `Waker` that will + // now be registered with `sessions.session_rx` and `sessions.established`. + // This has the consequence that when either of those streams transitions + // from `Pending` to `Ready`, they'll wake that (stale) waker, and the + // inspector task won't get notified. To avoid a hang, explicitly wake the + // inspector task here; when it gets polled, it will re-register the right + // waker (the `InspectorWaker`) with those streams. + if let Some(waker) = prev_task_waker.take() { + waker.wake(); + } + } + + fn poll_sessions(&self, invoker_cx: &mut Context) -> Poll<()> { self.waker.update(|w| { match w.poll_state { - PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling, - _ => unreachable!(), + PollState::Idle | PollState::Woken => { + w.poll_state = PollState::Polling; + w.inspector_ptr = Some(NonNull::from(self)); + } + s => unreachable!("state in poll_sessions {:#?}", s), }; }); @@ -259,94 +313,83 @@ impl JsRuntimeInspector { let cx = &mut Context::from_waker(&waker_ref); loop { - loop { - // Do one "handshake" with a newly connected session at a time. - if let Some(mut session) = sessions.handshake.take() { - let poll_result = session.poll_next_unpin(cx); - match poll_result { - Poll::Pending => { - sessions.established.push(session); - continue; - } - Poll::Ready(Some(session_stream_item)) => { - let (v8_session_ptr, msg) = session_stream_item; - InspectorSession::dispatch_message(v8_session_ptr, msg); - sessions.established.push(session); - continue; - } - Poll::Ready(None) => {} - } - } - - // Accept new connections. - let poll_result = sessions.session_rx.poll_next_unpin(cx); - if let Poll::Ready(Some(session_proxy)) = poll_result { - let session = InspectorSession::new( - sessions.v8_inspector.clone(), - session_proxy, - false, - ); - let prev = sessions.handshake.replace(session); - assert!(prev.is_none()); - } + self.poll_sessions_inner(cx); - // Poll established sessions. - match sessions.established.poll_next_unpin(cx) { - Poll::Ready(Some(session_stream_item)) => { - let (v8_session_ptr, msg) = session_stream_item; - InspectorSession::dispatch_message(v8_session_ptr, msg); - continue; - } - Poll::Ready(None) => break, - Poll::Pending => break, - }; + { + let flags = self.flags.borrow(); + assert!(!flags.on_pause); + assert!(!flags.waiting_for_session); } - let should_block = - self.flags.borrow().on_pause || self.flags.borrow().waiting_for_session; - - let new_state = self.waker.update(|w| { + let new_poll_state = self.waker.update(|w| { match w.poll_state { PollState::Woken => { - // The inspector was woken while the session handler was being - // polled, so we poll it another time. + // The inspector got woken up before the last round of polling was + // even over, so we need to do another round. w.poll_state = PollState::Polling; } - PollState::Polling if !should_block => { - // The session handler doesn't need to be polled any longer, and - // there's no reason to block (execution is not paused), so this - // function is about to return. + PollState::Polling => { + // Since all streams were polled until they all yielded `Pending`, + // there's nothing else we can do right now. w.poll_state = PollState::Idle; - // Register the task waker that can be used to wake the parent - // task that will poll the inspector future. - if let Some(cx) = invoker_cx.take() { - w.task_waker.replace(cx.waker().clone()); - } - // Register the address of the inspector, which allows the waker - // to request an interrupt from the isolate. - w.inspector_ptr = NonNull::new(self as *const _ as *mut Self); - } - PollState::Polling if should_block => { - // Isolate execution has been paused but there are no more - // events to process, so this thread will be parked. Therefore, - // store the current thread handle in the waker so it knows - // which thread to unpark when new events arrive. - w.poll_state = PollState::Parked; - w.parked_thread.replace(thread::current()); + // Capture the waker that, when used, will get the inspector polled. + w.task_waker.replace(invoker_cx.waker().clone()); } _ => unreachable!(), }; w.poll_state }); - match new_state { - PollState::Idle => break Ok(Poll::Pending), // Yield to task. - PollState::Polling => {} // Poll the session handler again. - PollState::Parked => thread::park(), // Park the thread. + + match new_poll_state { + PollState::Idle => break Poll::Pending, + PollState::Polling => continue, // Poll the session handler again. _ => unreachable!(), }; } } + /// Accepts incoming connections from inspector clients, and polls established + /// inspector sessions for messages that need to be dispatched to V8. This + /// function will repeatedly poll its innner streams and will not return until + /// they all yield `Pending` or have ended. + fn poll_sessions_inner(&self, cx: &mut Context) { + loop { + let mut sessions = self.sessions.borrow_mut(); + + // Accept new connections. + let poll_result = sessions.session_rx.poll_next_unpin(cx); + match poll_result { + Poll::Ready(Some(session_proxy)) => { + let session = InspectorSession::new( + self.v8_inspector.clone(), + session_proxy, + false, + ); + sessions.established.push(session); + // `session_rx` needs to be polled repeatedly until it is `Pending`. + continue; + } + Poll::Ready(None) => unreachable!(), // `session_rx` should never end. + Poll::Pending => {} + } + + // Poll established inspector sessions. + let poll_result = sessions.established.poll_next_unpin(cx); + if let Poll::Ready(Some(session_stream_item)) = poll_result { + let (v8_session_ptr, msg) = session_stream_item; + // Don't hold the borrow on sessions while dispatching a message, as it + // might result in a call to `poll_sessions_sync`. + drop(sessions); + InspectorSession::dispatch_message(v8_session_ptr, msg); + // Loop around. We need to keep polling established sessions and + // accepting new ones until eventually everything is `Pending`. + continue; + } + + break; + } + } + /// This function blocks the thread until at least one inspector client has /// established a websocket connection. /// @@ -356,10 +399,12 @@ impl JsRuntimeInspector { pub fn wait_for_session_and_break_on_next_statement(&mut self) { loop { match self.sessions.get_mut().established.iter_mut().next() { - Some(session) => break session.break_on_next_statement(), + Some(session) => { + break session.break_on_next_statement(); + } None => { self.flags.get_mut().waiting_for_session = true; - let _ = self.poll_sessions(None).unwrap(); + self.poll_sessions_sync(); } }; } @@ -423,7 +468,6 @@ struct InspectorFlags { struct SessionContainer { v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>, session_rx: UnboundedReceiver<InspectorSessionProxy>, - handshake: Option<Box<InspectorSession>>, established: SelectAll<Box<InspectorSession>>, } @@ -435,7 +479,6 @@ impl SessionContainer { Self { v8_inspector, session_rx: new_session_rx, - handshake: None, established: SelectAll::new(), } } @@ -446,12 +489,11 @@ impl SessionContainer { /// all sessions before dropping the inspector instance. fn drop_sessions(&mut self) { self.v8_inspector = Default::default(); - self.handshake.take(); self.established.clear(); } fn has_active_sessions(&self) -> bool { - !self.established.is_empty() || self.handshake.is_some() + !self.established.is_empty() } fn has_blocking_sessions(&self) -> bool { @@ -467,7 +509,6 @@ impl SessionContainer { Self { v8_inspector: Default::default(), session_rx: rx, - handshake: None, established: SelectAll::new(), } } @@ -476,7 +517,6 @@ impl SessionContainer { struct InspectorWakerInner { poll_state: PollState, task_waker: Option<task::Waker>, - parked_thread: Option<thread::Thread>, inspector_ptr: Option<NonNull<JsRuntimeInspector>>, isolate_handle: v8::IsolateHandle, } @@ -491,7 +531,6 @@ impl InspectorWaker { let inner = InspectorWakerInner { poll_state: PollState::Idle, task_waker: None, - parked_thread: None, inspector_ptr: None, isolate_handle, }; @@ -507,43 +546,38 @@ impl InspectorWaker { } } +extern "C" fn handle_interrupt(_isolate: &mut v8::Isolate, arg: *mut c_void) { + // SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the + // pointer to the latter is valid as long as waker is alive. + let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) }; + inspector.poll_sessions_sync(); +} + impl task::ArcWake for InspectorWaker { fn wake_by_ref(arc_self: &Arc<Self>) { arc_self.update(|w| { + // Determine whether, given the current poll state, waking up is possible + // and necessary. If it is, change the poll state to `Woken`. match w.poll_state { - PollState::Idle => { - // Wake the task, if any, that has polled the Inspector future last. - if let Some(waker) = w.task_waker.take() { - waker.wake() - } - // Request an interrupt from the isolate if it's running and there's - // not unhandled interrupt request in flight. - if let Some(arg) = w - .inspector_ptr - .take() - .map(|ptr| ptr.as_ptr() as *mut c_void) - { - w.isolate_handle.request_interrupt(handle_interrupt, arg); - } - extern "C" fn handle_interrupt( - _isolate: &mut v8::Isolate, - arg: *mut c_void, - ) { - // SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the - // pointer to the latter is valid as long as waker is alive. - let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) }; - let _ = inspector.poll_sessions(None); - } - } - PollState::Parked => { - // Unpark the isolate thread. - let parked_thread = w.parked_thread.take().unwrap(); - assert_ne!(parked_thread.id(), thread::current().id()); - parked_thread.unpark(); - } - _ => {} + PollState::Idle | PollState::Polling => w.poll_state = PollState::Woken, + PollState::Woken | PollState::Dropped => return, // Nothing to do. + PollState::SyncPolling => panic!("wake() called while sync polling"), }; - w.poll_state = PollState::Woken; + + // Wake the task, if any, that has polled the Inspector future last. + if let Some(waker) = w.task_waker.take() { + waker.wake() + } + + // Request an interrupt from the isolate, if the isolate is currently + // running and there isn't already an interrupt request in flight. + if let Some(arg) = w + .inspector_ptr + .take() + .map(|ptr| ptr.cast::<c_void>().as_ptr()) + { + w.isolate_handle.request_interrupt(handle_interrupt, arg); + } }); } } @@ -568,6 +602,7 @@ impl InspectorSession { blocking: bool, ) -> Box<Self> { new_box_with(move |self_ptr| { + // TODO(bartlomieju): channel should probably be a separate struct let v8_channel = v8::inspector::ChannelBase::new::<Self>(); let mut v8_inspector = v8_inspector_rc.borrow_mut(); let v8_inspector_ptr = v8_inspector.as_mut().unwrap(); |