summaryrefslogtreecommitdiff
path: root/cli/tools/jupyter/mod.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2024-07-02 23:37:54 +0100
committerGitHub <noreply@github.com>2024-07-03 00:37:54 +0200
commit7d919f6fd980ed54785e86892a518f0bdf68f475 (patch)
treedda1342e996b4d8d19ff0aa17892590822d2b56f /cli/tools/jupyter/mod.rs
parentc13b6d1413859d03b41b97d4c671fccfd388b2cc (diff)
refactor(jupyter): move ZeroMQ server to a separate thread (#24373)
Moves the ZeroMQ messaging server to a separate thread. This will allow to run blocking JS code and maintain communication with the notebook frontend. Towards https://github.com/denoland/deno/pull/23592 Towards https://github.com/denoland/deno/pull/24250 Closes https://github.com/denoland/deno/issues/23617
Diffstat (limited to 'cli/tools/jupyter/mod.rs')
-rw-r--r--cli/tools/jupyter/mod.rs379
1 files changed, 377 insertions, 2 deletions
diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs
index 0dbcfe9ef..a5139044f 100644
--- a/cli/tools/jupyter/mod.rs
+++ b/cli/tools/jupyter/mod.rs
@@ -2,18 +2,23 @@
use crate::args::Flags;
use crate::args::JupyterFlags;
+use crate::cdp;
+use crate::lsp::ReplCompletionItem;
use crate::ops;
use crate::tools::repl;
use crate::tools::test::create_single_test_event_channel;
use crate::tools::test::reporters::PrettyTestReporter;
use crate::tools::test::TestEventWorkerSender;
use crate::CliFactory;
+use deno_core::anyhow::bail;
use deno_core::anyhow::Context;
use deno_core::error::generic_error;
use deno_core::error::AnyError;
+use deno_core::futures::FutureExt;
use deno_core::located_script_name;
use deno_core::resolve_url_or_path;
use deno_core::serde_json;
+use deno_core::serde_json::json;
use deno_core::url::Url;
use deno_runtime::deno_io::Stdio;
use deno_runtime::deno_io::StdioPipe;
@@ -21,11 +26,11 @@ use deno_runtime::deno_permissions::Permissions;
use deno_runtime::deno_permissions::PermissionsContainer;
use deno_runtime::WorkerExecutionMode;
use deno_terminal::colors;
-
use jupyter_runtime::jupyter::ConnectionInfo;
use jupyter_runtime::messaging::StreamContent;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender;
+use tokio::sync::oneshot;
mod install;
pub mod server;
@@ -142,7 +147,377 @@ pub async fn kernel(
)
}));
- server::JupyterServer::start(spec, stdio_rx, repl_session).await?;
+ let (tx1, rx1) = mpsc::unbounded_channel();
+ let (tx2, rx2) = mpsc::unbounded_channel();
+ let (startup_data_tx, startup_data_rx) =
+ oneshot::channel::<server::StartupData>();
+
+ let mut repl_session_proxy = JupyterReplSession {
+ repl_session,
+ rx: rx1,
+ tx: tx2,
+ };
+ let repl_session_proxy_channels = JupyterReplProxy { tx: tx1, rx: rx2 };
+
+ let join_handle = std::thread::spawn(move || {
+ let fut = server::JupyterServer::start(
+ spec,
+ stdio_rx,
+ repl_session_proxy_channels,
+ startup_data_tx,
+ )
+ .boxed_local();
+ deno_runtime::tokio_util::create_and_run_current_thread(fut)
+ });
+
+ let Ok(startup_data) = startup_data_rx.await else {
+ bail!("Failed to acquire startup data");
+ };
+ {
+ let op_state_rc =
+ repl_session_proxy.repl_session.worker.js_runtime.op_state();
+ let mut op_state = op_state_rc.borrow_mut();
+ op_state.put(startup_data.iopub_connection.clone());
+ op_state.put(startup_data.last_execution_request.clone());
+ }
+
+ repl_session_proxy.start().await;
+ let server_result = join_handle.join();
+ match server_result {
+ Ok(result) => {
+ result?;
+ }
+ Err(e) => {
+ bail!("Jupyter kernel error: {:?}", e);
+ }
+ };
Ok(())
}
+
+pub enum JupyterReplRequest {
+ LspCompletions {
+ line_text: String,
+ position: usize,
+ },
+ JsGetProperties {
+ object_id: String,
+ },
+ JsEvaluate {
+ expr: String,
+ },
+ JsGlobalLexicalScopeNames,
+ JsEvaluateLineWithObjectWrapping {
+ line: String,
+ },
+ JsCallFunctionOnArgs {
+ function_declaration: String,
+ args: Vec<cdp::RemoteObject>,
+ },
+ JsCallFunctionOn {
+ arg0: cdp::CallArgument,
+ arg1: cdp::CallArgument,
+ },
+}
+
+pub enum JupyterReplResponse {
+ LspCompletions(Vec<ReplCompletionItem>),
+ JsGetProperties(Option<cdp::GetPropertiesResponse>),
+ JsEvaluate(Option<cdp::EvaluateResponse>),
+ JsGlobalLexicalScopeNames(cdp::GlobalLexicalScopeNamesResponse),
+ JsEvaluateLineWithObjectWrapping(Result<repl::TsEvaluateResponse, AnyError>),
+ JsCallFunctionOnArgs(Result<cdp::CallFunctionOnResponse, AnyError>),
+ JsCallFunctionOn(Option<cdp::CallFunctionOnResponse>),
+}
+
+pub struct JupyterReplProxy {
+ tx: mpsc::UnboundedSender<JupyterReplRequest>,
+ rx: mpsc::UnboundedReceiver<JupyterReplResponse>,
+}
+
+impl JupyterReplProxy {
+ pub async fn lsp_completions(
+ &mut self,
+ line_text: String,
+ position: usize,
+ ) -> Vec<ReplCompletionItem> {
+ let _ = self.tx.send(JupyterReplRequest::LspCompletions {
+ line_text,
+ position,
+ });
+ let Some(JupyterReplResponse::LspCompletions(resp)) = self.rx.recv().await
+ else {
+ unreachable!()
+ };
+ resp
+ }
+
+ pub async fn get_properties(
+ &mut self,
+ object_id: String,
+ ) -> Option<cdp::GetPropertiesResponse> {
+ let _ = self
+ .tx
+ .send(JupyterReplRequest::JsGetProperties { object_id });
+ let Some(JupyterReplResponse::JsGetProperties(resp)) = self.rx.recv().await
+ else {
+ unreachable!()
+ };
+ resp
+ }
+
+ pub async fn evaluate(
+ &mut self,
+ expr: String,
+ ) -> Option<cdp::EvaluateResponse> {
+ let _ = self.tx.send(JupyterReplRequest::JsEvaluate { expr });
+ let Some(JupyterReplResponse::JsEvaluate(resp)) = self.rx.recv().await
+ else {
+ unreachable!()
+ };
+ resp
+ }
+
+ pub async fn global_lexical_scope_names(
+ &mut self,
+ ) -> cdp::GlobalLexicalScopeNamesResponse {
+ let _ = self.tx.send(JupyterReplRequest::JsGlobalLexicalScopeNames);
+ let Some(JupyterReplResponse::JsGlobalLexicalScopeNames(resp)) =
+ self.rx.recv().await
+ else {
+ unreachable!()
+ };
+ resp
+ }
+
+ pub async fn evaluate_line_with_object_wrapping(
+ &mut self,
+ line: String,
+ ) -> Result<repl::TsEvaluateResponse, AnyError> {
+ let _ = self
+ .tx
+ .send(JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line });
+ let Some(JupyterReplResponse::JsEvaluateLineWithObjectWrapping(resp)) =
+ self.rx.recv().await
+ else {
+ unreachable!()
+ };
+ resp
+ }
+
+ pub async fn call_function_on_args(
+ &mut self,
+ function_declaration: String,
+ args: Vec<cdp::RemoteObject>,
+ ) -> Result<cdp::CallFunctionOnResponse, AnyError> {
+ let _ = self.tx.send(JupyterReplRequest::JsCallFunctionOnArgs {
+ function_declaration,
+ args,
+ });
+ let Some(JupyterReplResponse::JsCallFunctionOnArgs(resp)) =
+ self.rx.recv().await
+ else {
+ unreachable!()
+ };
+ resp
+ }
+
+ // TODO(bartlomieju): rename to "broadcast_result"?
+ pub async fn call_function_on(
+ &mut self,
+ arg0: cdp::CallArgument,
+ arg1: cdp::CallArgument,
+ ) -> Option<cdp::CallFunctionOnResponse> {
+ let _ = self
+ .tx
+ .send(JupyterReplRequest::JsCallFunctionOn { arg0, arg1 });
+ let Some(JupyterReplResponse::JsCallFunctionOn(resp)) =
+ self.rx.recv().await
+ else {
+ unreachable!()
+ };
+ resp
+ }
+}
+
+pub struct JupyterReplSession {
+ repl_session: repl::ReplSession,
+ rx: mpsc::UnboundedReceiver<JupyterReplRequest>,
+ tx: mpsc::UnboundedSender<JupyterReplResponse>,
+}
+
+impl JupyterReplSession {
+ pub async fn start(&mut self) {
+ loop {
+ let Some(msg) = self.rx.recv().await else {
+ break;
+ };
+ let resp = match msg {
+ JupyterReplRequest::LspCompletions {
+ line_text,
+ position,
+ } => JupyterReplResponse::LspCompletions(
+ self.lsp_completions(&line_text, position).await,
+ ),
+ JupyterReplRequest::JsGetProperties { object_id } => {
+ JupyterReplResponse::JsGetProperties(
+ self.get_properties(object_id).await,
+ )
+ }
+ JupyterReplRequest::JsEvaluate { expr } => {
+ JupyterReplResponse::JsEvaluate(self.evaluate(expr).await)
+ }
+ JupyterReplRequest::JsGlobalLexicalScopeNames => {
+ JupyterReplResponse::JsGlobalLexicalScopeNames(
+ self.global_lexical_scope_names().await,
+ )
+ }
+ JupyterReplRequest::JsEvaluateLineWithObjectWrapping { line } => {
+ JupyterReplResponse::JsEvaluateLineWithObjectWrapping(
+ self.evaluate_line_with_object_wrapping(&line).await,
+ )
+ }
+ JupyterReplRequest::JsCallFunctionOnArgs {
+ function_declaration,
+ args,
+ } => JupyterReplResponse::JsCallFunctionOnArgs(
+ self
+ .call_function_on_args(function_declaration, &args)
+ .await,
+ ),
+ JupyterReplRequest::JsCallFunctionOn { arg0, arg1 } => {
+ JupyterReplResponse::JsCallFunctionOn(
+ self.call_function_on(arg0, arg1).await,
+ )
+ }
+ };
+
+ let Ok(()) = self.tx.send(resp) else {
+ break;
+ };
+ }
+ }
+
+ pub async fn lsp_completions(
+ &mut self,
+ line_text: &str,
+ position: usize,
+ ) -> Vec<ReplCompletionItem> {
+ self
+ .repl_session
+ .language_server
+ .completions(line_text, position)
+ .await
+ }
+
+ pub async fn get_properties(
+ &mut self,
+ object_id: String,
+ ) -> Option<cdp::GetPropertiesResponse> {
+ let get_properties_response = self
+ .repl_session
+ .post_message_with_event_loop(
+ "Runtime.getProperties",
+ Some(cdp::GetPropertiesArgs {
+ object_id,
+ own_properties: None,
+ accessor_properties_only: None,
+ generate_preview: None,
+ non_indexed_properties_only: Some(true),
+ }),
+ )
+ .await
+ .ok()?;
+ serde_json::from_value(get_properties_response).ok()
+ }
+
+ pub async fn evaluate(
+ &mut self,
+ expr: String,
+ ) -> Option<cdp::EvaluateResponse> {
+ let evaluate_response: serde_json::Value = self
+ .repl_session
+ .post_message_with_event_loop(
+ "Runtime.evaluate",
+ Some(cdp::EvaluateArgs {
+ expression: expr,
+ object_group: None,
+ include_command_line_api: None,
+ silent: None,
+ context_id: Some(self.repl_session.context_id),
+ return_by_value: None,
+ generate_preview: None,
+ user_gesture: None,
+ await_promise: None,
+ throw_on_side_effect: Some(true),
+ timeout: Some(200),
+ disable_breaks: None,
+ repl_mode: None,
+ allow_unsafe_eval_blocked_by_csp: None,
+ unique_context_id: None,
+ }),
+ )
+ .await
+ .ok()?;
+ serde_json::from_value(evaluate_response).ok()
+ }
+
+ pub async fn global_lexical_scope_names(
+ &mut self,
+ ) -> cdp::GlobalLexicalScopeNamesResponse {
+ let evaluate_response = self
+ .repl_session
+ .post_message_with_event_loop(
+ "Runtime.globalLexicalScopeNames",
+ Some(cdp::GlobalLexicalScopeNamesArgs {
+ execution_context_id: Some(self.repl_session.context_id),
+ }),
+ )
+ .await
+ .unwrap();
+ serde_json::from_value(evaluate_response).unwrap()
+ }
+
+ pub async fn evaluate_line_with_object_wrapping(
+ &mut self,
+ line: &str,
+ ) -> Result<repl::TsEvaluateResponse, AnyError> {
+ self
+ .repl_session
+ .evaluate_line_with_object_wrapping(line)
+ .await
+ }
+
+ pub async fn call_function_on_args(
+ &mut self,
+ function_declaration: String,
+ args: &[cdp::RemoteObject],
+ ) -> Result<cdp::CallFunctionOnResponse, AnyError> {
+ self
+ .repl_session
+ .call_function_on_args(function_declaration, args)
+ .await
+ }
+
+ // TODO(bartlomieju): rename to "broadcast_result"?
+ pub async fn call_function_on(
+ &mut self,
+ arg0: cdp::CallArgument,
+ arg1: cdp::CallArgument,
+ ) -> Option<cdp::CallFunctionOnResponse> {
+ let response = self.repl_session
+ .post_message_with_event_loop(
+ "Runtime.callFunctionOn",
+ Some(json!({
+ "functionDeclaration": r#"async function (execution_count, result) {
+ await Deno[Deno.internal].jupyter.broadcastResult(execution_count, result);
+ }"#,
+ "arguments": [arg0, arg1],
+ "executionContextId": self.repl_session.context_id,
+ "awaitPromise": true,
+ })),
+ )
+ .await.ok()?;
+ serde_json::from_value(response).ok()
+ }
+}