summaryrefslogtreecommitdiff
path: root/cli/ops/jupyter.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/jupyter.rs')
-rw-r--r--cli/ops/jupyter.rs43
1 files changed, 25 insertions, 18 deletions
diff --git a/cli/ops/jupyter.rs b/cli/ops/jupyter.rs
index f7f006d9b..5bdf97e60 100644
--- a/cli/ops/jupyter.rs
+++ b/cli/ops/jupyter.rs
@@ -46,7 +46,7 @@ pub fn op_jupyter_input(
state: &mut OpState,
#[string] prompt: String,
is_password: bool,
-) -> Result<Option<String>, AnyError> {
+) -> Option<String> {
let (last_execution_request, stdin_connection_proxy) = {
(
state.borrow::<Arc<Mutex<Option<JupyterMessage>>>>().clone(),
@@ -58,11 +58,11 @@ pub fn op_jupyter_input(
if let Some(last_request) = maybe_last_request {
let JupyterMessageContent::ExecuteRequest(msg) = &last_request.content
else {
- return Ok(None);
+ return None;
};
if !msg.allow_stdin {
- return Ok(None);
+ return None;
}
let content = InputRequest {
@@ -73,7 +73,7 @@ pub fn op_jupyter_input(
let msg = JupyterMessage::new(content, Some(&last_request));
let Ok(()) = stdin_connection_proxy.lock().tx.send(msg) else {
- return Ok(None);
+ return None;
};
// Need to spawn a separate thread here, because `blocking_recv()` can't
@@ -82,17 +82,25 @@ pub fn op_jupyter_input(
stdin_connection_proxy.lock().rx.blocking_recv()
});
let Ok(Some(response)) = join_handle.join() else {
- return Ok(None);
+ return None;
};
let JupyterMessageContent::InputReply(msg) = response.content else {
- return Ok(None);
+ return None;
};
- return Ok(Some(msg.value));
+ return Some(msg.value);
}
- Ok(None)
+ None
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum JupyterBroadcastError {
+ #[error(transparent)]
+ SerdeJson(serde_json::Error),
+ #[error(transparent)]
+ ZeroMq(AnyError),
}
#[op2(async)]
@@ -102,7 +110,7 @@ pub async fn op_jupyter_broadcast(
#[serde] content: serde_json::Value,
#[serde] metadata: serde_json::Value,
#[serde] buffers: Vec<deno_core::JsBuffer>,
-) -> Result<(), AnyError> {
+) -> Result<(), JupyterBroadcastError> {
let (iopub_connection, last_execution_request) = {
let s = state.borrow();
@@ -125,36 +133,35 @@ pub async fn op_jupyter_broadcast(
content,
err
);
- err
+ JupyterBroadcastError::SerdeJson(err)
})?;
let jupyter_message = JupyterMessage::new(content, Some(&last_request))
.with_metadata(metadata)
.with_buffers(buffers.into_iter().map(|b| b.to_vec().into()).collect());
- iopub_connection.lock().send(jupyter_message).await?;
+ iopub_connection
+ .lock()
+ .send(jupyter_message)
+ .await
+ .map_err(JupyterBroadcastError::ZeroMq)?;
}
Ok(())
}
#[op2(fast)]
-pub fn op_print(
- state: &mut OpState,
- #[string] msg: &str,
- is_err: bool,
-) -> Result<(), AnyError> {
+pub fn op_print(state: &mut OpState, #[string] msg: &str, is_err: bool) {
let sender = state.borrow_mut::<mpsc::UnboundedSender<StreamContent>>();
if is_err {
if let Err(err) = sender.send(StreamContent::stderr(msg)) {
log::error!("Failed to send stderr message: {}", err);
}
- return Ok(());
+ return;
}
if let Err(err) = sender.send(StreamContent::stdout(msg)) {
log::error!("Failed to send stdout message: {}", err);
}
- Ok(())
}