diff options
Diffstat (limited to 'cli/ops/jupyter.rs')
-rw-r--r-- | cli/ops/jupyter.rs | 54 |
1 files changed, 34 insertions, 20 deletions
diff --git a/cli/ops/jupyter.rs b/cli/ops/jupyter.rs index 1c60bc2bc..57ca93ff4 100644 --- a/cli/ops/jupyter.rs +++ b/cli/ops/jupyter.rs @@ -4,9 +4,11 @@ use std::cell::RefCell; use std::rc::Rc; use std::sync::Arc; -use crate::tools::jupyter::jupyter_msg::Connection; -use crate::tools::jupyter::jupyter_msg::JupyterMessage; -use crate::tools::jupyter::server::StdioMsg; +use runtimelib::JupyterMessage; +use runtimelib::JupyterMessageContent; +use runtimelib::KernelIoPubConnection; +use runtimelib::StreamContent; + use deno_core::error::AnyError; use deno_core::op2; use deno_core::serde_json; @@ -19,7 +21,7 @@ deno_core::extension!(deno_jupyter, op_jupyter_broadcast, ], options = { - sender: mpsc::UnboundedSender<StdioMsg>, + sender: mpsc::UnboundedSender<StreamContent>, }, middleware = |op| match op.name { "op_print" => op_print(), @@ -38,28 +40,40 @@ pub async fn op_jupyter_broadcast( #[serde] metadata: serde_json::Value, #[serde] buffers: Vec<deno_core::JsBuffer>, ) -> Result<(), AnyError> { - let (iopub_socket, last_execution_request) = { + let (iopub_connection, last_execution_request) = { let s = state.borrow(); ( - s.borrow::<Arc<Mutex<Connection<zeromq::PubSocket>>>>() - .clone(), + s.borrow::<Arc<Mutex<KernelIoPubConnection>>>().clone(), s.borrow::<Rc<RefCell<Option<JupyterMessage>>>>().clone(), ) }; let maybe_last_request = last_execution_request.borrow().clone(); if let Some(last_request) = maybe_last_request { - (*iopub_socket.lock().await) - .send( - &last_request - .new_message(&message_type) - .with_content(content) - .with_metadata(metadata) - .with_buffers( - buffers.into_iter().map(|b| b.to_vec().into()).collect(), - ), - ) + let content = JupyterMessageContent::from_type_and_content( + &message_type, + content.clone(), + ) + .map_err(|err| { + log::error!( + "Error deserializing content from jupyter.broadcast, message_type: {}:\n\n{}\n\n{}", + &message_type, + content, + err + ); + err + })?; + + let mut jupyter_message = JupyterMessage::new(content, Some(&last_request)); + + jupyter_message.metadata = metadata; + jupyter_message.buffers = + buffers.into_iter().map(|b| b.to_vec().into()).collect(); + jupyter_message.set_parent(last_request); + + (iopub_connection.lock().await) + .send(jupyter_message) .await?; } @@ -72,16 +86,16 @@ pub fn op_print( #[string] msg: &str, is_err: bool, ) -> Result<(), AnyError> { - let sender = state.borrow_mut::<mpsc::UnboundedSender<StdioMsg>>(); + let sender = state.borrow_mut::<mpsc::UnboundedSender<StreamContent>>(); if is_err { - if let Err(err) = sender.send(StdioMsg::Stderr(msg.into())) { + if let Err(err) = sender.send(StreamContent::stderr(msg.into())) { log::error!("Failed to send stderr message: {}", err); } return Ok(()); } - if let Err(err) = sender.send(StdioMsg::Stdout(msg.into())) { + if let Err(err) = sender.send(StreamContent::stdout(msg.into())) { log::error!("Failed to send stdout message: {}", err); } Ok(()) |