diff options
Diffstat (limited to 'cli/tools/jupyter')
-rw-r--r-- | cli/tools/jupyter/install.rs | 95 | ||||
-rw-r--r-- | cli/tools/jupyter/jupyter_msg.rs | 268 | ||||
-rw-r--r-- | cli/tools/jupyter/mod.rs | 139 | ||||
-rw-r--r-- | cli/tools/jupyter/resources/deno-logo-32x32.png | bin | 0 -> 1029 bytes | |||
-rw-r--r-- | cli/tools/jupyter/resources/deno-logo-64x64.png | bin | 0 -> 2066 bytes | |||
-rw-r--r-- | cli/tools/jupyter/server.rs | 724 |
6 files changed, 1226 insertions, 0 deletions
diff --git a/cli/tools/jupyter/install.rs b/cli/tools/jupyter/install.rs new file mode 100644 index 000000000..d1777d92d --- /dev/null +++ b/cli/tools/jupyter/install.rs @@ -0,0 +1,95 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use deno_core::anyhow::bail; +use deno_core::anyhow::Context; +use deno_core::error::AnyError; +use deno_core::serde_json; +use deno_core::serde_json::json; +use std::env::current_exe; +use std::io::Write; +use std::path::Path; +use tempfile::TempDir; + +const DENO_ICON_32: &[u8] = include_bytes!("./resources/deno-logo-32x32.png"); +const DENO_ICON_64: &[u8] = include_bytes!("./resources/deno-logo-64x64.png"); + +pub fn status() -> Result<(), AnyError> { + let output = std::process::Command::new("jupyter") + .args(["kernelspec", "list", "--json"]) + .output() + .context("Failed to get list of installed kernelspecs")?; + let json_output: serde_json::Value = + serde_json::from_slice(&output.stdout) + .context("Failed to parse JSON from kernelspec list")?; + + if let Some(specs) = json_output.get("kernelspecs") { + if let Some(specs_obj) = specs.as_object() { + if specs_obj.contains_key("deno") { + println!("â
Deno kernel already installed"); + return Ok(()); + } + } + } + + println!("âšī¸ Deno kernel is not yet installed, run `deno jupyter --unstable --install` to set it up"); + Ok(()) +} + +fn install_icon( + dir_path: &Path, + filename: &str, + icon_data: &[u8], +) -> Result<(), AnyError> { + let path = dir_path.join(filename); + let mut file = std::fs::File::create(path)?; + file.write_all(icon_data)?; + Ok(()) +} + +pub fn install() -> Result<(), AnyError> { + let temp_dir = TempDir::new().unwrap(); + let kernel_json_path = temp_dir.path().join("kernel.json"); + + // TODO(bartlomieju): add remaining fields as per + // https://jupyter-client.readthedocs.io/en/stable/kernels.html#kernel-specs + // FIXME(bartlomieju): replace `current_exe` before landing? + let json_data = json!({ + "argv": [current_exe().unwrap().to_string_lossy(), "--unstable", "jupyter", "--kernel", "--conn", "{connection_file}"], + "display_name": "Deno", + "language": "typescript", + }); + + let f = std::fs::File::create(kernel_json_path)?; + serde_json::to_writer_pretty(f, &json_data)?; + install_icon(temp_dir.path(), "icon-32x32.png", DENO_ICON_32)?; + install_icon(temp_dir.path(), "icon-64x64.png", DENO_ICON_64)?; + + let child_result = std::process::Command::new("jupyter") + .args([ + "kernelspec", + "install", + "--user", + "--name", + "deno", + &temp_dir.path().to_string_lossy(), + ]) + .spawn(); + + if let Ok(mut child) = child_result { + let wait_result = child.wait(); + match wait_result { + Ok(status) => { + if !status.success() { + bail!("Failed to install kernelspec, try again."); + } + } + Err(err) => { + bail!("Failed to install kernelspec: {}", err); + } + } + } + + let _ = std::fs::remove_dir(temp_dir); + println!("â
Deno kernelspec installed successfully."); + Ok(()) +} diff --git a/cli/tools/jupyter/jupyter_msg.rs b/cli/tools/jupyter/jupyter_msg.rs new file mode 100644 index 000000000..c28dd3b48 --- /dev/null +++ b/cli/tools/jupyter/jupyter_msg.rs @@ -0,0 +1,268 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// This file is forked/ported from <https://github.com/evcxr/evcxr> +// Copyright 2020 The Evcxr Authors. MIT license. + +use bytes::Bytes; +use chrono::Utc; +use data_encoding::HEXLOWER; +use deno_core::anyhow::anyhow; +use deno_core::anyhow::bail; +use deno_core::error::AnyError; +use deno_core::serde_json; +use deno_core::serde_json::json; +use ring::hmac; +use std::fmt; +use uuid::Uuid; + +pub(crate) struct Connection<S> { + pub(crate) socket: S, + /// Will be None if our key was empty (digest authentication disabled). + pub(crate) mac: Option<hmac::Key>, +} + +impl<S: zeromq::Socket> Connection<S> { + pub(crate) fn new(socket: S, key: &str) -> Self { + let mac = if key.is_empty() { + None + } else { + Some(hmac::Key::new(hmac::HMAC_SHA256, key.as_bytes())) + }; + Connection { socket, mac } + } +} + +struct RawMessage { + zmq_identities: Vec<Bytes>, + jparts: Vec<Bytes>, +} + +impl RawMessage { + pub(crate) async fn read<S: zeromq::SocketRecv>( + connection: &mut Connection<S>, + ) -> Result<RawMessage, AnyError> { + Self::from_multipart(connection.socket.recv().await?, connection) + } + + pub(crate) fn from_multipart<S>( + multipart: zeromq::ZmqMessage, + connection: &Connection<S>, + ) -> Result<RawMessage, AnyError> { + let delimiter_index = multipart + .iter() + .position(|part| &part[..] == DELIMITER) + .ok_or_else(|| anyhow!("Missing delimiter"))?; + let mut parts = multipart.into_vec(); + let jparts: Vec<_> = parts.drain(delimiter_index + 2..).collect(); + let expected_hmac = parts.pop().unwrap(); + // Remove delimiter, so that what's left is just the identities. + parts.pop(); + let zmq_identities = parts; + + let raw_message = RawMessage { + zmq_identities, + jparts, + }; + + if let Some(key) = &connection.mac { + let sig = HEXLOWER.decode(&expected_hmac)?; + let mut msg = Vec::new(); + for part in &raw_message.jparts { + msg.extend(part); + } + + if let Err(err) = hmac::verify(key, msg.as_ref(), sig.as_ref()) { + bail!("{}", err); + } + } + + Ok(raw_message) + } + + async fn send<S: zeromq::SocketSend>( + self, + connection: &mut Connection<S>, + ) -> Result<(), AnyError> { + let hmac = if let Some(key) = &connection.mac { + let ctx = self.digest(key); + let tag = ctx.sign(); + HEXLOWER.encode(tag.as_ref()) + } else { + String::new() + }; + let mut parts: Vec<bytes::Bytes> = Vec::new(); + for part in &self.zmq_identities { + parts.push(part.to_vec().into()); + } + parts.push(DELIMITER.into()); + parts.push(hmac.as_bytes().to_vec().into()); + for part in &self.jparts { + parts.push(part.to_vec().into()); + } + // ZmqMessage::try_from only fails if parts is empty, which it never + // will be here. + let message = zeromq::ZmqMessage::try_from(parts).unwrap(); + connection.socket.send(message).await?; + Ok(()) + } + + fn digest(&self, mac: &hmac::Key) -> hmac::Context { + let mut hmac_ctx = hmac::Context::with_key(mac); + for part in &self.jparts { + hmac_ctx.update(part); + } + hmac_ctx + } +} + +#[derive(Clone)] +pub(crate) struct JupyterMessage { + zmq_identities: Vec<Bytes>, + header: serde_json::Value, + parent_header: serde_json::Value, + metadata: serde_json::Value, + content: serde_json::Value, +} + +const DELIMITER: &[u8] = b"<IDS|MSG>"; + +impl JupyterMessage { + pub(crate) async fn read<S: zeromq::SocketRecv>( + connection: &mut Connection<S>, + ) -> Result<JupyterMessage, AnyError> { + Self::from_raw_message(RawMessage::read(connection).await?) + } + + fn from_raw_message( + raw_message: RawMessage, + ) -> Result<JupyterMessage, AnyError> { + if raw_message.jparts.len() < 4 { + bail!("Insufficient message parts {}", raw_message.jparts.len()); + } + + Ok(JupyterMessage { + zmq_identities: raw_message.zmq_identities, + header: serde_json::from_slice(&raw_message.jparts[0])?, + parent_header: serde_json::from_slice(&raw_message.jparts[1])?, + metadata: serde_json::from_slice(&raw_message.jparts[2])?, + content: serde_json::from_slice(&raw_message.jparts[3])?, + }) + } + + pub(crate) fn message_type(&self) -> &str { + self.header["msg_type"].as_str().unwrap_or("") + } + + pub(crate) fn code(&self) -> &str { + self.content["code"].as_str().unwrap_or("") + } + + pub(crate) fn cursor_pos(&self) -> usize { + self.content["cursor_pos"].as_u64().unwrap_or(0) as usize + } + + pub(crate) fn comm_id(&self) -> &str { + self.content["comm_id"].as_str().unwrap_or("") + } + + // Creates a new child message of this message. ZMQ identities are not transferred. + pub(crate) fn new_message(&self, msg_type: &str) -> JupyterMessage { + let mut header = self.header.clone(); + header["msg_type"] = serde_json::Value::String(msg_type.to_owned()); + header["username"] = serde_json::Value::String("kernel".to_owned()); + header["msg_id"] = serde_json::Value::String(Uuid::new_v4().to_string()); + header["date"] = serde_json::Value::String(Utc::now().to_rfc3339()); + + JupyterMessage { + zmq_identities: Vec::new(), + header, + parent_header: self.header.clone(), + metadata: json!({}), + content: json!({}), + } + } + + // Creates a reply to this message. This is a child with the message type determined + // automatically by replacing "request" with "reply". ZMQ identities are transferred. + pub(crate) fn new_reply(&self) -> JupyterMessage { + let mut reply = + self.new_message(&self.message_type().replace("_request", "_reply")); + reply.zmq_identities = self.zmq_identities.clone(); + reply + } + + #[must_use = "Need to send this message for it to have any effect"] + pub(crate) fn comm_close_message(&self) -> JupyterMessage { + self.new_message("comm_close").with_content(json!({ + "comm_id": self.comm_id() + })) + } + + pub(crate) fn with_content( + mut self, + content: serde_json::Value, + ) -> JupyterMessage { + self.content = content; + self + } + + pub(crate) async fn send<S: zeromq::SocketSend>( + &self, + connection: &mut Connection<S>, + ) -> Result<(), AnyError> { + // If performance is a concern, we can probably avoid the clone and to_vec calls with a bit + // of refactoring. + let raw_message = RawMessage { + zmq_identities: self.zmq_identities.clone(), + jparts: vec![ + serde_json::to_string(&self.header) + .unwrap() + .as_bytes() + .to_vec() + .into(), + serde_json::to_string(&self.parent_header) + .unwrap() + .as_bytes() + .to_vec() + .into(), + serde_json::to_string(&self.metadata) + .unwrap() + .as_bytes() + .to_vec() + .into(), + serde_json::to_string(&self.content) + .unwrap() + .as_bytes() + .to_vec() + .into(), + ], + }; + raw_message.send(connection).await + } +} + +impl fmt::Debug for JupyterMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!( + f, + "\nHeader: {}", + serde_json::to_string_pretty(&self.header).unwrap() + )?; + writeln!( + f, + "Parent header: {}", + serde_json::to_string_pretty(&self.parent_header).unwrap() + )?; + writeln!( + f, + "Metadata: {}", + serde_json::to_string_pretty(&self.metadata).unwrap() + )?; + writeln!( + f, + "Content: {}\n", + serde_json::to_string_pretty(&self.content).unwrap() + )?; + Ok(()) + } +} diff --git a/cli/tools/jupyter/mod.rs b/cli/tools/jupyter/mod.rs new file mode 100644 index 000000000..b704d58cd --- /dev/null +++ b/cli/tools/jupyter/mod.rs @@ -0,0 +1,139 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use crate::args::Flags; +use crate::args::JupyterFlags; +use crate::tools::repl; +use crate::util::logger; +use crate::CliFactory; +use deno_core::anyhow::Context; +use deno_core::error::AnyError; +use deno_core::futures::channel::mpsc; +use deno_core::op; +use deno_core::resolve_url_or_path; +use deno_core::serde::Deserialize; +use deno_core::serde_json; +use deno_core::Op; +use deno_core::OpState; +use deno_runtime::permissions::Permissions; +use deno_runtime::permissions::PermissionsContainer; + +mod install; +mod jupyter_msg; +mod server; + +pub async fn kernel( + flags: Flags, + jupyter_flags: JupyterFlags, +) -> Result<(), AnyError> { + if !flags.unstable { + eprintln!( + "Unstable subcommand 'deno jupyter'. The --unstable flag must be provided." + ); + std::process::exit(70); + } + + if !jupyter_flags.install && !jupyter_flags.kernel { + install::status()?; + return Ok(()); + } + + if jupyter_flags.install { + install::install()?; + return Ok(()); + } + + let connection_filepath = jupyter_flags.conn_file.unwrap(); + + // This env var might be set by notebook + if std::env::var("DEBUG").is_ok() { + logger::init(Some(log::Level::Debug)); + } + + let factory = CliFactory::from_flags(flags).await?; + let cli_options = factory.cli_options(); + let main_module = + resolve_url_or_path("./$deno$jupyter.ts", cli_options.initial_cwd()) + .unwrap(); + // TODO(bartlomieju): should we run with all permissions? + let permissions = PermissionsContainer::new(Permissions::allow_all()); + let npm_resolver = factory.npm_resolver().await?.clone(); + let resolver = factory.resolver().await?.clone(); + let worker_factory = factory.create_cli_main_worker_factory().await?; + let (stdio_tx, stdio_rx) = mpsc::unbounded(); + + let conn_file = + std::fs::read_to_string(&connection_filepath).with_context(|| { + format!("Couldn't read connection file: {:?}", connection_filepath) + })?; + let spec: ConnectionSpec = + serde_json::from_str(&conn_file).with_context(|| { + format!( + "Connection file is not a valid JSON: {:?}", + connection_filepath + ) + })?; + + let mut worker = worker_factory + .create_custom_worker( + main_module.clone(), + permissions, + vec![deno_jupyter::init_ops(stdio_tx)], + Default::default(), + ) + .await?; + worker.setup_repl().await?; + let worker = worker.into_main_worker(); + let repl_session = + repl::ReplSession::initialize(cli_options, npm_resolver, resolver, worker) + .await?; + + server::JupyterServer::start(spec, stdio_rx, repl_session).await?; + + Ok(()) +} + +deno_core::extension!(deno_jupyter, + options = { + sender: mpsc::UnboundedSender<server::StdioMsg>, + }, + middleware = |op| match op.name { + "op_print" => op_print::DECL, + _ => op, + }, + state = |state, options| { + state.put(options.sender); + }, +); + +#[op] +pub fn op_print( + state: &mut OpState, + msg: String, + is_err: bool, +) -> Result<(), AnyError> { + let sender = state.borrow_mut::<mpsc::UnboundedSender<server::StdioMsg>>(); + + if is_err { + if let Err(err) = sender.unbounded_send(server::StdioMsg::Stderr(msg)) { + eprintln!("Failed to send stderr message: {}", err); + } + return Ok(()); + } + + if let Err(err) = sender.unbounded_send(server::StdioMsg::Stdout(msg)) { + eprintln!("Failed to send stdout message: {}", err); + } + Ok(()) +} + +#[derive(Debug, Deserialize)] +pub struct ConnectionSpec { + ip: String, + transport: String, + control_port: u32, + shell_port: u32, + stdin_port: u32, + hb_port: u32, + iopub_port: u32, + key: String, +} diff --git a/cli/tools/jupyter/resources/deno-logo-32x32.png b/cli/tools/jupyter/resources/deno-logo-32x32.png Binary files differnew file mode 100644 index 000000000..97871a02e --- /dev/null +++ b/cli/tools/jupyter/resources/deno-logo-32x32.png diff --git a/cli/tools/jupyter/resources/deno-logo-64x64.png b/cli/tools/jupyter/resources/deno-logo-64x64.png Binary files differnew file mode 100644 index 000000000..1b9444ef6 --- /dev/null +++ b/cli/tools/jupyter/resources/deno-logo-64x64.png diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs new file mode 100644 index 000000000..c15dab6c2 --- /dev/null +++ b/cli/tools/jupyter/server.rs @@ -0,0 +1,724 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// This file is forked/ported from <https://github.com/evcxr/evcxr> +// Copyright 2020 The Evcxr Authors. MIT license. + +use std::cell::RefCell; +use std::collections::HashMap; +use std::rc::Rc; +use std::sync::Arc; + +use crate::tools::repl; +use crate::tools::repl::cdp; +use deno_core::error::AnyError; +use deno_core::futures; +use deno_core::futures::channel::mpsc; +use deno_core::futures::StreamExt; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use tokio::sync::Mutex; +use zeromq::SocketRecv; +use zeromq::SocketSend; + +use super::jupyter_msg::Connection; +use super::jupyter_msg::JupyterMessage; +use super::ConnectionSpec; + +pub enum StdioMsg { + Stdout(String), + Stderr(String), +} + +pub struct JupyterServer { + execution_count: usize, + last_execution_request: Rc<RefCell<Option<JupyterMessage>>>, + // This is Arc<Mutex<>>, so we don't hold RefCell borrows across await + // points. + iopub_socket: Arc<Mutex<Connection<zeromq::PubSocket>>>, + repl_session: repl::ReplSession, +} + +impl JupyterServer { + pub async fn start( + spec: ConnectionSpec, + mut stdio_rx: mpsc::UnboundedReceiver<StdioMsg>, + repl_session: repl::ReplSession, + ) -> Result<(), AnyError> { + let mut heartbeat = + bind_socket::<zeromq::RepSocket>(&spec, spec.hb_port).await?; + let shell_socket = + bind_socket::<zeromq::RouterSocket>(&spec, spec.shell_port).await?; + let control_socket = + bind_socket::<zeromq::RouterSocket>(&spec, spec.control_port).await?; + let _stdin_socket = + bind_socket::<zeromq::RouterSocket>(&spec, spec.stdin_port).await?; + let iopub_socket = + bind_socket::<zeromq::PubSocket>(&spec, spec.iopub_port).await?; + let iopub_socket = Arc::new(Mutex::new(iopub_socket)); + let last_execution_request = Rc::new(RefCell::new(None)); + + let cancel_handle = CancelHandle::new_rc(); + let cancel_handle2 = CancelHandle::new_rc(); + + let mut server = Self { + execution_count: 0, + iopub_socket: iopub_socket.clone(), + last_execution_request: last_execution_request.clone(), + repl_session, + }; + + let handle1 = deno_core::unsync::spawn(async move { + if let Err(err) = Self::handle_heartbeat(&mut heartbeat).await { + eprintln!("Heartbeat error: {}", err); + } + }); + + let handle2 = deno_core::unsync::spawn(async move { + if let Err(err) = + Self::handle_control(control_socket, cancel_handle2).await + { + eprintln!("Control error: {}", err); + } + }); + + let handle3 = deno_core::unsync::spawn(async move { + if let Err(err) = server.handle_shell(shell_socket).await { + eprintln!("Shell error: {}", err); + } + }); + + let handle4 = deno_core::unsync::spawn(async move { + while let Some(stdio_msg) = stdio_rx.next().await { + Self::handle_stdio_msg( + iopub_socket.clone(), + last_execution_request.clone(), + stdio_msg, + ) + .await; + } + }); + + let join_fut = + futures::future::try_join_all(vec![handle1, handle2, handle3, handle4]); + + if let Ok(result) = join_fut.or_cancel(cancel_handle).await { + result?; + } + + Ok(()) + } + + async fn handle_stdio_msg<S: zeromq::SocketSend>( + iopub_socket: Arc<Mutex<Connection<S>>>, + last_execution_request: Rc<RefCell<Option<JupyterMessage>>>, + stdio_msg: StdioMsg, + ) { + let maybe_exec_result = last_execution_request.borrow().clone(); + if let Some(exec_request) = maybe_exec_result { + let (name, text) = match stdio_msg { + StdioMsg::Stdout(text) => ("stdout", text), + StdioMsg::Stderr(text) => ("stderr", text), + }; + + let result = exec_request + .new_message("stream") + .with_content(json!({ + "name": name, + "text": text + })) + .send(&mut *iopub_socket.lock().await) + .await; + + if let Err(err) = result { + eprintln!("Output {} error: {}", name, err); + } + } + } + + async fn handle_heartbeat( + connection: &mut Connection<zeromq::RepSocket>, + ) -> Result<(), AnyError> { + loop { + connection.socket.recv().await?; + connection + .socket + .send(zeromq::ZmqMessage::from(b"ping".to_vec())) + .await?; + } + } + + async fn handle_control( + mut connection: Connection<zeromq::RouterSocket>, + cancel_handle: Rc<CancelHandle>, + ) -> Result<(), AnyError> { + loop { + let msg = JupyterMessage::read(&mut connection).await?; + match msg.message_type() { + "kernel_info_request" => { + msg + .new_reply() + .with_content(kernel_info()) + .send(&mut connection) + .await?; + } + "shutdown_request" => { + cancel_handle.cancel(); + } + "interrupt_request" => { + eprintln!("Interrupt request currently not supported"); + } + _ => { + eprintln!( + "Unrecognized control message type: {}", + msg.message_type() + ); + } + } + } + } + + async fn handle_shell( + &mut self, + mut connection: Connection<zeromq::RouterSocket>, + ) -> Result<(), AnyError> { + loop { + let msg = JupyterMessage::read(&mut connection).await?; + self.handle_shell_message(msg, &mut connection).await?; + } + } + + async fn handle_shell_message( + &mut self, + msg: JupyterMessage, + connection: &mut Connection<zeromq::RouterSocket>, + ) -> Result<(), AnyError> { + msg + .new_message("status") + .with_content(json!({"execution_state": "busy"})) + .send(&mut *self.iopub_socket.lock().await) + .await?; + + match msg.message_type() { + "kernel_info_request" => { + msg + .new_reply() + .with_content(kernel_info()) + .send(connection) + .await?; + } + "is_complete_request" => { + msg + .new_reply() + .with_content(json!({"status": "complete"})) + .send(connection) + .await?; + } + "execute_request" => { + self + .handle_execution_request(msg.clone(), connection) + .await?; + } + "comm_open" => { + msg + .comm_close_message() + .send(&mut *self.iopub_socket.lock().await) + .await?; + } + "complete_request" => { + let user_code = msg.code(); + let cursor_pos = msg.cursor_pos(); + + let lsp_completions = self + .repl_session + .language_server + .completions(user_code, cursor_pos) + .await; + + if !lsp_completions.is_empty() { + let matches: Vec<String> = lsp_completions + .iter() + .map(|item| item.new_text.clone()) + .collect(); + + let cursor_start = lsp_completions + .first() + .map(|item| item.range.start) + .unwrap_or(cursor_pos); + + let cursor_end = lsp_completions + .last() + .map(|item| item.range.end) + .unwrap_or(cursor_pos); + + msg + .new_reply() + .with_content(json!({ + "status": "ok", + "matches": matches, + "cursor_start": cursor_start, + "cursor_end": cursor_end, + "metadata": {}, + })) + .send(connection) + .await?; + } else { + let expr = get_expr_from_line_at_pos(user_code, cursor_pos); + // check if the expression is in the form `obj.prop` + let (completions, cursor_start) = if let Some(index) = expr.rfind('.') + { + let sub_expr = &expr[..index]; + let prop_name = &expr[index + 1..]; + let candidates = + get_expression_property_names(&mut self.repl_session, sub_expr) + .await + .into_iter() + .filter(|n| { + !n.starts_with("Symbol(") + && n.starts_with(prop_name) + && n != &*repl::REPL_INTERNALS_NAME + }) + .collect(); + + (candidates, cursor_pos - prop_name.len()) + } else { + // combine results of declarations and globalThis properties + let mut candidates = get_expression_property_names( + &mut self.repl_session, + "globalThis", + ) + .await + .into_iter() + .chain(get_global_lexical_scope_names(&mut self.repl_session).await) + .filter(|n| n.starts_with(expr) && n != &*repl::REPL_INTERNALS_NAME) + .collect::<Vec<_>>(); + + // sort and remove duplicates + candidates.sort(); + candidates.dedup(); // make sure to sort first + + (candidates, cursor_pos - expr.len()) + }; + msg + .new_reply() + .with_content(json!({ + "status": "ok", + "matches": completions, + "cursor_start": cursor_start, + "cursor_end": cursor_pos, + "metadata": {}, + })) + .send(connection) + .await?; + } + } + "comm_msg" | "comm_info_request" | "history_request" => { + // We don't handle these messages + } + _ => { + eprintln!("Unrecognized shell message type: {}", msg.message_type()); + } + } + + msg + .new_message("status") + .with_content(json!({"execution_state": "idle"})) + .send(&mut *self.iopub_socket.lock().await) + .await?; + Ok(()) + } + + async fn handle_execution_request( + &mut self, + msg: JupyterMessage, + connection: &mut Connection<zeromq::RouterSocket>, + ) -> Result<(), AnyError> { + self.execution_count += 1; + *self.last_execution_request.borrow_mut() = Some(msg.clone()); + + msg + .new_message("execute_input") + .with_content(json!({ + "execution_count": self.execution_count, + "code": msg.code() + })) + .send(&mut *self.iopub_socket.lock().await) + .await?; + + let result = self + .repl_session + .evaluate_line_with_object_wrapping(msg.code()) + .await; + + let evaluate_response = match result { + Ok(eval_response) => eval_response, + Err(err) => { + msg + .new_message("error") + .with_content(json!({ + "ename": err.to_string(), + "evalue": "", + "traceback": [], + })) + .send(&mut *self.iopub_socket.lock().await) + .await?; + msg + .new_reply() + .with_content(json!({ + "status": "error", + "execution_count": self.execution_count, + })) + .send(connection) + .await?; + return Ok(()); + } + }; + + let repl::cdp::EvaluateResponse { + result, + exception_details, + } = evaluate_response.value; + + if exception_details.is_none() { + let output = + get_jupyter_display_or_eval_value(&mut self.repl_session, &result) + .await?; + msg + .new_message("execute_result") + .with_content(json!({ + "execution_count": self.execution_count, + "data": output, + "metadata": {}, + })) + .send(&mut *self.iopub_socket.lock().await) + .await?; + msg + .new_reply() + .with_content(json!({ + "status": "ok", + "execution_count": self.execution_count, + })) + .send(connection) + .await?; + // Let's sleep here for a few ms, so we give a chance to the task that is + // handling stdout and stderr streams to receive and flush the content. + // Otherwise, executing multiple cells one-by-one might lead to output + // from various cells be grouped together in another cell result. + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + } else { + let exception_details = exception_details.unwrap(); + let name = if let Some(exception) = exception_details.exception { + if let Some(description) = exception.description { + description + } else if let Some(value) = exception.value { + value.to_string() + } else { + "undefined".to_string() + } + } else { + "Unknown exception".to_string() + }; + + // TODO(bartlomieju): fill all the fields + msg + .new_message("error") + .with_content(json!({ + "ename": name, + "evalue": "", + "traceback": [], + })) + .send(&mut *self.iopub_socket.lock().await) + .await?; + msg + .new_reply() + .with_content(json!({ + "status": "error", + "execution_count": self.execution_count, + })) + .send(connection) + .await?; + } + + Ok(()) + } +} + +async fn bind_socket<S: zeromq::Socket>( + config: &ConnectionSpec, + port: u32, +) -> Result<Connection<S>, AnyError> { + let endpoint = format!("{}://{}:{}", config.transport, config.ip, port); + let mut socket = S::new(); + socket.bind(&endpoint).await?; + Ok(Connection::new(socket, &config.key)) +} + +fn kernel_info() -> serde_json::Value { + json!({ + "status": "ok", + "protocol_version": "5.3", + "implementation_version": crate::version::deno(), + "implementation": "Deno kernel", + "language_info": { + "name": "typescript", + "version": crate::version::TYPESCRIPT, + "mimetype": "text/x.typescript", + "file_extension": ".ts", + "pygments_lexer": "typescript", + "nb_converter": "script" + }, + "help_links": [{ + "text": "Visit Deno manual", + "url": "https://deno.land/manual" + }], + "banner": "Welcome to Deno kernel", + }) +} + +async fn get_jupyter_display( + session: &mut repl::ReplSession, + evaluate_result: &cdp::RemoteObject, +) -> Result<Option<HashMap<String, serde_json::Value>>, AnyError> { + let mut data = HashMap::default(); + let response = session + .call_function_on_args( + r#"function (object) {{ + return object[Symbol.for("Jupyter.display")](); + }}"# + .to_string(), + &[evaluate_result.clone()], + ) + .await?; + + if response.exception_details.is_some() { + return Ok(None); + } + + let object_id = response.result.object_id.unwrap(); + + let get_properties_response_result = session + .post_message_with_event_loop( + "Runtime.getProperties", + Some(cdp::GetPropertiesArgs { + object_id, + own_properties: Some(true), + accessor_properties_only: None, + generate_preview: None, + non_indexed_properties_only: Some(true), + }), + ) + .await; + + let Ok(get_properties_response) = get_properties_response_result else { + return Ok(None); + }; + + let get_properties_response: cdp::GetPropertiesResponse = + serde_json::from_value(get_properties_response).unwrap(); + + for prop in get_properties_response.result.into_iter() { + if let Some(value) = &prop.value { + data.insert( + prop.name.to_string(), + value + .value + .clone() + .unwrap_or_else(|| serde_json::Value::Null), + ); + } + } + + if !data.is_empty() { + return Ok(Some(data)); + } + + Ok(None) +} + +async fn get_jupyter_display_or_eval_value( + session: &mut repl::ReplSession, + evaluate_result: &cdp::RemoteObject, +) -> Result<HashMap<String, serde_json::Value>, AnyError> { + // Printing "undefined" generates a lot of noise, so let's skip + // these. + if evaluate_result.kind == "undefined" { + return Ok(HashMap::default()); + } + + if let Some(data) = get_jupyter_display(session, evaluate_result).await? { + return Ok(data); + } + + let response = session + .call_function_on_args( + format!( + r#"function (object) {{ + try {{ + return {0}.inspectArgs(["%o", object], {{ colors: !{0}.noColor }}); + }} catch (err) {{ + return {0}.inspectArgs(["%o", err]); + }} + }}"#, + *repl::REPL_INTERNALS_NAME + ), + &[evaluate_result.clone()], + ) + .await?; + let mut data = HashMap::default(); + if let Some(value) = response.result.value { + data.insert("text/plain".to_string(), value); + } + + Ok(data) +} + +// TODO(bartlomieju): dedup with repl::editor +fn get_expr_from_line_at_pos(line: &str, cursor_pos: usize) -> &str { + let start = line[..cursor_pos].rfind(is_word_boundary).unwrap_or(0); + let end = line[cursor_pos..] + .rfind(is_word_boundary) + .map(|i| cursor_pos + i) + .unwrap_or(cursor_pos); + + let word = &line[start..end]; + let word = word.strip_prefix(is_word_boundary).unwrap_or(word); + let word = word.strip_suffix(is_word_boundary).unwrap_or(word); + + word +} + +// TODO(bartlomieju): dedup with repl::editor +fn is_word_boundary(c: char) -> bool { + if matches!(c, '.' | '_' | '$') { + false + } else { + char::is_ascii_whitespace(&c) || char::is_ascii_punctuation(&c) + } +} + +// TODO(bartlomieju): dedup with repl::editor +async fn get_global_lexical_scope_names( + session: &mut repl::ReplSession, +) -> Vec<String> { + let evaluate_response = session + .post_message_with_event_loop( + "Runtime.globalLexicalScopeNames", + Some(cdp::GlobalLexicalScopeNamesArgs { + execution_context_id: Some(session.context_id), + }), + ) + .await + .unwrap(); + let evaluate_response: cdp::GlobalLexicalScopeNamesResponse = + serde_json::from_value(evaluate_response).unwrap(); + evaluate_response.names +} + +// TODO(bartlomieju): dedup with repl::editor +async fn get_expression_property_names( + session: &mut repl::ReplSession, + expr: &str, +) -> Vec<String> { + // try to get the properties from the expression + if let Some(properties) = get_object_expr_properties(session, expr).await { + return properties; + } + + // otherwise fall back to the prototype + let expr_type = get_expression_type(session, expr).await; + let object_expr = match expr_type.as_deref() { + // possibilities: https://chromedevtools.github.io/devtools-protocol/v8/Runtime/#type-RemoteObject + Some("object") => "Object.prototype", + Some("function") => "Function.prototype", + Some("string") => "String.prototype", + Some("boolean") => "Boolean.prototype", + Some("bigint") => "BigInt.prototype", + Some("number") => "Number.prototype", + _ => return Vec::new(), // undefined, symbol, and unhandled + }; + + get_object_expr_properties(session, object_expr) + .await + .unwrap_or_default() +} + +// TODO(bartlomieju): dedup with repl::editor +async fn get_expression_type( + session: &mut repl::ReplSession, + expr: &str, +) -> Option<String> { + evaluate_expression(session, expr) + .await + .map(|res| res.result.kind) +} + +// TODO(bartlomieju): dedup with repl::editor +async fn get_object_expr_properties( + session: &mut repl::ReplSession, + object_expr: &str, +) -> Option<Vec<String>> { + let evaluate_result = evaluate_expression(session, object_expr).await?; + let object_id = evaluate_result.result.object_id?; + + let get_properties_response = session + .post_message_with_event_loop( + "Runtime.getProperties", + Some(cdp::GetPropertiesArgs { + object_id, + own_properties: None, + accessor_properties_only: None, + generate_preview: None, + non_indexed_properties_only: Some(true), + }), + ) + .await + .ok()?; + let get_properties_response: cdp::GetPropertiesResponse = + serde_json::from_value(get_properties_response).ok()?; + Some( + get_properties_response + .result + .into_iter() + .map(|prop| prop.name) + .collect(), + ) +} + +// TODO(bartlomieju): dedup with repl::editor +async fn evaluate_expression( + session: &mut repl::ReplSession, + expr: &str, +) -> Option<cdp::EvaluateResponse> { + let evaluate_response = session + .post_message_with_event_loop( + "Runtime.evaluate", + Some(cdp::EvaluateArgs { + expression: expr.to_string(), + object_group: None, + include_command_line_api: None, + silent: None, + context_id: Some(session.context_id), + return_by_value: None, + generate_preview: None, + user_gesture: None, + await_promise: None, + throw_on_side_effect: Some(true), + timeout: Some(200), + disable_breaks: None, + repl_mode: None, + allow_unsafe_eval_blocked_by_csp: None, + unique_context_id: None, + }), + ) + .await + .ok()?; + let evaluate_response: cdp::EvaluateResponse = + serde_json::from_value(evaluate_response).ok()?; + + if evaluate_response.exception_details.is_some() { + None + } else { + Some(evaluate_response) + } +} |