diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2024-07-02 23:37:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-03 00:37:54 +0200 |
commit | 7d919f6fd980ed54785e86892a518f0bdf68f475 (patch) | |
tree | dda1342e996b4d8d19ff0aa17892590822d2b56f /cli/tools/jupyter/server.rs | |
parent | c13b6d1413859d03b41b97d4c671fccfd388b2cc (diff) |
refactor(jupyter): move ZeroMQ server to a separate thread (#24373)
Moves the ZeroMQ messaging server to a separate thread.
This will allow to run blocking JS code and maintain communication
with the notebook frontend.
Towards https://github.com/denoland/deno/pull/23592
Towards https://github.com/denoland/deno/pull/24250
Closes https://github.com/denoland/deno/issues/23617
Diffstat (limited to 'cli/tools/jupyter/server.rs')
-rw-r--r-- | cli/tools/jupyter/server.rs | 251 |
1 files changed, 104 insertions, 147 deletions
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs index 36f4d5c18..6a3831c49 100644 --- a/cli/tools/jupyter/server.rs +++ b/cli/tools/jupyter/server.rs @@ -3,20 +3,20 @@ // This file is forked/ported from <https://github.com/evcxr/evcxr> // Copyright 2020 The Evcxr Authors. MIT license. -use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; use std::sync::Arc; use crate::cdp; use crate::tools::repl; +use deno_core::anyhow::bail; use deno_core::error::AnyError; use deno_core::futures; use deno_core::serde_json; -use deno_core::serde_json::json; use deno_core::CancelFuture; use deno_core::CancelHandle; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio::sync::Mutex; use jupyter_runtime::messaging; @@ -25,27 +25,32 @@ use jupyter_runtime::ConnectionInfo; use jupyter_runtime::JupyterMessage; use jupyter_runtime::JupyterMessageContent; use jupyter_runtime::KernelControlConnection; -use jupyter_runtime::KernelHeartbeatConnection; use jupyter_runtime::KernelIoPubConnection; use jupyter_runtime::KernelShellConnection; use jupyter_runtime::ReplyError; use jupyter_runtime::ReplyStatus; use jupyter_runtime::StreamContent; +use super::JupyterReplProxy; + pub struct JupyterServer { execution_count: usize, - last_execution_request: Rc<RefCell<Option<JupyterMessage>>>, - // This is Arc<Mutex<>>, so we don't hold RefCell borrows across await - // points. + last_execution_request: Arc<Mutex<Option<JupyterMessage>>>, iopub_connection: Arc<Mutex<KernelIoPubConnection>>, - repl_session: repl::ReplSession, + repl_session_proxy: JupyterReplProxy, +} + +pub struct StartupData { + pub iopub_connection: Arc<Mutex<KernelIoPubConnection>>, + pub last_execution_request: Arc<Mutex<Option<JupyterMessage>>>, } impl JupyterServer { pub async fn start( connection_info: ConnectionInfo, mut stdio_rx: mpsc::UnboundedReceiver<StreamContent>, - mut repl_session: repl::ReplSession, + repl_session_proxy: JupyterReplProxy, + setup_tx: oneshot::Sender<StartupData>, ) -> Result<(), AnyError> { let mut heartbeat = connection_info.create_kernel_heartbeat_connection().await?; @@ -59,15 +64,14 @@ impl JupyterServer { connection_info.create_kernel_iopub_connection().await?; let iopub_connection = Arc::new(Mutex::new(iopub_connection)); - let last_execution_request = Rc::new(RefCell::new(None)); - - // Store `iopub_connection` in the op state so it's accessible to the runtime API. - { - let op_state_rc = repl_session.worker.js_runtime.op_state(); - let mut op_state = op_state_rc.borrow_mut(); - op_state.put(iopub_connection.clone()); - op_state.put(last_execution_request.clone()); - } + let last_execution_request = Arc::new(Mutex::new(None)); + + let Ok(()) = setup_tx.send(StartupData { + iopub_connection: iopub_connection.clone(), + last_execution_request: last_execution_request.clone(), + }) else { + bail!("Failed to send startup data"); + }; let cancel_handle = CancelHandle::new_rc(); @@ -75,20 +79,22 @@ impl JupyterServer { execution_count: 0, iopub_connection: iopub_connection.clone(), last_execution_request: last_execution_request.clone(), - repl_session, + repl_session_proxy, }; - let handle1 = deno_core::unsync::spawn(async move { - if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await { - log::error!( - "Heartbeat error: {}\nBacktrace:\n{}", - err, - err.backtrace() - ); + let hearbeat_fut = deno_core::unsync::spawn(async move { + loop { + if let Err(err) = heartbeat.single_heartbeat().await { + log::error!( + "Heartbeat error: {}\nBacktrace:\n{}", + err, + err.backtrace() + ); + } } }); - let handle2 = deno_core::unsync::spawn({ + let control_fut = deno_core::unsync::spawn({ let cancel_handle = cancel_handle.clone(); async move { if let Err(err) = @@ -103,13 +109,13 @@ impl JupyterServer { } }); - let handle3 = deno_core::unsync::spawn(async move { + let shell_fut = deno_core::unsync::spawn(async move { if let Err(err) = server.handle_shell(shell_connection).await { log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace()); } }); - let handle4 = deno_core::unsync::spawn(async move { + let stdio_fut = deno_core::unsync::spawn(async move { while let Some(stdio_msg) = stdio_rx.recv().await { Self::handle_stdio_msg( iopub_connection.clone(), @@ -120,8 +126,15 @@ impl JupyterServer { } }); - let join_fut = - futures::future::try_join_all(vec![handle1, handle2, handle3, handle4]); + let repl_session_fut = deno_core::unsync::spawn(async move {}); + + let join_fut = futures::future::try_join_all(vec![ + hearbeat_fut, + control_fut, + shell_fut, + stdio_fut, + repl_session_fut, + ]); if let Ok(result) = join_fut.or_cancel(cancel_handle).await { result?; @@ -132,26 +145,19 @@ impl JupyterServer { async fn handle_stdio_msg( iopub_connection: Arc<Mutex<KernelIoPubConnection>>, - last_execution_request: Rc<RefCell<Option<JupyterMessage>>>, + last_execution_request: Arc<Mutex<Option<JupyterMessage>>>, stdio_msg: StreamContent, ) { - let maybe_exec_result = last_execution_request.borrow().clone(); - if let Some(exec_request) = maybe_exec_result { - let result = (iopub_connection.lock().await) - .send(stdio_msg.as_child_of(&exec_request)) - .await; + let maybe_exec_result = last_execution_request.lock().await.clone(); + let Some(exec_request) = maybe_exec_result else { + return; + }; - if let Err(err) = result { - log::error!("Output error: {}", err); - } - } - } + let mut iopub_conn = iopub_connection.lock().await; + let result = iopub_conn.send(stdio_msg.as_child_of(&exec_request)).await; - async fn handle_heartbeat( - connection: &mut KernelHeartbeatConnection, - ) -> Result<(), AnyError> { - loop { - connection.single_heartbeat().await?; + if let Err(err) = result { + log::error!("Output error: {}", err); } } @@ -222,9 +228,8 @@ impl JupyterServer { let cursor_pos = req.cursor_pos; let lsp_completions = self - .repl_session - .language_server - .completions(&user_code, cursor_pos) + .repl_session_proxy + .lsp_completions(user_code.clone(), cursor_pos) .await; if !lsp_completions.is_empty() { @@ -263,27 +268,32 @@ impl JupyterServer { { let sub_expr = &expr[..index]; let prop_name = &expr[index + 1..]; - let candidates = - get_expression_property_names(&mut self.repl_session, sub_expr) - .await - .into_iter() - .filter(|n| { - !n.starts_with("Symbol(") - && n.starts_with(prop_name) - && n != &*repl::REPL_INTERNALS_NAME - }) - .collect(); + let candidates = get_expression_property_names( + &mut self.repl_session_proxy, + sub_expr, + ) + .await + .into_iter() + .filter(|n| { + !n.starts_with("Symbol(") + && n.starts_with(prop_name) + && n != &*repl::REPL_INTERNALS_NAME + }) + .collect(); (candidates, cursor_pos - prop_name.len()) } else { // combine results of declarations and globalThis properties let mut candidates = get_expression_property_names( - &mut self.repl_session, + &mut self.repl_session_proxy, "globalThis", ) .await .into_iter() - .chain(get_global_lexical_scope_names(&mut self.repl_session).await) + .chain( + get_global_lexical_scope_names(&mut self.repl_session_proxy) + .await, + ) .filter(|n| n.starts_with(expr) && n != &*repl::REPL_INTERNALS_NAME) .collect::<Vec<_>>(); @@ -419,7 +429,7 @@ impl JupyterServer { if !execute_request.silent && execute_request.store_history { self.execution_count += 1; } - *self.last_execution_request.borrow_mut() = Some(parent_message.clone()); + *self.last_execution_request.lock().await = Some(parent_message.clone()); self .send_iopub( @@ -432,8 +442,8 @@ impl JupyterServer { .await?; let result = self - .repl_session - .evaluate_line_with_object_wrapping(&execute_request.code) + .repl_session_proxy + .evaluate_line_with_object_wrapping(execute_request.code) .await; let evaluate_response = match result { @@ -471,8 +481,12 @@ impl JupyterServer { } = evaluate_response.value; if exception_details.is_none() { - publish_result(&mut self.repl_session, &result, self.execution_count) - .await?; + publish_result( + &mut self.repl_session_proxy, + &result, + self.execution_count, + ) + .await?; connection .send( @@ -497,7 +511,7 @@ impl JupyterServer { exception_details.exception { let result = self - .repl_session + .repl_session_proxy .call_function_on_args( r#" function(object) { @@ -513,7 +527,7 @@ impl JupyterServer { } "# .into(), - &[exception], + vec![exception], ) .await?; @@ -629,7 +643,7 @@ fn kernel_info() -> messaging::KernelInfoReply { } async fn publish_result( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, evaluate_result: &cdp::RemoteObject, execution_count: usize, ) -> Result<Option<HashMap<String, serde_json::Value>>, AnyError> { @@ -641,21 +655,10 @@ async fn publish_result( let arg1 = cdp::CallArgument::from(evaluate_result); - let response = session - .post_message_with_event_loop( - "Runtime.callFunctionOn", - Some(json!({ - "functionDeclaration": r#"async function (execution_count, result) { - await Deno[Deno.internal].jupyter.broadcastResult(execution_count, result); - }"#, - "arguments": [arg0, arg1], - "executionContextId": session.context_id, - "awaitPromise": true, - })), - ) - .await?; - - let response: cdp::CallFunctionOnResponse = serde_json::from_value(response)?; + let Some(response) = repl_session_proxy.call_function_on(arg0, arg1).await + else { + return Ok(None); + }; if let Some(exception_details) = &response.exception_details { // If the object doesn't have a Jupyter.display method or it throws an @@ -693,34 +696,25 @@ fn is_word_boundary(c: char) -> bool { // TODO(bartlomieju): dedup with repl::editor async fn get_global_lexical_scope_names( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, ) -> Vec<String> { - let evaluate_response = session - .post_message_with_event_loop( - "Runtime.globalLexicalScopeNames", - Some(cdp::GlobalLexicalScopeNamesArgs { - execution_context_id: Some(session.context_id), - }), - ) - .await - .unwrap(); - let evaluate_response: cdp::GlobalLexicalScopeNamesResponse = - serde_json::from_value(evaluate_response).unwrap(); - evaluate_response.names + repl_session_proxy.global_lexical_scope_names().await.names } // TODO(bartlomieju): dedup with repl::editor async fn get_expression_property_names( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, expr: &str, ) -> Vec<String> { // try to get the properties from the expression - if let Some(properties) = get_object_expr_properties(session, expr).await { + if let Some(properties) = + get_object_expr_properties(repl_session_proxy, expr).await + { return properties; } // otherwise fall back to the prototype - let expr_type = get_expression_type(session, expr).await; + let expr_type = get_expression_type(repl_session_proxy, expr).await; let object_expr = match expr_type.as_deref() { // possibilities: https://chromedevtools.github.io/devtools-protocol/v8/Runtime/#type-RemoteObject Some("object") => "Object.prototype", @@ -732,44 +726,32 @@ async fn get_expression_property_names( _ => return Vec::new(), // undefined, symbol, and unhandled }; - get_object_expr_properties(session, object_expr) + get_object_expr_properties(repl_session_proxy, object_expr) .await .unwrap_or_default() } // TODO(bartlomieju): dedup with repl::editor async fn get_expression_type( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, expr: &str, ) -> Option<String> { - evaluate_expression(session, expr) + evaluate_expression(repl_session_proxy, expr) .await .map(|res| res.result.kind) } // TODO(bartlomieju): dedup with repl::editor async fn get_object_expr_properties( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, object_expr: &str, ) -> Option<Vec<String>> { - let evaluate_result = evaluate_expression(session, object_expr).await?; + let evaluate_result = + evaluate_expression(repl_session_proxy, object_expr).await?; let object_id = evaluate_result.result.object_id?; - let get_properties_response = session - .post_message_with_event_loop( - "Runtime.getProperties", - Some(cdp::GetPropertiesArgs { - object_id, - own_properties: None, - accessor_properties_only: None, - generate_preview: None, - non_indexed_properties_only: Some(true), - }), - ) - .await - .ok()?; - let get_properties_response: cdp::GetPropertiesResponse = - serde_json::from_value(get_properties_response).ok()?; + let get_properties_response = + repl_session_proxy.get_properties(object_id.clone()).await?; Some( get_properties_response .result @@ -781,35 +763,10 @@ async fn get_object_expr_properties( // TODO(bartlomieju): dedup with repl::editor async fn evaluate_expression( - session: &mut repl::ReplSession, + repl_session_proxy: &mut JupyterReplProxy, expr: &str, ) -> Option<cdp::EvaluateResponse> { - let evaluate_response = session - .post_message_with_event_loop( - "Runtime.evaluate", - Some(cdp::EvaluateArgs { - expression: expr.to_string(), - object_group: None, - include_command_line_api: None, - silent: None, - context_id: Some(session.context_id), - return_by_value: None, - generate_preview: None, - user_gesture: None, - await_promise: None, - throw_on_side_effect: Some(true), - timeout: Some(200), - disable_breaks: None, - repl_mode: None, - allow_unsafe_eval_blocked_by_csp: None, - unique_context_id: None, - }), - ) - .await - .ok()?; - let evaluate_response: cdp::EvaluateResponse = - serde_json::from_value(evaluate_response).ok()?; - + let evaluate_response = repl_session_proxy.evaluate(expr.to_string()).await?; if evaluate_response.exception_details.is_some() { None } else { |