summaryrefslogtreecommitdiff
path: root/cli/tools/jupyter/server.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2024-07-02 23:37:54 +0100
committerGitHub <noreply@github.com>2024-07-03 00:37:54 +0200
commit7d919f6fd980ed54785e86892a518f0bdf68f475 (patch)
treedda1342e996b4d8d19ff0aa17892590822d2b56f /cli/tools/jupyter/server.rs
parentc13b6d1413859d03b41b97d4c671fccfd388b2cc (diff)
refactor(jupyter): move ZeroMQ server to a separate thread (#24373)
Moves the ZeroMQ messaging server to a separate thread. This will allow to run blocking JS code and maintain communication with the notebook frontend. Towards https://github.com/denoland/deno/pull/23592 Towards https://github.com/denoland/deno/pull/24250 Closes https://github.com/denoland/deno/issues/23617
Diffstat (limited to 'cli/tools/jupyter/server.rs')
-rw-r--r--cli/tools/jupyter/server.rs251
1 files changed, 104 insertions, 147 deletions
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs
index 36f4d5c18..6a3831c49 100644
--- a/cli/tools/jupyter/server.rs
+++ b/cli/tools/jupyter/server.rs
@@ -3,20 +3,20 @@
// This file is forked/ported from <https://github.com/evcxr/evcxr>
// Copyright 2020 The Evcxr Authors. MIT license.
-use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;
use crate::cdp;
use crate::tools::repl;
+use deno_core::anyhow::bail;
use deno_core::error::AnyError;
use deno_core::futures;
use deno_core::serde_json;
-use deno_core::serde_json::json;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use tokio::sync::mpsc;
+use tokio::sync::oneshot;
use tokio::sync::Mutex;
use jupyter_runtime::messaging;
@@ -25,27 +25,32 @@ use jupyter_runtime::ConnectionInfo;
use jupyter_runtime::JupyterMessage;
use jupyter_runtime::JupyterMessageContent;
use jupyter_runtime::KernelControlConnection;
-use jupyter_runtime::KernelHeartbeatConnection;
use jupyter_runtime::KernelIoPubConnection;
use jupyter_runtime::KernelShellConnection;
use jupyter_runtime::ReplyError;
use jupyter_runtime::ReplyStatus;
use jupyter_runtime::StreamContent;
+use super::JupyterReplProxy;
+
pub struct JupyterServer {
execution_count: usize,
- last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
- // This is Arc<Mutex<>>, so we don't hold RefCell borrows across await
- // points.
+ last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
- repl_session: repl::ReplSession,
+ repl_session_proxy: JupyterReplProxy,
+}
+
+pub struct StartupData {
+ pub iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
+ pub last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
}
impl JupyterServer {
pub async fn start(
connection_info: ConnectionInfo,
mut stdio_rx: mpsc::UnboundedReceiver<StreamContent>,
- mut repl_session: repl::ReplSession,
+ repl_session_proxy: JupyterReplProxy,
+ setup_tx: oneshot::Sender<StartupData>,
) -> Result<(), AnyError> {
let mut heartbeat =
connection_info.create_kernel_heartbeat_connection().await?;
@@ -59,15 +64,14 @@ impl JupyterServer {
connection_info.create_kernel_iopub_connection().await?;
let iopub_connection = Arc::new(Mutex::new(iopub_connection));
- let last_execution_request = Rc::new(RefCell::new(None));
-
- // Store `iopub_connection` in the op state so it's accessible to the runtime API.
- {
- let op_state_rc = repl_session.worker.js_runtime.op_state();
- let mut op_state = op_state_rc.borrow_mut();
- op_state.put(iopub_connection.clone());
- op_state.put(last_execution_request.clone());
- }
+ let last_execution_request = Arc::new(Mutex::new(None));
+
+ let Ok(()) = setup_tx.send(StartupData {
+ iopub_connection: iopub_connection.clone(),
+ last_execution_request: last_execution_request.clone(),
+ }) else {
+ bail!("Failed to send startup data");
+ };
let cancel_handle = CancelHandle::new_rc();
@@ -75,20 +79,22 @@ impl JupyterServer {
execution_count: 0,
iopub_connection: iopub_connection.clone(),
last_execution_request: last_execution_request.clone(),
- repl_session,
+ repl_session_proxy,
};
- let handle1 = deno_core::unsync::spawn(async move {
- if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await {
- log::error!(
- "Heartbeat error: {}\nBacktrace:\n{}",
- err,
- err.backtrace()
- );
+ let hearbeat_fut = deno_core::unsync::spawn(async move {
+ loop {
+ if let Err(err) = heartbeat.single_heartbeat().await {
+ log::error!(
+ "Heartbeat error: {}\nBacktrace:\n{}",
+ err,
+ err.backtrace()
+ );
+ }
}
});
- let handle2 = deno_core::unsync::spawn({
+ let control_fut = deno_core::unsync::spawn({
let cancel_handle = cancel_handle.clone();
async move {
if let Err(err) =
@@ -103,13 +109,13 @@ impl JupyterServer {
}
});
- let handle3 = deno_core::unsync::spawn(async move {
+ let shell_fut = deno_core::unsync::spawn(async move {
if let Err(err) = server.handle_shell(shell_connection).await {
log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace());
}
});
- let handle4 = deno_core::unsync::spawn(async move {
+ let stdio_fut = deno_core::unsync::spawn(async move {
while let Some(stdio_msg) = stdio_rx.recv().await {
Self::handle_stdio_msg(
iopub_connection.clone(),
@@ -120,8 +126,15 @@ impl JupyterServer {
}
});
- let join_fut =
- futures::future::try_join_all(vec![handle1, handle2, handle3, handle4]);
+ let repl_session_fut = deno_core::unsync::spawn(async move {});
+
+ let join_fut = futures::future::try_join_all(vec![
+ hearbeat_fut,
+ control_fut,
+ shell_fut,
+ stdio_fut,
+ repl_session_fut,
+ ]);
if let Ok(result) = join_fut.or_cancel(cancel_handle).await {
result?;
@@ -132,26 +145,19 @@ impl JupyterServer {
async fn handle_stdio_msg(
iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
- last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
+ last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
stdio_msg: StreamContent,
) {
- let maybe_exec_result = last_execution_request.borrow().clone();
- if let Some(exec_request) = maybe_exec_result {
- let result = (iopub_connection.lock().await)
- .send(stdio_msg.as_child_of(&exec_request))
- .await;
+ let maybe_exec_result = last_execution_request.lock().await.clone();
+ let Some(exec_request) = maybe_exec_result else {
+ return;
+ };
- if let Err(err) = result {
- log::error!("Output error: {}", err);
- }
- }
- }
+ let mut iopub_conn = iopub_connection.lock().await;
+ let result = iopub_conn.send(stdio_msg.as_child_of(&exec_request)).await;
- async fn handle_heartbeat(
- connection: &mut KernelHeartbeatConnection,
- ) -> Result<(), AnyError> {
- loop {
- connection.single_heartbeat().await?;
+ if let Err(err) = result {
+ log::error!("Output error: {}", err);
}
}
@@ -222,9 +228,8 @@ impl JupyterServer {
let cursor_pos = req.cursor_pos;
let lsp_completions = self
- .repl_session
- .language_server
- .completions(&user_code, cursor_pos)
+ .repl_session_proxy
+ .lsp_completions(user_code.clone(), cursor_pos)
.await;
if !lsp_completions.is_empty() {
@@ -263,27 +268,32 @@ impl JupyterServer {
{
let sub_expr = &expr[..index];
let prop_name = &expr[index + 1..];
- let candidates =
- get_expression_property_names(&mut self.repl_session, sub_expr)
- .await
- .into_iter()
- .filter(|n| {
- !n.starts_with("Symbol(")
- && n.starts_with(prop_name)
- && n != &*repl::REPL_INTERNALS_NAME
- })
- .collect();
+ let candidates = get_expression_property_names(
+ &mut self.repl_session_proxy,
+ sub_expr,
+ )
+ .await
+ .into_iter()
+ .filter(|n| {
+ !n.starts_with("Symbol(")
+ && n.starts_with(prop_name)
+ && n != &*repl::REPL_INTERNALS_NAME
+ })
+ .collect();
(candidates, cursor_pos - prop_name.len())
} else {
// combine results of declarations and globalThis properties
let mut candidates = get_expression_property_names(
- &mut self.repl_session,
+ &mut self.repl_session_proxy,
"globalThis",
)
.await
.into_iter()
- .chain(get_global_lexical_scope_names(&mut self.repl_session).await)
+ .chain(
+ get_global_lexical_scope_names(&mut self.repl_session_proxy)
+ .await,
+ )
.filter(|n| n.starts_with(expr) && n != &*repl::REPL_INTERNALS_NAME)
.collect::<Vec<_>>();
@@ -419,7 +429,7 @@ impl JupyterServer {
if !execute_request.silent && execute_request.store_history {
self.execution_count += 1;
}
- *self.last_execution_request.borrow_mut() = Some(parent_message.clone());
+ *self.last_execution_request.lock().await = Some(parent_message.clone());
self
.send_iopub(
@@ -432,8 +442,8 @@ impl JupyterServer {
.await?;
let result = self
- .repl_session
- .evaluate_line_with_object_wrapping(&execute_request.code)
+ .repl_session_proxy
+ .evaluate_line_with_object_wrapping(execute_request.code)
.await;
let evaluate_response = match result {
@@ -471,8 +481,12 @@ impl JupyterServer {
} = evaluate_response.value;
if exception_details.is_none() {
- publish_result(&mut self.repl_session, &result, self.execution_count)
- .await?;
+ publish_result(
+ &mut self.repl_session_proxy,
+ &result,
+ self.execution_count,
+ )
+ .await?;
connection
.send(
@@ -497,7 +511,7 @@ impl JupyterServer {
exception_details.exception
{
let result = self
- .repl_session
+ .repl_session_proxy
.call_function_on_args(
r#"
function(object) {
@@ -513,7 +527,7 @@ impl JupyterServer {
}
"#
.into(),
- &[exception],
+ vec![exception],
)
.await?;
@@ -629,7 +643,7 @@ fn kernel_info() -> messaging::KernelInfoReply {
}
async fn publish_result(
- session: &mut repl::ReplSession,
+ repl_session_proxy: &mut JupyterReplProxy,
evaluate_result: &cdp::RemoteObject,
execution_count: usize,
) -> Result<Option<HashMap<String, serde_json::Value>>, AnyError> {
@@ -641,21 +655,10 @@ async fn publish_result(
let arg1 = cdp::CallArgument::from(evaluate_result);
- let response = session
- .post_message_with_event_loop(
- "Runtime.callFunctionOn",
- Some(json!({
- "functionDeclaration": r#"async function (execution_count, result) {
- await Deno[Deno.internal].jupyter.broadcastResult(execution_count, result);
- }"#,
- "arguments": [arg0, arg1],
- "executionContextId": session.context_id,
- "awaitPromise": true,
- })),
- )
- .await?;
-
- let response: cdp::CallFunctionOnResponse = serde_json::from_value(response)?;
+ let Some(response) = repl_session_proxy.call_function_on(arg0, arg1).await
+ else {
+ return Ok(None);
+ };
if let Some(exception_details) = &response.exception_details {
// If the object doesn't have a Jupyter.display method or it throws an
@@ -693,34 +696,25 @@ fn is_word_boundary(c: char) -> bool {
// TODO(bartlomieju): dedup with repl::editor
async fn get_global_lexical_scope_names(
- session: &mut repl::ReplSession,
+ repl_session_proxy: &mut JupyterReplProxy,
) -> Vec<String> {
- let evaluate_response = session
- .post_message_with_event_loop(
- "Runtime.globalLexicalScopeNames",
- Some(cdp::GlobalLexicalScopeNamesArgs {
- execution_context_id: Some(session.context_id),
- }),
- )
- .await
- .unwrap();
- let evaluate_response: cdp::GlobalLexicalScopeNamesResponse =
- serde_json::from_value(evaluate_response).unwrap();
- evaluate_response.names
+ repl_session_proxy.global_lexical_scope_names().await.names
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_expression_property_names(
- session: &mut repl::ReplSession,
+ repl_session_proxy: &mut JupyterReplProxy,
expr: &str,
) -> Vec<String> {
// try to get the properties from the expression
- if let Some(properties) = get_object_expr_properties(session, expr).await {
+ if let Some(properties) =
+ get_object_expr_properties(repl_session_proxy, expr).await
+ {
return properties;
}
// otherwise fall back to the prototype
- let expr_type = get_expression_type(session, expr).await;
+ let expr_type = get_expression_type(repl_session_proxy, expr).await;
let object_expr = match expr_type.as_deref() {
// possibilities: https://chromedevtools.github.io/devtools-protocol/v8/Runtime/#type-RemoteObject
Some("object") => "Object.prototype",
@@ -732,44 +726,32 @@ async fn get_expression_property_names(
_ => return Vec::new(), // undefined, symbol, and unhandled
};
- get_object_expr_properties(session, object_expr)
+ get_object_expr_properties(repl_session_proxy, object_expr)
.await
.unwrap_or_default()
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_expression_type(
- session: &mut repl::ReplSession,
+ repl_session_proxy: &mut JupyterReplProxy,
expr: &str,
) -> Option<String> {
- evaluate_expression(session, expr)
+ evaluate_expression(repl_session_proxy, expr)
.await
.map(|res| res.result.kind)
}
// TODO(bartlomieju): dedup with repl::editor
async fn get_object_expr_properties(
- session: &mut repl::ReplSession,
+ repl_session_proxy: &mut JupyterReplProxy,
object_expr: &str,
) -> Option<Vec<String>> {
- let evaluate_result = evaluate_expression(session, object_expr).await?;
+ let evaluate_result =
+ evaluate_expression(repl_session_proxy, object_expr).await?;
let object_id = evaluate_result.result.object_id?;
- let get_properties_response = session
- .post_message_with_event_loop(
- "Runtime.getProperties",
- Some(cdp::GetPropertiesArgs {
- object_id,
- own_properties: None,
- accessor_properties_only: None,
- generate_preview: None,
- non_indexed_properties_only: Some(true),
- }),
- )
- .await
- .ok()?;
- let get_properties_response: cdp::GetPropertiesResponse =
- serde_json::from_value(get_properties_response).ok()?;
+ let get_properties_response =
+ repl_session_proxy.get_properties(object_id.clone()).await?;
Some(
get_properties_response
.result
@@ -781,35 +763,10 @@ async fn get_object_expr_properties(
// TODO(bartlomieju): dedup with repl::editor
async fn evaluate_expression(
- session: &mut repl::ReplSession,
+ repl_session_proxy: &mut JupyterReplProxy,
expr: &str,
) -> Option<cdp::EvaluateResponse> {
- let evaluate_response = session
- .post_message_with_event_loop(
- "Runtime.evaluate",
- Some(cdp::EvaluateArgs {
- expression: expr.to_string(),
- object_group: None,
- include_command_line_api: None,
- silent: None,
- context_id: Some(session.context_id),
- return_by_value: None,
- generate_preview: None,
- user_gesture: None,
- await_promise: None,
- throw_on_side_effect: Some(true),
- timeout: Some(200),
- disable_breaks: None,
- repl_mode: None,
- allow_unsafe_eval_blocked_by_csp: None,
- unique_context_id: None,
- }),
- )
- .await
- .ok()?;
- let evaluate_response: cdp::EvaluateResponse =
- serde_json::from_value(evaluate_response).ok()?;
-
+ let evaluate_response = repl_session_proxy.evaluate(expr.to_string()).await?;
if evaluate_response.exception_details.is_some() {
None
} else {