summaryrefslogtreecommitdiff
path: root/cli/tools/jupyter/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/tools/jupyter/server.rs')
-rw-r--r--cli/tools/jupyter/server.rs148
1 files changed, 64 insertions, 84 deletions
diff --git a/cli/tools/jupyter/server.rs b/cli/tools/jupyter/server.rs
index cd02b9891..2107dcfbf 100644
--- a/cli/tools/jupyter/server.rs
+++ b/cli/tools/jupyter/server.rs
@@ -18,8 +18,6 @@ use deno_core::CancelFuture;
use deno_core::CancelHandle;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
-use zeromq::SocketRecv;
-use zeromq::SocketSend;
use super::jupyter_msg::Connection;
use super::jupyter_msg::JupyterMessage;
@@ -67,7 +65,6 @@ impl JupyterServer {
}
let cancel_handle = CancelHandle::new_rc();
- let cancel_handle2 = CancelHandle::new_rc();
let mut server = Self {
execution_count: 0,
@@ -82,11 +79,14 @@ impl JupyterServer {
}
});
- let handle2 = deno_core::unsync::spawn(async move {
- if let Err(err) =
- Self::handle_control(control_socket, cancel_handle2).await
- {
- eprintln!("Control error: {}", err);
+ let handle2 = deno_core::unsync::spawn({
+ let cancel_handle = cancel_handle.clone();
+ async move {
+ if let Err(err) =
+ Self::handle_control(control_socket, cancel_handle).await
+ {
+ eprintln!("Control error: {}", err);
+ }
}
});
@@ -129,13 +129,11 @@ impl JupyterServer {
StdioMsg::Stderr(text) => ("stderr", text),
};
- let result = exec_request
- .new_message("stream")
- .with_content(json!({
+ let result = (*iopub_socket.lock().await)
+ .send(&exec_request.new_message("stream").with_content(json!({
"name": name,
"text": text
- }))
- .send(&mut *iopub_socket.lock().await)
+ })))
.await;
if let Err(err) = result {
@@ -148,11 +146,7 @@ impl JupyterServer {
connection: &mut Connection<zeromq::RepSocket>,
) -> Result<(), AnyError> {
loop {
- connection.socket.recv().await?;
- connection
- .socket
- .send(zeromq::ZmqMessage::from(b"ping".to_vec()))
- .await?;
+ connection.single_heartbeat().await?;
}
}
@@ -161,13 +155,11 @@ impl JupyterServer {
cancel_handle: Rc<CancelHandle>,
) -> Result<(), AnyError> {
loop {
- let msg = JupyterMessage::read(&mut connection).await?;
+ let msg = connection.read().await?;
match msg.message_type() {
"kernel_info_request" => {
- msg
- .new_reply()
- .with_content(kernel_info())
- .send(&mut connection)
+ connection
+ .send(&msg.new_reply().with_content(kernel_info()))
.await?;
}
"shutdown_request" => {
@@ -191,7 +183,7 @@ impl JupyterServer {
mut connection: Connection<zeromq::RouterSocket>,
) -> Result<(), AnyError> {
loop {
- let msg = JupyterMessage::read(&mut connection).await?;
+ let msg = connection.read().await?;
self.handle_shell_message(msg, &mut connection).await?;
}
}
@@ -201,25 +193,23 @@ impl JupyterServer {
msg: JupyterMessage,
connection: &mut Connection<zeromq::RouterSocket>,
) -> Result<(), AnyError> {
- msg
- .new_message("status")
- .with_content(json!({"execution_state": "busy"}))
- .send(&mut *self.iopub_socket.lock().await)
+ self
+ .send_iopub(
+ &msg
+ .new_message("status")
+ .with_content(json!({"execution_state": "busy"})),
+ )
.await?;
match msg.message_type() {
"kernel_info_request" => {
- msg
- .new_reply()
- .with_content(kernel_info())
- .send(connection)
+ connection
+ .send(&msg.new_reply().with_content(kernel_info()))
.await?;
}
"is_complete_request" => {
- msg
- .new_reply()
- .with_content(json!({"status": "complete"}))
- .send(connection)
+ connection
+ .send(&msg.new_reply().with_content(json!({"status": "complete"})))
.await?;
}
"execute_request" => {
@@ -228,10 +218,7 @@ impl JupyterServer {
.await?;
}
"comm_open" => {
- msg
- .comm_close_message()
- .send(&mut *self.iopub_socket.lock().await)
- .await?;
+ self.send_iopub(&msg.comm_close_message()).await?;
}
"complete_request" => {
let user_code = msg.code();
@@ -259,16 +246,14 @@ impl JupyterServer {
.map(|item| item.range.end)
.unwrap_or(cursor_pos);
- msg
- .new_reply()
- .with_content(json!({
+ connection
+ .send(&msg.new_reply().with_content(json!({
"status": "ok",
"matches": matches,
"cursor_start": cursor_start,
"cursor_end": cursor_end,
"metadata": {},
- }))
- .send(connection)
+ })))
.await?;
} else {
let expr = get_expr_from_line_at_pos(user_code, cursor_pos);
@@ -307,16 +292,14 @@ impl JupyterServer {
(candidates, cursor_pos - expr.len())
};
- msg
- .new_reply()
- .with_content(json!({
+ connection
+ .send(&msg.new_reply().with_content(json!({
"status": "ok",
"matches": completions,
"cursor_start": cursor_start,
"cursor_end": cursor_pos,
"metadata": {},
- }))
- .send(connection)
+ })))
.await?;
}
}
@@ -328,10 +311,12 @@ impl JupyterServer {
}
}
- msg
- .new_message("status")
- .with_content(json!({"execution_state": "idle"}))
- .send(&mut *self.iopub_socket.lock().await)
+ self
+ .send_iopub(
+ &msg
+ .new_message("status")
+ .with_content(json!({"execution_state": "idle"})),
+ )
.await?;
Ok(())
}
@@ -346,13 +331,11 @@ impl JupyterServer {
}
*self.last_execution_request.borrow_mut() = Some(msg.clone());
- msg
- .new_message("execute_input")
- .with_content(json!({
+ self
+ .send_iopub(&msg.new_message("execute_input").with_content(json!({
"execution_count": self.execution_count,
"code": msg.code()
- }))
- .send(&mut *self.iopub_socket.lock().await)
+ })))
.await?;
let result = self
@@ -363,22 +346,18 @@ impl JupyterServer {
let evaluate_response = match result {
Ok(eval_response) => eval_response,
Err(err) => {
- msg
- .new_message("error")
- .with_content(json!({
+ self
+ .send_iopub(&msg.new_message("error").with_content(json!({
"ename": err.to_string(),
"evalue": " ", // Fake value, otherwise old Jupyter frontends don't show the error
"traceback": [],
- }))
- .send(&mut *self.iopub_socket.lock().await)
+ })))
.await?;
- msg
- .new_reply()
- .with_content(json!({
+ connection
+ .send(&msg.new_reply().with_content(json!({
"status": "error",
"execution_count": self.execution_count,
- }))
- .send(connection)
+ })))
.await?;
return Ok(());
}
@@ -393,14 +372,12 @@ impl JupyterServer {
publish_result(&mut self.repl_session, &result, self.execution_count)
.await?;
- msg
- .new_reply()
- .with_content(json!({
+ connection
+ .send(&msg.new_reply().with_content(json!({
"status": "ok",
"execution_count": self.execution_count,
// FIXME: also include user_expressions
- }))
- .send(connection)
+ })))
.await?;
// Let's sleep here for a few ms, so we give a chance to the task that is
// handling stdout and stderr streams to receive and flush the content.
@@ -479,27 +456,30 @@ impl JupyterServer {
message
};
- msg
- .new_message("error")
- .with_content(json!({
+ self
+ .send_iopub(&msg.new_message("error").with_content(json!({
"ename": ename,
"evalue": evalue,
"traceback": traceback,
- }))
- .send(&mut *self.iopub_socket.lock().await)
+ })))
.await?;
- msg
- .new_reply()
- .with_content(json!({
+ connection
+ .send(&msg.new_reply().with_content(json!({
"status": "error",
"execution_count": self.execution_count,
- }))
- .send(connection)
+ })))
.await?;
}
Ok(())
}
+
+ async fn send_iopub(
+ &mut self,
+ message: &JupyterMessage,
+ ) -> Result<(), AnyError> {
+ self.iopub_socket.lock().await.send(message).await
+ }
}
async fn bind_socket<S: zeromq::Socket>(