diff options
Diffstat (limited to 'cli/tools/jupyter/server.rs')
-rw-r--r-- | cli/tools/jupyter/server.rs | 55 |
1 files changed, 48 insertions, 7 deletions
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs index 6a3831c49..6e203d17d 100644 --- a/cli/tools/jupyter/server.rs +++ b/cli/tools/jupyter/server.rs @@ -3,6 +3,10 @@ // This file is forked/ported from <https://github.com/evcxr/evcxr> // Copyright 2020 The Evcxr Authors. MIT license. +// NOTE(bartlomieju): unfortunately it appears that clippy is broken +// and can't allow a single line ignore for `await_holding_lock`. +#![allow(clippy::await_holding_lock)] + use std::collections::HashMap; use std::rc::Rc; use std::sync::Arc; @@ -12,12 +16,12 @@ use crate::tools::repl; use deno_core::anyhow::bail; use deno_core::error::AnyError; use deno_core::futures; +use deno_core::parking_lot::Mutex; use deno_core::serde_json; use deno_core::CancelFuture; use deno_core::CancelHandle; use tokio::sync::mpsc; use tokio::sync::oneshot; -use tokio::sync::Mutex; use jupyter_runtime::messaging; use jupyter_runtime::AsChildOf; @@ -40,8 +44,14 @@ pub struct JupyterServer { repl_session_proxy: JupyterReplProxy, } +pub struct StdinConnectionProxy { + pub tx: mpsc::UnboundedSender<JupyterMessage>, + pub rx: mpsc::UnboundedReceiver<JupyterMessage>, +} + pub struct StartupData { pub iopub_connection: Arc<Mutex<KernelIoPubConnection>>, + pub stdin_connection_proxy: Arc<Mutex<StdinConnectionProxy>>, pub last_execution_request: Arc<Mutex<Option<JupyterMessage>>>, } @@ -58,7 +68,7 @@ impl JupyterServer { connection_info.create_kernel_shell_connection().await?; let control_connection = connection_info.create_kernel_control_connection().await?; - let _stdin_connection = + let mut stdin_connection = connection_info.create_kernel_stdin_connection().await?; let iopub_connection = connection_info.create_kernel_iopub_connection().await?; @@ -66,9 +76,19 @@ impl JupyterServer { let iopub_connection = Arc::new(Mutex::new(iopub_connection)); let last_execution_request = Arc::new(Mutex::new(None)); + let (stdin_tx1, mut stdin_rx1) = + mpsc::unbounded_channel::<JupyterMessage>(); + let (stdin_tx2, stdin_rx2) = mpsc::unbounded_channel::<JupyterMessage>(); + + let stdin_connection_proxy = Arc::new(Mutex::new(StdinConnectionProxy { + tx: stdin_tx1, + rx: stdin_rx2, + })); + let Ok(()) = setup_tx.send(StartupData { iopub_connection: iopub_connection.clone(), last_execution_request: last_execution_request.clone(), + stdin_connection_proxy, }) else { bail!("Failed to send startup data"); }; @@ -82,6 +102,24 @@ impl JupyterServer { repl_session_proxy, }; + let stdin_fut = deno_core::unsync::spawn(async move { + loop { + let Some(msg) = stdin_rx1.recv().await else { + return; + }; + let Ok(()) = stdin_connection.send(msg).await else { + return; + }; + + let Ok(msg) = stdin_connection.read().await else { + return; + }; + let Ok(()) = stdin_tx2.send(msg) else { + return; + }; + } + }); + let hearbeat_fut = deno_core::unsync::spawn(async move { loop { if let Err(err) = heartbeat.single_heartbeat().await { @@ -134,6 +172,7 @@ impl JupyterServer { shell_fut, stdio_fut, repl_session_fut, + stdin_fut, ]); if let Ok(result) = join_fut.or_cancel(cancel_handle).await { @@ -148,13 +187,15 @@ impl JupyterServer { last_execution_request: Arc<Mutex<Option<JupyterMessage>>>, stdio_msg: StreamContent, ) { - let maybe_exec_result = last_execution_request.lock().await.clone(); + let maybe_exec_result = last_execution_request.lock().clone(); let Some(exec_request) = maybe_exec_result else { return; }; - let mut iopub_conn = iopub_connection.lock().await; - let result = iopub_conn.send(stdio_msg.as_child_of(&exec_request)).await; + let result = iopub_connection + .lock() + .send(stdio_msg.as_child_of(&exec_request)) + .await; if let Err(err) = result { log::error!("Output error: {}", err); @@ -429,7 +470,7 @@ impl JupyterServer { if !execute_request.silent && execute_request.store_history { self.execution_count += 1; } - *self.last_execution_request.lock().await = Some(parent_message.clone()); + *self.last_execution_request.lock() = Some(parent_message.clone()); self .send_iopub( @@ -613,7 +654,7 @@ impl JupyterServer { &mut self, message: JupyterMessage, ) -> Result<(), AnyError> { - self.iopub_connection.lock().await.send(message).await + self.iopub_connection.lock().send(message).await } } |