diff options
Diffstat (limited to 'cli/lsp/diagnostics.rs')
-rw-r--r-- | cli/lsp/diagnostics.rs | 134 |
1 files changed, 121 insertions, 13 deletions
diff --git a/cli/lsp/diagnostics.rs b/cli/lsp/diagnostics.rs index 1a57ad03b..a5e9d7bf8 100644 --- a/cli/lsp/diagnostics.rs +++ b/cli/lsp/diagnostics.rs @@ -16,6 +16,7 @@ use super::tsc::TsServer; use crate::args::LintOptions; use crate::graph_util; use crate::graph_util::enhanced_resolution_error_message; +use crate::lsp::lsp_custom::DiagnosticBatchNotificationParams; use crate::tools::lint::get_configured_rules; use deno_ast::MediaType; @@ -37,6 +38,7 @@ use deno_runtime::tokio_util::create_basic_runtime; use deno_semver::npm::NpmPackageReqReference; use log::error; use std::collections::HashMap; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::thread; use tokio::sync::mpsc; @@ -45,8 +47,13 @@ use tokio::time::Duration; use tokio_util::sync::CancellationToken; use tower_lsp::lsp_types as lsp; -pub type SnapshotForDiagnostics = - (Arc<StateSnapshot>, Arc<ConfigSnapshot>, LintOptions); +#[derive(Debug)] +pub struct DiagnosticServerUpdateMessage { + pub snapshot: Arc<StateSnapshot>, + pub config: Arc<ConfigSnapshot>, + pub lint_options: LintOptions, +} + pub type DiagnosticRecord = (ModuleSpecifier, Option<i32>, Vec<lsp::Diagnostic>); pub type DiagnosticVec = Vec<DiagnosticRecord>; @@ -145,13 +152,55 @@ impl TsDiagnosticsStore { } } +pub fn should_send_diagnostic_batch_index_notifications() -> bool { + crate::args::has_flag_env_var( + "DENO_DONT_USE_INTERNAL_LSP_DIAGNOSTIC_SYNC_FLAG", + ) +} + +#[derive(Clone, Debug)] +struct DiagnosticBatchCounter(Option<Arc<AtomicUsize>>); + +impl Default for DiagnosticBatchCounter { + fn default() -> Self { + if should_send_diagnostic_batch_index_notifications() { + Self(Some(Default::default())) + } else { + Self(None) + } + } +} + +impl DiagnosticBatchCounter { + pub fn inc(&self) -> Option<usize> { + self + .0 + .as_ref() + .map(|value| value.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1) + } + + pub fn get(&self) -> Option<usize> { + self + .0 + .as_ref() + .map(|value| value.load(std::sync::atomic::Ordering::SeqCst)) + } +} + +#[derive(Debug)] +struct ChannelMessage { + message: DiagnosticServerUpdateMessage, + batch_index: Option<usize>, +} + #[derive(Debug)] pub struct DiagnosticsServer { - channel: Option<mpsc::UnboundedSender<SnapshotForDiagnostics>>, + channel: Option<mpsc::UnboundedSender<ChannelMessage>>, ts_diagnostics: TsDiagnosticsStore, client: Client, performance: Arc<Performance>, ts_server: Arc<TsServer>, + batch_counter: DiagnosticBatchCounter, } impl DiagnosticsServer { @@ -166,6 +215,7 @@ impl DiagnosticsServer { client, performance, ts_server, + batch_counter: Default::default(), } } @@ -187,7 +237,7 @@ impl DiagnosticsServer { #[allow(unused_must_use)] pub fn start(&mut self) { - let (tx, mut rx) = mpsc::unbounded_channel::<SnapshotForDiagnostics>(); + let (tx, mut rx) = mpsc::unbounded_channel::<ChannelMessage>(); self.channel = Some(tx); let client = self.client.clone(); let performance = self.performance.clone(); @@ -208,7 +258,17 @@ impl DiagnosticsServer { match rx.recv().await { // channel has closed None => break, - Some((snapshot, config, lint_options)) => { + Some(message) => { + let ChannelMessage { + message: + DiagnosticServerUpdateMessage { + snapshot, + config, + lint_options, + }, + batch_index, + } = message; + // cancel the previous run token.cancel(); token = CancellationToken::new(); @@ -255,6 +315,7 @@ impl DiagnosticsServer { }) .unwrap_or_default(); + let messages_len = diagnostics.len(); if !token.is_cancelled() { ts_diagnostics_store.update(&diagnostics); diagnostics_publisher.publish(diagnostics, &token).await; @@ -263,6 +324,17 @@ impl DiagnosticsServer { performance.measure(mark); } } + + if let Some(batch_index) = batch_index { + diagnostics_publisher + .client + .send_diagnostic_batch_notification( + DiagnosticBatchNotificationParams { + batch_index, + messages_len, + }, + ); + } } })); @@ -286,10 +358,24 @@ impl DiagnosticsServer { ) .await; - diagnostics_publisher.publish(diagnostics, &token).await; - + let messages_len = diagnostics.len(); if !token.is_cancelled() { - performance.measure(mark); + diagnostics_publisher.publish(diagnostics, &token).await; + + if !token.is_cancelled() { + performance.measure(mark); + } + } + + if let Some(batch_index) = batch_index { + diagnostics_publisher + .client + .send_diagnostic_batch_notification( + DiagnosticBatchNotificationParams { + batch_index, + messages_len, + }, + ); } } })); @@ -315,10 +401,24 @@ impl DiagnosticsServer { ) .await; - diagnostics_publisher.publish(diagnostics, &token).await; - + let messages_len = diagnostics.len(); if !token.is_cancelled() { - performance.measure(mark); + diagnostics_publisher.publish(diagnostics, &token).await; + + if !token.is_cancelled() { + performance.measure(mark); + } + } + + if let Some(batch_index) = batch_index { + diagnostics_publisher + .client + .send_diagnostic_batch_notification( + DiagnosticBatchNotificationParams { + batch_index, + messages_len, + }, + ); } } })); @@ -329,15 +429,23 @@ impl DiagnosticsServer { }); } + pub fn latest_batch_index(&self) -> Option<usize> { + self.batch_counter.get() + } + pub fn update( &self, - message: SnapshotForDiagnostics, + message: DiagnosticServerUpdateMessage, ) -> Result<(), AnyError> { // todo(dsherret): instead of queuing up messages, it would be better to // instead only store the latest message (ex. maybe using a // tokio::sync::watch::channel) if let Some(tx) = &self.channel { - tx.send(message).map_err(|err| err.into()) + tx.send(ChannelMessage { + message, + batch_index: self.batch_counter.inc(), + }) + .map_err(|err| err.into()) } else { Err(anyhow!("diagnostics server not started")) } |