summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tools/repl.rs69
-rw-r--r--runtime/worker.rs3
2 files changed, 33 insertions, 39 deletions
diff --git a/cli/tools/repl.rs b/cli/tools/repl.rs
index 957d60b66..6c6d26240 100644
--- a/cli/tools/repl.rs
+++ b/cli/tools/repl.rs
@@ -25,24 +25,25 @@ use rustyline::Context;
use rustyline::Editor;
use rustyline_derive::{Helper, Hinter};
use std::borrow::Cow;
+use std::cell::RefCell;
use std::path::PathBuf;
-use std::sync::mpsc::channel;
-use std::sync::mpsc::sync_channel;
-use std::sync::mpsc::Receiver;
-use std::sync::mpsc::Sender;
-use std::sync::mpsc::SyncSender;
use std::sync::Arc;
use std::sync::Mutex;
use swc_ecmascript::parser::token::{Token, Word};
-use tokio::pin;
+use tokio::sync::mpsc::channel;
+use tokio::sync::mpsc::unbounded_channel;
+use tokio::sync::mpsc::Receiver;
+use tokio::sync::mpsc::Sender;
+use tokio::sync::mpsc::UnboundedReceiver;
+use tokio::sync::mpsc::UnboundedSender;
// Provides helpers to the editor like validation for multi-line edits, completion candidates for
// tab completion.
#[derive(Helper, Hinter)]
struct EditorHelper {
context_id: u64,
- message_tx: SyncSender<(String, Option<Value>)>,
- response_rx: Receiver<Result<Value, AnyError>>,
+ message_tx: Sender<(String, Option<Value>)>,
+ response_rx: RefCell<UnboundedReceiver<Result<Value, AnyError>>>,
}
impl EditorHelper {
@@ -51,8 +52,10 @@ impl EditorHelper {
method: &str,
params: Option<Value>,
) -> Result<Value, AnyError> {
- self.message_tx.send((method.to_string(), params))?;
- self.response_rx.recv()?
+ self
+ .message_tx
+ .blocking_send((method.to_string(), params))?;
+ self.response_rx.borrow_mut().blocking_recv().unwrap()
}
fn get_global_lexical_scope_names(&self) -> Vec<String> {
@@ -444,7 +447,7 @@ impl ReplSession {
}
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
- self.worker.run_event_loop(false).await
+ self.worker.run_event_loop(true).await
}
pub async fn evaluate_line_and_get_output(
@@ -622,39 +625,31 @@ impl ReplSession {
async fn read_line_and_poll(
repl_session: &mut ReplSession,
- message_rx: &Receiver<(String, Option<Value>)>,
- response_tx: &Sender<Result<Value, AnyError>>,
+ message_rx: &mut Receiver<(String, Option<Value>)>,
+ response_tx: &UnboundedSender<Result<Value, AnyError>>,
editor: ReplEditor,
) -> Result<String, ReadlineError> {
- let mut line = tokio::task::spawn_blocking(move || editor.readline());
-
+ let mut line_fut = tokio::task::spawn_blocking(move || editor.readline());
let mut poll_worker = true;
loop {
- for (method, params) in message_rx.try_iter() {
- let result = repl_session
- .post_message_with_event_loop(&method, params)
- .await;
- response_tx.send(result).unwrap();
- }
-
- // Because an inspector websocket client may choose to connect at anytime when we have an
- // inspector server we need to keep polling the worker to pick up new connections.
- // TODO(piscisaureus): the above comment is a red herring; figure out if/why
- // the event loop isn't woken by a waker when a websocket client connects.
- let timeout = tokio::time::sleep(tokio::time::Duration::from_millis(100));
- pin!(timeout);
-
tokio::select! {
- result = &mut line => {
+ result = &mut line_fut => {
return result.unwrap();
}
+ result = message_rx.recv() => {
+ if let Some((method, params)) = result {
+ let result = repl_session
+ .post_message_with_event_loop(&method, params)
+ .await;
+ response_tx.send(result).unwrap();
+ }
+
+ poll_worker = true;
+ },
_ = repl_session.run_event_loop(), if poll_worker => {
poll_worker = false;
}
- _ = timeout => {
- poll_worker = true
- }
}
}
}
@@ -664,13 +659,13 @@ pub async fn run(
worker: MainWorker,
) -> Result<(), AnyError> {
let mut repl_session = ReplSession::initialize(worker).await?;
- let (message_tx, message_rx) = sync_channel(1);
- let (response_tx, response_rx) = channel();
+ let (message_tx, mut message_rx) = channel(1);
+ let (response_tx, response_rx) = unbounded_channel();
let helper = EditorHelper {
context_id: repl_session.context_id,
message_tx,
- response_rx,
+ response_rx: RefCell::new(response_rx),
};
let history_file_path = program_state.dir.root.join("deno_history.txt");
@@ -682,7 +677,7 @@ pub async fn run(
loop {
let line = read_line_and_poll(
&mut repl_session,
- &message_rx,
+ &mut message_rx,
&response_tx,
editor.clone(),
)
diff --git a/runtime/worker.rs b/runtime/worker.rs
index 567e75253..091b97119 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -7,7 +7,6 @@ use crate::ops;
use crate::permissions::Permissions;
use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_core::error::AnyError;
-use deno_core::futures::future::poll_fn;
use deno_core::futures::stream::StreamExt;
use deno_core::futures::Future;
use deno_core::located_script_name;
@@ -247,7 +246,7 @@ impl MainWorker {
&mut self,
wait_for_inspector: bool,
) -> Result<(), AnyError> {
- poll_fn(|cx| self.poll_event_loop(cx, wait_for_inspector)).await
+ self.js_runtime.run_event_loop(wait_for_inspector).await
}
/// A utility function that runs provided future concurrently with the event loop.