summaryrefslogtreecommitdiff
path: root/cli/tools/jupyter/server.rs
diff options
context:
space:
mode:
authorZander Hill <zander@xargs.io>2024-07-04 15:12:14 -0700
committerGitHub <noreply@github.com>2024-07-04 22:12:14 +0000
commitf00f0f92983d6966a5b97e539ec3f3407c3d851f (patch)
tree73966bbfbd836dd3dd36ff22647c97d0f839baed /cli/tools/jupyter/server.rs
parent96b527b8df3c9e7e29c98a6a0d6876089b88bc09 (diff)
feat(jupyter): support `confirm` and `prompt` in notebooks (#23592)
Closes: https://github.com/denoland/deno/issues/22633 This commit adds support for `confirm` and `prompt` APIs, that instead of reading from stdin are using notebook frontend to show modal boxes and wait for answers. --------- Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
Diffstat (limited to 'cli/tools/jupyter/server.rs')
-rw-r--r--cli/tools/jupyter/server.rs55
1 files changed, 48 insertions, 7 deletions
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs
index 6a3831c49..6e203d17d 100644
--- a/cli/tools/jupyter/server.rs
+++ b/cli/tools/jupyter/server.rs
@@ -3,6 +3,10 @@
// This file is forked/ported from <https://github.com/evcxr/evcxr>
// Copyright 2020 The Evcxr Authors. MIT license.
+// NOTE(bartlomieju): unfortunately it appears that clippy is broken
+// and can't allow a single line ignore for `await_holding_lock`.
+#![allow(clippy::await_holding_lock)]
+
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;
@@ -12,12 +16,12 @@ use crate::tools::repl;
use deno_core::anyhow::bail;
use deno_core::error::AnyError;
use deno_core::futures;
+use deno_core::parking_lot::Mutex;
use deno_core::serde_json;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
-use tokio::sync::Mutex;
use jupyter_runtime::messaging;
use jupyter_runtime::AsChildOf;
@@ -40,8 +44,14 @@ pub struct JupyterServer {
repl_session_proxy: JupyterReplProxy,
}
+pub struct StdinConnectionProxy {
+ pub tx: mpsc::UnboundedSender<JupyterMessage>,
+ pub rx: mpsc::UnboundedReceiver<JupyterMessage>,
+}
+
pub struct StartupData {
pub iopub_connection: Arc<Mutex<KernelIoPubConnection>>,
+ pub stdin_connection_proxy: Arc<Mutex<StdinConnectionProxy>>,
pub last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
}
@@ -58,7 +68,7 @@ impl JupyterServer {
connection_info.create_kernel_shell_connection().await?;
let control_connection =
connection_info.create_kernel_control_connection().await?;
- let _stdin_connection =
+ let mut stdin_connection =
connection_info.create_kernel_stdin_connection().await?;
let iopub_connection =
connection_info.create_kernel_iopub_connection().await?;
@@ -66,9 +76,19 @@ impl JupyterServer {
let iopub_connection = Arc::new(Mutex::new(iopub_connection));
let last_execution_request = Arc::new(Mutex::new(None));
+ let (stdin_tx1, mut stdin_rx1) =
+ mpsc::unbounded_channel::<JupyterMessage>();
+ let (stdin_tx2, stdin_rx2) = mpsc::unbounded_channel::<JupyterMessage>();
+
+ let stdin_connection_proxy = Arc::new(Mutex::new(StdinConnectionProxy {
+ tx: stdin_tx1,
+ rx: stdin_rx2,
+ }));
+
let Ok(()) = setup_tx.send(StartupData {
iopub_connection: iopub_connection.clone(),
last_execution_request: last_execution_request.clone(),
+ stdin_connection_proxy,
}) else {
bail!("Failed to send startup data");
};
@@ -82,6 +102,24 @@ impl JupyterServer {
repl_session_proxy,
};
+ let stdin_fut = deno_core::unsync::spawn(async move {
+ loop {
+ let Some(msg) = stdin_rx1.recv().await else {
+ return;
+ };
+ let Ok(()) = stdin_connection.send(msg).await else {
+ return;
+ };
+
+ let Ok(msg) = stdin_connection.read().await else {
+ return;
+ };
+ let Ok(()) = stdin_tx2.send(msg) else {
+ return;
+ };
+ }
+ });
+
let hearbeat_fut = deno_core::unsync::spawn(async move {
loop {
if let Err(err) = heartbeat.single_heartbeat().await {
@@ -134,6 +172,7 @@ impl JupyterServer {
shell_fut,
stdio_fut,
repl_session_fut,
+ stdin_fut,
]);
if let Ok(result) = join_fut.or_cancel(cancel_handle).await {
@@ -148,13 +187,15 @@ impl JupyterServer {
last_execution_request: Arc<Mutex<Option<JupyterMessage>>>,
stdio_msg: StreamContent,
) {
- let maybe_exec_result = last_execution_request.lock().await.clone();
+ let maybe_exec_result = last_execution_request.lock().clone();
let Some(exec_request) = maybe_exec_result else {
return;
};
- let mut iopub_conn = iopub_connection.lock().await;
- let result = iopub_conn.send(stdio_msg.as_child_of(&exec_request)).await;
+ let result = iopub_connection
+ .lock()
+ .send(stdio_msg.as_child_of(&exec_request))
+ .await;
if let Err(err) = result {
log::error!("Output error: {}", err);
@@ -429,7 +470,7 @@ impl JupyterServer {
if !execute_request.silent && execute_request.store_history {
self.execution_count += 1;
}
- *self.last_execution_request.lock().await = Some(parent_message.clone());
+ *self.last_execution_request.lock() = Some(parent_message.clone());
self
.send_iopub(
@@ -613,7 +654,7 @@ impl JupyterServer {
&mut self,
message: JupyterMessage,
) -> Result<(), AnyError> {
- self.iopub_connection.lock().await.send(message).await
+ self.iopub_connection.lock().send(message).await
}
}