diff options
author | Kyle Kelley <rgbkrk@gmail.com> | 2024-05-21 13:35:21 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-21 22:35:21 +0200 |
commit | 8698e80304815353ec52be1b16f96483ebe559a0 (patch) | |
tree | 9abd53d5b656cd8cc0c1aa3940684f3ce1d9c8ef /cli/tools/jupyter | |
parent | cc8c0609ebec9f101a1739a0c42c91718ca2abba (diff) |
refactor(jupyter): use runtimelib for Jupyter structures and directory paths (#23826)
This brings in [`runtimelib`](https://github.com/runtimed/runtimed) to
use:
## Fully typed structs for Jupyter Messages
```rust
let msg = connection.read().await?;
self
.send_iopub(
runtimelib::Status::busy().as_child_of(msg),
)
.await?;
```
## Jupyter paths
Jupyter paths are implemented in Rust, allowing the Deno kernel to be
installed completely via Deno without a requirement on Python or
Jupyter. Deno users will be able to install and use the kernel with just
VS Code or other editors that support Jupyter.
```rust
pub fn status() -> Result<(), AnyError> {
let user_data_dir = user_data_dir()?;
let kernel_spec_dir_path = user_data_dir.join("kernels").join("deno");
let kernel_spec_path = kernel_spec_dir_path.join("kernel.json");
if kernel_spec_path.exists() {
log::info!("✅ Deno kernel already installed");
Ok(())
} else {
log::warn!("ℹ️ Deno kernel is not yet installed, run `deno jupyter --install` to set it up");
Ok(())
}
}
```
Closes https://github.com/denoland/deno/issues/21619
Diffstat (limited to 'cli/tools/jupyter')
-rw-r--r-- | cli/tools/jupyter/install.rs | 88 | ||||
-rw-r--r-- | cli/tools/jupyter/jupyter_msg.rs | 305 | ||||
-rw-r--r-- | cli/tools/jupyter/mod.rs | 26 | ||||
-rw-r--r-- | cli/tools/jupyter/server.rs | 444 |
4 files changed, 308 insertions, 555 deletions
diff --git a/cli/tools/jupyter/install.rs b/cli/tools/jupyter/install.rs index 69a75837e..40f21d3c1 100644 --- a/cli/tools/jupyter/install.rs +++ b/cli/tools/jupyter/install.rs @@ -1,40 +1,31 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. -use deno_core::anyhow::bail; -use deno_core::anyhow::Context; use deno_core::error::AnyError; use deno_core::serde_json; use deno_core::serde_json::json; use std::env::current_exe; -use std::io::ErrorKind; use std::io::Write; use std::path::Path; -use tempfile::TempDir; + +use runtimelib::dirs::user_data_dir; const DENO_ICON_32: &[u8] = include_bytes!("./resources/deno-logo-32x32.png"); const DENO_ICON_64: &[u8] = include_bytes!("./resources/deno-logo-64x64.png"); const DENO_ICON_SVG: &[u8] = include_bytes!("./resources/deno-logo-svg.svg"); pub fn status() -> Result<(), AnyError> { - let output = std::process::Command::new("jupyter") - .args(["kernelspec", "list", "--json"]) - .output() - .context("Failed to get list of installed kernelspecs")?; - let json_output: serde_json::Value = - serde_json::from_slice(&output.stdout) - .context("Failed to parse JSON from kernelspec list")?; + let user_data_dir = user_data_dir()?; - if let Some(specs) = json_output.get("kernelspecs") { - if let Some(specs_obj) = specs.as_object() { - if specs_obj.contains_key("deno") { - log::info!("✅ Deno kernel already installed"); - return Ok(()); - } - } - } + let kernel_spec_dir_path = user_data_dir.join("kernels").join("deno"); + let kernel_spec_path = kernel_spec_dir_path.join("kernel.json"); - log::warn!("ℹ️ Deno kernel is not yet installed, run `deno jupyter --install` to set it up"); - Ok(()) + if kernel_spec_path.exists() { + log::info!("✅ Deno kernel already installed"); + Ok(()) + } else { + log::warn!("ℹ️ Deno kernel is not yet installed, run `deno jupyter --install` to set it up"); + Ok(()) + } } fn install_icon( @@ -49,8 +40,12 @@ fn install_icon( } pub fn install() -> Result<(), AnyError> { - let temp_dir = TempDir::new().unwrap(); - let kernel_json_path = temp_dir.path().join("kernel.json"); + let user_data_dir = user_data_dir()?; + let kernel_dir = user_data_dir.join("kernels").join("deno"); + + std::fs::create_dir_all(&kernel_dir)?; + + let kernel_json_path = kernel_dir.join("kernel.json"); // TODO(bartlomieju): add remaining fields as per // https://jupyter-client.readthedocs.io/en/stable/kernels.html#kernel-specs @@ -63,51 +58,10 @@ pub fn install() -> Result<(), AnyError> { let f = std::fs::File::create(kernel_json_path)?; serde_json::to_writer_pretty(f, &json_data)?; - install_icon(temp_dir.path(), "logo-32x32.png", DENO_ICON_32)?; - install_icon(temp_dir.path(), "logo-64x64.png", DENO_ICON_64)?; - install_icon(temp_dir.path(), "logo-svg.svg", DENO_ICON_SVG)?; - - let child_result = std::process::Command::new("jupyter") - .args([ - "kernelspec", - "install", - "--user", - "--name", - "deno", - &temp_dir.path().to_string_lossy(), - ]) - .spawn(); - let mut child = match child_result { - Ok(child) => child, - Err(err) - if matches!( - err.kind(), - ErrorKind::NotFound | ErrorKind::PermissionDenied - ) => - { - return Err(err).context(concat!( - "Failed to spawn 'jupyter' command. Is JupyterLab installed ", - "(https://jupyter.org/install) and available on the PATH?" - )); - } - Err(err) => { - return Err(err).context("Failed to spawn 'jupyter' command."); - } - }; - - let wait_result = child.wait(); - match wait_result { - Ok(status) => { - if !status.success() { - bail!("Failed to install kernelspec, try again."); - } - } - Err(err) => { - bail!("Failed to install kernelspec: {}", err); - } - } + install_icon(&user_data_dir, "logo-32x32.png", DENO_ICON_32)?; + install_icon(&user_data_dir, "logo-64x64.png", DENO_ICON_64)?; + install_icon(&user_data_dir, "logo-svg.svg", DENO_ICON_SVG)?; - let _ = std::fs::remove_dir(temp_dir); log::info!("✅ Deno kernelspec installed successfully."); Ok(()) } diff --git a/cli/tools/jupyter/jupyter_msg.rs b/cli/tools/jupyter/jupyter_msg.rs deleted file mode 100644 index 233efcc8e..000000000 --- a/cli/tools/jupyter/jupyter_msg.rs +++ /dev/null @@ -1,305 +0,0 @@ -// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. - -// This file is forked/ported from <https://github.com/evcxr/evcxr> -// Copyright 2020 The Evcxr Authors. MIT license. - -use bytes::Bytes; -use data_encoding::HEXLOWER; -use deno_core::anyhow::anyhow; -use deno_core::anyhow::bail; -use deno_core::error::AnyError; -use deno_core::serde_json; -use deno_core::serde_json::json; -use ring::hmac; -use std::fmt; -use uuid::Uuid; - -use crate::util::time::utc_now; - -pub struct Connection<S> { - socket: S, - /// Will be None if our key was empty (digest authentication disabled). - mac: Option<hmac::Key>, -} - -impl<S: zeromq::Socket> Connection<S> { - pub fn new(socket: S, key: &str) -> Self { - let mac = if key.is_empty() { - None - } else { - Some(hmac::Key::new(hmac::HMAC_SHA256, key.as_bytes())) - }; - Connection { socket, mac } - } -} - -impl<S: zeromq::SocketSend + zeromq::SocketRecv> Connection<S> { - pub async fn single_heartbeat(&mut self) -> Result<(), AnyError> { - self.socket.recv().await?; - self - .socket - .send(zeromq::ZmqMessage::from(b"ping".to_vec())) - .await?; - Ok(()) - } -} - -impl<S: zeromq::SocketRecv> Connection<S> { - pub async fn read(&mut self) -> Result<JupyterMessage, AnyError> { - let multipart = self.socket.recv().await?; - let raw_message = RawMessage::from_multipart(multipart, self.mac.as_ref())?; - JupyterMessage::from_raw_message(raw_message) - } -} - -impl<S: zeromq::SocketSend> Connection<S> { - pub async fn send( - &mut self, - message: &JupyterMessage, - ) -> Result<(), AnyError> { - // If performance is a concern, we can probably avoid the clone and to_vec calls with a bit - // of refactoring. - let mut jparts: Vec<Bytes> = vec![ - serde_json::to_string(&message.header) - .unwrap() - .as_bytes() - .to_vec() - .into(), - serde_json::to_string(&message.parent_header) - .unwrap() - .as_bytes() - .to_vec() - .into(), - serde_json::to_string(&message.metadata) - .unwrap() - .as_bytes() - .to_vec() - .into(), - serde_json::to_string(&message.content) - .unwrap() - .as_bytes() - .to_vec() - .into(), - ]; - jparts.extend_from_slice(&message.buffers); - let raw_message = RawMessage { - zmq_identities: message.zmq_identities.clone(), - jparts, - }; - self.send_raw(raw_message).await - } - - async fn send_raw( - &mut self, - raw_message: RawMessage, - ) -> Result<(), AnyError> { - let hmac = if let Some(key) = &self.mac { - let ctx = digest(key, &raw_message.jparts); - let tag = ctx.sign(); - HEXLOWER.encode(tag.as_ref()) - } else { - String::new() - }; - let mut parts: Vec<bytes::Bytes> = Vec::new(); - for part in &raw_message.zmq_identities { - parts.push(part.to_vec().into()); - } - parts.push(DELIMITER.into()); - parts.push(hmac.as_bytes().to_vec().into()); - for part in &raw_message.jparts { - parts.push(part.to_vec().into()); - } - // ZmqMessage::try_from only fails if parts is empty, which it never - // will be here. - let message = zeromq::ZmqMessage::try_from(parts).unwrap(); - self.socket.send(message).await?; - Ok(()) - } -} - -fn digest(mac: &hmac::Key, jparts: &[Bytes]) -> hmac::Context { - let mut hmac_ctx = hmac::Context::with_key(mac); - for part in jparts { - hmac_ctx.update(part); - } - hmac_ctx -} - -struct RawMessage { - zmq_identities: Vec<Bytes>, - jparts: Vec<Bytes>, -} - -impl RawMessage { - pub fn from_multipart( - multipart: zeromq::ZmqMessage, - mac: Option<&hmac::Key>, - ) -> Result<RawMessage, AnyError> { - let delimiter_index = multipart - .iter() - .position(|part| &part[..] == DELIMITER) - .ok_or_else(|| anyhow!("Missing delimiter"))?; - let mut parts = multipart.into_vec(); - let jparts: Vec<_> = parts.drain(delimiter_index + 2..).collect(); - let expected_hmac = parts.pop().unwrap(); - // Remove delimiter, so that what's left is just the identities. - parts.pop(); - let zmq_identities = parts; - - let raw_message = RawMessage { - zmq_identities, - jparts, - }; - - if let Some(key) = mac { - let sig = HEXLOWER.decode(&expected_hmac)?; - let mut msg = Vec::new(); - for part in &raw_message.jparts { - msg.extend(part); - } - - if let Err(err) = hmac::verify(key, msg.as_ref(), sig.as_ref()) { - bail!("{}", err); - } - } - - Ok(raw_message) - } -} - -#[derive(Clone)] -pub struct JupyterMessage { - zmq_identities: Vec<Bytes>, - header: serde_json::Value, - parent_header: serde_json::Value, - metadata: serde_json::Value, - content: serde_json::Value, - buffers: Vec<Bytes>, -} - -const DELIMITER: &[u8] = b"<IDS|MSG>"; - -impl JupyterMessage { - fn from_raw_message( - raw_message: RawMessage, - ) -> Result<JupyterMessage, AnyError> { - if raw_message.jparts.len() < 4 { - bail!("Insufficient message parts {}", raw_message.jparts.len()); - } - - Ok(JupyterMessage { - zmq_identities: raw_message.zmq_identities, - header: serde_json::from_slice(&raw_message.jparts[0])?, - parent_header: serde_json::from_slice(&raw_message.jparts[1])?, - metadata: serde_json::from_slice(&raw_message.jparts[2])?, - content: serde_json::from_slice(&raw_message.jparts[3])?, - buffers: if raw_message.jparts.len() > 4 { - raw_message.jparts[4..].to_vec() - } else { - vec![] - }, - }) - } - - pub fn message_type(&self) -> &str { - self.header["msg_type"].as_str().unwrap_or("") - } - - pub fn store_history(&self) -> bool { - self.content["store_history"].as_bool().unwrap_or(true) - } - - pub fn silent(&self) -> bool { - self.content["silent"].as_bool().unwrap_or(false) - } - - pub fn code(&self) -> &str { - self.content["code"].as_str().unwrap_or("") - } - - pub fn cursor_pos(&self) -> usize { - self.content["cursor_pos"].as_u64().unwrap_or(0) as usize - } - - pub fn comm_id(&self) -> &str { - self.content["comm_id"].as_str().unwrap_or("") - } - - // Creates a new child message of this message. ZMQ identities are not transferred. - pub fn new_message(&self, msg_type: &str) -> JupyterMessage { - let mut header = self.header.clone(); - header["msg_type"] = serde_json::Value::String(msg_type.to_owned()); - header["username"] = serde_json::Value::String("kernel".to_owned()); - header["msg_id"] = serde_json::Value::String(Uuid::new_v4().to_string()); - header["date"] = serde_json::Value::String(utc_now().to_rfc3339()); - - JupyterMessage { - zmq_identities: Vec::new(), - header, - parent_header: self.header.clone(), - metadata: json!({}), - content: json!({}), - buffers: vec![], - } - } - - // Creates a reply to this message. This is a child with the message type determined - // automatically by replacing "request" with "reply". ZMQ identities are transferred. - pub fn new_reply(&self) -> JupyterMessage { - let mut reply = - self.new_message(&self.message_type().replace("_request", "_reply")); - reply.zmq_identities = self.zmq_identities.clone(); - reply - } - - #[must_use = "Need to send this message for it to have any effect"] - pub fn comm_close_message(&self) -> JupyterMessage { - self.new_message("comm_close").with_content(json!({ - "comm_id": self.comm_id() - })) - } - - pub fn with_content(mut self, content: serde_json::Value) -> JupyterMessage { - self.content = content; - self - } - - pub fn with_metadata( - mut self, - metadata: serde_json::Value, - ) -> JupyterMessage { - self.metadata = metadata; - self - } - - pub fn with_buffers(mut self, buffers: Vec<Bytes>) -> JupyterMessage { - self.buffers = buffers; - self - } -} - -impl fmt::Debug for JupyterMessage { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - writeln!( - f, - "\nHeader: {}", - serde_json::to_string_pretty(&self.header).unwrap() - )?; - writeln!( - f, - "Parent header: {}", - serde_json::to_string_pretty(&self.parent_header).unwrap() - )?; - writeln!( - f, - "Metadata: {}", - serde_json::to_string_pretty(&self.metadata).unwrap() - )?; - writeln!( - f, - "Content: {}\n", - serde_json::to_string_pretty(&self.content).unwrap() - )?; - Ok(()) - } -} diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs index da1c4bc4d..a4d0bb27d 100644 --- a/cli/tools/jupyter/mod.rs +++ b/cli/tools/jupyter/mod.rs @@ -3,7 +3,6 @@ use crate::args::Flags; use crate::args::JupyterFlags; use crate::ops; -use crate::tools::jupyter::server::StdioMsg; use crate::tools::repl; use crate::tools::test::create_single_test_event_channel; use crate::tools::test::reporters::PrettyTestReporter; @@ -15,7 +14,6 @@ use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::located_script_name; use deno_core::resolve_url_or_path; -use deno_core::serde::Deserialize; use deno_core::serde_json; use deno_core::url::Url; use deno_runtime::deno_io::Stdio; @@ -24,11 +22,13 @@ use deno_runtime::permissions::Permissions; use deno_runtime::permissions::PermissionsContainer; use deno_runtime::WorkerExecutionMode; use deno_terminal::colors; + +use runtimelib::jupyter::ConnectionInfo; +use runtimelib::messaging::StreamContent; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; mod install; -pub mod jupyter_msg; pub mod server; pub async fn kernel( @@ -73,7 +73,7 @@ pub async fn kernel( std::fs::read_to_string(&connection_filepath).with_context(|| { format!("Couldn't read connection file: {:?}", connection_filepath) })?; - let spec: ConnectionSpec = + let spec: ConnectionInfo = serde_json::from_str(&conn_file).with_context(|| { format!( "Connection file is not a valid JSON: {:?}", @@ -119,12 +119,14 @@ pub async fn kernel( test_event_receiver, ) .await?; - struct TestWriter(UnboundedSender<StdioMsg>); + struct TestWriter(UnboundedSender<StreamContent>); impl std::io::Write for TestWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { self .0 - .send(StdioMsg::Stdout(String::from_utf8_lossy(buf).into_owned())) + .send(StreamContent::stdout( + String::from_utf8_lossy(buf).into_owned(), + )) .ok(); Ok(buf.len()) } @@ -150,15 +152,3 @@ pub async fn kernel( Ok(()) } - -#[derive(Debug, Deserialize)] -pub struct ConnectionSpec { - ip: String, - transport: String, - control_port: u32, - shell_port: u32, - stdin_port: u32, - hb_port: u32, - iopub_port: u32, - key: String, -} diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs index 4021cf6a3..3d273ee74 100644 --- a/cli/tools/jupyter/server.rs +++ b/cli/tools/jupyter/server.rs @@ -19,48 +19,54 @@ use deno_core::CancelHandle; use tokio::sync::mpsc; use tokio::sync::Mutex; -use super::jupyter_msg::Connection; -use super::jupyter_msg::JupyterMessage; -use super::ConnectionSpec; - -pub enum StdioMsg { - Stdout(String), - Stderr(String), -} +use runtimelib::ConnectionInfo; +use runtimelib::KernelControlConnection; +use runtimelib::KernelHeartbeatConnection; +use runtimelib::KernelIoPubConnection; +use runtimelib::KernelShellConnection; + +use runtimelib::messaging; +use runtimelib::AsChildOf; +use runtimelib::JupyterMessage; +use runtimelib::JupyterMessageContent; +use runtimelib::ReplyError; +use runtimelib::ReplyStatus; +use runtimelib::StreamContent; pub struct JupyterServer { execution_count: usize, last_execution_request: Rc<RefCell<Option<JupyterMessage>>>, // This is Arc<Mutex<>>, so we don't hold RefCell borrows across await // points. - iopub_socket: Arc<Mutex<Connection<zeromq::PubSocket>>>, + iopub_connection: Arc<Mutex<KernelIoPubConnection>>, repl_session: repl::ReplSession, } impl JupyterServer { pub async fn start( - spec: ConnectionSpec, - mut stdio_rx: mpsc::UnboundedReceiver<StdioMsg>, + connection_info: ConnectionInfo, + mut stdio_rx: mpsc::UnboundedReceiver<StreamContent>, mut repl_session: repl::ReplSession, ) -> Result<(), AnyError> { let mut heartbeat = - bind_socket::<zeromq::RepSocket>(&spec, spec.hb_port).await?; - let shell_socket = - bind_socket::<zeromq::RouterSocket>(&spec, spec.shell_port).await?; - let control_socket = - bind_socket::<zeromq::RouterSocket>(&spec, spec.control_port).await?; - let _stdin_socket = - bind_socket::<zeromq::RouterSocket>(&spec, spec.stdin_port).await?; - let iopub_socket = - bind_socket::<zeromq::PubSocket>(&spec, spec.iopub_port).await?; - let iopub_socket = Arc::new(Mutex::new(iopub_socket)); + connection_info.create_kernel_heartbeat_connection().await?; + let shell_connection = + connection_info.create_kernel_shell_connection().await?; + let control_connection = + connection_info.create_kernel_control_connection().await?; + let _stdin_connection = + connection_info.create_kernel_stdin_connection().await?; + let iopub_connection = + connection_info.create_kernel_iopub_connection().await?; + + let iopub_connection = Arc::new(Mutex::new(iopub_connection)); let last_execution_request = Rc::new(RefCell::new(None)); - // Store `iopub_socket` in the op state so it's accessible to the runtime API. + // Store `iopub_connection` in the op state so it's accessible to the runtime API. { let op_state_rc = repl_session.worker.js_runtime.op_state(); let mut op_state = op_state_rc.borrow_mut(); - op_state.put(iopub_socket.clone()); + op_state.put(iopub_connection.clone()); op_state.put(last_execution_request.clone()); } @@ -68,14 +74,18 @@ impl JupyterServer { let mut server = Self { execution_count: 0, - iopub_socket: iopub_socket.clone(), + iopub_connection: iopub_connection.clone(), last_execution_request: last_execution_request.clone(), repl_session, }; let handle1 = deno_core::unsync::spawn(async move { if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await { - log::error!("Heartbeat error: {}", err); + log::error!( + "Heartbeat error: {}\nBacktrace:\n{}", + err, + err.backtrace() + ); } }); @@ -83,23 +93,27 @@ impl JupyterServer { let cancel_handle = cancel_handle.clone(); async move { if let Err(err) = - Self::handle_control(control_socket, cancel_handle).await + Self::handle_control(control_connection, cancel_handle).await { - log::error!("Control error: {}", err); + log::error!( + "Control error: {}\nBacktrace:\n{}", + err, + err.backtrace() + ); } } }); let handle3 = deno_core::unsync::spawn(async move { - if let Err(err) = server.handle_shell(shell_socket).await { - log::error!("Shell error: {}", err); + if let Err(err) = server.handle_shell(shell_connection).await { + log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace()); } }); let handle4 = deno_core::unsync::spawn(async move { while let Some(stdio_msg) = stdio_rx.recv().await { Self::handle_stdio_msg( - iopub_socket.clone(), + iopub_connection.clone(), last_execution_request.clone(), stdio_msg, ) @@ -117,33 +131,25 @@ impl JupyterServer { Ok(()) } - async fn handle_stdio_msg<S: zeromq::SocketSend>( - iopub_socket: Arc<Mutex<Connection<S>>>, + async fn handle_stdio_msg( + iopub_connection: Arc<Mutex<KernelIoPubConnection>>, last_execution_request: Rc<RefCell<Option<JupyterMessage>>>, - stdio_msg: StdioMsg, + stdio_msg: StreamContent, ) { let maybe_exec_result = last_execution_request.borrow().clone(); if let Some(exec_request) = maybe_exec_result { - let (name, text) = match stdio_msg { - StdioMsg::Stdout(text) => ("stdout", text), - StdioMsg::Stderr(text) => ("stderr", text), - }; - - let result = (*iopub_socket.lock().await) - .send(&exec_request.new_message("stream").with_content(json!({ - "name": name, - "text": text - }))) + let result = (iopub_connection.lock().await) + .send(stdio_msg.as_child_of(&exec_request)) .await; if let Err(err) = result { - log::error!("Output {} error: {}", name, err); + log::error!("Output error: {}", err); } } } async fn handle_heartbeat( - connection: &mut Connection<zeromq::RepSocket>, + connection: &mut KernelHeartbeatConnection, ) -> Result<(), AnyError> { loop { connection.single_heartbeat().await?; @@ -151,23 +157,30 @@ impl JupyterServer { } async fn handle_control( - mut connection: Connection<zeromq::RouterSocket>, + mut connection: KernelControlConnection, cancel_handle: Rc<CancelHandle>, ) -> Result<(), AnyError> { loop { let msg = connection.read().await?; - match msg.message_type() { - "kernel_info_request" => { - connection - .send(&msg.new_reply().with_content(kernel_info())) - .await?; + + match msg.content { + JupyterMessageContent::KernelInfoRequest(_) => { + // normally kernel info is sent from the shell channel + // however, some frontends will send it on the control channel + // and it's no harm to send a kernel info reply on control + connection.send(kernel_info().as_child_of(&msg)).await?; } - "shutdown_request" => { + JupyterMessageContent::ShutdownRequest(_) => { cancel_handle.cancel(); } - "interrupt_request" => { + JupyterMessageContent::InterruptRequest(_) => { log::error!("Interrupt request currently not supported"); } + JupyterMessageContent::DebugRequest(_) => { + log::error!("Debug request currently not supported"); + // See https://jupyter-client.readthedocs.io/en/latest/messaging.html#debug-request + // and https://microsoft.github.io/debug-adapter-protocol/ + } _ => { log::error!( "Unrecognized control message type: {}", @@ -180,7 +193,7 @@ impl JupyterServer { async fn handle_shell( &mut self, - mut connection: Connection<zeromq::RouterSocket>, + mut connection: KernelShellConnection, ) -> Result<(), AnyError> { loop { let msg = connection.read().await?; @@ -191,43 +204,28 @@ impl JupyterServer { async fn handle_shell_message( &mut self, msg: JupyterMessage, - connection: &mut Connection<zeromq::RouterSocket>, + connection: &mut KernelShellConnection, ) -> Result<(), AnyError> { + let parent = &msg.clone(); + self - .send_iopub( - &msg - .new_message("status") - .with_content(json!({"execution_state": "busy"})), - ) + .send_iopub(messaging::Status::busy().as_child_of(parent)) .await?; - match msg.message_type() { - "kernel_info_request" => { - connection - .send(&msg.new_reply().with_content(kernel_info())) - .await?; - } - "is_complete_request" => { - connection - .send(&msg.new_reply().with_content(json!({"status": "complete"}))) - .await?; - } - "execute_request" => { + match msg.content { + JupyterMessageContent::ExecuteRequest(execute_request) => { self - .handle_execution_request(msg.clone(), connection) + .handle_execution_request(execute_request, parent, connection) .await?; } - "comm_open" => { - self.send_iopub(&msg.comm_close_message()).await?; - } - "complete_request" => { - let user_code = msg.code(); - let cursor_pos = msg.cursor_pos(); + JupyterMessageContent::CompleteRequest(req) => { + let user_code = req.code; + let cursor_pos = req.cursor_pos; let lsp_completions = self .repl_session .language_server - .completions(user_code, cursor_pos) + .completions(&user_code, cursor_pos) .await; if !lsp_completions.is_empty() { @@ -247,16 +245,20 @@ impl JupyterServer { .unwrap_or(cursor_pos); connection - .send(&msg.new_reply().with_content(json!({ - "status": "ok", - "matches": matches, - "cursor_start": cursor_start, - "cursor_end": cursor_end, - "metadata": {}, - }))) + .send( + messaging::CompleteReply { + matches, + cursor_start, + cursor_end, + metadata: Default::default(), + status: ReplyStatus::Ok, + error: None, + } + .as_child_of(parent), + ) .await?; } else { - let expr = get_expr_from_line_at_pos(user_code, cursor_pos); + let expr = get_expr_from_line_at_pos(&user_code, cursor_pos); // check if the expression is in the form `obj.prop` let (completions, cursor_start) = if let Some(index) = expr.rfind('.') { @@ -292,72 +294,173 @@ impl JupyterServer { (candidates, cursor_pos - expr.len()) }; + connection - .send(&msg.new_reply().with_content(json!({ - "status": "ok", - "matches": completions, - "cursor_start": cursor_start, - "cursor_end": cursor_pos, - "metadata": {}, - }))) + .send( + messaging::CompleteReply { + matches: completions, + cursor_start, + cursor_end: cursor_pos, + metadata: Default::default(), + status: ReplyStatus::Ok, + error: None, + } + .as_child_of(parent), + ) .await?; } } - "comm_msg" | "comm_info_request" | "history_request" => { - // We don't handle these messages + + JupyterMessageContent::InspectRequest(_req) => { + // TODO(bartlomieju?): implement introspection request + // The inspect request is used to get information about an object at cursor position. + // There are two detail levels: 0 is typically documentation, 1 is typically source code + + // The response includes a MimeBundle to render the object: + // { + // "status": "ok", + // "found": true, + // "data": { + // "text/plain": "Plain documentation here", + // "text/html": "<div>Rich documentation here</div>", + // "application/json": { + // "key1": "value1", + // "key2": "value2" + // } + // }, + // } + + connection + .send( + messaging::InspectReply { + status: ReplyStatus::Ok, + found: false, + data: Default::default(), + metadata: Default::default(), + error: None, + } + .as_child_of(parent), + ) + .await?; + } + + JupyterMessageContent::IsCompleteRequest(_) => { + connection + .send(messaging::IsCompleteReply::complete().as_child_of(parent)) + .await?; + } + JupyterMessageContent::KernelInfoRequest(_) => { + connection.send(kernel_info().as_child_of(parent)).await?; } + JupyterMessageContent::CommOpen(comm) => { + connection + .send( + messaging::CommClose { + comm_id: comm.comm_id, + data: Default::default(), + } + .as_child_of(parent), + ) + .await?; + } + JupyterMessageContent::HistoryRequest(_req) => { + connection + .send( + messaging::HistoryReply { + history: vec![], + error: None, + status: ReplyStatus::Ok, + } + .as_child_of(parent), + ) + .await?; + } + JupyterMessageContent::InputReply(_rep) => { + // TODO(@zph): implement input reply from https://github.com/denoland/deno/pull/23592 + // NOTE: This will belong on the stdin channel, not the shell channel + } + JupyterMessageContent::CommInfoRequest(_req) => { + connection + .send( + messaging::CommInfoReply { + comms: Default::default(), + status: ReplyStatus::Ok, + error: None, + } + .as_child_of(parent), + ) + .await?; + } + JupyterMessageContent::CommMsg(_) + | JupyterMessageContent::CommClose(_) => { + // Do nothing with regular comm messages + } + // Any unknown message type is ignored _ => { - log::error!("Unrecognized shell message type: {}", msg.message_type()); + log::error!( + "Unrecognized shell message type: {}", + msg.content.message_type() + ); } } self - .send_iopub( - &msg - .new_message("status") - .with_content(json!({"execution_state": "idle"})), - ) + .send_iopub(messaging::Status::idle().as_child_of(parent)) .await?; + Ok(()) } async fn handle_execution_request( &mut self, - msg: JupyterMessage, - connection: &mut Connection<zeromq::RouterSocket>, + execute_request: messaging::ExecuteRequest, + parent_message: &JupyterMessage, + connection: &mut KernelShellConnection, ) -> Result<(), AnyError> { - if !msg.silent() && msg.store_history() { + if !execute_request.silent && execute_request.store_history { self.execution_count += 1; } - *self.last_execution_request.borrow_mut() = Some(msg.clone()); + *self.last_execution_request.borrow_mut() = Some(parent_message.clone()); self - .send_iopub(&msg.new_message("execute_input").with_content(json!({ - "execution_count": self.execution_count, - "code": msg.code() - }))) + .send_iopub( + messaging::ExecuteInput { + execution_count: self.execution_count, + code: execute_request.code.clone(), + } + .as_child_of(parent_message), + ) .await?; let result = self .repl_session - .evaluate_line_with_object_wrapping(msg.code()) + .evaluate_line_with_object_wrapping(&execute_request.code) .await; let evaluate_response = match result { Ok(eval_response) => eval_response, Err(err) => { self - .send_iopub(&msg.new_message("error").with_content(json!({ - "ename": err.to_string(), - "evalue": " ", // Fake value, otherwise old Jupyter frontends don't show the error - "traceback": [], - }))) + .send_iopub( + messaging::ErrorOutput { + ename: err.to_string(), + evalue: err.to_string(), + traceback: vec![], + } + .as_child_of(parent_message), + ) .await?; connection - .send(&msg.new_reply().with_content(json!({ - "status": "error", - "execution_count": self.execution_count, - }))) + .send( + messaging::ExecuteReply { + execution_count: self.execution_count, + status: ReplyStatus::Error, + payload: None, + user_expressions: None, + error: None, + } + .as_child_of(parent_message), + ) .await?; return Ok(()); } @@ -373,11 +476,16 @@ impl JupyterServer { .await?; connection - .send(&msg.new_reply().with_content(json!({ - "status": "ok", - "execution_count": self.execution_count, - // FIXME: also include user_expressions - }))) + .send( + messaging::ExecuteReply { + execution_count: self.execution_count, + status: ReplyStatus::Ok, + user_expressions: None, + payload: None, + error: None, + } + .as_child_of(parent_message), + ) .await?; // Let's sleep here for a few ms, so we give a chance to the task that is // handling stdout and stderr streams to receive and flush the content. @@ -458,17 +566,30 @@ impl JupyterServer { }; self - .send_iopub(&msg.new_message("error").with_content(json!({ - "ename": ename, - "evalue": evalue, - "traceback": traceback, - }))) + .send_iopub( + messaging::ErrorOutput { + ename: ename.clone(), + evalue: evalue.clone(), + traceback: traceback.clone(), + } + .as_child_of(parent_message), + ) .await?; connection - .send(&msg.new_reply().with_content(json!({ - "status": "error", - "execution_count": self.execution_count, - }))) + .send( + messaging::ExecuteReply { + execution_count: self.execution_count, + status: ReplyStatus::Error, + error: Some(ReplyError { + ename, + evalue, + traceback, + }), + user_expressions: None, + payload: None, + } + .as_child_of(parent_message), + ) .await?; } @@ -477,42 +598,35 @@ impl JupyterServer { async fn send_iopub( &mut self, - message: &JupyterMessage, + message: JupyterMessage, ) -> Result<(), AnyError> { - self.iopub_socket.lock().await.send(message).await + self.iopub_connection.lock().await.send(message).await } } -async fn bind_socket<S: zeromq::Socket>( - config: &ConnectionSpec, - port: u32, -) -> Result<Connection<S>, AnyError> { - let endpoint = format!("{}://{}:{}", config.transport, config.ip, port); - let mut socket = S::new(); - socket.bind(&endpoint).await?; - Ok(Connection::new(socket, &config.key)) -} - -fn kernel_info() -> serde_json::Value { - json!({ - "status": "ok", - "protocol_version": "5.3", - "implementation_version": crate::version::deno(), - "implementation": "Deno kernel", - "language_info": { - "name": "typescript", - "version": crate::version::TYPESCRIPT, - "mimetype": "text/x.typescript", - "file_extension": ".ts", - "pygments_lexer": "typescript", - "nb_converter": "script" +fn kernel_info() -> messaging::KernelInfoReply { + messaging::KernelInfoReply { + status: ReplyStatus::Ok, + protocol_version: "5.3".to_string(), + implementation: "Deno kernel".to_string(), + implementation_version: crate::version::deno().to_string(), + language_info: messaging::LanguageInfo { + name: "typescript".to_string(), + version: crate::version::TYPESCRIPT.to_string(), + mimetype: "text/x.typescript".to_string(), + file_extension: ".ts".to_string(), + pygments_lexer: "typescript".to_string(), + codemirror_mode: messaging::CodeMirrorMode::typescript(), + nbconvert_exporter: "script".to_string(), }, - "help_links": [{ - "text": "Visit Deno manual", - "url": "https://deno.land/manual" + banner: "Welcome to Deno kernel".to_string(), + help_links: vec![messaging::HelpLink { + text: "Visit Deno manual".to_string(), + url: "https://deno.land/manual".to_string(), }], - "banner": "Welcome to Deno kernel", - }) + debugger: false, + error: None, + } } async fn publish_result( |