summaryrefslogtreecommitdiff
path: root/cli/tools/jupyter/jupyter_msg.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/tools/jupyter/jupyter_msg.rs')
-rw-r--r--cli/tools/jupyter/jupyter_msg.rs268
1 files changed, 268 insertions, 0 deletions
diff --git a/cli/tools/jupyter/jupyter_msg.rs b/cli/tools/jupyter/jupyter_msg.rs
new file mode 100644
index 000000000..c28dd3b48
--- /dev/null
+++ b/cli/tools/jupyter/jupyter_msg.rs
@@ -0,0 +1,268 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+// This file is forked/ported from <https://github.com/evcxr/evcxr>
+// Copyright 2020 The Evcxr Authors. MIT license.
+
+use bytes::Bytes;
+use chrono::Utc;
+use data_encoding::HEXLOWER;
+use deno_core::anyhow::anyhow;
+use deno_core::anyhow::bail;
+use deno_core::error::AnyError;
+use deno_core::serde_json;
+use deno_core::serde_json::json;
+use ring::hmac;
+use std::fmt;
+use uuid::Uuid;
+
+pub(crate) struct Connection<S> {
+ pub(crate) socket: S,
+ /// Will be None if our key was empty (digest authentication disabled).
+ pub(crate) mac: Option<hmac::Key>,
+}
+
+impl<S: zeromq::Socket> Connection<S> {
+ pub(crate) fn new(socket: S, key: &str) -> Self {
+ let mac = if key.is_empty() {
+ None
+ } else {
+ Some(hmac::Key::new(hmac::HMAC_SHA256, key.as_bytes()))
+ };
+ Connection { socket, mac }
+ }
+}
+
+struct RawMessage {
+ zmq_identities: Vec<Bytes>,
+ jparts: Vec<Bytes>,
+}
+
+impl RawMessage {
+ pub(crate) async fn read<S: zeromq::SocketRecv>(
+ connection: &mut Connection<S>,
+ ) -> Result<RawMessage, AnyError> {
+ Self::from_multipart(connection.socket.recv().await?, connection)
+ }
+
+ pub(crate) fn from_multipart<S>(
+ multipart: zeromq::ZmqMessage,
+ connection: &Connection<S>,
+ ) -> Result<RawMessage, AnyError> {
+ let delimiter_index = multipart
+ .iter()
+ .position(|part| &part[..] == DELIMITER)
+ .ok_or_else(|| anyhow!("Missing delimiter"))?;
+ let mut parts = multipart.into_vec();
+ let jparts: Vec<_> = parts.drain(delimiter_index + 2..).collect();
+ let expected_hmac = parts.pop().unwrap();
+ // Remove delimiter, so that what's left is just the identities.
+ parts.pop();
+ let zmq_identities = parts;
+
+ let raw_message = RawMessage {
+ zmq_identities,
+ jparts,
+ };
+
+ if let Some(key) = &connection.mac {
+ let sig = HEXLOWER.decode(&expected_hmac)?;
+ let mut msg = Vec::new();
+ for part in &raw_message.jparts {
+ msg.extend(part);
+ }
+
+ if let Err(err) = hmac::verify(key, msg.as_ref(), sig.as_ref()) {
+ bail!("{}", err);
+ }
+ }
+
+ Ok(raw_message)
+ }
+
+ async fn send<S: zeromq::SocketSend>(
+ self,
+ connection: &mut Connection<S>,
+ ) -> Result<(), AnyError> {
+ let hmac = if let Some(key) = &connection.mac {
+ let ctx = self.digest(key);
+ let tag = ctx.sign();
+ HEXLOWER.encode(tag.as_ref())
+ } else {
+ String::new()
+ };
+ let mut parts: Vec<bytes::Bytes> = Vec::new();
+ for part in &self.zmq_identities {
+ parts.push(part.to_vec().into());
+ }
+ parts.push(DELIMITER.into());
+ parts.push(hmac.as_bytes().to_vec().into());
+ for part in &self.jparts {
+ parts.push(part.to_vec().into());
+ }
+ // ZmqMessage::try_from only fails if parts is empty, which it never
+ // will be here.
+ let message = zeromq::ZmqMessage::try_from(parts).unwrap();
+ connection.socket.send(message).await?;
+ Ok(())
+ }
+
+ fn digest(&self, mac: &hmac::Key) -> hmac::Context {
+ let mut hmac_ctx = hmac::Context::with_key(mac);
+ for part in &self.jparts {
+ hmac_ctx.update(part);
+ }
+ hmac_ctx
+ }
+}
+
+#[derive(Clone)]
+pub(crate) struct JupyterMessage {
+ zmq_identities: Vec<Bytes>,
+ header: serde_json::Value,
+ parent_header: serde_json::Value,
+ metadata: serde_json::Value,
+ content: serde_json::Value,
+}
+
+const DELIMITER: &[u8] = b"<IDS|MSG>";
+
+impl JupyterMessage {
+ pub(crate) async fn read<S: zeromq::SocketRecv>(
+ connection: &mut Connection<S>,
+ ) -> Result<JupyterMessage, AnyError> {
+ Self::from_raw_message(RawMessage::read(connection).await?)
+ }
+
+ fn from_raw_message(
+ raw_message: RawMessage,
+ ) -> Result<JupyterMessage, AnyError> {
+ if raw_message.jparts.len() < 4 {
+ bail!("Insufficient message parts {}", raw_message.jparts.len());
+ }
+
+ Ok(JupyterMessage {
+ zmq_identities: raw_message.zmq_identities,
+ header: serde_json::from_slice(&raw_message.jparts[0])?,
+ parent_header: serde_json::from_slice(&raw_message.jparts[1])?,
+ metadata: serde_json::from_slice(&raw_message.jparts[2])?,
+ content: serde_json::from_slice(&raw_message.jparts[3])?,
+ })
+ }
+
+ pub(crate) fn message_type(&self) -> &str {
+ self.header["msg_type"].as_str().unwrap_or("")
+ }
+
+ pub(crate) fn code(&self) -> &str {
+ self.content["code"].as_str().unwrap_or("")
+ }
+
+ pub(crate) fn cursor_pos(&self) -> usize {
+ self.content["cursor_pos"].as_u64().unwrap_or(0) as usize
+ }
+
+ pub(crate) fn comm_id(&self) -> &str {
+ self.content["comm_id"].as_str().unwrap_or("")
+ }
+
+ // Creates a new child message of this message. ZMQ identities are not transferred.
+ pub(crate) fn new_message(&self, msg_type: &str) -> JupyterMessage {
+ let mut header = self.header.clone();
+ header["msg_type"] = serde_json::Value::String(msg_type.to_owned());
+ header["username"] = serde_json::Value::String("kernel".to_owned());
+ header["msg_id"] = serde_json::Value::String(Uuid::new_v4().to_string());
+ header["date"] = serde_json::Value::String(Utc::now().to_rfc3339());
+
+ JupyterMessage {
+ zmq_identities: Vec::new(),
+ header,
+ parent_header: self.header.clone(),
+ metadata: json!({}),
+ content: json!({}),
+ }
+ }
+
+ // Creates a reply to this message. This is a child with the message type determined
+ // automatically by replacing "request" with "reply". ZMQ identities are transferred.
+ pub(crate) fn new_reply(&self) -> JupyterMessage {
+ let mut reply =
+ self.new_message(&self.message_type().replace("_request", "_reply"));
+ reply.zmq_identities = self.zmq_identities.clone();
+ reply
+ }
+
+ #[must_use = "Need to send this message for it to have any effect"]
+ pub(crate) fn comm_close_message(&self) -> JupyterMessage {
+ self.new_message("comm_close").with_content(json!({
+ "comm_id": self.comm_id()
+ }))
+ }
+
+ pub(crate) fn with_content(
+ mut self,
+ content: serde_json::Value,
+ ) -> JupyterMessage {
+ self.content = content;
+ self
+ }
+
+ pub(crate) async fn send<S: zeromq::SocketSend>(
+ &self,
+ connection: &mut Connection<S>,
+ ) -> Result<(), AnyError> {
+ // If performance is a concern, we can probably avoid the clone and to_vec calls with a bit
+ // of refactoring.
+ let raw_message = RawMessage {
+ zmq_identities: self.zmq_identities.clone(),
+ jparts: vec![
+ serde_json::to_string(&self.header)
+ .unwrap()
+ .as_bytes()
+ .to_vec()
+ .into(),
+ serde_json::to_string(&self.parent_header)
+ .unwrap()
+ .as_bytes()
+ .to_vec()
+ .into(),
+ serde_json::to_string(&self.metadata)
+ .unwrap()
+ .as_bytes()
+ .to_vec()
+ .into(),
+ serde_json::to_string(&self.content)
+ .unwrap()
+ .as_bytes()
+ .to_vec()
+ .into(),
+ ],
+ };
+ raw_message.send(connection).await
+ }
+}
+
+impl fmt::Debug for JupyterMessage {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ writeln!(
+ f,
+ "\nHeader: {}",
+ serde_json::to_string_pretty(&self.header).unwrap()
+ )?;
+ writeln!(
+ f,
+ "Parent header: {}",
+ serde_json::to_string_pretty(&self.parent_header).unwrap()
+ )?;
+ writeln!(
+ f,
+ "Metadata: {}",
+ serde_json::to_string_pretty(&self.metadata).unwrap()
+ )?;
+ writeln!(
+ f,
+ "Content: {}\n",
+ serde_json::to_string_pretty(&self.content).unwrap()
+ )?;
+ Ok(())
+ }
+}