summaryrefslogtreecommitdiff
path: root/cli/lsp/diagnostics.rs
diff options
context:
space:
mode:
authorKitson Kelly <me@kitsonkelly.com>2021-03-10 13:41:35 +1100
committerGitHub <noreply@github.com>2021-03-10 13:41:35 +1100
commita020ebde2d9c69a1e2616c96f907866d417f2e0f (patch)
treec323920e7a3d071b59a248898f92100182eec31e /cli/lsp/diagnostics.rs
parent8d3baa7777b6bd2a2631e1b87a4676b520f2b447 (diff)
fix(lsp): diagnostics use own thread and debounce (#9572)
Diffstat (limited to 'cli/lsp/diagnostics.rs')
-rw-r--r--cli/lsp/diagnostics.rs344
1 files changed, 330 insertions, 14 deletions
diff --git a/cli/lsp/diagnostics.rs b/cli/lsp/diagnostics.rs
index 705f0866d..1bfb19867 100644
--- a/cli/lsp/diagnostics.rs
+++ b/cli/lsp/diagnostics.rs
@@ -3,20 +3,33 @@
use super::analysis::get_lint_references;
use super::analysis::references_to_diagnostics;
use super::analysis::ResolvedDependency;
-use super::language_server::StateSnapshot;
+use super::language_server;
use super::tsc;
use crate::diagnostics;
use crate::media_type::MediaType;
+use crate::tokio_util::create_basic_runtime;
+use deno_core::error::anyhow;
use deno_core::error::AnyError;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::ModuleSpecifier;
use lspower::lsp;
+use lspower::Client;
use std::collections::HashMap;
use std::collections::HashSet;
use std::mem;
+use std::sync::Arc;
+use std::thread;
+use tokio::sync::mpsc;
+use tokio::sync::oneshot;
+use tokio::time;
+
+// 150ms between keystrokes is about 45 WPM, so we want something that is longer
+// than that, but not too long to introduce detectable UI delay. 200ms is a
+// decent compromise.
+const DIAGNOSTIC_DEBOUNCE_MS: u64 = 200;
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum DiagnosticSource {
@@ -25,8 +38,311 @@ pub enum DiagnosticSource {
TypeScript,
}
+#[derive(Debug)]
+enum DiagnosticRequest {
+ Get(
+ ModuleSpecifier,
+ DiagnosticSource,
+ oneshot::Sender<Vec<lsp::Diagnostic>>,
+ ),
+ Invalidate(ModuleSpecifier),
+ Update,
+}
+
+/// Given a client and a diagnostics collection, publish the appropriate changes
+/// to the client.
+async fn publish_diagnostics(
+ client: &Client,
+ collection: &mut DiagnosticCollection,
+ snapshot: &language_server::StateSnapshot,
+) {
+ let mark = snapshot.performance.mark("publish_diagnostics");
+ let maybe_changes = collection.take_changes();
+ if let Some(diagnostic_changes) = maybe_changes {
+ for specifier in diagnostic_changes {
+ // TODO(@kitsonk) not totally happy with the way we collect and store
+ // different types of diagnostics and offer them up to the client, we
+ // do need to send "empty" vectors though when a particular feature is
+ // disabled, otherwise the client will not clear down previous
+ // diagnostics
+ let mut diagnostics: Vec<lsp::Diagnostic> =
+ if snapshot.config.settings.lint {
+ collection
+ .diagnostics_for(&specifier, &DiagnosticSource::Lint)
+ .cloned()
+ .collect()
+ } else {
+ vec![]
+ };
+ if snapshot.config.settings.enable {
+ diagnostics.extend(
+ collection
+ .diagnostics_for(&specifier, &DiagnosticSource::TypeScript)
+ .cloned(),
+ );
+ diagnostics.extend(
+ collection
+ .diagnostics_for(&specifier, &DiagnosticSource::Deno)
+ .cloned(),
+ );
+ }
+ let uri = specifier.clone();
+ let version = snapshot.documents.version(&specifier);
+ client.publish_diagnostics(uri, diagnostics, version).await;
+ }
+ }
+
+ snapshot.performance.measure(mark);
+}
+
+async fn update_diagnostics(
+ client: &Client,
+ collection: &mut DiagnosticCollection,
+ snapshot: &language_server::StateSnapshot,
+ ts_server: &tsc::TsServer,
+) {
+ let (enabled, lint_enabled) = {
+ let config = &snapshot.config;
+ (config.settings.enable, config.settings.lint)
+ };
+
+ let mark = snapshot.performance.mark("update_diagnostics");
+ let lint = async {
+ let mut diagnostics = None;
+ if lint_enabled {
+ let mark = snapshot.performance.mark("prepare_diagnostics_lint");
+ diagnostics = Some(
+ generate_lint_diagnostics(snapshot.clone(), collection.clone()).await,
+ );
+ snapshot.performance.measure(mark);
+ };
+ Ok::<_, AnyError>(diagnostics)
+ };
+
+ let ts = async {
+ let mut diagnostics = None;
+ if enabled {
+ let mark = snapshot.performance.mark("prepare_diagnostics_ts");
+ diagnostics = Some(
+ generate_ts_diagnostics(
+ snapshot.clone(),
+ collection.clone(),
+ ts_server,
+ )
+ .await?,
+ );
+ snapshot.performance.measure(mark);
+ };
+ Ok::<_, AnyError>(diagnostics)
+ };
+
+ let deps = async {
+ let mut diagnostics = None;
+ if enabled {
+ let mark = snapshot.performance.mark("prepare_diagnostics_deps");
+ diagnostics = Some(
+ generate_dependency_diagnostics(snapshot.clone(), collection.clone())
+ .await?,
+ );
+ snapshot.performance.measure(mark);
+ };
+ Ok::<_, AnyError>(diagnostics)
+ };
+
+ let (lint_res, ts_res, deps_res) = tokio::join!(lint, ts, deps);
+ let mut disturbed = false;
+
+ match lint_res {
+ Ok(Some(diagnostics)) => {
+ for (specifier, version, diagnostics) in diagnostics {
+ collection.set(specifier, DiagnosticSource::Lint, version, diagnostics);
+ disturbed = true;
+ }
+ }
+ Err(err) => {
+ error!("Internal error: {}", err);
+ }
+ _ => (),
+ }
+
+ match ts_res {
+ Ok(Some(diagnostics)) => {
+ for (specifier, version, diagnostics) in diagnostics {
+ collection.set(
+ specifier,
+ DiagnosticSource::TypeScript,
+ version,
+ diagnostics,
+ );
+ disturbed = true;
+ }
+ }
+ Err(err) => {
+ error!("Internal error: {}", err);
+ }
+ _ => (),
+ }
+
+ match deps_res {
+ Ok(Some(diagnostics)) => {
+ for (specifier, version, diagnostics) in diagnostics {
+ collection.set(specifier, DiagnosticSource::Deno, version, diagnostics);
+ disturbed = true;
+ }
+ }
+ Err(err) => {
+ error!("Internal error: {}", err);
+ }
+ _ => (),
+ }
+ snapshot.performance.measure(mark);
+
+ if disturbed {
+ publish_diagnostics(client, collection, snapshot).await
+ }
+}
+
+fn handle_request(
+ maybe_request: Option<DiagnosticRequest>,
+ collection: &mut DiagnosticCollection,
+ dirty: &mut bool,
+) -> bool {
+ match maybe_request {
+ Some(request) => {
+ match request {
+ DiagnosticRequest::Get(specifier, source, tx) => {
+ let diagnostics = collection
+ .diagnostics_for(&specifier, &source)
+ .cloned()
+ .collect();
+ if tx.send(diagnostics).is_err() {
+ error!("DiagnosticServer unable to send response on channel.");
+ }
+ }
+ DiagnosticRequest::Invalidate(specifier) => {
+ collection.invalidate(&specifier)
+ }
+ DiagnosticRequest::Update => *dirty = true,
+ }
+ true
+ }
+ _ => false,
+ }
+}
+
+/// A server which calculates diagnostics in its own thread and publishes them
+/// to an LSP client.
+#[derive(Debug)]
+pub(crate) struct DiagnosticsServer(
+ Option<mpsc::UnboundedSender<DiagnosticRequest>>,
+);
+
+impl DiagnosticsServer {
+ pub(crate) fn new() -> Self {
+ Self(None)
+ }
+
+ pub(crate) fn start(
+ &mut self,
+ language_server: Arc<tokio::sync::Mutex<language_server::Inner>>,
+ client: Client,
+ ts_server: Arc<tsc::TsServer>,
+ ) {
+ let (tx, mut rx) = mpsc::unbounded_channel::<DiagnosticRequest>();
+ self.0 = Some(tx);
+
+ let _join_handle = thread::spawn(move || {
+ let runtime = create_basic_runtime();
+ let mut collection = DiagnosticCollection::default();
+
+ runtime.block_on(async {
+ // Some(snapshot) is the flag we use to determine if something has
+ // changed where we will wait for the timeout of changes or a request
+ // that forces us to update diagnostics
+ let mut dirty = false;
+
+ loop {
+ let next = rx.recv();
+ tokio::pin!(next);
+
+ let duration = if dirty {
+ time::Duration::from_millis(DIAGNOSTIC_DEBOUNCE_MS)
+ } else {
+ // we need to await an arbitrary silly amount of time, so this is
+ // 1 year in seconds
+ time::Duration::new(31_622_400, 0)
+ };
+
+ // "race" the next message off the rx queue or the debounce timer.
+ // if the next message comes off the queue, the next iteration of the
+ // loop will reset the debounce future. When the debounce future
+ // occurs, the diagnostics will be updated based on the snapshot that
+ // is retrieved, thereby "skipping" all the interim state changes.
+ tokio::select! {
+ _ = time::sleep(duration) => {
+ if dirty {
+ dirty = false;
+ let snapshot = {
+ // make sure the lock drops
+ language_server.lock().await.snapshot()
+ };
+ update_diagnostics(
+ &client,
+ &mut collection,
+ &snapshot,
+ &ts_server
+ ).await;
+ }
+ let maybe_request = next.await;
+ if !handle_request(maybe_request, &mut collection, &mut dirty) {
+ break;
+ }
+ }
+ maybe_request = &mut next => {
+ if !handle_request(maybe_request, &mut collection, &mut dirty) {
+ break;
+ }
+ }
+ }
+ }
+ })
+ });
+ }
+
+ pub async fn get(
+ &self,
+ specifier: ModuleSpecifier,
+ source: DiagnosticSource,
+ ) -> Result<Vec<lsp::Diagnostic>, AnyError> {
+ let (tx, rx) = oneshot::channel::<Vec<lsp::Diagnostic>>();
+ if let Some(self_tx) = &self.0 {
+ self_tx.send(DiagnosticRequest::Get(specifier, source, tx))?;
+ rx.await.map_err(|err| err.into())
+ } else {
+ Err(anyhow!("diagnostic server not started"))
+ }
+ }
+
+ pub fn invalidate(&self, specifier: ModuleSpecifier) -> Result<(), AnyError> {
+ if let Some(tx) = &self.0 {
+ tx.send(DiagnosticRequest::Invalidate(specifier))
+ .map_err(|err| err.into())
+ } else {
+ Err(anyhow!("diagnostic server not started"))
+ }
+ }
+
+ pub fn update(&self) -> Result<(), AnyError> {
+ if let Some(tx) = &self.0 {
+ tx.send(DiagnosticRequest::Update).map_err(|err| err.into())
+ } else {
+ Err(anyhow!("diagnostic server not started"))
+ }
+ }
+}
+
#[derive(Debug, Default, Clone)]
-pub struct DiagnosticCollection {
+struct DiagnosticCollection {
map: HashMap<(ModuleSpecifier, DiagnosticSource), Vec<lsp::Diagnostic>>,
versions: HashMap<ModuleSpecifier, i32>,
changes: HashSet<ModuleSpecifier>,
@@ -78,16 +394,16 @@ impl DiagnosticCollection {
pub type DiagnosticVec =
Vec<(ModuleSpecifier, Option<i32>, Vec<lsp::Diagnostic>)>;
-pub async fn generate_lint_diagnostics(
- state_snapshot: StateSnapshot,
- diagnostic_collection: DiagnosticCollection,
+async fn generate_lint_diagnostics(
+ state_snapshot: language_server::StateSnapshot,
+ collection: DiagnosticCollection,
) -> DiagnosticVec {
tokio::task::spawn_blocking(move || {
let mut diagnostic_list = Vec::new();
for specifier in state_snapshot.documents.open_specifiers() {
let version = state_snapshot.documents.version(specifier);
- let current_version = diagnostic_collection.get_version(specifier);
+ let current_version = collection.get_version(specifier);
if version != current_version {
let media_type = MediaType::from(specifier);
if let Ok(Some(source_code)) =
@@ -229,16 +545,16 @@ fn ts_json_to_diagnostics(
.collect()
}
-pub async fn generate_ts_diagnostics(
- state_snapshot: StateSnapshot,
- diagnostic_collection: DiagnosticCollection,
+async fn generate_ts_diagnostics(
+ state_snapshot: language_server::StateSnapshot,
+ collection: DiagnosticCollection,
ts_server: &tsc::TsServer,
) -> Result<DiagnosticVec, AnyError> {
let mut diagnostics = Vec::new();
let mut specifiers = Vec::new();
for specifier in state_snapshot.documents.open_specifiers() {
let version = state_snapshot.documents.version(specifier);
- let current_version = diagnostic_collection.get_version(specifier);
+ let current_version = collection.get_version(specifier);
if version != current_version {
specifiers.push(specifier.clone());
}
@@ -260,9 +576,9 @@ pub async fn generate_ts_diagnostics(
Ok(diagnostics)
}
-pub async fn generate_dependency_diagnostics(
- mut state_snapshot: StateSnapshot,
- diagnostic_collection: DiagnosticCollection,
+async fn generate_dependency_diagnostics(
+ mut state_snapshot: language_server::StateSnapshot,
+ collection: DiagnosticCollection,
) -> Result<DiagnosticVec, AnyError> {
tokio::task::spawn_blocking(move || {
let mut diagnostics = Vec::new();
@@ -270,7 +586,7 @@ pub async fn generate_dependency_diagnostics(
let sources = &mut state_snapshot.sources;
for specifier in state_snapshot.documents.open_specifiers() {
let version = state_snapshot.documents.version(specifier);
- let current_version = diagnostic_collection.get_version(specifier);
+ let current_version = collection.get_version(specifier);
if version != current_version {
let mut diagnostic_list = Vec::new();
if let Some(dependencies) = state_snapshot.documents.dependencies(specifier) {