diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2021-12-28 17:40:42 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-28 17:40:42 +0100 |
commit | ee7ab81768c593e99774b629c82c1784204a1cb0 (patch) | |
tree | c37e58f26df4178ce8bdde3e7a51bdb691852a64 /core/inspector.rs | |
parent | 07618c861e27e284cae3a309783ef78dcef611dd (diff) |
refactor(core): cleanup Inspector implementation (#12962)
Diffstat (limited to 'core/inspector.rs')
-rw-r--r-- | core/inspector.rs | 172 |
1 files changed, 95 insertions, 77 deletions
diff --git a/core/inspector.rs b/core/inspector.rs index 8a686dc63..5dd3b0055 100644 --- a/core/inspector.rs +++ b/core/inspector.rs @@ -37,10 +37,16 @@ use std::rc::Rc; use std::sync::Arc; use std::thread; -/// If first argument is `None` then it's a notification, otherwise -/// it's a message. -pub type SessionProxySender = UnboundedSender<(Option<i32>, String)>; -pub type SessionProxyReceiver = UnboundedReceiver<Vec<u8>>; +pub enum InspectorMsgKind { + Notification, + Message(i32), +} +pub struct InspectorMsg { + pub kind: InspectorMsgKind, + pub content: String, +} +pub type SessionProxySender = UnboundedSender<InspectorMsg>; +pub type SessionProxyReceiver = UnboundedReceiver<String>; /// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form /// a duplex channel for sending/receiving messages in V8 session. @@ -89,7 +95,7 @@ impl Drop for JsRuntimeInspector { // deleted, however InspectorSession also has a drop handler that cleans // up after itself. To avoid a double free, make sure the inspector is // dropped last. - take(&mut *self.sessions.borrow_mut()); + self.sessions.borrow_mut().drop_sessions(); // Notify counterparty that this instance is being destroyed. Ignoring // result because counterparty waiting for the signal might have already @@ -154,24 +160,25 @@ impl JsRuntimeInspector { let v8_inspector_client = v8::inspector::V8InspectorClientBase::new::<Self>(); - let flags = InspectorFlags::new(); let waker = InspectorWaker::new(scope.thread_safe_handle()); // Create JsRuntimeInspector instance. let mut self_ = Box::new(Self { v8_inspector_client, v8_inspector: Default::default(), - sessions: Default::default(), + sessions: RefCell::new(SessionContainer::temporary_placeholder()), new_session_tx, - flags, + flags: Default::default(), waker, deregister_tx: None, }); self_.v8_inspector = Rc::new(RefCell::new( v8::inspector::V8Inspector::create(scope, &mut *self_).into(), )); - self_.sessions = - SessionContainer::new(self_.v8_inspector.clone(), new_session_rx); + self_.sessions = RefCell::new(SessionContainer::new( + self_.v8_inspector.clone(), + new_session_rx, + )); // Tell the inspector about the global context. let context = v8::Local::new(scope, context); @@ -184,15 +191,14 @@ impl JsRuntimeInspector { .context_created(context, Self::CONTEXT_GROUP_ID, context_name); // Poll the session handler so we will get notified whenever there is - // new_incoming debugger activity. + // new incoming debugger activity. let _ = self_.poll_sessions(None).unwrap(); self_ } pub fn has_active_sessions(&self) -> bool { - let sessions = self.sessions.borrow(); - !sessions.established.is_empty() || sessions.handshake.is_some() + self.sessions.borrow().has_active_sessions() } fn poll_sessions( @@ -236,8 +242,12 @@ impl JsRuntimeInspector { } // Accept new connections. - match sessions.new_incoming.poll_next_unpin(cx) { - Poll::Ready(Some(session)) => { + match sessions.session_rx.poll_next_unpin(cx) { + Poll::Ready(Some(session_proxy)) => { + let session = InspectorSession::new( + sessions.v8_inspector.clone(), + session_proxy, + ); let prev = sessions.handshake.replace(session); assert!(prev.is_none()); continue; @@ -374,17 +384,11 @@ struct InspectorFlags { on_pause: bool, } -impl InspectorFlags { - fn new() -> RefCell<Self> { - let self_ = Self::default(); - RefCell::new(self_) - } -} - /// A helper structure that helps coordinate sessions during different /// parts of their lifecycle. struct SessionContainer { - new_incoming: Pin<Box<dyn Stream<Item = Box<InspectorSession>> + 'static>>, + v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>, + session_rx: UnboundedReceiver<InspectorSessionProxy>, handshake: Option<Box<InspectorSession>>, established: FuturesUnordered<Box<InspectorSession>>, } @@ -393,24 +397,38 @@ impl SessionContainer { fn new( v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>, new_session_rx: UnboundedReceiver<InspectorSessionProxy>, - ) -> RefCell<Self> { - let new_incoming = new_session_rx - .map(move |session_proxy| { - InspectorSession::new(v8_inspector.clone(), session_proxy) - }) - .boxed_local(); - let self_ = Self { - new_incoming, - ..Default::default() - }; - RefCell::new(self_) + ) -> Self { + Self { + v8_inspector, + session_rx: new_session_rx, + handshake: None, + established: FuturesUnordered::new(), + } + } + + /// V8 automatically deletes all sessions when an `V8Inspector` instance is + /// deleted, however InspectorSession also has a drop handler that cleans + /// up after itself. To avoid a double free, we need to manually drop + /// all sessions before dropping the inspector instance. + fn drop_sessions(&mut self) { + self.v8_inspector = Default::default(); + self.handshake.take(); + self.established.clear(); + } + + fn has_active_sessions(&self) -> bool { + !self.established.is_empty() || self.handshake.is_some() } -} -impl Default for SessionContainer { - fn default() -> Self { + /// A temporary placeholder that should be used before actual + /// instance of V8Inspector is created. It's used in favor + /// of `Default` implementation to signal that it's not meant + /// for actual use. + fn temporary_placeholder() -> Self { + let (_tx, rx) = mpsc::unbounded::<InspectorSessionProxy>(); Self { - new_incoming: stream::empty().boxed_local(), + v8_inspector: Default::default(), + session_rx: rx, handshake: None, established: FuturesUnordered::new(), } @@ -535,11 +553,14 @@ impl InspectorSession { fn send_message( &self, - maybe_call_id: Option<i32>, + msg_kind: InspectorMsgKind, msg: v8::UniquePtr<v8::inspector::StringBuffer>, ) { let msg = msg.unwrap().string().to_string(); - let _ = self.proxy.tx.unbounded_send((maybe_call_id, msg)); + let _ = self.proxy.tx.unbounded_send(InspectorMsg { + kind: msg_kind, + content: msg, + }); } pub fn break_on_next_statement(&mut self) { @@ -567,14 +588,14 @@ impl v8::inspector::ChannelImpl for InspectorSession { call_id: i32, message: v8::UniquePtr<v8::inspector::StringBuffer>, ) { - self.send_message(Some(call_id), message); + self.send_message(InspectorMsgKind::Message(call_id), message); } fn send_notification( &mut self, message: v8::UniquePtr<v8::inspector::StringBuffer>, ) { - self.send_message(None, message); + self.send_message(InspectorMsgKind::Notification, message); } fn flush_protocol_notifications(&mut self) {} @@ -587,7 +608,7 @@ impl Future for InspectorSession { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { while let Poll::Ready(maybe_msg) = self.proxy.rx.poll_next_unpin(cx) { if let Some(msg) = maybe_msg { - let msg = v8::inspector::StringView::from(msg.as_slice()); + let msg = v8::inspector::StringView::from(msg.as_bytes()); let mut v8_session = self.v8_session.borrow_mut(); let v8_session_ptr = v8_session.as_mut(); v8_session_ptr.dispatch_protocol_message(msg); @@ -603,8 +624,8 @@ impl Future for InspectorSession { /// A local inspector session that can be used to send and receive protocol messages directly on /// the same thread as an isolate. pub struct LocalInspectorSession { - v8_session_tx: UnboundedSender<Vec<u8>>, - v8_session_rx: UnboundedReceiver<(Option<i32>, String)>, + v8_session_tx: UnboundedSender<String>, + v8_session_rx: UnboundedReceiver<InspectorMsg>, response_tx_map: HashMap<i32, oneshot::Sender<serde_json::Value>>, next_message_id: i32, notification_queue: Vec<Value>, @@ -612,8 +633,8 @@ pub struct LocalInspectorSession { impl LocalInspectorSession { pub fn new( - v8_session_tx: UnboundedSender<Vec<u8>>, - v8_session_rx: UnboundedReceiver<(Option<i32>, String)>, + v8_session_tx: UnboundedSender<String>, + v8_session_rx: UnboundedReceiver<InspectorMsg>, ) -> Self { let response_tx_map = HashMap::new(); let next_message_id = 0; @@ -651,11 +672,8 @@ impl LocalInspectorSession { "params": params, }); - let raw_message = serde_json::to_string(&message).unwrap(); - self - .v8_session_tx - .unbounded_send(raw_message.as_bytes().to_vec()) - .unwrap(); + let stringified_msg = serde_json::to_string(&message).unwrap(); + self.v8_session_tx.unbounded_send(stringified_msg).unwrap(); loop { let receive_fut = self.receive_from_v8_session().boxed_local(); @@ -675,40 +693,40 @@ impl LocalInspectorSession { } async fn receive_from_v8_session(&mut self) { - let (maybe_call_id, message) = self.v8_session_rx.next().await.unwrap(); - // If there's no call_id then it's a notification - if let Some(call_id) = maybe_call_id { - let message: serde_json::Value = match serde_json::from_str(&message) { - Ok(v) => v, - Err(error) => match error.classify() { - serde_json::error::Category::Syntax => json!({ - "id": call_id, - "result": { + let inspector_msg = self.v8_session_rx.next().await.unwrap(); + if let InspectorMsgKind::Message(msg_id) = inspector_msg.kind { + let message: serde_json::Value = + match serde_json::from_str(&inspector_msg.content) { + Ok(v) => v, + Err(error) => match error.classify() { + serde_json::error::Category::Syntax => json!({ + "id": msg_id, "result": { - "type": "error", - "description": "Unterminated string literal", - "value": "Unterminated string literal", + "result": { + "type": "error", + "description": "Unterminated string literal", + "value": "Unterminated string literal", + }, + "exceptionDetails": { + "exceptionId": 0, + "text": "Unterminated string literal", + "lineNumber": 0, + "columnNumber": 0 + }, }, - "exceptionDetails": { - "exceptionId": 0, - "text": "Unterminated string literal", - "lineNumber": 0, - "columnNumber": 0 - }, - }, - }), - _ => panic!("Could not parse inspector message"), - }, - }; + }), + _ => panic!("Could not parse inspector message"), + }, + }; self .response_tx_map - .remove(&call_id) + .remove(&msg_id) .unwrap() .send(message) .unwrap(); } else { - let message = serde_json::from_str(&message).unwrap(); + let message = serde_json::from_str(&inspector_msg.content).unwrap(); self.notification_queue.push(message); } } |