From 1cab4f07a3e6125a089726f022dd6bc9af517536 Mon Sep 17 00:00:00 2001 From: Leo Kettmeir Date: Wed, 6 Nov 2024 16:57:57 -0800 Subject: refactor: use concrete error type for remaining ops (#26746) --- cli/ops/jupyter.rs | 43 +++++++++++++++++++++++++------------------ 1 file changed, 25 insertions(+), 18 deletions(-) (limited to 'cli/ops/jupyter.rs') diff --git a/cli/ops/jupyter.rs b/cli/ops/jupyter.rs index f7f006d9b..5bdf97e60 100644 --- a/cli/ops/jupyter.rs +++ b/cli/ops/jupyter.rs @@ -46,7 +46,7 @@ pub fn op_jupyter_input( state: &mut OpState, #[string] prompt: String, is_password: bool, -) -> Result, AnyError> { +) -> Option { let (last_execution_request, stdin_connection_proxy) = { ( state.borrow::>>>().clone(), @@ -58,11 +58,11 @@ pub fn op_jupyter_input( if let Some(last_request) = maybe_last_request { let JupyterMessageContent::ExecuteRequest(msg) = &last_request.content else { - return Ok(None); + return None; }; if !msg.allow_stdin { - return Ok(None); + return None; } let content = InputRequest { @@ -73,7 +73,7 @@ pub fn op_jupyter_input( let msg = JupyterMessage::new(content, Some(&last_request)); let Ok(()) = stdin_connection_proxy.lock().tx.send(msg) else { - return Ok(None); + return None; }; // Need to spawn a separate thread here, because `blocking_recv()` can't @@ -82,17 +82,25 @@ pub fn op_jupyter_input( stdin_connection_proxy.lock().rx.blocking_recv() }); let Ok(Some(response)) = join_handle.join() else { - return Ok(None); + return None; }; let JupyterMessageContent::InputReply(msg) = response.content else { - return Ok(None); + return None; }; - return Ok(Some(msg.value)); + return Some(msg.value); } - Ok(None) + None +} + +#[derive(Debug, thiserror::Error)] +pub enum JupyterBroadcastError { + #[error(transparent)] + SerdeJson(serde_json::Error), + #[error(transparent)] + ZeroMq(AnyError), } #[op2(async)] @@ -102,7 +110,7 @@ pub async fn op_jupyter_broadcast( #[serde] content: serde_json::Value, #[serde] metadata: serde_json::Value, #[serde] buffers: Vec, -) -> Result<(), AnyError> { +) -> Result<(), JupyterBroadcastError> { let (iopub_connection, last_execution_request) = { let s = state.borrow(); @@ -125,36 +133,35 @@ pub async fn op_jupyter_broadcast( content, err ); - err + JupyterBroadcastError::SerdeJson(err) })?; let jupyter_message = JupyterMessage::new(content, Some(&last_request)) .with_metadata(metadata) .with_buffers(buffers.into_iter().map(|b| b.to_vec().into()).collect()); - iopub_connection.lock().send(jupyter_message).await?; + iopub_connection + .lock() + .send(jupyter_message) + .await + .map_err(JupyterBroadcastError::ZeroMq)?; } Ok(()) } #[op2(fast)] -pub fn op_print( - state: &mut OpState, - #[string] msg: &str, - is_err: bool, -) -> Result<(), AnyError> { +pub fn op_print(state: &mut OpState, #[string] msg: &str, is_err: bool) { let sender = state.borrow_mut::>(); if is_err { if let Err(err) = sender.send(StreamContent::stderr(msg)) { log::error!("Failed to send stderr message: {}", err); } - return Ok(()); + return; } if let Err(err) = sender.send(StreamContent::stdout(msg)) { log::error!("Failed to send stdout message: {}", err); } - Ok(()) } -- cgit v1.2.3