summaryrefslogtreecommitdiff
path: root/cli/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/worker.rs')
-rw-r--r--cli/worker.rs201
1 files changed, 81 insertions, 120 deletions
diff --git a/cli/worker.rs b/cli/worker.rs
index 6fb235ceb..07a96af16 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -33,6 +33,25 @@ pub struct WorkerChannels {
pub receiver: Arc<AsyncMutex<mpsc::Receiver<Buf>>>,
}
+impl WorkerChannels {
+ /// Post message to worker as a host.
+ pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> {
+ let mut sender = self.sender.clone();
+ sender.send(buf).map_err(ErrBox::from).await
+ }
+
+ /// Get message from worker as a host.
+ pub fn get_message(&self) -> Pin<Box<dyn Future<Output = Option<Buf>>>> {
+ let receiver_mutex = self.receiver.clone();
+
+ async move {
+ let mut receiver = receiver_mutex.lock().await;
+ receiver.next().await
+ }
+ .boxed_local()
+ }
+}
+
/// Worker is a CLI wrapper for `deno_core::Isolate`.
///
/// It provides infrastructure to communicate with a worker and
@@ -45,10 +64,9 @@ pub struct WorkerChannels {
/// - `MainWorker`
/// - `CompilerWorker`
/// - `WebWorker`
-#[derive(Clone)]
pub struct Worker {
pub name: String,
- pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>,
+ pub isolate: Box<deno_core::EsIsolate>,
pub state: ThreadSafeState,
external_channels: WorkerChannels,
}
@@ -70,7 +88,7 @@ impl Worker {
Self {
name,
- isolate: Arc::new(AsyncMutex::new(isolate)),
+ isolate,
state,
external_channels,
}
@@ -90,13 +108,10 @@ impl Worker {
js_filename: &str,
js_source: &str,
) -> Result<(), ErrBox> {
- let mut isolate = self.isolate.try_lock().unwrap();
- isolate.execute(js_filename, js_source)
+ self.isolate.execute(js_filename, js_source)
}
/// Executes the provided JavaScript module.
- ///
- /// Takes ownership of the isolate behind mutex.
pub async fn execute_mod_async(
&mut self,
module_specifier: &ModuleSpecifier,
@@ -104,40 +119,17 @@ impl Worker {
is_prefetch: bool,
) -> Result<(), ErrBox> {
let specifier = module_specifier.to_string();
- let worker = self.clone();
-
- let mut isolate = self.isolate.lock().await;
- let id = isolate.load_module(&specifier, maybe_code).await?;
- worker.state.global_state.progress.done();
-
+ let id = self.isolate.load_module(&specifier, maybe_code).await?;
+ self.state.global_state.progress.done();
if !is_prefetch {
- return isolate.mod_evaluate(id);
+ return self.isolate.mod_evaluate(id);
}
-
Ok(())
}
- /// Post message to worker as a host.
- ///
- /// This method blocks current thread.
- pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> {
- let mut sender = self.external_channels.sender.clone();
- let result = sender.send(buf).map_err(ErrBox::from).await;
- drop(sender);
- result
- }
-
- /// Get message from worker as a host.
- pub fn get_message(
- &self,
- ) -> Pin<Box<dyn Future<Output = Option<Buf>> + Send>> {
- let receiver_mutex = self.external_channels.receiver.clone();
-
- async move {
- let mut receiver = receiver_mutex.lock().await;
- receiver.next().await
- }
- .boxed()
+ /// Returns a way to communicate with the Worker from other threads.
+ pub fn thread_safe_handle(&self) -> WorkerChannels {
+ self.external_channels.clone()
}
}
@@ -148,13 +140,7 @@ impl Future for Worker {
let inner = self.get_mut();
let waker = AtomicWaker::new();
waker.register(cx.waker());
- match inner.isolate.try_lock() {
- Ok(mut isolate) => isolate.poll_unpin(cx),
- Err(_) => {
- waker.wake();
- Poll::Pending
- }
- }
+ inner.isolate.poll_unpin(cx)
}
}
@@ -164,7 +150,6 @@ impl Future for Worker {
///
/// All WebWorkers created during program execution are decendants of
/// this worker.
-#[derive(Clone)]
pub struct MainWorker(Worker);
impl MainWorker {
@@ -175,33 +160,31 @@ impl MainWorker {
external_channels: WorkerChannels,
) -> Self {
let state_ = state.clone();
- let worker = Worker::new(name, startup_data, state_, external_channels);
+ let mut worker = Worker::new(name, startup_data, state_, external_channels);
{
- let mut isolate = worker.isolate.try_lock().unwrap();
- let op_registry = isolate.op_registry.clone();
-
- ops::runtime::init(&mut isolate, &state);
- ops::runtime_compiler::init(&mut isolate, &state);
- ops::errors::init(&mut isolate, &state);
- ops::fetch::init(&mut isolate, &state);
- ops::files::init(&mut isolate, &state);
- ops::fs::init(&mut isolate, &state);
- ops::io::init(&mut isolate, &state);
- ops::plugins::init(&mut isolate, &state, op_registry);
- ops::net::init(&mut isolate, &state);
- ops::tls::init(&mut isolate, &state);
- ops::os::init(&mut isolate, &state);
- ops::permissions::init(&mut isolate, &state);
- ops::process::init(&mut isolate, &state);
- ops::random::init(&mut isolate, &state);
- ops::repl::init(&mut isolate, &state);
- ops::resources::init(&mut isolate, &state);
- ops::signal::init(&mut isolate, &state);
- ops::timers::init(&mut isolate, &state);
- ops::worker_host::init(&mut isolate, &state);
- ops::web_worker::init(&mut isolate, &state);
+ let op_registry = worker.isolate.op_registry.clone();
+ let isolate = &mut worker.isolate;
+ ops::runtime::init(isolate, &state);
+ ops::runtime_compiler::init(isolate, &state);
+ ops::errors::init(isolate, &state);
+ ops::fetch::init(isolate, &state);
+ ops::files::init(isolate, &state);
+ ops::fs::init(isolate, &state);
+ ops::io::init(isolate, &state);
+ ops::plugins::init(isolate, &state, op_registry);
+ ops::net::init(isolate, &state);
+ ops::tls::init(isolate, &state);
+ ops::os::init(isolate, &state);
+ ops::permissions::init(isolate, &state);
+ ops::process::init(isolate, &state);
+ ops::random::init(isolate, &state);
+ ops::repl::init(isolate, &state);
+ ops::resources::init(isolate, &state);
+ ops::signal::init(isolate, &state);
+ ops::timers::init(isolate, &state);
+ ops::worker_host::init(isolate, &state);
+ ops::web_worker::init(isolate, &state);
}
-
Self(worker)
}
}
@@ -219,15 +202,6 @@ impl DerefMut for MainWorker {
}
}
-impl Future for MainWorker {
- type Output = Result<(), ErrBox>;
-
- fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
- let inner = self.get_mut();
- inner.0.poll_unpin(cx)
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -245,18 +219,7 @@ mod tests {
F: FnOnce() + Send + 'static,
{
let fut = futures::future::lazy(move |_cx| f());
- tokio_util::run(fut)
- }
-
- pub async fn panic_on_error<I, E, F>(f: F) -> I
- where
- F: Future<Output = Result<I, E>>,
- E: std::fmt::Debug,
- {
- match f.await {
- Ok(v) => v,
- Err(e) => panic!("Future got unexpected error: {:?}", e),
- }
+ tokio_util::run_basic(fut)
}
#[test]
@@ -284,7 +247,7 @@ mod tests {
)
.unwrap();
let state_ = state.clone();
- tokio_util::run(async move {
+ tokio_util::run_basic(async move {
let mut worker =
MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
let result = worker
@@ -293,7 +256,9 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
- panic_on_error(worker).await
+ if let Err(e) = (&mut *worker).await {
+ panic!("Future got unexpected error: {:?}", e);
+ }
});
let metrics = &state_.metrics;
@@ -327,7 +292,7 @@ mod tests {
)
.unwrap();
let state_ = state.clone();
- tokio_util::run(async move {
+ tokio_util::run_basic(async move {
let mut worker =
MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
let result = worker
@@ -336,7 +301,9 @@ mod tests {
if let Err(err) = result {
eprintln!("execute_mod err {:?}", err);
}
- panic_on_error(worker).await
+ if let Err(e) = (&mut *worker).await {
+ panic!("Future got unexpected error: {:?}", e);
+ }
});
let metrics = &state_.metrics;
@@ -345,10 +312,9 @@ mod tests {
assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0);
}
- #[test]
- fn execute_006_url_imports() {
+ #[tokio::test]
+ async fn execute_006_url_imports() {
let http_server_guard = crate::test_util::http_server();
-
let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
@@ -368,31 +334,26 @@ mod tests {
int,
)
.unwrap();
- let global_state_ = global_state;
- let state_ = state.clone();
- tokio_util::run(async move {
- let mut worker = MainWorker::new(
- "TEST".to_string(),
- startup_data::deno_isolate_init(),
- state,
- ext,
- );
-
- worker.execute("bootstrapMainRuntime()").unwrap();
- let result = worker
- .execute_mod_async(&module_specifier, None, false)
- .await;
-
- if let Err(err) = result {
- eprintln!("execute_mod err {:?}", err);
- }
- panic_on_error(worker).await
- });
-
- assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3);
+ let mut worker = MainWorker::new(
+ "TEST".to_string(),
+ startup_data::deno_isolate_init(),
+ state.clone(),
+ ext,
+ );
+ worker.execute("bootstrapMainRuntime()").unwrap();
+ let result = worker
+ .execute_mod_async(&module_specifier, None, false)
+ .await;
+ if let Err(err) = result {
+ eprintln!("execute_mod err {:?}", err);
+ }
+ if let Err(e) = (&mut *worker).await {
+ panic!("Future got unexpected error: {:?}", e);
+ }
+ assert_eq!(state.metrics.resolve_count.load(Ordering::SeqCst), 3);
// Check that we've only invoked the compiler once.
assert_eq!(
- global_state_.metrics.compiler_starts.load(Ordering::SeqCst),
+ global_state.metrics.compiler_starts.load(Ordering::SeqCst),
1
);
drop(http_server_guard);