summaryrefslogtreecommitdiff
path: root/cli/tools/jupyter/server.rs
diff options
context:
space:
mode:
authorKyle Kelley <rgbkrk@gmail.com>2024-05-21 13:35:21 -0700
committerGitHub <noreply@github.com>2024-05-21 22:35:21 +0200
commit8698e80304815353ec52be1b16f96483ebe559a0 (patch)
tree9abd53d5b656cd8cc0c1aa3940684f3ce1d9c8ef /cli/tools/jupyter/server.rs
parentcc8c0609ebec9f101a1739a0c42c91718ca2abba (diff)
refactor(jupyter): use runtimelib for Jupyter structures and directory paths (#23826)
This brings in [`runtimelib`](https://github.com/runtimed/runtimed) to use: ## Fully typed structs for Jupyter Messages ```rust let msg = connection.read().await?; self .send_iopub( runtimelib::Status::busy().as_child_of(msg), ) .await?; ``` ## Jupyter paths Jupyter paths are implemented in Rust, allowing the Deno kernel to be installed completely via Deno without a requirement on Python or Jupyter. Deno users will be able to install and use the kernel with just VS Code or other editors that support Jupyter. ```rust pub fn status() -> Result<(), AnyError> { let user_data_dir = user_data_dir()?; let kernel_spec_dir_path = user_data_dir.join("kernels").join("deno"); let kernel_spec_path = kernel_spec_dir_path.join("kernel.json"); if kernel_spec_path.exists() { log::info!("✅ Deno kernel already installed"); Ok(()) } else { log::warn!("ℹ️ Deno kernel is not yet installed, run `deno jupyter --install` to set it up"); Ok(()) } } ``` Closes https://github.com/denoland/deno/issues/21619
Diffstat (limited to 'cli/tools/jupyter/server.rs')
-rw-r--r--cli/tools/jupyter/server.rs444
1 files changed, 279 insertions, 165 deletions
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs
index 4021cf6a3..3d273ee74 100644
--- a/cli/tools/jupyter/server.rs
+++ b/cli/tools/jupyter/server.rs
@@ -19,48 +19,54 @@ use deno_core::CancelHandle;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
-use super::jupyter_msg::Connection;
-use super::jupyter_msg::JupyterMessage;
-use super::ConnectionSpec;
-
-pub enum StdioMsg {
- Stdout(String),
- Stderr(String),
-}
+use runtimelib::ConnectionInfo;
+use runtimelib::KernelControlConnection;
+use runtimelib::KernelHeartbeatConnection;
+use runtimelib::KernelIoPubConnection;
+use runtimelib::KernelShellConnection;
+
+use runtimelib::messaging;
+use runtimelib::AsChildOf;
+use runtimelib::JupyterMessage;
+use runtimelib::JupyterMessageContent;
+use runtimelib::ReplyError;
+use runtimelib::ReplyStatus;
+use runtimelib::StreamContent;
pub struct JupyterServer {
execution_count: usize,
last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
// This is Arc<Mutex<>>, so we don't hold RefCell borrows across await
// points.
- iopub_socket: Arc<Mutex<Connection<zeromq::PubSocket>>>,
+ iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
repl_session: repl::ReplSession,
}
impl JupyterServer {
pub async fn start(
- spec: ConnectionSpec,
- mut stdio_rx: mpsc::UnboundedReceiver<StdioMsg>,
+ connection_info: ConnectionInfo,
+ mut stdio_rx: mpsc::UnboundedReceiver<StreamContent>,
mut repl_session: repl::ReplSession,
) -> Result<(), AnyError> {
let mut heartbeat =
- bind_socket::<zeromq::RepSocket>(&spec, spec.hb_port).await?;
- let shell_socket =
- bind_socket::<zeromq::RouterSocket>(&spec, spec.shell_port).await?;
- let control_socket =
- bind_socket::<zeromq::RouterSocket>(&spec, spec.control_port).await?;
- let _stdin_socket =
- bind_socket::<zeromq::RouterSocket>(&spec, spec.stdin_port).await?;
- let iopub_socket =
- bind_socket::<zeromq::PubSocket>(&spec, spec.iopub_port).await?;
- let iopub_socket = Arc::new(Mutex::new(iopub_socket));
+ connection_info.create_kernel_heartbeat_connection().await?;
+ let shell_connection =
+ connection_info.create_kernel_shell_connection().await?;
+ let control_connection =
+ connection_info.create_kernel_control_connection().await?;
+ let _stdin_connection =
+ connection_info.create_kernel_stdin_connection().await?;
+ let iopub_connection =
+ connection_info.create_kernel_iopub_connection().await?;
+
+ let iopub_connection = Arc::new(Mutex::new(iopub_connection));
let last_execution_request = Rc::new(RefCell::new(None));
- // Store `iopub_socket` in the op state so it's accessible to the runtime API.
+ // Store `iopub_connection` in the op state so it's accessible to the runtime API.
{
let op_state_rc = repl_session.worker.js_runtime.op_state();
let mut op_state = op_state_rc.borrow_mut();
- op_state.put(iopub_socket.clone());
+ op_state.put(iopub_connection.clone());
op_state.put(last_execution_request.clone());
}
@@ -68,14 +74,18 @@ impl JupyterServer {
let mut server = Self {
execution_count: 0,
- iopub_socket: iopub_socket.clone(),
+ iopub_connection: iopub_connection.clone(),
last_execution_request: last_execution_request.clone(),
repl_session,
};
let handle1 = deno_core::unsync::spawn(async move {
if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await {
- log::error!("Heartbeat error: {}", err);
+ log::error!(
+ "Heartbeat error: {}\nBacktrace:\n{}",
+ err,
+ err.backtrace()
+ );
}
});
@@ -83,23 +93,27 @@ impl JupyterServer {
let cancel_handle = cancel_handle.clone();
async move {
if let Err(err) =
- Self::handle_control(control_socket, cancel_handle).await
+ Self::handle_control(control_connection, cancel_handle).await
{
- log::error!("Control error: {}", err);
+ log::error!(
+ "Control error: {}\nBacktrace:\n{}",
+ err,
+ err.backtrace()
+ );
}
}
});
let handle3 = deno_core::unsync::spawn(async move {
- if let Err(err) = server.handle_shell(shell_socket).await {
- log::error!("Shell error: {}", err);
+ if let Err(err) = server.handle_shell(shell_connection).await {
+ log::error!("Shell error: {}\nBacktrace:\n{}", err, err.backtrace());
}
});
let handle4 = deno_core::unsync::spawn(async move {
while let Some(stdio_msg) = stdio_rx.recv().await {
Self::handle_stdio_msg(
- iopub_socket.clone(),
+ iopub_connection.clone(),
last_execution_request.clone(),
stdio_msg,
)
@@ -117,33 +131,25 @@ impl JupyterServer {
Ok(())
}
- async fn handle_stdio_msg<S: zeromq::SocketSend>(
- iopub_socket: Arc<Mutex<Connection<S>>>,
+ async fn handle_stdio_msg(
+ iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
- stdio_msg: StdioMsg,
+ stdio_msg: StreamContent,
) {
let maybe_exec_result = last_execution_request.borrow().clone();
if let Some(exec_request) = maybe_exec_result {
- let (name, text) = match stdio_msg {
- StdioMsg::Stdout(text) => ("stdout", text),
- StdioMsg::Stderr(text) => ("stderr", text),
- };
-
- let result = (*iopub_socket.lock().await)
- .send(&exec_request.new_message("stream").with_content(json!({
- "name": name,
- "text": text
- })))
+ let result = (iopub_connection.lock().await)
+ .send(stdio_msg.as_child_of(&exec_request))
.await;
if let Err(err) = result {
- log::error!("Output {} error: {}", name, err);
+ log::error!("Output error: {}", err);
}
}
}
async fn handle_heartbeat(
- connection: &mut Connection<zeromq::RepSocket>,
+ connection: &mut KernelHeartbeatConnection,
) -> Result<(), AnyError> {
loop {
connection.single_heartbeat().await?;
@@ -151,23 +157,30 @@ impl JupyterServer {
}
async fn handle_control(
- mut connection: Connection<zeromq::RouterSocket>,
+ mut connection: KernelControlConnection,
cancel_handle: Rc<CancelHandle>,
) -> Result<(), AnyError> {
loop {
let msg = connection.read().await?;
- match msg.message_type() {
- "kernel_info_request" => {
- connection
- .send(&msg.new_reply().with_content(kernel_info()))
- .await?;
+
+ match msg.content {
+ JupyterMessageContent::KernelInfoRequest(_) => {
+ // normally kernel info is sent from the shell channel
+ // however, some frontends will send it on the control channel
+ // and it's no harm to send a kernel info reply on control
+ connection.send(kernel_info().as_child_of(&msg)).await?;
}
- "shutdown_request" => {
+ JupyterMessageContent::ShutdownRequest(_) => {
cancel_handle.cancel();
}
- "interrupt_request" => {
+ JupyterMessageContent::InterruptRequest(_) => {
log::error!("Interrupt request currently not supported");
}
+ JupyterMessageContent::DebugRequest(_) => {
+ log::error!("Debug request currently not supported");
+ // See https://jupyter-client.readthedocs.io/en/latest/messaging.html#debug-request
+ // and https://microsoft.github.io/debug-adapter-protocol/
+ }
_ => {
log::error!(
"Unrecognized control message type: {}",
@@ -180,7 +193,7 @@ impl JupyterServer {
async fn handle_shell(
&mut self,
- mut connection: Connection<zeromq::RouterSocket>,
+ mut connection: KernelShellConnection,
) -> Result<(), AnyError> {
loop {
let msg = connection.read().await?;
@@ -191,43 +204,28 @@ impl JupyterServer {
async fn handle_shell_message(
&mut self,
msg: JupyterMessage,
- connection: &mut Connection<zeromq::RouterSocket>,
+ connection: &mut KernelShellConnection,
) -> Result<(), AnyError> {
+ let parent = &msg.clone();
+
self
- .send_iopub(
- &msg
- .new_message("status")
- .with_content(json!({"execution_state": "busy"})),
- )
+ .send_iopub(messaging::Status::busy().as_child_of(parent))
.await?;
- match msg.message_type() {
- "kernel_info_request" => {
- connection
- .send(&msg.new_reply().with_content(kernel_info()))
- .await?;
- }
- "is_complete_request" => {
- connection
- .send(&msg.new_reply().with_content(json!({"status": "complete"})))
- .await?;
- }
- "execute_request" => {
+ match msg.content {
+ JupyterMessageContent::ExecuteRequest(execute_request) => {
self
- .handle_execution_request(msg.clone(), connection)
+ .handle_execution_request(execute_request, parent, connection)
.await?;
}
- "comm_open" => {
- self.send_iopub(&msg.comm_close_message()).await?;
- }
- "complete_request" => {
- let user_code = msg.code();
- let cursor_pos = msg.cursor_pos();
+ JupyterMessageContent::CompleteRequest(req) => {
+ let user_code = req.code;
+ let cursor_pos = req.cursor_pos;
let lsp_completions = self
.repl_session
.language_server
- .completions(user_code, cursor_pos)
+ .completions(&user_code, cursor_pos)
.await;
if !lsp_completions.is_empty() {
@@ -247,16 +245,20 @@ impl JupyterServer {
.unwrap_or(cursor_pos);
connection
- .send(&msg.new_reply().with_content(json!({
- "status": "ok",
- "matches": matches,
- "cursor_start": cursor_start,
- "cursor_end": cursor_end,
- "metadata": {},
- })))
+ .send(
+ messaging::CompleteReply {
+ matches,
+ cursor_start,
+ cursor_end,
+ metadata: Default::default(),
+ status: ReplyStatus::Ok,
+ error: None,
+ }
+ .as_child_of(parent),
+ )
.await?;
} else {
- let expr = get_expr_from_line_at_pos(user_code, cursor_pos);
+ let expr = get_expr_from_line_at_pos(&user_code, cursor_pos);
// check if the expression is in the form `obj.prop`
let (completions, cursor_start) = if let Some(index) = expr.rfind('.')
{
@@ -292,72 +294,173 @@ impl JupyterServer {
(candidates, cursor_pos - expr.len())
};
+
connection
- .send(&msg.new_reply().with_content(json!({
- "status": "ok",
- "matches": completions,
- "cursor_start": cursor_start,
- "cursor_end": cursor_pos,
- "metadata": {},
- })))
+ .send(
+ messaging::CompleteReply {
+ matches: completions,
+ cursor_start,
+ cursor_end: cursor_pos,
+ metadata: Default::default(),
+ status: ReplyStatus::Ok,
+ error: None,
+ }
+ .as_child_of(parent),
+ )
.await?;
}
}
- "comm_msg" | "comm_info_request" | "history_request" => {
- // We don't handle these messages
+
+ JupyterMessageContent::InspectRequest(_req) => {
+ // TODO(bartlomieju?): implement introspection request
+ // The inspect request is used to get information about an object at cursor position.
+ // There are two detail levels: 0 is typically documentation, 1 is typically source code
+
+ // The response includes a MimeBundle to render the object:
+ // {
+ // "status": "ok",
+ // "found": true,
+ // "data": {
+ // "text/plain": "Plain documentation here",
+ // "text/html": "<div>Rich documentation here</div>",
+ // "application/json": {
+ // "key1": "value1",
+ // "key2": "value2"
+ // }
+ // },
+ // }
+
+ connection
+ .send(
+ messaging::InspectReply {
+ status: ReplyStatus::Ok,
+ found: false,
+ data: Default::default(),
+ metadata: Default::default(),
+ error: None,
+ }
+ .as_child_of(parent),
+ )
+ .await?;
+ }
+
+ JupyterMessageContent::IsCompleteRequest(_) => {
+ connection
+ .send(messaging::IsCompleteReply::complete().as_child_of(parent))
+ .await?;
+ }
+ JupyterMessageContent::KernelInfoRequest(_) => {
+ connection.send(kernel_info().as_child_of(parent)).await?;
}
+ JupyterMessageContent::CommOpen(comm) => {
+ connection
+ .send(
+ messaging::CommClose {
+ comm_id: comm.comm_id,
+ data: Default::default(),
+ }
+ .as_child_of(parent),
+ )
+ .await?;
+ }
+ JupyterMessageContent::HistoryRequest(_req) => {
+ connection
+ .send(
+ messaging::HistoryReply {
+ history: vec![],
+ error: None,
+ status: ReplyStatus::Ok,
+ }
+ .as_child_of(parent),
+ )
+ .await?;
+ }
+ JupyterMessageContent::InputReply(_rep) => {
+ // TODO(@zph): implement input reply from https://github.com/denoland/deno/pull/23592
+ // NOTE: This will belong on the stdin channel, not the shell channel
+ }
+ JupyterMessageContent::CommInfoRequest(_req) => {
+ connection
+ .send(
+ messaging::CommInfoReply {
+ comms: Default::default(),
+ status: ReplyStatus::Ok,
+ error: None,
+ }
+ .as_child_of(parent),
+ )
+ .await?;
+ }
+ JupyterMessageContent::CommMsg(_)
+ | JupyterMessageContent::CommClose(_) => {
+ // Do nothing with regular comm messages
+ }
+ // Any unknown message type is ignored
_ => {
- log::error!("Unrecognized shell message type: {}", msg.message_type());
+ log::error!(
+ "Unrecognized shell message type: {}",
+ msg.content.message_type()
+ );
}
}
self
- .send_iopub(
- &msg
- .new_message("status")
- .with_content(json!({"execution_state": "idle"})),
- )
+ .send_iopub(messaging::Status::idle().as_child_of(parent))
.await?;
+
Ok(())
}
async fn handle_execution_request(
&mut self,
- msg: JupyterMessage,
- connection: &mut Connection<zeromq::RouterSocket>,
+ execute_request: messaging::ExecuteRequest,
+ parent_message: &JupyterMessage,
+ connection: &mut KernelShellConnection,
) -> Result<(), AnyError> {
- if !msg.silent() && msg.store_history() {
+ if !execute_request.silent && execute_request.store_history {
self.execution_count += 1;
}
- *self.last_execution_request.borrow_mut() = Some(msg.clone());
+ *self.last_execution_request.borrow_mut() = Some(parent_message.clone());
self
- .send_iopub(&msg.new_message("execute_input").with_content(json!({
- "execution_count": self.execution_count,
- "code": msg.code()
- })))
+ .send_iopub(
+ messaging::ExecuteInput {
+ execution_count: self.execution_count,
+ code: execute_request.code.clone(),
+ }
+ .as_child_of(parent_message),
+ )
.await?;
let result = self
.repl_session
- .evaluate_line_with_object_wrapping(msg.code())
+ .evaluate_line_with_object_wrapping(&execute_request.code)
.await;
let evaluate_response = match result {
Ok(eval_response) => eval_response,
Err(err) => {
self
- .send_iopub(&msg.new_message("error").with_content(json!({
- "ename": err.to_string(),
- "evalue": " ", // Fake value, otherwise old Jupyter frontends don't show the error
- "traceback": [],
- })))
+ .send_iopub(
+ messaging::ErrorOutput {
+ ename: err.to_string(),
+ evalue: err.to_string(),
+ traceback: vec![],
+ }
+ .as_child_of(parent_message),
+ )
.await?;
connection
- .send(&msg.new_reply().with_content(json!({
- "status": "error",
- "execution_count": self.execution_count,
- })))
+ .send(
+ messaging::ExecuteReply {
+ execution_count: self.execution_count,
+ status: ReplyStatus::Error,
+ payload: None,
+ user_expressions: None,
+ error: None,
+ }
+ .as_child_of(parent_message),
+ )
.await?;
return Ok(());
}
@@ -373,11 +476,16 @@ impl JupyterServer {
.await?;
connection
- .send(&msg.new_reply().with_content(json!({
- "status": "ok",
- "execution_count": self.execution_count,
- // FIXME: also include user_expressions
- })))
+ .send(
+ messaging::ExecuteReply {
+ execution_count: self.execution_count,
+ status: ReplyStatus::Ok,
+ user_expressions: None,
+ payload: None,
+ error: None,
+ }
+ .as_child_of(parent_message),
+ )
.await?;
// Let's sleep here for a few ms, so we give a chance to the task that is
// handling stdout and stderr streams to receive and flush the content.
@@ -458,17 +566,30 @@ impl JupyterServer {
};
self
- .send_iopub(&msg.new_message("error").with_content(json!({
- "ename": ename,
- "evalue": evalue,
- "traceback": traceback,
- })))
+ .send_iopub(
+ messaging::ErrorOutput {
+ ename: ename.clone(),
+ evalue: evalue.clone(),
+ traceback: traceback.clone(),
+ }
+ .as_child_of(parent_message),
+ )
.await?;
connection
- .send(&msg.new_reply().with_content(json!({
- "status": "error",
- "execution_count": self.execution_count,
- })))
+ .send(
+ messaging::ExecuteReply {
+ execution_count: self.execution_count,
+ status: ReplyStatus::Error,
+ error: Some(ReplyError {
+ ename,
+ evalue,
+ traceback,
+ }),
+ user_expressions: None,
+ payload: None,
+ }
+ .as_child_of(parent_message),
+ )
.await?;
}
@@ -477,42 +598,35 @@ impl JupyterServer {
async fn send_iopub(
&mut self,
- message: &JupyterMessage,
+ message: JupyterMessage,
) -> Result<(), AnyError> {
- self.iopub_socket.lock().await.send(message).await
+ self.iopub_connection.lock().await.send(message).await
}
}
-async fn bind_socket<S: zeromq::Socket>(
- config: &ConnectionSpec,
- port: u32,
-) -> Result<Connection<S>, AnyError> {
- let endpoint = format!("{}://{}:{}", config.transport, config.ip, port);
- let mut socket = S::new();
- socket.bind(&endpoint).await?;
- Ok(Connection::new(socket, &config.key))
-}
-
-fn kernel_info() -> serde_json::Value {
- json!({
- "status": "ok",
- "protocol_version": "5.3",
- "implementation_version": crate::version::deno(),
- "implementation": "Deno kernel",
- "language_info": {
- "name": "typescript",
- "version": crate::version::TYPESCRIPT,
- "mimetype": "text/x.typescript",
- "file_extension": ".ts",
- "pygments_lexer": "typescript",
- "nb_converter": "script"
+fn kernel_info() -> messaging::KernelInfoReply {
+ messaging::KernelInfoReply {
+ status: ReplyStatus::Ok,
+ protocol_version: "5.3".to_string(),
+ implementation: "Deno kernel".to_string(),
+ implementation_version: crate::version::deno().to_string(),
+ language_info: messaging::LanguageInfo {
+ name: "typescript".to_string(),
+ version: crate::version::TYPESCRIPT.to_string(),
+ mimetype: "text/x.typescript".to_string(),
+ file_extension: ".ts".to_string(),
+ pygments_lexer: "typescript".to_string(),
+ codemirror_mode: messaging::CodeMirrorMode::typescript(),
+ nbconvert_exporter: "script".to_string(),
},
- "help_links": [{
- "text": "Visit Deno manual",
- "url": "https://deno.land/manual"
+ banner: "Welcome to Deno kernel".to_string(),
+ help_links: vec![messaging::HelpLink {
+ text: "Visit Deno manual".to_string(),
+ url: "https://deno.land/manual".to_string(),
}],
- "banner": "Welcome to Deno kernel",
- })
+ debugger: false,
+ error: None,
+ }
}
async fn publish_result(