diff options
author | Kyle Kelley <rgbkrk@gmail.com> | 2024-05-21 13:35:21 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-21 22:35:21 +0200 |
commit | 8698e80304815353ec52be1b16f96483ebe559a0 (patch) | |
tree | 9abd53d5b656cd8cc0c1aa3940684f3ce1d9c8ef /cli/tools/jupyter/server.rs | |
parent | cc8c0609ebec9f101a1739a0c42c91718ca2abba (diff) |
refactor(jupyter): use runtimelib for Jupyter structures and directory paths (#23826)
This brings in [`runtimelib`](https://github.com/runtimed/runtimed) to
use:
## Fully typed structs for Jupyter Messages
```rust
let msg = connection.read().await?;
self
.send_iopub(
runtimelib::Status::busy().as_child_of(msg),
)
.await?;
```
## Jupyter paths
Jupyter paths are implemented in Rust, allowing the Deno kernel to be
installed completely via Deno without a requirement on Python or
Jupyter. Deno users will be able to install and use the kernel with just
VS Code or other editors that support Jupyter.
```rust
pub fn status() -> Result<(), AnyError> {
let user_data_dir = user_data_dir()?;
let kernel_spec_dir_path = user_data_dir.join("kernels").join("deno");
let kernel_spec_path = kernel_spec_dir_path.join("kernel.json");
if kernel_spec_path.exists() {
log::info!("✅ Deno kernel already installed");
Ok(())
} else {
log::warn!("ℹ️ Deno kernel is not yet installed, run `deno jupyter --install` to set it up");
Ok(())
}
}
```
Closes https://github.com/denoland/deno/issues/21619
Diffstat (limited to 'cli/tools/jupyter/server.rs')
-rw-r--r-- | cli/tools/jupyter/server.rs | 444 |
1 files changed, 279 insertions, 165 deletions
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs index 4021cf6a3..3d273ee74 100644 --- a/cli/tools/jupyter/server.rs +++ b/cli/tools/jupyter/server.rs @@ -19,48 +19,54 @@ use deno_core::CancelHandle; use tokio::sync::mpsc; use tokio::sync::Mutex; -use super::jupyter_msg::Connection; -use super::jupyter_msg::JupyterMessage; -use super::ConnectionSpec; - -pub enum StdioMsg { - Stdout(String), - Stderr(String), -} +use runtimelib::ConnectionInfo; +use runtimelib::KernelControlConnection; +use runtimelib::KernelHeartbeatConnection; +use runtimelib::KernelIoPubConnection; +use runtimelib::KernelShellConnection; + +use runtimelib::messaging; +use runtimelib::AsChildOf; +use runtimelib::JupyterMessage; +use runtimelib::JupyterMessageContent; +use runtimelib::ReplyError; +use runtimelib::ReplyStatus; +use runtimelib::StreamContent; pub struct JupyterServer { execution_count: usize, last_execution_request: Rc<RefCell<Option<JupyterMessage>>>, // This is Arc<Mutex<>>, so we don't hold RefCell borrows across await // points. - iopub_socket: Arc<Mutex<Connection<zeromq::PubSocket>>>, + iopub_connection: Arc<Mutex<KernelIoPubConnection>>, repl_session: repl::ReplSession, } impl JupyterServer { pub async fn start( - spec: ConnectionSpec, - mut stdio_rx: mpsc::UnboundedReceiver<StdioMsg>, + connection_info: ConnectionInfo, + mut stdio_rx: mpsc::UnboundedReceiver<StreamContent>, mut repl_session: repl::ReplSession, ) -> Result<(), AnyError> { let mut heartbeat = - bind_socket::<zeromq::RepSocket>(&spec, spec.hb_port).await?; - let shell_socket = - bind_socket::<zeromq::RouterSocket>(&spec, spec.shell_port).await?; - let control_socket = - bind_socket::<zeromq::RouterSocket>(&spec, spec.control_port).await?; - let _stdin_socket = - bind_socket::<zeromq::RouterSocket>(&spec, spec.stdin_port).await?; - let iopub_socket = - bind_socket::<zeromq::PubSocket>(&spec, spec.iopub_port).await?; - let iopub_socket = Arc::new(Mutex::new(iopub_socket)); + connection_info.create_kernel_heartbeat_connection().await?; + let shell_connection = + connection_info.create_kernel_shell_connection().await?; + let control_connection = + connection_info.create_kernel_control_connection().await?; + let _stdin_connection = + connection_info.create_kernel_stdin_connection().await?; + let iopub_connection = + connection_info.create_kernel_iopub_connection().await?; + + let iopub_connection = Arc::new(Mutex::new(iopub_connection)); let last_execution_request = Rc::new(RefCell::new(None)); - // Store `iopub_socket` in the op state so it's accessible to the runtime API. + // Store `iopub_connection` in the op state so it's accessible to the runtime API. { let op_state_rc = repl_session.worker.js_runtime.op_state(); let mut op_state = op_state_rc.borrow_mut(); - op_state.put(iopub_socket.clone()); + op_state.put(iopub_connection.clone()); op_state.put(last_execution_request.clone()); } @@ -68,14 +74,18 @@ impl JupyterServer { let mut server = Self { execution_count: 0, - iopub_socket: iopub_socket.clone(), + iopub_connection: iopub_connection.clone(), last_execution_request: last_execution_request.clone(), repl_session, }; let handle1 = deno_core::unsync::spawn(async move { if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await { - log::error!("Heartbeat error: {}", err); + log::error!( + "Heartbeat error: {}\nBacktrace:\n{}", + err, + err.backtrace() + ); } }); @@ -83,23 +93,27 @@ impl JupyterServer { let cancel_handle = cancel_handle.clone(); async move { if let Err(err) = - Self::handle_control(control_socket, cancel_handle).await + Self::handle_control(control_connection, cancel_handle).await { - log::error!("Control error: {}", err); + log::error!( + "Control error: {}\nBacktrace:\n{}", + err, + err.backtrace() + ); } } }); let handle3 = deno_core::unsync::spawn(async move { - if let Err(err) = server.handle_shell(shell_socket).await { - log::error!("Shell error: {}", err); + if let Err(err) = server.handle_shell(shell_connection).await { + log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace()); } }); let handle4 = deno_core::unsync::spawn(async move { while let Some(stdio_msg) = stdio_rx.recv().await { Self::handle_stdio_msg( - iopub_socket.clone(), + iopub_connection.clone(), last_execution_request.clone(), stdio_msg, ) @@ -117,33 +131,25 @@ impl JupyterServer { Ok(()) } - async fn handle_stdio_msg<S: zeromq::SocketSend>( - iopub_socket: Arc<Mutex<Connection<S>>>, + async fn handle_stdio_msg( + iopub_connection: Arc<Mutex<KernelIoPubConnection>>, last_execution_request: Rc<RefCell<Option<JupyterMessage>>>, - stdio_msg: StdioMsg, + stdio_msg: StreamContent, ) { let maybe_exec_result = last_execution_request.borrow().clone(); if let Some(exec_request) = maybe_exec_result { - let (name, text) = match stdio_msg { - StdioMsg::Stdout(text) => ("stdout", text), - StdioMsg::Stderr(text) => ("stderr", text), - }; - - let result = (*iopub_socket.lock().await) - .send(&exec_request.new_message("stream").with_content(json!({ - "name": name, - "text": text - }))) + let result = (iopub_connection.lock().await) + .send(stdio_msg.as_child_of(&exec_request)) .await; if let Err(err) = result { - log::error!("Output {} error: {}", name, err); + log::error!("Output error: {}", err); } } } async fn handle_heartbeat( - connection: &mut Connection<zeromq::RepSocket>, + connection: &mut KernelHeartbeatConnection, ) -> Result<(), AnyError> { loop { connection.single_heartbeat().await?; @@ -151,23 +157,30 @@ impl JupyterServer { } async fn handle_control( - mut connection: Connection<zeromq::RouterSocket>, + mut connection: KernelControlConnection, cancel_handle: Rc<CancelHandle>, ) -> Result<(), AnyError> { loop { let msg = connection.read().await?; - match msg.message_type() { - "kernel_info_request" => { - connection - .send(&msg.new_reply().with_content(kernel_info())) - .await?; + + match msg.content { + JupyterMessageContent::KernelInfoRequest(_) => { + // normally kernel info is sent from the shell channel + // however, some frontends will send it on the control channel + // and it's no harm to send a kernel info reply on control + connection.send(kernel_info().as_child_of(&msg)).await?; } - "shutdown_request" => { + JupyterMessageContent::ShutdownRequest(_) => { cancel_handle.cancel(); } - "interrupt_request" => { + JupyterMessageContent::InterruptRequest(_) => { log::error!("Interrupt request currently not supported"); } + JupyterMessageContent::DebugRequest(_) => { + log::error!("Debug request currently not supported"); + // See https://jupyter-client.readthedocs.io/en/latest/messaging.html#debug-request + // and https://microsoft.github.io/debug-adapter-protocol/ + } _ => { log::error!( "Unrecognized control message type: {}", @@ -180,7 +193,7 @@ impl JupyterServer { async fn handle_shell( &mut self, - mut connection: Connection<zeromq::RouterSocket>, + mut connection: KernelShellConnection, ) -> Result<(), AnyError> { loop { let msg = connection.read().await?; @@ -191,43 +204,28 @@ impl JupyterServer { async fn handle_shell_message( &mut self, msg: JupyterMessage, - connection: &mut Connection<zeromq::RouterSocket>, + connection: &mut KernelShellConnection, ) -> Result<(), AnyError> { + let parent = &msg.clone(); + self - .send_iopub( - &msg - .new_message("status") - .with_content(json!({"execution_state": "busy"})), - ) + .send_iopub(messaging::Status::busy().as_child_of(parent)) .await?; - match msg.message_type() { - "kernel_info_request" => { - connection - .send(&msg.new_reply().with_content(kernel_info())) - .await?; - } - "is_complete_request" => { - connection - .send(&msg.new_reply().with_content(json!({"status": "complete"}))) - .await?; - } - "execute_request" => { + match msg.content { + JupyterMessageContent::ExecuteRequest(execute_request) => { self - .handle_execution_request(msg.clone(), connection) + .handle_execution_request(execute_request, parent, connection) .await?; } - "comm_open" => { - self.send_iopub(&msg.comm_close_message()).await?; - } - "complete_request" => { - let user_code = msg.code(); - let cursor_pos = msg.cursor_pos(); + JupyterMessageContent::CompleteRequest(req) => { + let user_code = req.code; + let cursor_pos = req.cursor_pos; let lsp_completions = self .repl_session .language_server - .completions(user_code, cursor_pos) + .completions(&user_code, cursor_pos) .await; if !lsp_completions.is_empty() { @@ -247,16 +245,20 @@ impl JupyterServer { .unwrap_or(cursor_pos); connection - .send(&msg.new_reply().with_content(json!({ - "status": "ok", - "matches": matches, - "cursor_start": cursor_start, - "cursor_end": cursor_end, - "metadata": {}, - }))) + .send( + messaging::CompleteReply { + matches, + cursor_start, + cursor_end, + metadata: Default::default(), + status: ReplyStatus::Ok, + error: None, + } + .as_child_of(parent), + ) .await?; } else { - let expr = get_expr_from_line_at_pos(user_code, cursor_pos); + let expr = get_expr_from_line_at_pos(&user_code, cursor_pos); // check if the expression is in the form `obj.prop` let (completions, cursor_start) = if let Some(index) = expr.rfind('.') { @@ -292,72 +294,173 @@ impl JupyterServer { (candidates, cursor_pos - expr.len()) }; + connection - .send(&msg.new_reply().with_content(json!({ - "status": "ok", - "matches": completions, - "cursor_start": cursor_start, - "cursor_end": cursor_pos, - "metadata": {}, - }))) + .send( + messaging::CompleteReply { + matches: completions, + cursor_start, + cursor_end: cursor_pos, + metadata: Default::default(), + status: ReplyStatus::Ok, + error: None, + } + .as_child_of(parent), + ) .await?; } } - "comm_msg" | "comm_info_request" | "history_request" => { - // We don't handle these messages + + JupyterMessageContent::InspectRequest(_req) => { + // TODO(bartlomieju?): implement introspection request + // The inspect request is used to get information about an object at cursor position. + // There are two detail levels: 0 is typically documentation, 1 is typically source code + + // The response includes a MimeBundle to render the object: + // { + // "status": "ok", + // "found": true, + // "data": { + // "text/plain": "Plain documentation here", + // "text/html": "<div>Rich documentation here</div>", + // "application/json": { + // "key1": "value1", + // "key2": "value2" + // } + // }, + // } + + connection + .send( + messaging::InspectReply { + status: ReplyStatus::Ok, + found: false, + data: Default::default(), + metadata: Default::default(), + error: None, + } + .as_child_of(parent), + ) + .await?; + } + + JupyterMessageContent::IsCompleteRequest(_) => { + connection + .send(messaging::IsCompleteReply::complete().as_child_of(parent)) + .await?; + } + JupyterMessageContent::KernelInfoRequest(_) => { + connection.send(kernel_info().as_child_of(parent)).await?; } + JupyterMessageContent::CommOpen(comm) => { + connection + .send( + messaging::CommClose { + comm_id: comm.comm_id, + data: Default::default(), + } + .as_child_of(parent), + ) + .await?; + } + JupyterMessageContent::HistoryRequest(_req) => { + connection + .send( + messaging::HistoryReply { + history: vec![], + error: None, + status: ReplyStatus::Ok, + } + .as_child_of(parent), + ) + .await?; + } + JupyterMessageContent::InputReply(_rep) => { + // TODO(@zph): implement input reply from https://github.com/denoland/deno/pull/23592 + // NOTE: This will belong on the stdin channel, not the shell channel + } + JupyterMessageContent::CommInfoRequest(_req) => { + connection + .send( + messaging::CommInfoReply { + comms: Default::default(), + status: ReplyStatus::Ok, + error: None, + } + .as_child_of(parent), + ) + .await?; + } + JupyterMessageContent::CommMsg(_) + | JupyterMessageContent::CommClose(_) => { + // Do nothing with regular comm messages + } + // Any unknown message type is ignored _ => { - log::error!("Unrecognized shell message type: {}", msg.message_type()); + log::error!( + "Unrecognized shell message type: {}", + msg.content.message_type() + ); } } self - .send_iopub( - &msg - .new_message("status") - .with_content(json!({"execution_state": "idle"})), - ) + .send_iopub(messaging::Status::idle().as_child_of(parent)) .await?; + Ok(()) } async fn handle_execution_request( &mut self, - msg: JupyterMessage, - connection: &mut Connection<zeromq::RouterSocket>, + execute_request: messaging::ExecuteRequest, + parent_message: &JupyterMessage, + connection: &mut KernelShellConnection, ) -> Result<(), AnyError> { - if !msg.silent() && msg.store_history() { + if !execute_request.silent && execute_request.store_history { self.execution_count += 1; } - *self.last_execution_request.borrow_mut() = Some(msg.clone()); + *self.last_execution_request.borrow_mut() = Some(parent_message.clone()); self - .send_iopub(&msg.new_message("execute_input").with_content(json!({ - "execution_count": self.execution_count, - "code": msg.code() - }))) + .send_iopub( + messaging::ExecuteInput { + execution_count: self.execution_count, + code: execute_request.code.clone(), + } + .as_child_of(parent_message), + ) .await?; let result = self .repl_session - .evaluate_line_with_object_wrapping(msg.code()) + .evaluate_line_with_object_wrapping(&execute_request.code) .await; let evaluate_response = match result { Ok(eval_response) => eval_response, Err(err) => { self - .send_iopub(&msg.new_message("error").with_content(json!({ - "ename": err.to_string(), - "evalue": " ", // Fake value, otherwise old Jupyter frontends don't show the error - "traceback": [], - }))) + .send_iopub( + messaging::ErrorOutput { + ename: err.to_string(), + evalue: err.to_string(), + traceback: vec![], + } + .as_child_of(parent_message), + ) .await?; connection - .send(&msg.new_reply().with_content(json!({ - "status": "error", - "execution_count": self.execution_count, - }))) + .send( + messaging::ExecuteReply { + execution_count: self.execution_count, + status: ReplyStatus::Error, + payload: None, + user_expressions: None, + error: None, + } + .as_child_of(parent_message), + ) .await?; return Ok(()); } @@ -373,11 +476,16 @@ impl JupyterServer { .await?; connection - .send(&msg.new_reply().with_content(json!({ - "status": "ok", - "execution_count": self.execution_count, - // FIXME: also include user_expressions - }))) + .send( + messaging::ExecuteReply { + execution_count: self.execution_count, + status: ReplyStatus::Ok, + user_expressions: None, + payload: None, + error: None, + } + .as_child_of(parent_message), + ) .await?; // Let's sleep here for a few ms, so we give a chance to the task that is // handling stdout and stderr streams to receive and flush the content. @@ -458,17 +566,30 @@ impl JupyterServer { }; self - .send_iopub(&msg.new_message("error").with_content(json!({ - "ename": ename, - "evalue": evalue, - "traceback": traceback, - }))) + .send_iopub( + messaging::ErrorOutput { + ename: ename.clone(), + evalue: evalue.clone(), + traceback: traceback.clone(), + } + .as_child_of(parent_message), + ) .await?; connection - .send(&msg.new_reply().with_content(json!({ - "status": "error", - "execution_count": self.execution_count, - }))) + .send( + messaging::ExecuteReply { + execution_count: self.execution_count, + status: ReplyStatus::Error, + error: Some(ReplyError { + ename, + evalue, + traceback, + }), + user_expressions: None, + payload: None, + } + .as_child_of(parent_message), + ) .await?; } @@ -477,42 +598,35 @@ impl JupyterServer { async fn send_iopub( &mut self, - message: &JupyterMessage, + message: JupyterMessage, ) -> Result<(), AnyError> { - self.iopub_socket.lock().await.send(message).await + self.iopub_connection.lock().await.send(message).await } } -async fn bind_socket<S: zeromq::Socket>( - config: &ConnectionSpec, - port: u32, -) -> Result<Connection<S>, AnyError> { - let endpoint = format!("{}://{}:{}", config.transport, config.ip, port); - let mut socket = S::new(); - socket.bind(&endpoint).await?; - Ok(Connection::new(socket, &config.key)) -} - -fn kernel_info() -> serde_json::Value { - json!({ - "status": "ok", - "protocol_version": "5.3", - "implementation_version": crate::version::deno(), - "implementation": "Deno kernel", - "language_info": { - "name": "typescript", - "version": crate::version::TYPESCRIPT, - "mimetype": "text/x.typescript", - "file_extension": ".ts", - "pygments_lexer": "typescript", - "nb_converter": "script" +fn kernel_info() -> messaging::KernelInfoReply { + messaging::KernelInfoReply { + status: ReplyStatus::Ok, + protocol_version: "5.3".to_string(), + implementation: "Deno kernel".to_string(), + implementation_version: crate::version::deno().to_string(), + language_info: messaging::LanguageInfo { + name: "typescript".to_string(), + version: crate::version::TYPESCRIPT.to_string(), + mimetype: "text/x.typescript".to_string(), + file_extension: ".ts".to_string(), + pygments_lexer: "typescript".to_string(), + codemirror_mode: messaging::CodeMirrorMode::typescript(), + nbconvert_exporter: "script".to_string(), }, - "help_links": [{ - "text": "Visit Deno manual", - "url": "https://deno.land/manual" + banner: "Welcome to Deno kernel".to_string(), + help_links: vec![messaging::HelpLink { + text: "Visit Deno manual".to_string(), + url: "https://deno.land/manual".to_string(), }], - "banner": "Welcome to Deno kernel", - }) + debugger: false, + error: None, + } } async fn publish_result( |