summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKyle Kelley <rgbkrk@gmail.com>2024-05-21 13:35:21 -0700
committerGitHub <noreply@github.com>2024-05-21 22:35:21 +0200
commit8698e80304815353ec52be1b16f96483ebe559a0 (patch)
tree9abd53d5b656cd8cc0c1aa3940684f3ce1d9c8ef
parentcc8c0609ebec9f101a1739a0c42c91718ca2abba (diff)
refactor(jupyter): use runtimelib for Jupyter structures and directory paths (#23826)
This brings in [`runtimelib`](https://github.com/runtimed/runtimed) to use: ## Fully typed structs for Jupyter Messages ```rust let msg = connection.read().await?; self .send_iopub( runtimelib::Status::busy().as_child_of(msg), ) .await?; ``` ## Jupyter paths Jupyter paths are implemented in Rust, allowing the Deno kernel to be installed completely via Deno without a requirement on Python or Jupyter. Deno users will be able to install and use the kernel with just VS Code or other editors that support Jupyter. ```rust pub fn status() -> Result<(), AnyError> { let user_data_dir = user_data_dir()?; let kernel_spec_dir_path = user_data_dir.join("kernels").join("deno"); let kernel_spec_path = kernel_spec_dir_path.join("kernel.json"); if kernel_spec_path.exists() { log::info!("✅ Deno kernel already installed"); Ok(()) } else { log::warn!("ℹ️ Deno kernel is not yet installed, run `deno jupyter --install` to set it up"); Ok(()) } } ``` Closes https://github.com/denoland/deno/issues/21619
-rw-r--r--Cargo.lock120
-rw-r--r--cli/Cargo.toml1
-rw-r--r--cli/ops/jupyter.rs54
-rw-r--r--cli/tools/jupyter/install.rs88
-rw-r--r--cli/tools/jupyter/jupyter_msg.rs305
-rw-r--r--cli/tools/jupyter/mod.rs26
-rw-r--r--cli/tools/jupyter/server.rs444
-rw-r--r--tests/integration/jupyter_tests.rs10
-rw-r--r--tests/specs/jupyter/install_command/__test__.jsonc (renamed from tests/specs/jupyter/install_command_not_exists/__test__.jsonc)4
-rw-r--r--tests/specs/jupyter/install_command/install_command.out2
-rw-r--r--tests/specs/jupyter/install_command_not_exists/install_command_not_exists.out5
11 files changed, 460 insertions, 599 deletions
diff --git a/Cargo.lock b/Cargo.lock
index f571d1a64..37b3f500c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -384,6 +384,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
+name = "base64"
+version = "0.22.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
+
+[[package]]
name = "base64-simd"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1071,7 +1077,7 @@ version = "1.43.5"
dependencies = [
"async-trait",
"base32",
- "base64",
+ "base64 0.21.7",
"bincode",
"bytes",
"cache_control",
@@ -1138,6 +1144,7 @@ dependencies = [
"regex",
"reqwest",
"ring",
+ "runtimelib",
"rustyline",
"rustyline-derive",
"serde",
@@ -1172,7 +1179,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "584547d27786a734536fde7088f8429d355569c39410427be44695c300618408"
dependencies = [
"anyhow",
- "base64",
+ "base64 0.21.7",
"deno_media_type",
"deno_terminal",
"dprint-swc-ext",
@@ -1350,7 +1357,7 @@ dependencies = [
"aes",
"aes-gcm",
"aes-kw",
- "base64",
+ "base64 0.21.7",
"cbc",
"const-oid",
"ctr",
@@ -1408,7 +1415,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80b80fef2bf1b6e14712633975f7f39a3b29b95a5769cafcb959ffa1a84b7680"
dependencies = [
"anyhow",
- "base64",
+ "base64 0.21.7",
"deno_ast",
"deno_graph",
"escape8259",
@@ -1504,7 +1511,7 @@ version = "0.150.0"
dependencies = [
"async-compression",
"async-trait",
- "base64",
+ "base64 0.21.7",
"bencher",
"brotli 3.5.0",
"bytes",
@@ -1559,7 +1566,7 @@ version = "0.60.0"
dependencies = [
"anyhow",
"async-trait",
- "base64",
+ "base64 0.21.7",
"chrono",
"deno_core",
"deno_fetch",
@@ -2157,6 +2164,27 @@ dependencies = [
]
[[package]]
+name = "dirs"
+version = "5.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225"
+dependencies = [
+ "dirs-sys",
+]
+
+[[package]]
+name = "dirs-sys"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c"
+dependencies = [
+ "libc",
+ "option-ext",
+ "redox_users",
+ "windows-sys 0.48.0",
+]
+
+[[package]]
name = "displaydoc"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2513,7 +2541,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f606daca1ce18c69ccdabc59aa1c7e077356b8ffcd74e12c7646f545320a2fd"
dependencies = [
"anyhow",
- "base64",
+ "base64 0.21.7",
"deno_ast",
"deno_graph",
"deno_npm",
@@ -2570,7 +2598,7 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63dd7b57f9b33b1741fa631c9522eb35d43e96dcca4a6a91d5e4ca7c93acdc1"
dependencies = [
- "base64",
+ "base64 0.21.7",
"http-body-util",
"hyper 1.1.0",
"hyper-util",
@@ -3826,6 +3854,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
[[package]]
+name = "libredox"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d"
+dependencies = [
+ "bitflags 2.5.0",
+ "libc",
+]
+
+[[package]]
name = "libsqlite3-sys"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4383,6 +4421,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
+name = "option-ext"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
+
+[[package]]
name = "ordered-float"
version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -5123,6 +5167,17 @@ dependencies = [
]
[[package]]
+name = "redox_users"
+version = "0.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891"
+dependencies = [
+ "getrandom",
+ "libredox",
+ "thiserror",
+]
+
+[[package]]
name = "ref-cast"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -5184,7 +5239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1"
dependencies = [
"async-compression",
- "base64",
+ "base64 0.21.7",
"bytes",
"encoding_rs",
"futures-core",
@@ -5270,7 +5325,7 @@ version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94"
dependencies = [
- "base64",
+ "base64 0.21.7",
"bitflags 2.5.0",
"serde",
"serde_derive",
@@ -5297,6 +5352,29 @@ dependencies = [
]
[[package]]
+name = "runtimelib"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4300b46ab6f2970f81c176f4f2f7ff0a48809f52be7a8fd4ca5a32e9002f6e8f"
+dependencies = [
+ "anyhow",
+ "base64 0.22.1",
+ "bytes",
+ "chrono",
+ "data-encoding",
+ "dirs",
+ "glob",
+ "rand",
+ "ring",
+ "serde",
+ "serde_json",
+ "shellexpand",
+ "tokio",
+ "uuid",
+ "zeromq",
+]
+
+[[package]]
name = "rusqlite"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -5392,7 +5470,7 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c"
dependencies = [
- "base64",
+ "base64 0.21.7",
]
[[package]]
@@ -5707,6 +5785,12 @@ dependencies = [
]
[[package]]
+name = "sha1_smol"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
+
+[[package]]
name = "sha2"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -5724,6 +5808,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f"
[[package]]
+name = "shellexpand"
+version = "3.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "da03fa3b94cc19e3ebfc88c4229c49d8f08cdbd1228870a45f0ffdf84988e14b"
+dependencies = [
+ "dirs",
+]
+
+[[package]]
name = "signal-hook"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -6298,7 +6391,7 @@ version = "0.184.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "565a76c4ca47ce31d78301c0beab878e4c2cb4f624691254d834ec8c0e236755"
dependencies = [
- "base64",
+ "base64 0.21.7",
"dashmap",
"indexmap",
"once_cell",
@@ -6561,7 +6654,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
- "base64",
+ "base64 0.21.7",
"bytes",
"console_static_text",
"deno_unsync",
@@ -7193,6 +7286,7 @@ checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
dependencies = [
"getrandom",
"serde",
+ "sha1_smol",
]
[[package]]
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 2bc2d4c17..a7856b8d7 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -129,6 +129,7 @@ rand = { workspace = true, features = ["small_rng"] }
regex.workspace = true
reqwest.workspace = true
ring.workspace = true
+runtimelib = "=0.9.0"
rustyline.workspace = true
rustyline-derive = "=0.7.0"
serde.workspace = true
diff --git a/cli/ops/jupyter.rs b/cli/ops/jupyter.rs
index 1c60bc2bc..57ca93ff4 100644
--- a/cli/ops/jupyter.rs
+++ b/cli/ops/jupyter.rs
@@ -4,9 +4,11 @@ use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
-use crate::tools::jupyter::jupyter_msg::Connection;
-use crate::tools::jupyter::jupyter_msg::JupyterMessage;
-use crate::tools::jupyter::server::StdioMsg;
+use runtimelib::JupyterMessage;
+use runtimelib::JupyterMessageContent;
+use runtimelib::KernelIoPubConnection;
+use runtimelib::StreamContent;
+
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::serde_json;
@@ -19,7 +21,7 @@ deno_core::extension!(deno_jupyter,
op_jupyter_broadcast,
],
options = {
- sender: mpsc::UnboundedSender<StdioMsg>,
+ sender: mpsc::UnboundedSender<StreamContent>,
},
middleware = |op| match op.name {
"op_print" => op_print(),
@@ -38,28 +40,40 @@ pub async fn op_jupyter_broadcast(
#[serde] metadata: serde_json::Value,
#[serde] buffers: Vec<deno_core::JsBuffer>,
) -> Result<(), AnyError> {
- let (iopub_socket, last_execution_request) = {
+ let (iopub_connection, last_execution_request) = {
let s = state.borrow();
(
- s.borrow::<Arc<Mutex<Connection<zeromq::PubSocket>>>>()
- .clone(),
+ s.borrow::<Arc<Mutex<KernelIoPubConnection>>>().clone(),
s.borrow::<Rc<RefCell<Option<JupyterMessage>>>>().clone(),
)
};
let maybe_last_request = last_execution_request.borrow().clone();
if let Some(last_request) = maybe_last_request {
- (*iopub_socket.lock().await)
- .send(
- &last_request
- .new_message(&message_type)
- .with_content(content)
- .with_metadata(metadata)
- .with_buffers(
- buffers.into_iter().map(|b| b.to_vec().into()).collect(),
- ),
- )
+ let content = JupyterMessageContent::from_type_and_content(
+ &message_type,
+ content.clone(),
+ )
+ .map_err(|err| {
+ log::error!(
+ "Error deserializing content from jupyter.broadcast, message_type: {}:\n\n{}\n\n{}",
+ &message_type,
+ content,
+ err
+ );
+ err
+ })?;
+
+ let mut jupyter_message = JupyterMessage::new(content, Some(&last_request));
+
+ jupyter_message.metadata = metadata;
+ jupyter_message.buffers =
+ buffers.into_iter().map(|b| b.to_vec().into()).collect();
+ jupyter_message.set_parent(last_request);
+
+ (iopub_connection.lock().await)
+ .send(jupyter_message)
.await?;
}
@@ -72,16 +86,16 @@ pub fn op_print(
#[string] msg: &str,
is_err: bool,
) -> Result<(), AnyError> {
- let sender = state.borrow_mut::<mpsc::UnboundedSender<StdioMsg>>();
+ let sender = state.borrow_mut::<mpsc::UnboundedSender<StreamContent>>();
if is_err {
- if let Err(err) = sender.send(StdioMsg::Stderr(msg.into())) {
+ if let Err(err) = sender.send(StreamContent::stderr(msg.into())) {
log::error!("Failed to send stderr message: {}", err);
}
return Ok(());
}
- if let Err(err) = sender.send(StdioMsg::Stdout(msg.into())) {
+ if let Err(err) = sender.send(StreamContent::stdout(msg.into())) {
log::error!("Failed to send stdout message: {}", err);
}
Ok(())
diff --git a/cli/tools/jupyter/install.rs b/cli/tools/jupyter/install.rs
index 69a75837e..40f21d3c1 100644
--- a/cli/tools/jupyter/install.rs
+++ b/cli/tools/jupyter/install.rs
@@ -1,40 +1,31 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
-use deno_core::anyhow::bail;
-use deno_core::anyhow::Context;
use deno_core::error::AnyError;
use deno_core::serde_json;
use deno_core::serde_json::json;
use std::env::current_exe;
-use std::io::ErrorKind;
use std::io::Write;
use std::path::Path;
-use tempfile::TempDir;
+
+use runtimelib::dirs::user_data_dir;
const DENO_ICON_32: &[u8] = include_bytes!("./resources/deno-logo-32x32.png");
const DENO_ICON_64: &[u8] = include_bytes!("./resources/deno-logo-64x64.png");
const DENO_ICON_SVG: &[u8] = include_bytes!("./resources/deno-logo-svg.svg");
pub fn status() -> Result<(), AnyError> {
- let output = std::process::Command::new("jupyter")
- .args(["kernelspec", "list", "--json"])
- .output()
- .context("Failed to get list of installed kernelspecs")?;
- let json_output: serde_json::Value =
- serde_json::from_slice(&output.stdout)
- .context("Failed to parse JSON from kernelspec list")?;
+ let user_data_dir = user_data_dir()?;
- if let Some(specs) = json_output.get("kernelspecs") {
- if let Some(specs_obj) = specs.as_object() {
- if specs_obj.contains_key("deno") {
- log::info!("✅ Deno kernel already installed");
- return Ok(());
- }
- }
- }
+ let kernel_spec_dir_path = user_data_dir.join("kernels").join("deno");
+ let kernel_spec_path = kernel_spec_dir_path.join("kernel.json");
- log::warn!("ℹ️ Deno kernel is not yet installed, run `deno jupyter --install` to set it up");
- Ok(())
+ if kernel_spec_path.exists() {
+ log::info!("✅ Deno kernel already installed");
+ Ok(())
+ } else {
+ log::warn!("ℹ️ Deno kernel is not yet installed, run `deno jupyter --install` to set it up");
+ Ok(())
+ }
}
fn install_icon(
@@ -49,8 +40,12 @@ fn install_icon(
}
pub fn install() -> Result<(), AnyError> {
- let temp_dir = TempDir::new().unwrap();
- let kernel_json_path = temp_dir.path().join("kernel.json");
+ let user_data_dir = user_data_dir()?;
+ let kernel_dir = user_data_dir.join("kernels").join("deno");
+
+ std::fs::create_dir_all(&kernel_dir)?;
+
+ let kernel_json_path = kernel_dir.join("kernel.json");
// TODO(bartlomieju): add remaining fields as per
// https://jupyter-client.readthedocs.io/en/stable/kernels.html#kernel-specs
@@ -63,51 +58,10 @@ pub fn install() -> Result<(), AnyError> {
let f = std::fs::File::create(kernel_json_path)?;
serde_json::to_writer_pretty(f, &json_data)?;
- install_icon(temp_dir.path(), "logo-32x32.png", DENO_ICON_32)?;
- install_icon(temp_dir.path(), "logo-64x64.png", DENO_ICON_64)?;
- install_icon(temp_dir.path(), "logo-svg.svg", DENO_ICON_SVG)?;
-
- let child_result = std::process::Command::new("jupyter")
- .args([
- "kernelspec",
- "install",
- "--user",
- "--name",
- "deno",
- &temp_dir.path().to_string_lossy(),
- ])
- .spawn();
- let mut child = match child_result {
- Ok(child) => child,
- Err(err)
- if matches!(
- err.kind(),
- ErrorKind::NotFound | ErrorKind::PermissionDenied
- ) =>
- {
- return Err(err).context(concat!(
- "Failed to spawn 'jupyter' command. Is JupyterLab installed ",
- "(https://jupyter.org/install) and available on the PATH?"
- ));
- }
- Err(err) => {
- return Err(err).context("Failed to spawn 'jupyter' command.");
- }
- };
-
- let wait_result = child.wait();
- match wait_result {
- Ok(status) => {
- if !status.success() {
- bail!("Failed to install kernelspec, try again.");
- }
- }
- Err(err) => {
- bail!("Failed to install kernelspec: {}", err);
- }
- }
+ install_icon(&user_data_dir, "logo-32x32.png", DENO_ICON_32)?;
+ install_icon(&user_data_dir, "logo-64x64.png", DENO_ICON_64)?;
+ install_icon(&user_data_dir, "logo-svg.svg", DENO_ICON_SVG)?;
- let _ = std::fs::remove_dir(temp_dir);
log::info!("✅ Deno kernelspec installed successfully.");
Ok(())
}
diff --git a/cli/tools/jupyter/jupyter_msg.rs b/cli/tools/jupyter/jupyter_msg.rs
deleted file mode 100644
index 233efcc8e..000000000
--- a/cli/tools/jupyter/jupyter_msg.rs
+++ /dev/null
@@ -1,305 +0,0 @@
-// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
-
-// This file is forked/ported from <https://github.com/evcxr/evcxr>
-// Copyright 2020 The Evcxr Authors. MIT license.
-
-use bytes::Bytes;
-use data_encoding::HEXLOWER;
-use deno_core::anyhow::anyhow;
-use deno_core::anyhow::bail;
-use deno_core::error::AnyError;
-use deno_core::serde_json;
-use deno_core::serde_json::json;
-use ring::hmac;
-use std::fmt;
-use uuid::Uuid;
-
-use crate::util::time::utc_now;
-
-pub struct Connection<S> {
- socket: S,
- /// Will be None if our key was empty (digest authentication disabled).
- mac: Option<hmac::Key>,
-}
-
-impl<S: zeromq::Socket> Connection<S> {
- pub fn new(socket: S, key: &str) -> Self {
- let mac = if key.is_empty() {
- None
- } else {
- Some(hmac::Key::new(hmac::HMAC_SHA256, key.as_bytes()))
- };
- Connection { socket, mac }
- }
-}
-
-impl<S: zeromq::SocketSend + zeromq::SocketRecv> Connection<S> {
- pub async fn single_heartbeat(&mut self) -> Result<(), AnyError> {
- self.socket.recv().await?;
- self
- .socket
- .send(zeromq::ZmqMessage::from(b"ping".to_vec()))
- .await?;
- Ok(())
- }
-}
-
-impl<S: zeromq::SocketRecv> Connection<S> {
- pub async fn read(&mut self) -> Result<JupyterMessage, AnyError> {
- let multipart = self.socket.recv().await?;
- let raw_message = RawMessage::from_multipart(multipart, self.mac.as_ref())?;
- JupyterMessage::from_raw_message(raw_message)
- }
-}
-
-impl<S: zeromq::SocketSend> Connection<S> {
- pub async fn send(
- &mut self,
- message: &JupyterMessage,
- ) -> Result<(), AnyError> {
- // If performance is a concern, we can probably avoid the clone and to_vec calls with a bit
- // of refactoring.
- let mut jparts: Vec<Bytes> = vec![
- serde_json::to_string(&message.header)
- .unwrap()
- .as_bytes()
- .to_vec()
- .into(),
- serde_json::to_string(&message.parent_header)
- .unwrap()
- .as_bytes()
- .to_vec()
- .into(),
- serde_json::to_string(&message.metadata)
- .unwrap()
- .as_bytes()
- .to_vec()
- .into(),
- serde_json::to_string(&message.content)
- .unwrap()
- .as_bytes()
- .to_vec()
- .into(),
- ];
- jparts.extend_from_slice(&message.buffers);
- let raw_message = RawMessage {
- zmq_identities: message.zmq_identities.clone(),
- jparts,
- };
- self.send_raw(raw_message).await
- }
-
- async fn send_raw(
- &mut self,
- raw_message: RawMessage,
- ) -> Result<(), AnyError> {
- let hmac = if let Some(key) = &self.mac {
- let ctx = digest(key, &raw_message.jparts);
- let tag = ctx.sign();
- HEXLOWER.encode(tag.as_ref())
- } else {
- String::new()
- };
- let mut parts: Vec<bytes::Bytes> = Vec::new();
- for part in &raw_message.zmq_identities {
- parts.push(part.to_vec().into());
- }
- parts.push(DELIMITER.into());
- parts.push(hmac.as_bytes().to_vec().into());
- for part in &raw_message.jparts {
- parts.push(part.to_vec().into());
- }
- // ZmqMessage::try_from only fails if parts is empty, which it never
- // will be here.
- let message = zeromq::ZmqMessage::try_from(parts).unwrap();
- self.socket.send(message).await?;
- Ok(())
- }
-}
-
-fn digest(mac: &hmac::Key, jparts: &[Bytes]) -> hmac::Context {
- let mut hmac_ctx = hmac::Context::with_key(mac);
- for part in jparts {
- hmac_ctx.update(part);
- }
- hmac_ctx
-}
-
-struct RawMessage {
- zmq_identities: Vec<Bytes>,
- jparts: Vec<Bytes>,
-}
-
-impl RawMessage {
- pub fn from_multipart(
- multipart: zeromq::ZmqMessage,
- mac: Option<&hmac::Key>,
- ) -> Result<RawMessage, AnyError> {
- let delimiter_index = multipart
- .iter()
- .position(|part| &part[..] == DELIMITER)
- .ok_or_else(|| anyhow!("Missing delimiter"))?;
- let mut parts = multipart.into_vec();
- let jparts: Vec<_> = parts.drain(delimiter_index + 2..).collect();
- let expected_hmac = parts.pop().unwrap();
- // Remove delimiter, so that what's left is just the identities.
- parts.pop();
- let zmq_identities = parts;
-
- let raw_message = RawMessage {
- zmq_identities,
- jparts,
- };
-
- if let Some(key) = mac {
- let sig = HEXLOWER.decode(&expected_hmac)?;
- let mut msg = Vec::new();
- for part in &raw_message.jparts {
- msg.extend(part);
- }
-
- if let Err(err) = hmac::verify(key, msg.as_ref(), sig.as_ref()) {
- bail!("{}", err);
- }
- }
-
- Ok(raw_message)
- }
-}
-
-#[derive(Clone)]
-pub struct JupyterMessage {
- zmq_identities: Vec<Bytes>,
- header: serde_json::Value,
- parent_header: serde_json::Value,
- metadata: serde_json::Value,
- content: serde_json::Value,
- buffers: Vec<Bytes>,
-}
-
-const DELIMITER: &[u8] = b"<IDS|MSG>";
-
-impl JupyterMessage {
- fn from_raw_message(
- raw_message: RawMessage,
- ) -> Result<JupyterMessage, AnyError> {
- if raw_message.jparts.len() < 4 {
- bail!("Insufficient message parts {}", raw_message.jparts.len());
- }
-
- Ok(JupyterMessage {
- zmq_identities: raw_message.zmq_identities,
- header: serde_json::from_slice(&raw_message.jparts[0])?,
- parent_header: serde_json::from_slice(&raw_message.jparts[1])?,
- metadata: serde_json::from_slice(&raw_message.jparts[2])?,
- content: serde_json::from_slice(&raw_message.jparts[3])?,
- buffers: if raw_message.jparts.len() > 4 {
- raw_message.jparts[4..].to_vec()
- } else {
- vec![]
- },
- })
- }
-
- pub fn message_type(&self) -> &str {
- self.header["msg_type"].as_str().unwrap_or("")
- }
-
- pub fn store_history(&self) -> bool {
- self.content["store_history"].as_bool().unwrap_or(true)
- }
-
- pub fn silent(&self) -> bool {
- self.content["silent"].as_bool().unwrap_or(false)
- }
-
- pub fn code(&self) -> &str {
- self.content["code"].as_str().unwrap_or("")
- }
-
- pub fn cursor_pos(&self) -> usize {
- self.content["cursor_pos"].as_u64().unwrap_or(0) as usize
- }
-
- pub fn comm_id(&self) -> &str {
- self.content["comm_id"].as_str().unwrap_or("")
- }
-
- // Creates a new child message of this message. ZMQ identities are not transferred.
- pub fn new_message(&self, msg_type: &str) -> JupyterMessage {
- let mut header = self.header.clone();
- header["msg_type"] = serde_json::Value::String(msg_type.to_owned());
- header["username"] = serde_json::Value::String("kernel".to_owned());
- header["msg_id"] = serde_json::Value::String(Uuid::new_v4().to_string());
- header["date"] = serde_json::Value::String(utc_now().to_rfc3339());
-
- JupyterMessage {
- zmq_identities: Vec::new(),
- header,
- parent_header: self.header.clone(),
- metadata: json!({}),
- content: json!({}),
- buffers: vec![],
- }
- }
-
- // Creates a reply to this message. This is a child with the message type determined
- // automatically by replacing "request" with "reply". ZMQ identities are transferred.
- pub fn new_reply(&self) -> JupyterMessage {
- let mut reply =
- self.new_message(&self.message_type().replace("_request", "_reply"));
- reply.zmq_identities = self.zmq_identities.clone();
- reply
- }
-
- #[must_use = "Need to send this message for it to have any effect"]
- pub fn comm_close_message(&self) -> JupyterMessage {
- self.new_message("comm_close").with_content(json!({
- "comm_id": self.comm_id()
- }))
- }
-
- pub fn with_content(mut self, content: serde_json::Value) -> JupyterMessage {
- self.content = content;
- self
- }
-
- pub fn with_metadata(
- mut self,
- metadata: serde_json::Value,
- ) -> JupyterMessage {
- self.metadata = metadata;
- self
- }
-
- pub fn with_buffers(mut self, buffers: Vec<Bytes>) -> JupyterMessage {
- self.buffers = buffers;
- self
- }
-}
-
-impl fmt::Debug for JupyterMessage {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- writeln!(
- f,
- "\nHeader: {}",
- serde_json::to_string_pretty(&self.header).unwrap()
- )?;
- writeln!(
- f,
- "Parent header: {}",
- serde_json::to_string_pretty(&self.parent_header).unwrap()
- )?;
- writeln!(
- f,
- "Metadata: {}",
- serde_json::to_string_pretty(&self.metadata).unwrap()
- )?;
- writeln!(
- f,
- "Content: {}\n",
- serde_json::to_string_pretty(&self.content).unwrap()
- )?;
- Ok(())
- }
-}
diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs
index da1c4bc4d..a4d0bb27d 100644
--- a/cli/tools/jupyter/mod.rs
+++ b/cli/tools/jupyter/mod.rs
@@ -3,7 +3,6 @@
use crate::args::Flags;
use crate::args::JupyterFlags;
use crate::ops;
-use crate::tools::jupyter::server::StdioMsg;
use crate::tools::repl;
use crate::tools::test::create_single_test_event_channel;
use crate::tools::test::reporters::PrettyTestReporter;
@@ -15,7 +14,6 @@ use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::located_script_name;
use deno_core::resolve_url_or_path;
-use deno_core::serde::Deserialize;
use deno_core::serde_json;
use deno_core::url::Url;
use deno_runtime::deno_io::Stdio;
@@ -24,11 +22,13 @@ use deno_runtime::permissions::Permissions;
use deno_runtime::permissions::PermissionsContainer;
use deno_runtime::WorkerExecutionMode;
use deno_terminal::colors;
+
+use runtimelib::jupyter::ConnectionInfo;
+use runtimelib::messaging::StreamContent;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender;
mod install;
-pub mod jupyter_msg;
pub mod server;
pub async fn kernel(
@@ -73,7 +73,7 @@ pub async fn kernel(
std::fs::read_to_string(&connection_filepath).with_context(|| {
format!("Couldn't read connection file: {:?}", connection_filepath)
})?;
- let spec: ConnectionSpec =
+ let spec: ConnectionInfo =
serde_json::from_str(&conn_file).with_context(|| {
format!(
"Connection file is not a valid JSON: {:?}",
@@ -119,12 +119,14 @@ pub async fn kernel(
test_event_receiver,
)
.await?;
- struct TestWriter(UnboundedSender<StdioMsg>);
+ struct TestWriter(UnboundedSender<StreamContent>);
impl std::io::Write for TestWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self
.0
- .send(StdioMsg::Stdout(String::from_utf8_lossy(buf).into_owned()))
+ .send(StreamContent::stdout(
+ String::from_utf8_lossy(buf).into_owned(),
+ ))
.ok();
Ok(buf.len())
}
@@ -150,15 +152,3 @@ pub async fn kernel(
Ok(())
}
-
-#[derive(Debug, Deserialize)]
-pub struct ConnectionSpec {
- ip: String,
- transport: String,
- control_port: u32,
- shell_port: u32,
- stdin_port: u32,
- hb_port: u32,
- iopub_port: u32,
- key: String,
-}
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs
index 4021cf6a3..3d273ee74 100644
--- a/cli/tools/jupyter/server.rs
+++ b/cli/tools/jupyter/server.rs
@@ -19,48 +19,54 @@ use deno_core::CancelHandle;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
-use super::jupyter_msg::Connection;
-use super::jupyter_msg::JupyterMessage;
-use super::ConnectionSpec;
-
-pub enum StdioMsg {
- Stdout(String),
- Stderr(String),
-}
+use runtimelib::ConnectionInfo;
+use runtimelib::KernelControlConnection;
+use runtimelib::KernelHeartbeatConnection;
+use runtimelib::KernelIoPubConnection;
+use runtimelib::KernelShellConnection;
+
+use runtimelib::messaging;
+use runtimelib::AsChildOf;
+use runtimelib::JupyterMessage;
+use runtimelib::JupyterMessageContent;
+use runtimelib::ReplyError;
+use runtimelib::ReplyStatus;
+use runtimelib::StreamContent;
pub struct JupyterServer {
execution_count: usize,
last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
// This is Arc<Mutex<>>, so we don't hold RefCell borrows across await
// points.
- iopub_socket: Arc<Mutex<Connection<zeromq::PubSocket>>>,
+ iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
repl_session: repl::ReplSession,
}
impl JupyterServer {
pub async fn start(
- spec: ConnectionSpec,
- mut stdio_rx: mpsc::UnboundedReceiver<StdioMsg>,
+ connection_info: ConnectionInfo,
+ mut stdio_rx: mpsc::UnboundedReceiver<StreamContent>,
mut repl_session: repl::ReplSession,
) -> Result<(), AnyError> {
let mut heartbeat =
- bind_socket::<zeromq::RepSocket>(&spec, spec.hb_port).await?;
- let shell_socket =
- bind_socket::<zeromq::RouterSocket>(&spec, spec.shell_port).await?;
- let control_socket =
- bind_socket::<zeromq::RouterSocket>(&spec, spec.control_port).await?;
- let _stdin_socket =
- bind_socket::<zeromq::RouterSocket>(&spec, spec.stdin_port).await?;
- let iopub_socket =
- bind_socket::<zeromq::PubSocket>(&spec, spec.iopub_port).await?;
- let iopub_socket = Arc::new(Mutex::new(iopub_socket));
+ connection_info.create_kernel_heartbeat_connection().await?;
+ let shell_connection =
+ connection_info.create_kernel_shell_connection().await?;
+ let control_connection =
+ connection_info.create_kernel_control_connection().await?;
+ let _stdin_connection =
+ connection_info.create_kernel_stdin_connection().await?;
+ let iopub_connection =
+ connection_info.create_kernel_iopub_connection().await?;
+
+ let iopub_connection = Arc::new(Mutex::new(iopub_connection));
let last_execution_request = Rc::new(RefCell::new(None));
- // Store `iopub_socket` in the op state so it's accessible to the runtime API.
+ // Store `iopub_connection` in the op state so it's accessible to the runtime API.
{
let op_state_rc = repl_session.worker.js_runtime.op_state();
let mut op_state = op_state_rc.borrow_mut();
- op_state.put(iopub_socket.clone());
+ op_state.put(iopub_connection.clone());
op_state.put(last_execution_request.clone());
}
@@ -68,14 +74,18 @@ impl JupyterServer {
let mut server = Self {
execution_count: 0,
- iopub_socket: iopub_socket.clone(),
+ iopub_connection: iopub_connection.clone(),
last_execution_request: last_execution_request.clone(),
repl_session,
};
let handle1 = deno_core::unsync::spawn(async move {
if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await {
- log::error!("Heartbeat error: {}", err);
+ log::error!(
+ "Heartbeat error: {}\nBacktrace:\n{}",
+ err,
+ err.backtrace()
+ );
}
});
@@ -83,23 +93,27 @@ impl JupyterServer {
let cancel_handle = cancel_handle.clone();
async move {
if let Err(err) =
- Self::handle_control(control_socket, cancel_handle).await
+ Self::handle_control(control_connection, cancel_handle).await
{
- log::error!("Control error: {}", err);
+ log::error!(
+ "Control error: {}\nBacktrace:\n{}",
+ err,
+ err.backtrace()
+ );
}
}
});
let handle3 = deno_core::unsync::spawn(async move {
- if let Err(err) = server.handle_shell(shell_socket).await {
- log::error!("Shell error: {}", err);
+ if let Err(err) = server.handle_shell(shell_connection).await {
+ log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace());
}
});
let handle4 = deno_core::unsync::spawn(async move {
while let Some(stdio_msg) = stdio_rx.recv().await {
Self::handle_stdio_msg(
- iopub_socket.clone(),
+ iopub_connection.clone(),
last_execution_request.clone(),
stdio_msg,
)
@@ -117,33 +131,25 @@ impl JupyterServer {
Ok(())
}
- async fn handle_stdio_msg<S: zeromq::SocketSend>(
- iopub_socket: Arc<Mutex<Connection<S>>>,
+ async fn handle_stdio_msg(
+ iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
- stdio_msg: StdioMsg,
+ stdio_msg: StreamContent,
) {
let maybe_exec_result = last_execution_request.borrow().clone();
if let Some(exec_request) = maybe_exec_result {
- let (name, text) = match stdio_msg {
- StdioMsg::Stdout(text) => ("stdout", text),
- StdioMsg::Stderr(text) => ("stderr", text),
- };
-
- let result = (*iopub_socket.lock().await)
- .send(&exec_request.new_message("stream").with_content(json!({
- "name": name,
- "text": text
- })))
+ let result = (iopub_connection.lock().await)
+ .send(stdio_msg.as_child_of(&exec_request))
.await;
if let Err(err) = result {
- log::error!("Output {} error: {}", name, err);
+ log::error!("Output error: {}", err);
}
}
}
async fn handle_heartbeat(
- connection: &mut Connection<zeromq::RepSocket>,
+ connection: &mut KernelHeartbeatConnection,
) -> Result<(), AnyError> {
loop {
connection.single_heartbeat().await?;
@@ -151,23 +157,30 @@ impl JupyterServer {
}
async fn handle_control(
- mut connection: Connection<zeromq::RouterSocket>,
+ mut connection: KernelControlConnection,
cancel_handle: Rc<CancelHandle>,
) -> Result<(), AnyError> {
loop {
let msg = connection.read().await?;
- match msg.message_type() {
- "kernel_info_request" => {
- connection
- .send(&msg.new_reply().with_content(kernel_info()))
- .await?;
+
+ match msg.content {
+ JupyterMessageContent::KernelInfoRequest(_) => {
+ // normally kernel info is sent from the shell channel
+ // however, some frontends will send it on the control channel
+ // and it's no harm to send a kernel info reply on control
+ connection.send(kernel_info().as_child_of(&msg)).await?;
}
- "shutdown_request" => {
+ JupyterMessageContent::ShutdownRequest(_) => {
cancel_handle.cancel();
}
- "interrupt_request" => {
+ JupyterMessageContent::InterruptRequest(_) => {
log::error!("Interrupt request currently not supported");
}
+ JupyterMessageContent::DebugRequest(_) => {
+ log::error!("Debug request currently not supported");
+ // See https://jupyter-client.readthedocs.io/en/latest/messaging.html#debug-request
+ // and https://microsoft.github.io/debug-adapter-protocol/
+ }
_ => {
log::error!(
"Unrecognized control message type: {}",
@@ -180,7 +193,7 @@ impl JupyterServer {
async fn handle_shell(
&mut self,
- mut connection: Connection<zeromq::RouterSocket>,
+ mut connection: KernelShellConnection,
) -> Result<(), AnyError> {
loop {
let msg = connection.read().await?;
@@ -191,43 +204,28 @@ impl JupyterServer {
async fn handle_shell_message(
&mut self,
msg: JupyterMessage,
- connection: &mut Connection<zeromq::RouterSocket>,
+ connection: &mut KernelShellConnection,
) -> Result<(), AnyError> {
+ let parent = &msg.clone();
+
self
- .send_iopub(
- &msg
- .new_message("status")
- .with_content(json!({"execution_state": "busy"})),
- )
+ .send_iopub(messaging::Status::busy().as_child_of(parent))
.await?;
- match msg.message_type() {
- "kernel_info_request" => {
- connection
- .send(&msg.new_reply().with_content(kernel_info()))
- .await?;
- }
- "is_complete_request" => {
- connection
- .send(&msg.new_reply().with_content(json!({"status": "complete"})))
- .await?;
- }
- "execute_request" => {
+ match msg.content {
+ JupyterMessageContent::ExecuteRequest(execute_request) => {
self
- .handle_execution_request(msg.clone(), connection)
+ .handle_execution_request(execute_request, parent, connection)
.await?;
}
- "comm_open" => {
- self.send_iopub(&msg.comm_close_message()).await?;
- }
- "complete_request" => {
- let user_code = msg.code();
- let cursor_pos = msg.cursor_pos();
+ JupyterMessageContent::CompleteRequest(req) => {
+ let user_code = req.code;
+ let cursor_pos = req.cursor_pos;
let lsp_completions = self
.repl_session
.language_server
- .completions(user_code, cursor_pos)
+ .completions(&user_code, cursor_pos)
.await;
if !lsp_completions.is_empty() {
@@ -247,16 +245,20 @@ impl JupyterServer {
.unwrap_or(cursor_pos);
connection
- .send(&msg.new_reply().with_content(json!({
- "status": "ok",
- "matches": matches,
- "cursor_start": cursor_start,
- "cursor_end": cursor_end,
- "metadata": {},
- })))
+ .send(
+ messaging::CompleteReply {
+ matches,
+ cursor_start,
+ cursor_end,
+ metadata: Default::default(),
+ status: ReplyStatus::Ok,
+ error: None,
+ }
+ .as_child_of(parent),
+ )
.await?;
} else {
- let expr = get_expr_from_line_at_pos(user_code, cursor_pos);
+ let expr = get_expr_from_line_at_pos(&user_code, cursor_pos);
// check if the expression is in the form `obj.prop`
let (completions, cursor_start) = if let Some(index) = expr.rfind('.')
{
@@ -292,72 +294,173 @@ impl JupyterServer {
(candidates, cursor_pos - expr.len())
};
+
connection
- .send(&msg.new_reply().with_content(json!({
- "status": "ok",
- "matches": completions,
- "cursor_start": cursor_start,
- "cursor_end": cursor_pos,
- "metadata": {},
- })))
+ .send(
+ messaging::CompleteReply {
+ matches: completions,
+ cursor_start,
+ cursor_end: cursor_pos,
+ metadata: Default::default(),
+ status: ReplyStatus::Ok,
+ error: None,
+ }
+ .as_child_of(parent),
+ )
.await?;
}
}
- "comm_msg" | "comm_info_request" | "history_request" => {
- // We don't handle these messages
+
+ JupyterMessageContent::InspectRequest(_req) => {
+ // TODO(bartlomieju?): implement introspection request
+ // The inspect request is used to get information about an object at cursor position.
+ // There are two detail levels: 0 is typically documentation, 1 is typically source code
+
+ // The response includes a MimeBundle to render the object:
+ // {
+ // "status": "ok",
+ // "found": true,
+ // "data": {
+ // "text/plain": "Plain documentation here",
+ // "text/html": "<div>Rich documentation here</div>",
+ // "application/json": {
+ // "key1": "value1",
+ // "key2": "value2"
+ // }
+ // },
+ // }
+
+ connection
+ .send(
+ messaging::InspectReply {
+ status: ReplyStatus::Ok,
+ found: false,
+ data: Default::default(),
+ metadata: Default::default(),
+ error: None,
+ }
+ .as_child_of(parent),
+ )
+ .await?;
+ }
+
+ JupyterMessageContent::IsCompleteRequest(_) => {
+ connection
+ .send(messaging::IsCompleteReply::complete().as_child_of(parent))
+ .await?;
+ }
+ JupyterMessageContent::KernelInfoRequest(_) => {
+ connection.send(kernel_info().as_child_of(parent)).await?;
}
+ JupyterMessageContent::CommOpen(comm) => {
+ connection
+ .send(
+ messaging::CommClose {
+ comm_id: comm.comm_id,
+ data: Default::default(),
+ }
+ .as_child_of(parent),
+ )
+ .await?;
+ }
+ JupyterMessageContent::HistoryRequest(_req) => {
+ connection
+ .send(
+ messaging::HistoryReply {
+ history: vec![],
+ error: None,
+ status: ReplyStatus::Ok,
+ }
+ .as_child_of(parent),
+ )
+ .await?;
+ }
+ JupyterMessageContent::InputReply(_rep) => {
+ // TODO(@zph): implement input reply from https://github.com/denoland/deno/pull/23592
+ // NOTE: This will belong on the stdin channel, not the shell channel
+ }
+ JupyterMessageContent::CommInfoRequest(_req) => {
+ connection
+ .send(
+ messaging::CommInfoReply {
+ comms: Default::default(),
+ status: ReplyStatus::Ok,
+ error: None,
+ }
+ .as_child_of(parent),
+ )
+ .await?;
+ }
+ JupyterMessageContent::CommMsg(_)
+ | JupyterMessageContent::CommClose(_) => {
+ // Do nothing with regular comm messages
+ }
+ // Any unknown message type is ignored
_ => {
- log::error!("Unrecognized shell message type: {}", msg.message_type());
+ log::error!(
+ "Unrecognized shell message type: {}",
+ msg.content.message_type()
+ );
}
}
self
- .send_iopub(
- &msg
- .new_message("status")
- .with_content(json!({"execution_state": "idle"})),
- )
+ .send_iopub(messaging::Status::idle().as_child_of(parent))
.await?;
+
Ok(())
}
async fn handle_execution_request(
&mut self,
- msg: JupyterMessage,
- connection: &mut Connection<zeromq::RouterSocket>,
+ execute_request: messaging::ExecuteRequest,
+ parent_message: &JupyterMessage,
+ connection: &mut KernelShellConnection,
) -> Result<(), AnyError> {
- if !msg.silent() && msg.store_history() {
+ if !execute_request.silent && execute_request.store_history {
self.execution_count += 1;
}
- *self.last_execution_request.borrow_mut() = Some(msg.clone());
+ *self.last_execution_request.borrow_mut() = Some(parent_message.clone());
self
- .send_iopub(&msg.new_message("execute_input").with_content(json!({
- "execution_count": self.execution_count,
- "code": msg.code()
- })))
+ .send_iopub(
+ messaging::ExecuteInput {
+ execution_count: self.execution_count,
+ code: execute_request.code.clone(),
+ }
+ .as_child_of(parent_message),
+ )
.await?;
let result = self
.repl_session
- .evaluate_line_with_object_wrapping(msg.code())
+ .evaluate_line_with_object_wrapping(&execute_request.code)
.await;
let evaluate_response = match result {
Ok(eval_response) => eval_response,
Err(err) => {
self
- .send_iopub(&msg.new_message("error").with_content(json!({
- "ename": err.to_string(),
- "evalue": " ", // Fake value, otherwise old Jupyter frontends don't show the error
- "traceback": [],
- })))
+ .send_iopub(
+ messaging::ErrorOutput {
+ ename: err.to_string(),
+ evalue: err.to_string(),
+ traceback: vec![],
+ }
+ .as_child_of(parent_message),
+ )
.await?;
connection
- .send(&msg.new_reply().with_content(json!({
- "status": "error",
- "execution_count": self.execution_count,
- })))
+ .send(
+ messaging::ExecuteReply {
+ execution_count: self.execution_count,
+ status: ReplyStatus::Error,
+ payload: None,
+ user_expressions: None,
+ error: None,
+ }
+ .as_child_of(parent_message),
+ )
.await?;
return Ok(());
}
@@ -373,11 +476,16 @@ impl JupyterServer {
.await?;
connection
- .send(&msg.new_reply().with_content(json!({
- "status": "ok",
- "execution_count": self.execution_count,
- // FIXME: also include user_expressions
- })))
+ .send(
+ messaging::ExecuteReply {
+ execution_count: self.execution_count,
+ status: ReplyStatus::Ok,
+ user_expressions: None,
+ payload: None,
+ error: None,
+ }
+ .as_child_of(parent_message),
+ )
.await?;
// Let's sleep here for a few ms, so we give a chance to the task that is
// handling stdout and stderr streams to receive and flush the content.
@@ -458,17 +566,30 @@ impl JupyterServer {
};
self
- .send_iopub(&msg.new_message("error").with_content(json!({
- "ename": ename,
- "evalue": evalue,
- "traceback": traceback,
- })))
+ .send_iopub(
+ messaging::ErrorOutput {
+ ename: ename.clone(),
+ evalue: evalue.clone(),
+ traceback: traceback.clone(),
+ }
+ .as_child_of(parent_message),
+ )
.await?;
connection
- .send(&msg.new_reply().with_content(json!({
- "status": "error",
- "execution_count": self.execution_count,
- })))
+ .send(
+ messaging::ExecuteReply {
+ execution_count: self.execution_count,
+ status: ReplyStatus::Error,
+ error: Some(ReplyError {
+ ename,
+ evalue,
+ traceback,
+ }),
+ user_expressions: None,
+ payload: None,
+ }
+ .as_child_of(parent_message),
+ )
.await?;
}
@@ -477,42 +598,35 @@ impl JupyterServer {
async fn send_iopub(
&mut self,
- message: &JupyterMessage,
+ message: JupyterMessage,
) -> Result<(), AnyError> {
- self.iopub_socket.lock().await.send(message).await
+ self.iopub_connection.lock().await.send(message).await
}
}
-async fn bind_socket<S: zeromq::Socket>(
- config: &ConnectionSpec,
- port: u32,
-) -> Result<Connection<S>, AnyError> {
- let endpoint = format!("{}://{}:{}", config.transport, config.ip, port);
- let mut socket = S::new();
- socket.bind(&endpoint).await?;
- Ok(Connection::new(socket, &config.key))
-}
-
-fn kernel_info() -> serde_json::Value {
- json!({
- "status": "ok",
- "protocol_version": "5.3",
- "implementation_version": crate::version::deno(),
- "implementation": "Deno kernel",
- "language_info": {
- "name": "typescript",
- "version": crate::version::TYPESCRIPT,
- "mimetype": "text/x.typescript",
- "file_extension": ".ts",
- "pygments_lexer": "typescript",
- "nb_converter": "script"
+fn kernel_info() -> messaging::KernelInfoReply {
+ messaging::KernelInfoReply {
+ status: ReplyStatus::Ok,
+ protocol_version: "5.3".to_string(),
+ implementation: "Deno kernel".to_string(),
+ implementation_version: crate::version::deno().to_string(),
+ language_info: messaging::LanguageInfo {
+ name: "typescript".to_string(),
+ version: crate::version::TYPESCRIPT.to_string(),
+ mimetype: "text/x.typescript".to_string(),
+ file_extension: ".ts".to_string(),
+ pygments_lexer: "typescript".to_string(),
+ codemirror_mode: messaging::CodeMirrorMode::typescript(),
+ nbconvert_exporter: "script".to_string(),
},
- "help_links": [{
- "text": "Visit Deno manual",
- "url": "https://deno.land/manual"
+ banner: "Welcome to Deno kernel".to_string(),
+ help_links: vec![messaging::HelpLink {
+ text: "Visit Deno manual".to_string(),
+ url: "https://deno.land/manual".to_string(),
}],
- "banner": "Welcome to Deno kernel",
- })
+ debugger: false,
+ error: None,
+ }
}
async fn publish_result(
diff --git a/tests/integration/jupyter_tests.rs b/tests/integration/jupyter_tests.rs
index 3c4efbdac..c7b2712e8 100644
--- a/tests/integration/jupyter_tests.rs
+++ b/tests/integration/jupyter_tests.rs
@@ -10,6 +10,8 @@ use test_util::DenoChild;
use test_util::TestContext;
use test_util::TestContextBuilder;
+use chrono::DateTime;
+use chrono::Utc;
use deno_core::anyhow::Result;
use deno_core::serde_json;
use deno_core::serde_json::json;
@@ -119,7 +121,7 @@ impl Default for JupyterMsg {
struct MsgHeader {
msg_id: Uuid,
session: Uuid,
- date: String,
+ date: DateTime<Utc>,
username: String,
msg_type: String,
version: String,
@@ -136,7 +138,7 @@ impl Default for MsgHeader {
Self {
msg_id: Uuid::new_v4(),
session: Uuid::new_v4(),
- date: utc_now().to_rfc3339(),
+ date: utc_now(),
username: "test".into(),
msg_type: "kernel_info_request".into(),
version: "5.3".into(),
@@ -517,7 +519,7 @@ async fn jupyter_kernel_info() -> Result<()> {
"mimetype": "text/x.typescript",
"file_extension": ".ts",
"pygments_lexer": "typescript",
- "nb_converter": "script"
+ "nbconvert_exporter": "script"
},
}),
);
@@ -612,7 +614,7 @@ async fn jupyter_store_history_false() -> Result<()> {
json!({
"silent": false,
"store_history": false,
- "code": "console.log(\"asdf\")"
+ "code": "console.log(\"asdf\")",
}),
)
.await?;
diff --git a/tests/specs/jupyter/install_command_not_exists/__test__.jsonc b/tests/specs/jupyter/install_command/__test__.jsonc
index 9552157bd..df60c3b86 100644
--- a/tests/specs/jupyter/install_command_not_exists/__test__.jsonc
+++ b/tests/specs/jupyter/install_command/__test__.jsonc
@@ -1,8 +1,8 @@
{
"args": "jupyter --install",
- "output": "install_command_not_exists.out",
+ "output": "install_command.out",
"envs": {
"PATH": ""
},
- "exitCode": 1
+ "exitCode": 0
}
diff --git a/tests/specs/jupyter/install_command/install_command.out b/tests/specs/jupyter/install_command/install_command.out
new file mode 100644
index 000000000..62875d9cf
--- /dev/null
+++ b/tests/specs/jupyter/install_command/install_command.out
@@ -0,0 +1,2 @@
+Warning "deno jupyter" is unstable and might change in the future.
+✅ Deno kernelspec installed successfully.
diff --git a/tests/specs/jupyter/install_command_not_exists/install_command_not_exists.out b/tests/specs/jupyter/install_command_not_exists/install_command_not_exists.out
deleted file mode 100644
index 1bb176e20..000000000
--- a/tests/specs/jupyter/install_command_not_exists/install_command_not_exists.out
+++ /dev/null
@@ -1,5 +0,0 @@
-Warning "deno jupyter" is unstable and might change in the future.
-error: Failed to spawn 'jupyter' command. Is JupyterLab installed (https://jupyter.org/install) and available on the PATH?
-
-Caused by:
-[WILDCARD]