diff options
author | David Sherret <dsherret@users.noreply.github.com> | 2024-04-30 22:30:40 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-01 02:30:40 +0000 |
commit | 486437fee1ac53610a901b07bda91909844ec9ab (patch) | |
tree | 06f13c58b2cf1be9fd0bbf1ed030ce05d67255f3 /cli/tools/jupyter/server.rs | |
parent | f2216c90a76775f7af6fa06b96b6e56dc5810284 (diff) |
refactor(jupyter): move communication methods out of data structs (#23622)
Moves the communication methods out of the data structs and onto the
`Connection` struct.
Diffstat (limited to 'cli/tools/jupyter/server.rs')
-rw-r--r-- | cli/tools/jupyter/server.rs | 148 |
1 files changed, 64 insertions, 84 deletions
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs index cd02b9891..2107dcfbf 100644 --- a/cli/tools/jupyter/server.rs +++ b/cli/tools/jupyter/server.rs @@ -18,8 +18,6 @@ use deno_core::CancelFuture; use deno_core::CancelHandle; use tokio::sync::mpsc; use tokio::sync::Mutex; -use zeromq::SocketRecv; -use zeromq::SocketSend; use super::jupyter_msg::Connection; use super::jupyter_msg::JupyterMessage; @@ -67,7 +65,6 @@ impl JupyterServer { } let cancel_handle = CancelHandle::new_rc(); - let cancel_handle2 = CancelHandle::new_rc(); let mut server = Self { execution_count: 0, @@ -82,11 +79,14 @@ impl JupyterServer { } }); - let handle2 = deno_core::unsync::spawn(async move { - if let Err(err) = - Self::handle_control(control_socket, cancel_handle2).await - { - eprintln!("Control error: {}", err); + let handle2 = deno_core::unsync::spawn({ + let cancel_handle = cancel_handle.clone(); + async move { + if let Err(err) = + Self::handle_control(control_socket, cancel_handle).await + { + eprintln!("Control error: {}", err); + } } }); @@ -129,13 +129,11 @@ impl JupyterServer { StdioMsg::Stderr(text) => ("stderr", text), }; - let result = exec_request - .new_message("stream") - .with_content(json!({ + let result = (*iopub_socket.lock().await) + .send(&exec_request.new_message("stream").with_content(json!({ "name": name, "text": text - })) - .send(&mut *iopub_socket.lock().await) + }))) .await; if let Err(err) = result { @@ -148,11 +146,7 @@ impl JupyterServer { connection: &mut Connection<zeromq::RepSocket>, ) -> Result<(), AnyError> { loop { - connection.socket.recv().await?; - connection - .socket - .send(zeromq::ZmqMessage::from(b"ping".to_vec())) - .await?; + connection.single_heartbeat().await?; } } @@ -161,13 +155,11 @@ impl JupyterServer { cancel_handle: Rc<CancelHandle>, ) -> Result<(), AnyError> { loop { - let msg = JupyterMessage::read(&mut connection).await?; + let msg = connection.read().await?; match msg.message_type() { "kernel_info_request" => { - msg - .new_reply() - .with_content(kernel_info()) - .send(&mut connection) + connection + .send(&msg.new_reply().with_content(kernel_info())) .await?; } "shutdown_request" => { @@ -191,7 +183,7 @@ impl JupyterServer { mut connection: Connection<zeromq::RouterSocket>, ) -> Result<(), AnyError> { loop { - let msg = JupyterMessage::read(&mut connection).await?; + let msg = connection.read().await?; self.handle_shell_message(msg, &mut connection).await?; } } @@ -201,25 +193,23 @@ impl JupyterServer { msg: JupyterMessage, connection: &mut Connection<zeromq::RouterSocket>, ) -> Result<(), AnyError> { - msg - .new_message("status") - .with_content(json!({"execution_state": "busy"})) - .send(&mut *self.iopub_socket.lock().await) + self + .send_iopub( + &msg + .new_message("status") + .with_content(json!({"execution_state": "busy"})), + ) .await?; match msg.message_type() { "kernel_info_request" => { - msg - .new_reply() - .with_content(kernel_info()) - .send(connection) + connection + .send(&msg.new_reply().with_content(kernel_info())) .await?; } "is_complete_request" => { - msg - .new_reply() - .with_content(json!({"status": "complete"})) - .send(connection) + connection + .send(&msg.new_reply().with_content(json!({"status": "complete"}))) .await?; } "execute_request" => { @@ -228,10 +218,7 @@ impl JupyterServer { .await?; } "comm_open" => { - msg - .comm_close_message() - .send(&mut *self.iopub_socket.lock().await) - .await?; + self.send_iopub(&msg.comm_close_message()).await?; } "complete_request" => { let user_code = msg.code(); @@ -259,16 +246,14 @@ impl JupyterServer { .map(|item| item.range.end) .unwrap_or(cursor_pos); - msg - .new_reply() - .with_content(json!({ + connection + .send(&msg.new_reply().with_content(json!({ "status": "ok", "matches": matches, "cursor_start": cursor_start, "cursor_end": cursor_end, "metadata": {}, - })) - .send(connection) + }))) .await?; } else { let expr = get_expr_from_line_at_pos(user_code, cursor_pos); @@ -307,16 +292,14 @@ impl JupyterServer { (candidates, cursor_pos - expr.len()) }; - msg - .new_reply() - .with_content(json!({ + connection + .send(&msg.new_reply().with_content(json!({ "status": "ok", "matches": completions, "cursor_start": cursor_start, "cursor_end": cursor_pos, "metadata": {}, - })) - .send(connection) + }))) .await?; } } @@ -328,10 +311,12 @@ impl JupyterServer { } } - msg - .new_message("status") - .with_content(json!({"execution_state": "idle"})) - .send(&mut *self.iopub_socket.lock().await) + self + .send_iopub( + &msg + .new_message("status") + .with_content(json!({"execution_state": "idle"})), + ) .await?; Ok(()) } @@ -346,13 +331,11 @@ impl JupyterServer { } *self.last_execution_request.borrow_mut() = Some(msg.clone()); - msg - .new_message("execute_input") - .with_content(json!({ + self + .send_iopub(&msg.new_message("execute_input").with_content(json!({ "execution_count": self.execution_count, "code": msg.code() - })) - .send(&mut *self.iopub_socket.lock().await) + }))) .await?; let result = self @@ -363,22 +346,18 @@ impl JupyterServer { let evaluate_response = match result { Ok(eval_response) => eval_response, Err(err) => { - msg - .new_message("error") - .with_content(json!({ + self + .send_iopub(&msg.new_message("error").with_content(json!({ "ename": err.to_string(), "evalue": " ", // Fake value, otherwise old Jupyter frontends don't show the error "traceback": [], - })) - .send(&mut *self.iopub_socket.lock().await) + }))) .await?; - msg - .new_reply() - .with_content(json!({ + connection + .send(&msg.new_reply().with_content(json!({ "status": "error", "execution_count": self.execution_count, - })) - .send(connection) + }))) .await?; return Ok(()); } @@ -393,14 +372,12 @@ impl JupyterServer { publish_result(&mut self.repl_session, &result, self.execution_count) .await?; - msg - .new_reply() - .with_content(json!({ + connection + .send(&msg.new_reply().with_content(json!({ "status": "ok", "execution_count": self.execution_count, // FIXME: also include user_expressions - })) - .send(connection) + }))) .await?; // Let's sleep here for a few ms, so we give a chance to the task that is // handling stdout and stderr streams to receive and flush the content. @@ -479,27 +456,30 @@ impl JupyterServer { message }; - msg - .new_message("error") - .with_content(json!({ + self + .send_iopub(&msg.new_message("error").with_content(json!({ "ename": ename, "evalue": evalue, "traceback": traceback, - })) - .send(&mut *self.iopub_socket.lock().await) + }))) .await?; - msg - .new_reply() - .with_content(json!({ + connection + .send(&msg.new_reply().with_content(json!({ "status": "error", "execution_count": self.execution_count, - })) - .send(connection) + }))) .await?; } Ok(()) } + + async fn send_iopub( + &mut self, + message: &JupyterMessage, + ) -> Result<(), AnyError> { + self.iopub_socket.lock().await.send(message).await + } } async fn bind_socket<S: zeromq::Socket>( |