diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2024-07-02 23:37:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-03 00:37:54 +0200 |
commit | 7d919f6fd980ed54785e86892a518f0bdf68f475 (patch) | |
tree | dda1342e996b4d8d19ff0aa17892590822d2b56f /cli/tools/jupyter/mod.rs | |
parent | c13b6d1413859d03b41b97d4c671fccfd388b2cc (diff) |
refactor(jupyter): move ZeroMQ server to a separate thread (#24373)
Moves the ZeroMQ messaging server to a separate thread.
This will allow to run blocking JS code and maintain communication
with the notebook frontend.
Towards https://github.com/denoland/deno/pull/23592
Towards https://github.com/denoland/deno/pull/24250
Closes https://github.com/denoland/deno/issues/23617
Diffstat (limited to 'cli/tools/jupyter/mod.rs')
-rw-r--r-- | cli/tools/jupyter/mod.rs | 379 |
1 files changed, 377 insertions, 2 deletions
diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs index 0dbcfe9ef..a5139044f 100644 --- a/cli/tools/jupyter/mod.rs +++ b/cli/tools/jupyter/mod.rs @@ -2,18 +2,23 @@ use crate::args::Flags; use crate::args::JupyterFlags; +use crate::cdp; +use crate::lsp::ReplCompletionItem; use crate::ops; use crate::tools::repl; use crate::tools::test::create_single_test_event_channel; use crate::tools::test::reporters::PrettyTestReporter; use crate::tools::test::TestEventWorkerSender; use crate::CliFactory; +use deno_core::anyhow::bail; use deno_core::anyhow::Context; use deno_core::error::generic_error; use deno_core::error::AnyError; +use deno_core::futures::FutureExt; use deno_core::located_script_name; use deno_core::resolve_url_or_path; use deno_core::serde_json; +use deno_core::serde_json::json; use deno_core::url::Url; use deno_runtime::deno_io::Stdio; use deno_runtime::deno_io::StdioPipe; @@ -21,11 +26,11 @@ use deno_runtime::deno_permissions::Permissions; use deno_runtime::deno_permissions::PermissionsContainer; use deno_runtime::WorkerExecutionMode; use deno_terminal::colors; - use jupyter_runtime::jupyter::ConnectionInfo; use jupyter_runtime::messaging::StreamContent; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; mod install; pub mod server; @@ -142,7 +147,377 @@ pub async fn kernel( ) })); - server::JupyterServer::start(spec, stdio_rx, repl_session).await?; + let (tx1, rx1) = mpsc::unbounded_channel(); + let (tx2, rx2) = mpsc::unbounded_channel(); + let (startup_data_tx, startup_data_rx) = + oneshot::channel::<server::StartupData>(); + + let mut repl_session_proxy = JupyterReplSession { + repl_session, + rx: rx1, + tx: tx2, + }; + let repl_session_proxy_channels = JupyterReplProxy { tx: tx1, rx: rx2 }; + + let join_handle = std::thread::spawn(move || { + let fut = server::JupyterServer::start( + spec, + stdio_rx, + repl_session_proxy_channels, + startup_data_tx, + ) + .boxed_local(); + deno_runtime::tokio_util::create_and_run_current_thread(fut) + }); + + let Ok(startup_data) = startup_data_rx.await else { + bail!("Failed to acquire startup data"); + }; + { + let op_state_rc = + repl_session_proxy.repl_session.worker.js_runtime.op_state(); + let mut op_state = op_state_rc.borrow_mut(); + op_state.put(startup_data.iopub_connection.clone()); + op_state.put(startup_data.last_execution_request.clone()); + } + + repl_session_proxy.start().await; + let server_result = join_handle.join(); + match server_result { + Ok(result) => { + result?; + } + Err(e) => { + bail!("Jupyter kernel error: {:?}", e); + } + }; Ok(()) } + +pub enum JupyterReplRequest { + LspCompletions { + line_text: String, + position: usize, + }, + JsGetProperties { + object_id: String, + }, + JsEvaluate { + expr: String, + }, + JsGlobalLexicalScopeNames, + JsEvaluateLineWithObjectWrapping { + line: String, + }, + JsCallFunctionOnArgs { + function_declaration: String, + args: Vec<cdp::RemoteObject>, + }, + JsCallFunctionOn { + arg0: cdp::CallArgument, + arg1: cdp::CallArgument, + }, +} + +pub enum JupyterReplResponse { + LspCompletions(Vec<ReplCompletionItem>), + JsGetProperties(Option<cdp::GetPropertiesResponse>), + JsEvaluate(Option<cdp::EvaluateResponse>), + JsGlobalLexicalScopeNames(cdp::GlobalLexicalScopeNamesResponse), + JsEvaluateLineWithObjectWrapping(Result<repl::TsEvaluateResponse, AnyError>), + JsCallFunctionOnArgs(Result<cdp::CallFunctionOnResponse, AnyError>), + JsCallFunctionOn(Option<cdp::CallFunctionOnResponse>), +} + +pub struct JupyterReplProxy { + tx: mpsc::UnboundedSender<JupyterReplRequest>, + rx: mpsc::UnboundedReceiver<JupyterReplResponse>, +} + +impl JupyterReplProxy { + pub async fn lsp_completions( + &mut self, + line_text: String, + position: usize, + ) -> Vec<ReplCompletionItem> { + let _ = self.tx.send(JupyterReplRequest::LspCompletions { + line_text, + position, + }); + let Some(JupyterReplResponse::LspCompletions(resp)) = self.rx.recv().await + else { + unreachable!() + }; + resp + } + + pub async fn get_properties( + &mut self, + object_id: String, + ) -> Option<cdp::GetPropertiesResponse> { + let _ = self + .tx + .send(JupyterReplRequest::JsGetProperties { object_id }); + let Some(JupyterReplResponse::JsGetProperties(resp)) = self.rx.recv().await + else { + unreachable!() + }; + resp + } + + pub async fn evaluate( + &mut self, + expr: String, + ) -> Option<cdp::EvaluateResponse> { + let _ = self.tx.send(JupyterReplRequest::JsEvaluate { expr }); + let Some(JupyterReplResponse::JsEvaluate(resp)) = self.rx.recv().await + else { + unreachable!() + }; + resp + } + + pub async fn global_lexical_scope_names( + &mut self, + ) -> cdp::GlobalLexicalScopeNamesResponse { + let _ = self.tx.send(JupyterReplRequest::JsGlobalLexicalScopeNames); + let Some(JupyterReplResponse::JsGlobalLexicalScopeNames(resp)) = + self.rx.recv().await + else { + unreachable!() + }; + resp + } + + pub async fn evaluate_line_with_object_wrapping( + &mut self, + line: String, + ) -> Result<repl::TsEvaluateResponse, AnyError> { + let _ = self + .tx + .send(JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line }); + let Some(JupyterReplResponse::JsEvaluateLineWithObjectWrapping(resp)) = + self.rx.recv().await + else { + unreachable!() + }; + resp + } + + pub async fn call_function_on_args( + &mut self, + function_declaration: String, + args: Vec<cdp::RemoteObject>, + ) -> Result<cdp::CallFunctionOnResponse, AnyError> { + let _ = self.tx.send(JupyterReplRequest::JsCallFunctionOnArgs { + function_declaration, + args, + }); + let Some(JupyterReplResponse::JsCallFunctionOnArgs(resp)) = + self.rx.recv().await + else { + unreachable!() + }; + resp + } + + // TODO(bartlomieju): rename to "broadcast_result"? + pub async fn call_function_on( + &mut self, + arg0: cdp::CallArgument, + arg1: cdp::CallArgument, + ) -> Option<cdp::CallFunctionOnResponse> { + let _ = self + .tx + .send(JupyterReplRequest::JsCallFunctionOn { arg0, arg1 }); + let Some(JupyterReplResponse::JsCallFunctionOn(resp)) = + self.rx.recv().await + else { + unreachable!() + }; + resp + } +} + +pub struct JupyterReplSession { + repl_session: repl::ReplSession, + rx: mpsc::UnboundedReceiver<JupyterReplRequest>, + tx: mpsc::UnboundedSender<JupyterReplResponse>, +} + +impl JupyterReplSession { + pub async fn start(&mut self) { + loop { + let Some(msg) = self.rx.recv().await else { + break; + }; + let resp = match msg { + JupyterReplRequest::LspCompletions { + line_text, + position, + } => JupyterReplResponse::LspCompletions( + self.lsp_completions(&line_text, position).await, + ), + JupyterReplRequest::JsGetProperties { object_id } => { + JupyterReplResponse::JsGetProperties( + self.get_properties(object_id).await, + ) + } + JupyterReplRequest::JsEvaluate { expr } => { + JupyterReplResponse::JsEvaluate(self.evaluate(expr).await) + } + JupyterReplRequest::JsGlobalLexicalScopeNames => { + JupyterReplResponse::JsGlobalLexicalScopeNames( + self.global_lexical_scope_names().await, + ) + } + JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line } => { + JupyterReplResponse::JsEvaluateLineWithObjectWrapping( + self.evaluate_line_with_object_wrapping(&line).await, + ) + } + JupyterReplRequest::JsCallFunctionOnArgs { + function_declaration, + args, + } => JupyterReplResponse::JsCallFunctionOnArgs( + self + .call_function_on_args(function_declaration, &args) + .await, + ), + JupyterReplRequest::JsCallFunctionOn { arg0, arg1 } => { + JupyterReplResponse::JsCallFunctionOn( + self.call_function_on(arg0, arg1).await, + ) + } + }; + + let Ok(()) = self.tx.send(resp) else { + break; + }; + } + } + + pub async fn lsp_completions( + &mut self, + line_text: &str, + position: usize, + ) -> Vec<ReplCompletionItem> { + self + .repl_session + .language_server + .completions(line_text, position) + .await + } + + pub async fn get_properties( + &mut self, + object_id: String, + ) -> Option<cdp::GetPropertiesResponse> { + let get_properties_response = self + .repl_session + .post_message_with_event_loop( + "Runtime.getProperties", + Some(cdp::GetPropertiesArgs { + object_id, + own_properties: None, + accessor_properties_only: None, + generate_preview: None, + non_indexed_properties_only: Some(true), + }), + ) + .await + .ok()?; + serde_json::from_value(get_properties_response).ok() + } + + pub async fn evaluate( + &mut self, + expr: String, + ) -> Option<cdp::EvaluateResponse> { + let evaluate_response: serde_json::Value = self + .repl_session + .post_message_with_event_loop( + "Runtime.evaluate", + Some(cdp::EvaluateArgs { + expression: expr, + object_group: None, + include_command_line_api: None, + silent: None, + context_id: Some(self.repl_session.context_id), + return_by_value: None, + generate_preview: None, + user_gesture: None, + await_promise: None, + throw_on_side_effect: Some(true), + timeout: Some(200), + disable_breaks: None, + repl_mode: None, + allow_unsafe_eval_blocked_by_csp: None, + unique_context_id: None, + }), + ) + .await + .ok()?; + serde_json::from_value(evaluate_response).ok() + } + + pub async fn global_lexical_scope_names( + &mut self, + ) -> cdp::GlobalLexicalScopeNamesResponse { + let evaluate_response = self + .repl_session + .post_message_with_event_loop( + "Runtime.globalLexicalScopeNames", + Some(cdp::GlobalLexicalScopeNamesArgs { + execution_context_id: Some(self.repl_session.context_id), + }), + ) + .await + .unwrap(); + serde_json::from_value(evaluate_response).unwrap() + } + + pub async fn evaluate_line_with_object_wrapping( + &mut self, + line: &str, + ) -> Result<repl::TsEvaluateResponse, AnyError> { + self + .repl_session + .evaluate_line_with_object_wrapping(line) + .await + } + + pub async fn call_function_on_args( + &mut self, + function_declaration: String, + args: &[cdp::RemoteObject], + ) -> Result<cdp::CallFunctionOnResponse, AnyError> { + self + .repl_session + .call_function_on_args(function_declaration, args) + .await + } + + // TODO(bartlomieju): rename to "broadcast_result"? + pub async fn call_function_on( + &mut self, + arg0: cdp::CallArgument, + arg1: cdp::CallArgument, + ) -> Option<cdp::CallFunctionOnResponse> { + let response = self.repl_session + .post_message_with_event_loop( + "Runtime.callFunctionOn", + Some(json!({ + "functionDeclaration": r#"async function (execution_count, result) { + await Deno[Deno.internal].jupyter.broadcastResult(execution_count, result); + }"#, + "arguments": [arg0, arg1], + "executionContextId": self.repl_session.context_id, + "awaitPromise": true, + })), + ) + .await.ok()?; + serde_json::from_value(response).ok() + } +} |