diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2020-02-03 18:08:44 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-03 18:08:44 -0500 |
commit | 161cf7cdfd44ace8937fb7940727984990742d18 (patch) | |
tree | 1ef88b3cd6427353366d930ea9be5ae494504255 /cli/worker.rs | |
parent | 0471243334ac1aeb76dcaadbc3f0b8114d188fb8 (diff) |
refactor: Use Tokio's single-threaded runtime (#3844)
This change simplifies how we execute V8. Previously V8 Isolates jumped
around threads every time they were woken up. This was overly complex and
potentially hurting performance in a myriad ways. Now isolates run on
their own dedicated thread and never move.
- blocking_json spawns a thread and does not use a thread pool
- op_host_poll_worker and op_host_resume_worker are non-operational
- removes Worker::get_message and Worker::post_message
- ThreadSafeState::workers table contains WorkerChannel entries instead
of actual Worker instances.
- MainWorker and CompilerWorker are no longer Futures.
- The multi-threaded version of deno_core_http_bench was removed.
- AyncOps no longer need to be Send + Sync
This PR is very large and several tests were disabled to speed
integration:
- installer_test_local_module_run
- installer_test_remote_module_run
- _015_duplicate_parallel_import
- _026_workers
Diffstat (limited to 'cli/worker.rs')
-rw-r--r-- | cli/worker.rs | 201 |
1 files changed, 81 insertions, 120 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index 6fb235ceb..07a96af16 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -33,6 +33,25 @@ pub struct WorkerChannels { pub receiver: Arc<AsyncMutex<mpsc::Receiver<Buf>>>, } +impl WorkerChannels { + /// Post message to worker as a host. + pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> { + let mut sender = self.sender.clone(); + sender.send(buf).map_err(ErrBox::from).await + } + + /// Get message from worker as a host. + pub fn get_message(&self) -> Pin<Box<dyn Future<Output = Option<Buf>>>> { + let receiver_mutex = self.receiver.clone(); + + async move { + let mut receiver = receiver_mutex.lock().await; + receiver.next().await + } + .boxed_local() + } +} + /// Worker is a CLI wrapper for `deno_core::Isolate`. /// /// It provides infrastructure to communicate with a worker and @@ -45,10 +64,9 @@ pub struct WorkerChannels { /// - `MainWorker` /// - `CompilerWorker` /// - `WebWorker` -#[derive(Clone)] pub struct Worker { pub name: String, - pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>, + pub isolate: Box<deno_core::EsIsolate>, pub state: ThreadSafeState, external_channels: WorkerChannels, } @@ -70,7 +88,7 @@ impl Worker { Self { name, - isolate: Arc::new(AsyncMutex::new(isolate)), + isolate, state, external_channels, } @@ -90,13 +108,10 @@ impl Worker { js_filename: &str, js_source: &str, ) -> Result<(), ErrBox> { - let mut isolate = self.isolate.try_lock().unwrap(); - isolate.execute(js_filename, js_source) + self.isolate.execute(js_filename, js_source) } /// Executes the provided JavaScript module. - /// - /// Takes ownership of the isolate behind mutex. pub async fn execute_mod_async( &mut self, module_specifier: &ModuleSpecifier, @@ -104,40 +119,17 @@ impl Worker { is_prefetch: bool, ) -> Result<(), ErrBox> { let specifier = module_specifier.to_string(); - let worker = self.clone(); - - let mut isolate = self.isolate.lock().await; - let id = isolate.load_module(&specifier, maybe_code).await?; - worker.state.global_state.progress.done(); - + let id = self.isolate.load_module(&specifier, maybe_code).await?; + self.state.global_state.progress.done(); if !is_prefetch { - return isolate.mod_evaluate(id); + return self.isolate.mod_evaluate(id); } - Ok(()) } - /// Post message to worker as a host. - /// - /// This method blocks current thread. - pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> { - let mut sender = self.external_channels.sender.clone(); - let result = sender.send(buf).map_err(ErrBox::from).await; - drop(sender); - result - } - - /// Get message from worker as a host. - pub fn get_message( - &self, - ) -> Pin<Box<dyn Future<Output = Option<Buf>> + Send>> { - let receiver_mutex = self.external_channels.receiver.clone(); - - async move { - let mut receiver = receiver_mutex.lock().await; - receiver.next().await - } - .boxed() + /// Returns a way to communicate with the Worker from other threads. + pub fn thread_safe_handle(&self) -> WorkerChannels { + self.external_channels.clone() } } @@ -148,13 +140,7 @@ impl Future for Worker { let inner = self.get_mut(); let waker = AtomicWaker::new(); waker.register(cx.waker()); - match inner.isolate.try_lock() { - Ok(mut isolate) => isolate.poll_unpin(cx), - Err(_) => { - waker.wake(); - Poll::Pending - } - } + inner.isolate.poll_unpin(cx) } } @@ -164,7 +150,6 @@ impl Future for Worker { /// /// All WebWorkers created during program execution are decendants of /// this worker. -#[derive(Clone)] pub struct MainWorker(Worker); impl MainWorker { @@ -175,33 +160,31 @@ impl MainWorker { external_channels: WorkerChannels, ) -> Self { let state_ = state.clone(); - let worker = Worker::new(name, startup_data, state_, external_channels); + let mut worker = Worker::new(name, startup_data, state_, external_channels); { - let mut isolate = worker.isolate.try_lock().unwrap(); - let op_registry = isolate.op_registry.clone(); - - ops::runtime::init(&mut isolate, &state); - ops::runtime_compiler::init(&mut isolate, &state); - ops::errors::init(&mut isolate, &state); - ops::fetch::init(&mut isolate, &state); - ops::files::init(&mut isolate, &state); - ops::fs::init(&mut isolate, &state); - ops::io::init(&mut isolate, &state); - ops::plugins::init(&mut isolate, &state, op_registry); - ops::net::init(&mut isolate, &state); - ops::tls::init(&mut isolate, &state); - ops::os::init(&mut isolate, &state); - ops::permissions::init(&mut isolate, &state); - ops::process::init(&mut isolate, &state); - ops::random::init(&mut isolate, &state); - ops::repl::init(&mut isolate, &state); - ops::resources::init(&mut isolate, &state); - ops::signal::init(&mut isolate, &state); - ops::timers::init(&mut isolate, &state); - ops::worker_host::init(&mut isolate, &state); - ops::web_worker::init(&mut isolate, &state); + let op_registry = worker.isolate.op_registry.clone(); + let isolate = &mut worker.isolate; + ops::runtime::init(isolate, &state); + ops::runtime_compiler::init(isolate, &state); + ops::errors::init(isolate, &state); + ops::fetch::init(isolate, &state); + ops::files::init(isolate, &state); + ops::fs::init(isolate, &state); + ops::io::init(isolate, &state); + ops::plugins::init(isolate, &state, op_registry); + ops::net::init(isolate, &state); + ops::tls::init(isolate, &state); + ops::os::init(isolate, &state); + ops::permissions::init(isolate, &state); + ops::process::init(isolate, &state); + ops::random::init(isolate, &state); + ops::repl::init(isolate, &state); + ops::resources::init(isolate, &state); + ops::signal::init(isolate, &state); + ops::timers::init(isolate, &state); + ops::worker_host::init(isolate, &state); + ops::web_worker::init(isolate, &state); } - Self(worker) } } @@ -219,15 +202,6 @@ impl DerefMut for MainWorker { } } -impl Future for MainWorker { - type Output = Result<(), ErrBox>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - inner.0.poll_unpin(cx) - } -} - #[cfg(test)] mod tests { use super::*; @@ -245,18 +219,7 @@ mod tests { F: FnOnce() + Send + 'static, { let fut = futures::future::lazy(move |_cx| f()); - tokio_util::run(fut) - } - - pub async fn panic_on_error<I, E, F>(f: F) -> I - where - F: Future<Output = Result<I, E>>, - E: std::fmt::Debug, - { - match f.await { - Ok(v) => v, - Err(e) => panic!("Future got unexpected error: {:?}", e), - } + tokio_util::run_basic(fut) } #[test] @@ -284,7 +247,7 @@ mod tests { ) .unwrap(); let state_ = state.clone(); - tokio_util::run(async move { + tokio_util::run_basic(async move { let mut worker = MainWorker::new("TEST".to_string(), StartupData::None, state, ext); let result = worker @@ -293,7 +256,9 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - panic_on_error(worker).await + if let Err(e) = (&mut *worker).await { + panic!("Future got unexpected error: {:?}", e); + } }); let metrics = &state_.metrics; @@ -327,7 +292,7 @@ mod tests { ) .unwrap(); let state_ = state.clone(); - tokio_util::run(async move { + tokio_util::run_basic(async move { let mut worker = MainWorker::new("TEST".to_string(), StartupData::None, state, ext); let result = worker @@ -336,7 +301,9 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - panic_on_error(worker).await + if let Err(e) = (&mut *worker).await { + panic!("Future got unexpected error: {:?}", e); + } }); let metrics = &state_.metrics; @@ -345,10 +312,9 @@ mod tests { assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 0); } - #[test] - fn execute_006_url_imports() { + #[tokio::test] + async fn execute_006_url_imports() { let http_server_guard = crate::test_util::http_server(); - let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) .parent() .unwrap() @@ -368,31 +334,26 @@ mod tests { int, ) .unwrap(); - let global_state_ = global_state; - let state_ = state.clone(); - tokio_util::run(async move { - let mut worker = MainWorker::new( - "TEST".to_string(), - startup_data::deno_isolate_init(), - state, - ext, - ); - - worker.execute("bootstrapMainRuntime()").unwrap(); - let result = worker - .execute_mod_async(&module_specifier, None, false) - .await; - - if let Err(err) = result { - eprintln!("execute_mod err {:?}", err); - } - panic_on_error(worker).await - }); - - assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3); + let mut worker = MainWorker::new( + "TEST".to_string(), + startup_data::deno_isolate_init(), + state.clone(), + ext, + ); + worker.execute("bootstrapMainRuntime()").unwrap(); + let result = worker + .execute_mod_async(&module_specifier, None, false) + .await; + if let Err(err) = result { + eprintln!("execute_mod err {:?}", err); + } + if let Err(e) = (&mut *worker).await { + panic!("Future got unexpected error: {:?}", e); + } + assert_eq!(state.metrics.resolve_count.load(Ordering::SeqCst), 3); // Check that we've only invoked the compiler once. assert_eq!( - global_state_.metrics.compiler_starts.load(Ordering::SeqCst), + global_state.metrics.compiler_starts.load(Ordering::SeqCst), 1 ); drop(http_server_guard); |