summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrevor Manz <trevor.j.manz@gmail.com>2023-10-04 07:05:20 -0400
committerGitHub <noreply@github.com>2023-10-04 13:05:20 +0200
commit9a46a824bd897e240af8a14f9d950ab6d95f42a5 (patch)
tree9c6bf70579452767b194112d6a531c14feec5652
parentda0b945804f19903beac71b23ff1040ebdb9b554 (diff)
feat(jupyter): send binary data with `Deno.jupyter.broadcast` (#20755)
Adds `buffers` to the `Deno.jupyter.broadcast` API to send binary data via comms. This affords the ability to send binary data via websockets to the jupyter widget frontend.
-rw-r--r--cli/js/40_jupyter.js4
-rw-r--r--cli/ops/jupyter.rs2
-rw-r--r--cli/tests/testdata/jupyter/integration_test.ipynb41
-rw-r--r--cli/tools/jupyter/jupyter_msg.rs58
-rw-r--r--cli/tsc/dts/lib.deno.unstable.d.ts1
5 files changed, 81 insertions, 25 deletions
diff --git a/cli/js/40_jupyter.js b/cli/js/40_jupyter.js
index 10dbccf4c..5a30a6b8e 100644
--- a/cli/js/40_jupyter.js
+++ b/cli/js/40_jupyter.js
@@ -9,8 +9,8 @@ function enableJupyter() {
} = core.ensureFastOps();
globalThis.Deno.jupyter = {
- async broadcast(msgType, content, { metadata = {} } = {}) {
- await op_jupyter_broadcast(msgType, content, metadata);
+ async broadcast(msgType, content, { metadata = {}, buffers = [] } = {}) {
+ await op_jupyter_broadcast(msgType, content, metadata, buffers);
},
};
}
diff --git a/cli/ops/jupyter.rs b/cli/ops/jupyter.rs
index 12d19fab5..f63edebe8 100644
--- a/cli/ops/jupyter.rs
+++ b/cli/ops/jupyter.rs
@@ -37,6 +37,7 @@ pub async fn op_jupyter_broadcast(
#[string] message_type: String,
#[serde] content: serde_json::Value,
#[serde] metadata: serde_json::Value,
+ #[serde] buffers: Vec<deno_core::JsBuffer>,
) -> Result<(), AnyError> {
let (iopub_socket, last_execution_request) = {
let s = state.borrow();
@@ -54,6 +55,7 @@ pub async fn op_jupyter_broadcast(
.new_message(&message_type)
.with_content(content)
.with_metadata(metadata)
+ .with_buffers(buffers.into_iter().map(|b| b.into()).collect())
.send(&mut *iopub_socket.lock().await)
.await?;
}
diff --git a/cli/tests/testdata/jupyter/integration_test.ipynb b/cli/tests/testdata/jupyter/integration_test.ipynb
index 76064cea2..450df4e9e 100644
--- a/cli/tests/testdata/jupyter/integration_test.ipynb
+++ b/cli/tests/testdata/jupyter/integration_test.ipynb
@@ -721,8 +721,47 @@
},
{
"cell_type": "code",
+ "execution_count": 22,
+ "id": "6e9b530f-554d-4ef7-a5d6-69432283fd40",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "// Smoke test: Send example Jupyter Widgets messages with \"extra\" context.\n",
+ "// No return because we don't have a front-end widget to get the message from.\n",
+ "await Deno.jupyter.broadcast(\n",
+ " \"comm_open\",\n",
+ " {\n",
+ " \"comm_id\": \"foo\",\n",
+ " \"target_name\": \"jupyter.widget\",\n",
+ " \"data\": {\n",
+ " \"state\": {},\n",
+ " },\n",
+ " },\n",
+ " {\n",
+ " \"metadata\": { \"version\": \"2.1.0\" },\n",
+ " },\n",
+ ");\n",
+ "\n",
+ "await Deno.jupyter.broadcast(\n",
+ " \"comm_msg\",\n",
+ " {\n",
+ " \"comm_id\": \"foo\",\n",
+ " \"data\": {\n",
+ " \"method\": \"update\",\n",
+ " \"state\": { \"answer\": null },\n",
+ " \"buffer_paths\": [[\"answer\"]]\n",
+ " },\n",
+ " },\n",
+ " {\n",
+ " \"buffers\": [new Uint8Array([42])],\n",
+ " },\n",
+ ");"
+ ]
+ },
+ {
+ "cell_type": "code",
"execution_count": null,
- "id": "0181f28e",
+ "id": "f678313e-06c6-4fb8-a4ef-54a417129a82",
"metadata": {},
"outputs": [],
"source": []
diff --git a/cli/tools/jupyter/jupyter_msg.rs b/cli/tools/jupyter/jupyter_msg.rs
index beb9f34e4..da6654cae 100644
--- a/cli/tools/jupyter/jupyter_msg.rs
+++ b/cli/tools/jupyter/jupyter_msg.rs
@@ -122,6 +122,7 @@ pub(crate) struct JupyterMessage {
parent_header: serde_json::Value,
metadata: serde_json::Value,
content: serde_json::Value,
+ buffers: Vec<Bytes>,
}
const DELIMITER: &[u8] = b"<IDS|MSG>";
@@ -146,6 +147,11 @@ impl JupyterMessage {
parent_header: serde_json::from_slice(&raw_message.jparts[1])?,
metadata: serde_json::from_slice(&raw_message.jparts[2])?,
content: serde_json::from_slice(&raw_message.jparts[3])?,
+ buffers: if raw_message.jparts.len() > 4 {
+ raw_message.jparts[4..].to_vec()
+ } else {
+ vec![]
+ },
})
}
@@ -179,6 +185,7 @@ impl JupyterMessage {
parent_header: self.header.clone(),
metadata: json!({}),
content: json!({}),
+ buffers: vec![],
}
}
@@ -214,36 +221,43 @@ impl JupyterMessage {
self
}
+ pub(crate) fn with_buffers(mut self, buffers: Vec<Bytes>) -> JupyterMessage {
+ self.buffers = buffers;
+ self
+ }
+
pub(crate) async fn send<S: zeromq::SocketSend>(
&self,
connection: &mut Connection<S>,
) -> Result<(), AnyError> {
// If performance is a concern, we can probably avoid the clone and to_vec calls with a bit
// of refactoring.
+ let mut jparts: Vec<Bytes> = vec![
+ serde_json::to_string(&self.header)
+ .unwrap()
+ .as_bytes()
+ .to_vec()
+ .into(),
+ serde_json::to_string(&self.parent_header)
+ .unwrap()
+ .as_bytes()
+ .to_vec()
+ .into(),
+ serde_json::to_string(&self.metadata)
+ .unwrap()
+ .as_bytes()
+ .to_vec()
+ .into(),
+ serde_json::to_string(&self.content)
+ .unwrap()
+ .as_bytes()
+ .to_vec()
+ .into(),
+ ];
+ jparts.extend_from_slice(&self.buffers);
let raw_message = RawMessage {
zmq_identities: self.zmq_identities.clone(),
- jparts: vec![
- serde_json::to_string(&self.header)
- .unwrap()
- .as_bytes()
- .to_vec()
- .into(),
- serde_json::to_string(&self.parent_header)
- .unwrap()
- .as_bytes()
- .to_vec()
- .into(),
- serde_json::to_string(&self.metadata)
- .unwrap()
- .as_bytes()
- .to_vec()
- .into(),
- serde_json::to_string(&self.content)
- .unwrap()
- .as_bytes()
- .to_vec()
- .into(),
- ],
+ jparts,
};
raw_message.send(connection).await
}
diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts
index 4d909a789..782e8eba4 100644
--- a/cli/tsc/dts/lib.deno.unstable.d.ts
+++ b/cli/tsc/dts/lib.deno.unstable.d.ts
@@ -2101,6 +2101,7 @@ declare namespace Deno {
content: Record<string, unknown>,
extra?: {
metadata?: Record<string, unknown>;
+ buffers?: Uint8Array[];
},
): Promise<void>;
}