summaryrefslogtreecommitdiff
path: root/cli/ops
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-09-27 02:21:06 +0200
committerGitHub <noreply@github.com>2023-09-27 02:21:06 +0200
commit46a4bd5178f5aed22041422c431b5ab6f697865d (patch)
treebb72699b0c18ace70b7dd8c7434a211703edba14 /cli/ops
parentd39659332c224dfee51a43499c2d2d5da12a0da8 (diff)
feat(unstable): add `Deno.jupyter.broadcast` API (#20656)
Closes https://github.com/denoland/deno/issues/20591 --------- Co-authored-by: Kyle Kelley <rgbkrk@gmail.com>
Diffstat (limited to 'cli/ops')
-rw-r--r--cli/ops/jupyter.rs81
-rw-r--r--cli/ops/mod.rs1
2 files changed, 82 insertions, 0 deletions
diff --git a/cli/ops/jupyter.rs b/cli/ops/jupyter.rs
new file mode 100644
index 000000000..765b062e5
--- /dev/null
+++ b/cli/ops/jupyter.rs
@@ -0,0 +1,81 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::cell::RefCell;
+use std::rc::Rc;
+use std::sync::Arc;
+
+use crate::tools::jupyter::jupyter_msg::Connection;
+use crate::tools::jupyter::jupyter_msg::JupyterMessage;
+use crate::tools::jupyter::server::StdioMsg;
+use deno_core::error::AnyError;
+use deno_core::op2;
+use deno_core::serde_json;
+use deno_core::Op;
+use deno_core::OpState;
+use tokio::sync::mpsc;
+use tokio::sync::Mutex;
+
+deno_core::extension!(deno_jupyter,
+ ops = [
+ op_jupyter_broadcast,
+ ],
+ options = {
+ sender: mpsc::UnboundedSender<StdioMsg>,
+ },
+ middleware = |op| match op.name {
+ "op_print" => op_print::DECL,
+ _ => op,
+ },
+ state = |state, options| {
+ state.put(options.sender);
+ },
+);
+
+#[op2(async)]
+pub async fn op_jupyter_broadcast(
+ state: Rc<RefCell<OpState>>,
+ #[string] message_type: String,
+ #[serde] content: serde_json::Value,
+) -> Result<(), AnyError> {
+ let (iopub_socket, last_execution_request) = {
+ let s = state.borrow();
+
+ (
+ s.borrow::<Arc<Mutex<Connection<zeromq::PubSocket>>>>()
+ .clone(),
+ s.borrow::<Rc<RefCell<Option<JupyterMessage>>>>().clone(),
+ )
+ };
+
+ let maybe_last_request = last_execution_request.borrow().clone();
+ if let Some(last_request) = maybe_last_request {
+ last_request
+ .new_message(&message_type)
+ .with_content(content)
+ .send(&mut *iopub_socket.lock().await)
+ .await?;
+ }
+
+ Ok(())
+}
+
+#[op2(fast)]
+pub fn op_print(
+ state: &mut OpState,
+ #[string] msg: &str,
+ is_err: bool,
+) -> Result<(), AnyError> {
+ let sender = state.borrow_mut::<mpsc::UnboundedSender<StdioMsg>>();
+
+ if is_err {
+ if let Err(err) = sender.send(StdioMsg::Stderr(msg.into())) {
+ eprintln!("Failed to send stderr message: {}", err);
+ }
+ return Ok(());
+ }
+
+ if let Err(err) = sender.send(StdioMsg::Stdout(msg.into())) {
+ eprintln!("Failed to send stdout message: {}", err);
+ }
+ Ok(())
+}
diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs
index d4d8a84ba..ad4e4bd4e 100644
--- a/cli/ops/mod.rs
+++ b/cli/ops/mod.rs
@@ -9,6 +9,7 @@ use deno_core::Extension;
use deno_core::OpState;
pub mod bench;
+pub mod jupyter;
pub mod testing;
pub fn cli_exts(npm_resolver: Arc<CliNpmResolver>) -> Vec<Extension> {