summaryrefslogtreecommitdiff
path: root/cli/tools/jupyter/server.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-09-27 02:21:06 +0200
committerGitHub <noreply@github.com>2023-09-27 02:21:06 +0200
commit46a4bd5178f5aed22041422c431b5ab6f697865d (patch)
treebb72699b0c18ace70b7dd8c7434a211703edba14 /cli/tools/jupyter/server.rs
parentd39659332c224dfee51a43499c2d2d5da12a0da8 (diff)
feat(unstable): add `Deno.jupyter.broadcast` API (#20656)
Closes https://github.com/denoland/deno/issues/20591 --------- Co-authored-by: Kyle Kelley <rgbkrk@gmail.com>
Diffstat (limited to 'cli/tools/jupyter/server.rs')
-rw-r--r--cli/tools/jupyter/server.rs15
1 files changed, 11 insertions, 4 deletions
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs
index 2028c7d25..c54dcd275 100644
--- a/cli/tools/jupyter/server.rs
+++ b/cli/tools/jupyter/server.rs
@@ -12,12 +12,11 @@ use crate::tools::repl;
use crate::tools::repl::cdp;
use deno_core::error::AnyError;
use deno_core::futures;
-use deno_core::futures::channel::mpsc;
-use deno_core::futures::StreamExt;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
+use tokio::sync::mpsc;
use tokio::sync::Mutex;
use zeromq::SocketRecv;
use zeromq::SocketSend;
@@ -44,7 +43,7 @@ impl JupyterServer {
pub async fn start(
spec: ConnectionSpec,
mut stdio_rx: mpsc::UnboundedReceiver<StdioMsg>,
- repl_session: repl::ReplSession,
+ mut repl_session: repl::ReplSession,
) -> Result<(), AnyError> {
let mut heartbeat =
bind_socket::<zeromq::RepSocket>(&spec, spec.hb_port).await?;
@@ -59,6 +58,14 @@ impl JupyterServer {
let iopub_socket = Arc::new(Mutex::new(iopub_socket));
let last_execution_request = Rc::new(RefCell::new(None));
+ // Store `iopub_socket` in the op state so it's accessible to the runtime API.
+ {
+ let op_state_rc = repl_session.worker.js_runtime.op_state();
+ let mut op_state = op_state_rc.borrow_mut();
+ op_state.put(iopub_socket.clone());
+ op_state.put(last_execution_request.clone());
+ }
+
let cancel_handle = CancelHandle::new_rc();
let cancel_handle2 = CancelHandle::new_rc();
@@ -90,7 +97,7 @@ impl JupyterServer {
});
let handle4 = deno_core::unsync::spawn(async move {
- while let Some(stdio_msg) = stdio_rx.next().await {
+ while let Some(stdio_msg) = stdio_rx.recv().await {
Self::handle_stdio_msg(
iopub_socket.clone(),
last_execution_request.clone(),