diff options
author | Kitson Kelly <me@kitsonkelly.com> | 2021-03-10 13:41:35 +1100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-10 13:41:35 +1100 |
commit | a020ebde2d9c69a1e2616c96f907866d417f2e0f (patch) | |
tree | c323920e7a3d071b59a248898f92100182eec31e /cli/lsp/diagnostics.rs | |
parent | 8d3baa7777b6bd2a2631e1b87a4676b520f2b447 (diff) |
fix(lsp): diagnostics use own thread and debounce (#9572)
Diffstat (limited to 'cli/lsp/diagnostics.rs')
-rw-r--r-- | cli/lsp/diagnostics.rs | 344 |
1 files changed, 330 insertions, 14 deletions
diff --git a/cli/lsp/diagnostics.rs b/cli/lsp/diagnostics.rs index 705f0866d..1bfb19867 100644 --- a/cli/lsp/diagnostics.rs +++ b/cli/lsp/diagnostics.rs @@ -3,20 +3,33 @@ use super::analysis::get_lint_references; use super::analysis::references_to_diagnostics; use super::analysis::ResolvedDependency; -use super::language_server::StateSnapshot; +use super::language_server; use super::tsc; use crate::diagnostics; use crate::media_type::MediaType; +use crate::tokio_util::create_basic_runtime; +use deno_core::error::anyhow; use deno_core::error::AnyError; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::ModuleSpecifier; use lspower::lsp; +use lspower::Client; use std::collections::HashMap; use std::collections::HashSet; use std::mem; +use std::sync::Arc; +use std::thread; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::time; + +// 150ms between keystrokes is about 45 WPM, so we want something that is longer +// than that, but not too long to introduce detectable UI delay. 200ms is a +// decent compromise. +const DIAGNOSTIC_DEBOUNCE_MS: u64 = 200; #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub enum DiagnosticSource { @@ -25,8 +38,311 @@ pub enum DiagnosticSource { TypeScript, } +#[derive(Debug)] +enum DiagnosticRequest { + Get( + ModuleSpecifier, + DiagnosticSource, + oneshot::Sender<Vec<lsp::Diagnostic>>, + ), + Invalidate(ModuleSpecifier), + Update, +} + +/// Given a client and a diagnostics collection, publish the appropriate changes +/// to the client. +async fn publish_diagnostics( + client: &Client, + collection: &mut DiagnosticCollection, + snapshot: &language_server::StateSnapshot, +) { + let mark = snapshot.performance.mark("publish_diagnostics"); + let maybe_changes = collection.take_changes(); + if let Some(diagnostic_changes) = maybe_changes { + for specifier in diagnostic_changes { + // TODO(@kitsonk) not totally happy with the way we collect and store + // different types of diagnostics and offer them up to the client, we + // do need to send "empty" vectors though when a particular feature is + // disabled, otherwise the client will not clear down previous + // diagnostics + let mut diagnostics: Vec<lsp::Diagnostic> = + if snapshot.config.settings.lint { + collection + .diagnostics_for(&specifier, &DiagnosticSource::Lint) + .cloned() + .collect() + } else { + vec![] + }; + if snapshot.config.settings.enable { + diagnostics.extend( + collection + .diagnostics_for(&specifier, &DiagnosticSource::TypeScript) + .cloned(), + ); + diagnostics.extend( + collection + .diagnostics_for(&specifier, &DiagnosticSource::Deno) + .cloned(), + ); + } + let uri = specifier.clone(); + let version = snapshot.documents.version(&specifier); + client.publish_diagnostics(uri, diagnostics, version).await; + } + } + + snapshot.performance.measure(mark); +} + +async fn update_diagnostics( + client: &Client, + collection: &mut DiagnosticCollection, + snapshot: &language_server::StateSnapshot, + ts_server: &tsc::TsServer, +) { + let (enabled, lint_enabled) = { + let config = &snapshot.config; + (config.settings.enable, config.settings.lint) + }; + + let mark = snapshot.performance.mark("update_diagnostics"); + let lint = async { + let mut diagnostics = None; + if lint_enabled { + let mark = snapshot.performance.mark("prepare_diagnostics_lint"); + diagnostics = Some( + generate_lint_diagnostics(snapshot.clone(), collection.clone()).await, + ); + snapshot.performance.measure(mark); + }; + Ok::<_, AnyError>(diagnostics) + }; + + let ts = async { + let mut diagnostics = None; + if enabled { + let mark = snapshot.performance.mark("prepare_diagnostics_ts"); + diagnostics = Some( + generate_ts_diagnostics( + snapshot.clone(), + collection.clone(), + ts_server, + ) + .await?, + ); + snapshot.performance.measure(mark); + }; + Ok::<_, AnyError>(diagnostics) + }; + + let deps = async { + let mut diagnostics = None; + if enabled { + let mark = snapshot.performance.mark("prepare_diagnostics_deps"); + diagnostics = Some( + generate_dependency_diagnostics(snapshot.clone(), collection.clone()) + .await?, + ); + snapshot.performance.measure(mark); + }; + Ok::<_, AnyError>(diagnostics) + }; + + let (lint_res, ts_res, deps_res) = tokio::join!(lint, ts, deps); + let mut disturbed = false; + + match lint_res { + Ok(Some(diagnostics)) => { + for (specifier, version, diagnostics) in diagnostics { + collection.set(specifier, DiagnosticSource::Lint, version, diagnostics); + disturbed = true; + } + } + Err(err) => { + error!("Internal error: {}", err); + } + _ => (), + } + + match ts_res { + Ok(Some(diagnostics)) => { + for (specifier, version, diagnostics) in diagnostics { + collection.set( + specifier, + DiagnosticSource::TypeScript, + version, + diagnostics, + ); + disturbed = true; + } + } + Err(err) => { + error!("Internal error: {}", err); + } + _ => (), + } + + match deps_res { + Ok(Some(diagnostics)) => { + for (specifier, version, diagnostics) in diagnostics { + collection.set(specifier, DiagnosticSource::Deno, version, diagnostics); + disturbed = true; + } + } + Err(err) => { + error!("Internal error: {}", err); + } + _ => (), + } + snapshot.performance.measure(mark); + + if disturbed { + publish_diagnostics(client, collection, snapshot).await + } +} + +fn handle_request( + maybe_request: Option<DiagnosticRequest>, + collection: &mut DiagnosticCollection, + dirty: &mut bool, +) -> bool { + match maybe_request { + Some(request) => { + match request { + DiagnosticRequest::Get(specifier, source, tx) => { + let diagnostics = collection + .diagnostics_for(&specifier, &source) + .cloned() + .collect(); + if tx.send(diagnostics).is_err() { + error!("DiagnosticServer unable to send response on channel."); + } + } + DiagnosticRequest::Invalidate(specifier) => { + collection.invalidate(&specifier) + } + DiagnosticRequest::Update => *dirty = true, + } + true + } + _ => false, + } +} + +/// A server which calculates diagnostics in its own thread and publishes them +/// to an LSP client. +#[derive(Debug)] +pub(crate) struct DiagnosticsServer( + Option<mpsc::UnboundedSender<DiagnosticRequest>>, +); + +impl DiagnosticsServer { + pub(crate) fn new() -> Self { + Self(None) + } + + pub(crate) fn start( + &mut self, + language_server: Arc<tokio::sync::Mutex<language_server::Inner>>, + client: Client, + ts_server: Arc<tsc::TsServer>, + ) { + let (tx, mut rx) = mpsc::unbounded_channel::<DiagnosticRequest>(); + self.0 = Some(tx); + + let _join_handle = thread::spawn(move || { + let runtime = create_basic_runtime(); + let mut collection = DiagnosticCollection::default(); + + runtime.block_on(async { + // Some(snapshot) is the flag we use to determine if something has + // changed where we will wait for the timeout of changes or a request + // that forces us to update diagnostics + let mut dirty = false; + + loop { + let next = rx.recv(); + tokio::pin!(next); + + let duration = if dirty { + time::Duration::from_millis(DIAGNOSTIC_DEBOUNCE_MS) + } else { + // we need to await an arbitrary silly amount of time, so this is + // 1 year in seconds + time::Duration::new(31_622_400, 0) + }; + + // "race" the next message off the rx queue or the debounce timer. + // if the next message comes off the queue, the next iteration of the + // loop will reset the debounce future. When the debounce future + // occurs, the diagnostics will be updated based on the snapshot that + // is retrieved, thereby "skipping" all the interim state changes. + tokio::select! { + _ = time::sleep(duration) => { + if dirty { + dirty = false; + let snapshot = { + // make sure the lock drops + language_server.lock().await.snapshot() + }; + update_diagnostics( + &client, + &mut collection, + &snapshot, + &ts_server + ).await; + } + let maybe_request = next.await; + if !handle_request(maybe_request, &mut collection, &mut dirty) { + break; + } + } + maybe_request = &mut next => { + if !handle_request(maybe_request, &mut collection, &mut dirty) { + break; + } + } + } + } + }) + }); + } + + pub async fn get( + &self, + specifier: ModuleSpecifier, + source: DiagnosticSource, + ) -> Result<Vec<lsp::Diagnostic>, AnyError> { + let (tx, rx) = oneshot::channel::<Vec<lsp::Diagnostic>>(); + if let Some(self_tx) = &self.0 { + self_tx.send(DiagnosticRequest::Get(specifier, source, tx))?; + rx.await.map_err(|err| err.into()) + } else { + Err(anyhow!("diagnostic server not started")) + } + } + + pub fn invalidate(&self, specifier: ModuleSpecifier) -> Result<(), AnyError> { + if let Some(tx) = &self.0 { + tx.send(DiagnosticRequest::Invalidate(specifier)) + .map_err(|err| err.into()) + } else { + Err(anyhow!("diagnostic server not started")) + } + } + + pub fn update(&self) -> Result<(), AnyError> { + if let Some(tx) = &self.0 { + tx.send(DiagnosticRequest::Update).map_err(|err| err.into()) + } else { + Err(anyhow!("diagnostic server not started")) + } + } +} + #[derive(Debug, Default, Clone)] -pub struct DiagnosticCollection { +struct DiagnosticCollection { map: HashMap<(ModuleSpecifier, DiagnosticSource), Vec<lsp::Diagnostic>>, versions: HashMap<ModuleSpecifier, i32>, changes: HashSet<ModuleSpecifier>, @@ -78,16 +394,16 @@ impl DiagnosticCollection { pub type DiagnosticVec = Vec<(ModuleSpecifier, Option<i32>, Vec<lsp::Diagnostic>)>; -pub async fn generate_lint_diagnostics( - state_snapshot: StateSnapshot, - diagnostic_collection: DiagnosticCollection, +async fn generate_lint_diagnostics( + state_snapshot: language_server::StateSnapshot, + collection: DiagnosticCollection, ) -> DiagnosticVec { tokio::task::spawn_blocking(move || { let mut diagnostic_list = Vec::new(); for specifier in state_snapshot.documents.open_specifiers() { let version = state_snapshot.documents.version(specifier); - let current_version = diagnostic_collection.get_version(specifier); + let current_version = collection.get_version(specifier); if version != current_version { let media_type = MediaType::from(specifier); if let Ok(Some(source_code)) = @@ -229,16 +545,16 @@ fn ts_json_to_diagnostics( .collect() } -pub async fn generate_ts_diagnostics( - state_snapshot: StateSnapshot, - diagnostic_collection: DiagnosticCollection, +async fn generate_ts_diagnostics( + state_snapshot: language_server::StateSnapshot, + collection: DiagnosticCollection, ts_server: &tsc::TsServer, ) -> Result<DiagnosticVec, AnyError> { let mut diagnostics = Vec::new(); let mut specifiers = Vec::new(); for specifier in state_snapshot.documents.open_specifiers() { let version = state_snapshot.documents.version(specifier); - let current_version = diagnostic_collection.get_version(specifier); + let current_version = collection.get_version(specifier); if version != current_version { specifiers.push(specifier.clone()); } @@ -260,9 +576,9 @@ pub async fn generate_ts_diagnostics( Ok(diagnostics) } -pub async fn generate_dependency_diagnostics( - mut state_snapshot: StateSnapshot, - diagnostic_collection: DiagnosticCollection, +async fn generate_dependency_diagnostics( + mut state_snapshot: language_server::StateSnapshot, + collection: DiagnosticCollection, ) -> Result<DiagnosticVec, AnyError> { tokio::task::spawn_blocking(move || { let mut diagnostics = Vec::new(); @@ -270,7 +586,7 @@ pub async fn generate_dependency_diagnostics( let sources = &mut state_snapshot.sources; for specifier in state_snapshot.documents.open_specifiers() { let version = state_snapshot.documents.version(specifier); - let current_version = diagnostic_collection.get_version(specifier); + let current_version = collection.get_version(specifier); if version != current_version { let mut diagnostic_list = Vec::new(); if let Some(dependencies) = state_snapshot.documents.dependencies(specifier) { |