summaryrefslogtreecommitdiff
path: root/cli/tools/jupyter/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/tools/jupyter/server.rs')
-rw-r--r--cli/tools/jupyter/server.rs724
1 files changed, 724 insertions, 0 deletions
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs
new file mode 100644
index 000000000..c15dab6c2
--- /dev/null
+++ b/cli/tools/jupyter/server.rs
@@ -0,0 +1,724 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+// This file is forked/ported from <https://github.com/evcxr/evcxr>
+// Copyright 2020 The Evcxr Authors. MIT license.
+
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::sync::Arc;
+
+use crate::tools::repl;
+use crate::tools::repl::cdp;
+use deno_core::error::AnyError;
+use deno_core::futures;
+use deno_core::futures::channel::mpsc;
+use deno_core::futures::StreamExt;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
+use tokio::sync::Mutex;
+use zeromq::SocketRecv;
+use zeromq::SocketSend;
+
+use super::jupyter_msg::Connection;
+use super::jupyter_msg::JupyterMessage;
+use super::ConnectionSpec;
+
+pub enum StdioMsg {
+ Stdout(String),
+ Stderr(String),
+}
+
+pub struct JupyterServer {
+ execution_count: usize,
+ last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
+ // This is Arc<Mutex<>>, so we don't hold RefCell borrows across await
+ // points.
+ iopub_socket: Arc<Mutex<Connection<zeromq::PubSocket>>>,
+ repl_session: repl::ReplSession,
+}
+
+impl JupyterServer {
+ pub async fn start(
+ spec: ConnectionSpec,
+ mut stdio_rx: mpsc::UnboundedReceiver<StdioMsg>,
+ repl_session: repl::ReplSession,
+ ) -> Result<(), AnyError> {
+ let mut heartbeat =
+ bind_socket::<zeromq::RepSocket>(&spec, spec.hb_port).await?;
+ let shell_socket =
+ bind_socket::<zeromq::RouterSocket>(&spec, spec.shell_port).await?;
+ let control_socket =
+ bind_socket::<zeromq::RouterSocket>(&spec, spec.control_port).await?;
+ let _stdin_socket =
+ bind_socket::<zeromq::RouterSocket>(&spec, spec.stdin_port).await?;
+ let iopub_socket =
+ bind_socket::<zeromq::PubSocket>(&spec, spec.iopub_port).await?;
+ let iopub_socket = Arc::new(Mutex::new(iopub_socket));
+ let last_execution_request = Rc::new(RefCell::new(None));
+
+ let cancel_handle = CancelHandle::new_rc();
+ let cancel_handle2 = CancelHandle::new_rc();
+
+ let mut server = Self {
+ execution_count: 0,
+ iopub_socket: iopub_socket.clone(),
+ last_execution_request: last_execution_request.clone(),
+ repl_session,
+ };
+
+ let handle1 = deno_core::unsync::spawn(async move {
+ if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await {
+ eprintln!("Heartbeat error: {}", err);
+ }
+ });
+
+ let handle2 = deno_core::unsync::spawn(async move {
+ if let Err(err) =
+ Self::handle_control(control_socket, cancel_handle2).await
+ {
+ eprintln!("Control error: {}", err);
+ }
+ });
+
+ let handle3 = deno_core::unsync::spawn(async move {
+ if let Err(err) = server.handle_shell(shell_socket).await {
+ eprintln!("Shell error: {}", err);
+ }
+ });
+
+ let handle4 = deno_core::unsync::spawn(async move {
+ while let Some(stdio_msg) = stdio_rx.next().await {
+ Self::handle_stdio_msg(
+ iopub_socket.clone(),
+ last_execution_request.clone(),
+ stdio_msg,
+ )
+ .await;
+ }
+ });
+
+ let join_fut =
+ futures::future::try_join_all(vec![handle1, handle2, handle3, handle4]);
+
+ if let Ok(result) = join_fut.or_cancel(cancel_handle).await {
+ result?;
+ }
+
+ Ok(())
+ }
+
+ async fn handle_stdio_msg<S: zeromq::SocketSend>(
+ iopub_socket: Arc<Mutex<Connection<S>>>,
+ last_execution_request: Rc<RefCell<Option<JupyterMessage>>>,
+ stdio_msg: StdioMsg,
+ ) {
+ let maybe_exec_result = last_execution_request.borrow().clone();
+ if let Some(exec_request) = maybe_exec_result {
+ let (name, text) = match stdio_msg {
+ StdioMsg::Stdout(text) => ("stdout", text),
+ StdioMsg::Stderr(text) => ("stderr", text),
+ };
+
+ let result = exec_request
+ .new_message("stream")
+ .with_content(json!({
+ "name": name,
+ "text": text
+ }))
+ .send(&mut *iopub_socket.lock().await)
+ .await;
+
+ if let Err(err) = result {
+ eprintln!("Output {} error: {}", name, err);
+ }
+ }
+ }
+
+ async fn handle_heartbeat(
+ connection: &mut Connection<zeromq::RepSocket>,
+ ) -> Result<(), AnyError> {
+ loop {
+ connection.socket.recv().await?;
+ connection
+ .socket
+ .send(zeromq::ZmqMessage::from(b"ping".to_vec()))
+ .await?;
+ }
+ }
+
+ async fn handle_control(
+ mut connection: Connection<zeromq::RouterSocket>,
+ cancel_handle: Rc<CancelHandle>,
+ ) -> Result<(), AnyError> {
+ loop {
+ let msg = JupyterMessage::read(&mut connection).await?;
+ match msg.message_type() {
+ "kernel_info_request" => {
+ msg
+ .new_reply()
+ .with_content(kernel_info())
+ .send(&mut connection)
+ .await?;
+ }
+ "shutdown_request" => {
+ cancel_handle.cancel();
+ }
+ "interrupt_request" => {
+ eprintln!("Interrupt request currently not supported");
+ }
+ _ => {
+ eprintln!(
+ "Unrecognized control message type: {}",
+ msg.message_type()
+ );
+ }
+ }
+ }
+ }
+
+ async fn handle_shell(
+ &mut self,
+ mut connection: Connection<zeromq::RouterSocket>,
+ ) -> Result<(), AnyError> {
+ loop {
+ let msg = JupyterMessage::read(&mut connection).await?;
+ self.handle_shell_message(msg, &mut connection).await?;
+ }
+ }
+
+ async fn handle_shell_message(
+ &mut self,
+ msg: JupyterMessage,
+ connection: &mut Connection<zeromq::RouterSocket>,
+ ) -> Result<(), AnyError> {
+ msg
+ .new_message("status")
+ .with_content(json!({"execution_state": "busy"}))
+ .send(&mut *self.iopub_socket.lock().await)
+ .await?;
+
+ match msg.message_type() {
+ "kernel_info_request" => {
+ msg
+ .new_reply()
+ .with_content(kernel_info())
+ .send(connection)
+ .await?;
+ }
+ "is_complete_request" => {
+ msg
+ .new_reply()
+ .with_content(json!({"status": "complete"}))
+ .send(connection)
+ .await?;
+ }
+ "execute_request" => {
+ self
+ .handle_execution_request(msg.clone(), connection)
+ .await?;
+ }
+ "comm_open" => {
+ msg
+ .comm_close_message()
+ .send(&mut *self.iopub_socket.lock().await)
+ .await?;
+ }
+ "complete_request" => {
+ let user_code = msg.code();
+ let cursor_pos = msg.cursor_pos();
+
+ let lsp_completions = self
+ .repl_session
+ .language_server
+ .completions(user_code, cursor_pos)
+ .await;
+
+ if !lsp_completions.is_empty() {
+ let matches: Vec<String> = lsp_completions
+ .iter()
+ .map(|item| item.new_text.clone())
+ .collect();
+
+ let cursor_start = lsp_completions
+ .first()
+ .map(|item| item.range.start)
+ .unwrap_or(cursor_pos);
+
+ let cursor_end = lsp_completions
+ .last()
+ .map(|item| item.range.end)
+ .unwrap_or(cursor_pos);
+
+ msg
+ .new_reply()
+ .with_content(json!({
+ "status": "ok",
+ "matches": matches,
+ "cursor_start": cursor_start,
+ "cursor_end": cursor_end,
+ "metadata": {},
+ }))
+ .send(connection)
+ .await?;
+ } else {
+ let expr = get_expr_from_line_at_pos(user_code, cursor_pos);
+ // check if the expression is in the form `obj.prop`
+ let (completions, cursor_start) = if let Some(index) = expr.rfind('.')
+ {
+ let sub_expr = &expr[..index];
+ let prop_name = &expr[index + 1..];
+ let candidates =
+ get_expression_property_names(&mut self.repl_session, sub_expr)
+ .await
+ .into_iter()
+ .filter(|n| {
+ !n.starts_with("Symbol(")
+ && n.starts_with(prop_name)
+ && n != &*repl::REPL_INTERNALS_NAME
+ })
+ .collect();
+
+ (candidates, cursor_pos - prop_name.len())
+ } else {
+ // combine results of declarations and globalThis properties
+ let mut candidates = get_expression_property_names(
+ &mut self.repl_session,
+ "globalThis",
+ )
+ .await
+ .into_iter()
+ .chain(get_global_lexical_scope_names(&mut self.repl_session).await)
+ .filter(|n| n.starts_with(expr) && n != &*repl::REPL_INTERNALS_NAME)
+ .collect::<Vec<_>>();
+
+ // sort and remove duplicates
+ candidates.sort();
+ candidates.dedup(); // make sure to sort first
+
+ (candidates, cursor_pos - expr.len())
+ };
+ msg
+ .new_reply()
+ .with_content(json!({
+ "status": "ok",
+ "matches": completions,
+ "cursor_start": cursor_start,
+ "cursor_end": cursor_pos,
+ "metadata": {},
+ }))
+ .send(connection)
+ .await?;
+ }
+ }
+ "comm_msg" | "comm_info_request" | "history_request" => {
+ // We don't handle these messages
+ }
+ _ => {
+ eprintln!("Unrecognized shell message type: {}", msg.message_type());
+ }
+ }
+
+ msg
+ .new_message("status")
+ .with_content(json!({"execution_state": "idle"}))
+ .send(&mut *self.iopub_socket.lock().await)
+ .await?;
+ Ok(())
+ }
+
+ async fn handle_execution_request(
+ &mut self,
+ msg: JupyterMessage,
+ connection: &mut Connection<zeromq::RouterSocket>,
+ ) -> Result<(), AnyError> {
+ self.execution_count += 1;
+ *self.last_execution_request.borrow_mut() = Some(msg.clone());
+
+ msg
+ .new_message("execute_input")
+ .with_content(json!({
+ "execution_count": self.execution_count,
+ "code": msg.code()
+ }))
+ .send(&mut *self.iopub_socket.lock().await)
+ .await?;
+
+ let result = self
+ .repl_session
+ .evaluate_line_with_object_wrapping(msg.code())
+ .await;
+
+ let evaluate_response = match result {
+ Ok(eval_response) => eval_response,
+ Err(err) => {
+ msg
+ .new_message("error")
+ .with_content(json!({
+ "ename": err.to_string(),
+ "evalue": "",
+ "traceback": [],
+ }))
+ .send(&mut *self.iopub_socket.lock().await)
+ .await?;
+ msg
+ .new_reply()
+ .with_content(json!({
+ "status": "error",
+ "execution_count": self.execution_count,
+ }))
+ .send(connection)
+ .await?;
+ return Ok(());
+ }
+ };
+
+ let repl::cdp::EvaluateResponse {
+ result,
+ exception_details,
+ } = evaluate_response.value;
+
+ if exception_details.is_none() {
+ let output =
+ get_jupyter_display_or_eval_value(&mut self.repl_session, &result)
+ .await?;
+ msg
+ .new_message("execute_result")
+ .with_content(json!({
+ "execution_count": self.execution_count,
+ "data": output,
+ "metadata": {},
+ }))
+ .send(&mut *self.iopub_socket.lock().await)
+ .await?;
+ msg
+ .new_reply()
+ .with_content(json!({
+ "status": "ok",
+ "execution_count": self.execution_count,
+ }))
+ .send(connection)
+ .await?;
+ // Let's sleep here for a few ms, so we give a chance to the task that is
+ // handling stdout and stderr streams to receive and flush the content.
+ // Otherwise, executing multiple cells one-by-one might lead to output
+ // from various cells be grouped together in another cell result.
+ tokio::time::sleep(std::time::Duration::from_millis(5)).await;
+ } else {
+ let exception_details = exception_details.unwrap();
+ let name = if let Some(exception) = exception_details.exception {
+ if let Some(description) = exception.description {
+ description
+ } else if let Some(value) = exception.value {
+ value.to_string()
+ } else {
+ "undefined".to_string()
+ }
+ } else {
+ "Unknown exception".to_string()
+ };
+
+ // TODO(bartlomieju): fill all the fields
+ msg
+ .new_message("error")
+ .with_content(json!({
+ "ename": name,
+ "evalue": "",
+ "traceback": [],
+ }))
+ .send(&mut *self.iopub_socket.lock().await)
+ .await?;
+ msg
+ .new_reply()
+ .with_content(json!({
+ "status": "error",
+ "execution_count": self.execution_count,
+ }))
+ .send(connection)
+ .await?;
+ }
+
+ Ok(())
+ }
+}
+
+async fn bind_socket<S: zeromq::Socket>(
+ config: &ConnectionSpec,
+ port: u32,
+) -> Result<Connection<S>, AnyError> {
+ let endpoint = format!("{}://{}:{}", config.transport, config.ip, port);
+ let mut socket = S::new();
+ socket.bind(&endpoint).await?;
+ Ok(Connection::new(socket, &config.key))
+}
+
+fn kernel_info() -> serde_json::Value {
+ json!({
+ "status": "ok",
+ "protocol_version": "5.3",
+ "implementation_version": crate::version::deno(),
+ "implementation": "Deno kernel",
+ "language_info": {
+ "name": "typescript",
+ "version": crate::version::TYPESCRIPT,
+ "mimetype": "text/x.typescript",
+ "file_extension": ".ts",
+ "pygments_lexer": "typescript",
+ "nb_converter": "script"
+ },
+ "help_links": [{
+ "text": "Visit Deno manual",
+ "url": "https://deno.land/manual"
+ }],
+ "banner": "Welcome to Deno kernel",
+ })
+}
+
+async fn get_jupyter_display(
+ session: &mut repl::ReplSession,
+ evaluate_result: &cdp::RemoteObject,
+) -> Result<Option<HashMap<String, serde_json::Value>>, AnyError> {
+ let mut data = HashMap::default();
+ let response = session
+ .call_function_on_args(
+ r#"function (object) {{
+ return object[Symbol.for("Jupyter.display")]();
+ }}"#
+ .to_string(),
+ &[evaluate_result.clone()],
+ )
+ .await?;
+
+ if response.exception_details.is_some() {
+ return Ok(None);
+ }
+
+ let object_id = response.result.object_id.unwrap();
+
+ let get_properties_response_result = session
+ .post_message_with_event_loop(
+ "Runtime.getProperties",
+ Some(cdp::GetPropertiesArgs {
+ object_id,
+ own_properties: Some(true),
+ accessor_properties_only: None,
+ generate_preview: None,
+ non_indexed_properties_only: Some(true),
+ }),
+ )
+ .await;
+
+ let Ok(get_properties_response) = get_properties_response_result else {
+ return Ok(None);
+ };
+
+ let get_properties_response: cdp::GetPropertiesResponse =
+ serde_json::from_value(get_properties_response).unwrap();
+
+ for prop in get_properties_response.result.into_iter() {
+ if let Some(value) = &prop.value {
+ data.insert(
+ prop.name.to_string(),
+ value
+ .value
+ .clone()
+ .unwrap_or_else(|| serde_json::Value::Null),
+ );
+ }
+ }
+
+ if !data.is_empty() {
+ return Ok(Some(data));
+ }
+
+ Ok(None)
+}
+
+async fn get_jupyter_display_or_eval_value(
+ session: &mut repl::ReplSession,
+ evaluate_result: &cdp::RemoteObject,
+) -> Result<HashMap<String, serde_json::Value>, AnyError> {
+ // Printing "undefined" generates a lot of noise, so let's skip
+ // these.
+ if evaluate_result.kind == "undefined" {
+ return Ok(HashMap::default());
+ }
+
+ if let Some(data) = get_jupyter_display(session, evaluate_result).await? {
+ return Ok(data);
+ }
+
+ let response = session
+ .call_function_on_args(
+ format!(
+ r#"function (object) {{
+ try {{
+ return {0}.inspectArgs(["%o", object], {{ colors: !{0}.noColor }});
+ }} catch (err) {{
+ return {0}.inspectArgs(["%o", err]);
+ }}
+ }}"#,
+ *repl::REPL_INTERNALS_NAME
+ ),
+ &[evaluate_result.clone()],
+ )
+ .await?;
+ let mut data = HashMap::default();
+ if let Some(value) = response.result.value {
+ data.insert("text/plain".to_string(), value);
+ }
+
+ Ok(data)
+}
+
+// TODO(bartlomieju): dedup with repl::editor
+fn get_expr_from_line_at_pos(line: &str, cursor_pos: usize) -> &str {
+ let start = line[..cursor_pos].rfind(is_word_boundary).unwrap_or(0);
+ let end = line[cursor_pos..]
+ .rfind(is_word_boundary)
+ .map(|i| cursor_pos + i)
+ .unwrap_or(cursor_pos);
+
+ let word = &line[start..end];
+ let word = word.strip_prefix(is_word_boundary).unwrap_or(word);
+ let word = word.strip_suffix(is_word_boundary).unwrap_or(word);
+
+ word
+}
+
+// TODO(bartlomieju): dedup with repl::editor
+fn is_word_boundary(c: char) -> bool {
+ if matches!(c, '.' | '_' | '$') {
+ false
+ } else {
+ char::is_ascii_whitespace(&c) || char::is_ascii_punctuation(&c)
+ }
+}
+
+// TODO(bartlomieju): dedup with repl::editor
+async fn get_global_lexical_scope_names(
+ session: &mut repl::ReplSession,
+) -> Vec<String> {
+ let evaluate_response = session
+ .post_message_with_event_loop(
+ "Runtime.globalLexicalScopeNames",
+ Some(cdp::GlobalLexicalScopeNamesArgs {
+ execution_context_id: Some(session.context_id),
+ }),
+ )
+ .await
+ .unwrap();
+ let evaluate_response: cdp::GlobalLexicalScopeNamesResponse =
+ serde_json::from_value(evaluate_response).unwrap();
+ evaluate_response.names
+}
+
+// TODO(bartlomieju): dedup with repl::editor
+async fn get_expression_property_names(
+ session: &mut repl::ReplSession,
+ expr: &str,
+) -> Vec<String> {
+ // try to get the properties from the expression
+ if let Some(properties) = get_object_expr_properties(session, expr).await {
+ return properties;
+ }
+
+ // otherwise fall back to the prototype
+ let expr_type = get_expression_type(session, expr).await;
+ let object_expr = match expr_type.as_deref() {
+ // possibilities: https://chromedevtools.github.io/devtools-protocol/v8/Runtime/#type-RemoteObject
+ Some("object") => "Object.prototype",
+ Some("function") => "Function.prototype",
+ Some("string") => "String.prototype",
+ Some("boolean") => "Boolean.prototype",
+ Some("bigint") => "BigInt.prototype",
+ Some("number") => "Number.prototype",
+ _ => return Vec::new(), // undefined, symbol, and unhandled
+ };
+
+ get_object_expr_properties(session, object_expr)
+ .await
+ .unwrap_or_default()
+}
+
+// TODO(bartlomieju): dedup with repl::editor
+async fn get_expression_type(
+ session: &mut repl::ReplSession,
+ expr: &str,
+) -> Option<String> {
+ evaluate_expression(session, expr)
+ .await
+ .map(|res| res.result.kind)
+}
+
+// TODO(bartlomieju): dedup with repl::editor
+async fn get_object_expr_properties(
+ session: &mut repl::ReplSession,
+ object_expr: &str,
+) -> Option<Vec<String>> {
+ let evaluate_result = evaluate_expression(session, object_expr).await?;
+ let object_id = evaluate_result.result.object_id?;
+
+ let get_properties_response = session
+ .post_message_with_event_loop(
+ "Runtime.getProperties",
+ Some(cdp::GetPropertiesArgs {
+ object_id,
+ own_properties: None,
+ accessor_properties_only: None,
+ generate_preview: None,
+ non_indexed_properties_only: Some(true),
+ }),
+ )
+ .await
+ .ok()?;
+ let get_properties_response: cdp::GetPropertiesResponse =
+ serde_json::from_value(get_properties_response).ok()?;
+ Some(
+ get_properties_response
+ .result
+ .into_iter()
+ .map(|prop| prop.name)
+ .collect(),
+ )
+}
+
+// TODO(bartlomieju): dedup with repl::editor
+async fn evaluate_expression(
+ session: &mut repl::ReplSession,
+ expr: &str,
+) -> Option<cdp::EvaluateResponse> {
+ let evaluate_response = session
+ .post_message_with_event_loop(
+ "Runtime.evaluate",
+ Some(cdp::EvaluateArgs {
+ expression: expr.to_string(),
+ object_group: None,
+ include_command_line_api: None,
+ silent: None,
+ context_id: Some(session.context_id),
+ return_by_value: None,
+ generate_preview: None,
+ user_gesture: None,
+ await_promise: None,
+ throw_on_side_effect: Some(true),
+ timeout: Some(200),
+ disable_breaks: None,
+ repl_mode: None,
+ allow_unsafe_eval_blocked_by_csp: None,
+ unique_context_id: None,
+ }),
+ )
+ .await
+ .ok()?;
+ let evaluate_response: cdp::EvaluateResponse =
+ serde_json::from_value(evaluate_response).ok()?;
+
+ if evaluate_response.exception_details.is_some() {
+ None
+ } else {
+ Some(evaluate_response)
+ }
+}