diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2020-02-03 18:08:44 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-03 18:08:44 -0500 |
commit | 161cf7cdfd44ace8937fb7940727984990742d18 (patch) | |
tree | 1ef88b3cd6427353366d930ea9be5ae494504255 /cli/compilers/ts.rs | |
parent | 0471243334ac1aeb76dcaadbc3f0b8114d188fb8 (diff) |
refactor: Use Tokio's single-threaded runtime (#3844)
This change simplifies how we execute V8. Previously V8 Isolates jumped
around threads every time they were woken up. This was overly complex and
potentially hurting performance in a myriad ways. Now isolates run on
their own dedicated thread and never move.
- blocking_json spawns a thread and does not use a thread pool
- op_host_poll_worker and op_host_resume_worker are non-operational
- removes Worker::get_message and Worker::post_message
- ThreadSafeState::workers table contains WorkerChannel entries instead
of actual Worker instances.
- MainWorker and CompilerWorker are no longer Futures.
- The multi-threaded version of deno_core_http_bench was removed.
- AyncOps no longer need to be Send + Sync
This PR is very large and several tests were disabled to speed
integration:
- installer_test_local_module_run
- installer_test_remote_module_run
- _015_duplicate_parallel_import
- _026_workers
Diffstat (limited to 'cli/compilers/ts.rs')
-rw-r--r-- | cli/compilers/ts.rs | 265 |
1 files changed, 146 insertions, 119 deletions
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index b86a1a7ae..27203484f 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -9,7 +9,7 @@ use crate::file_fetcher::SourceFile; use crate::file_fetcher::SourceFileFetcher; use crate::global_state::ThreadSafeGlobalState; use crate::msg; -use crate::serde_json::json; +use crate::ops::JsonResult; use crate::source_maps::SourceMapGetter; use crate::startup_data; use crate::state::*; @@ -20,6 +20,7 @@ use deno_core::ModuleSpecifier; use futures::future::FutureExt; use futures::Future; use regex::Regex; +use serde_json::json; use std::collections::HashMap; use std::collections::HashSet; use std::fs; @@ -37,7 +38,7 @@ lazy_static! { Regex::new(r#""checkJs"\s*?:\s*?true"#).unwrap(); } -#[derive(Clone, Copy)] +#[derive(Clone)] pub enum TargetLib { Main, Worker, @@ -236,7 +237,8 @@ impl TsCompiler { Ok(compiler) } - /// Create a new V8 worker with snapshot of TS compiler and setup compiler's runtime. + /// Create a new V8 worker with snapshot of TS compiler and setup compiler's + /// runtime. fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker { let (int, ext) = ThreadSafeState::create_channels(); let worker_state = @@ -280,34 +282,52 @@ impl TsCompiler { true, ); - let worker = TsCompiler::setup_worker(global_state); - let worker_ = worker.clone(); - - async move { - worker.post_message(req_msg).await?; - worker.await?; - debug!("Sent message to worker"); - let maybe_msg = worker_.get_message().await; - debug!("Received message from worker"); - if let Some(msg) = maybe_msg { - let json_str = std::str::from_utf8(&msg).unwrap(); - debug!("Message: {}", json_str); - if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { - return Err(ErrBox::from(diagnostics)); + // TODO(ry) The code below looks very similar to spawn_ts_compiler_worker. + // Can we combine them? + let (load_sender, load_receiver) = + tokio::sync::oneshot::channel::<Result<(), ErrBox>>(); + std::thread::spawn(move || { + let mut worker = TsCompiler::setup_worker(global_state); + let handle = worker.thread_safe_handle(); + + let fut = async move { + if let Err(err) = handle.post_message(req_msg).await { + load_sender.send(Err(err)).unwrap(); + return; + } + debug!("Sent message to worker"); + if let Err(err) = (&mut *worker).await { + load_sender.send(Err(err)).unwrap(); + return; } + let maybe_msg = handle.get_message().await; + debug!("Received message from worker"); + if let Some(ref msg) = maybe_msg { + let json_str = std::str::from_utf8(msg).unwrap(); + debug!("Message: {}", json_str); + if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { + let err = ErrBox::from(diagnostics); + load_sender.send(Err(err)).unwrap(); + return; + } + } + load_sender.send(Ok(())).unwrap(); } - Ok(()) - } + .boxed_local(); + crate::tokio_util::run_basic(fut); + }); + async { load_receiver.await.unwrap() }.boxed_local() } - /// Mark given module URL as compiled to avoid multiple compilations of same module - /// in single run. + /// Mark given module URL as compiled to avoid multiple compilations of same + /// module in single run. fn mark_compiled(&self, url: &Url) { let mut c = self.compiled.lock().unwrap(); c.insert(url.clone()); } - /// Check if given module URL has already been compiled and can be fetched directly from disk. + /// Check if given module URL has already been compiled and can be fetched + /// directly from disk. fn has_compiled(&self, url: &Url) -> bool { let c = self.compiled.lock().unwrap(); c.contains(url) @@ -317,9 +337,11 @@ impl TsCompiler { /// /// This method compiled every module at most once. /// - /// If `--reload` flag was provided then compiler will not on-disk cache and force recompilation. + /// If `--reload` flag was provided then compiler will not on-disk cache and + /// force recompilation. /// - /// If compilation is required then new V8 worker is spawned with fresh TS compiler. + /// If compilation is required then new V8 worker is spawned with fresh TS + /// compiler. pub fn compile_async( &self, global_state: ThreadSafeGlobalState, @@ -356,22 +378,12 @@ impl TsCompiler { } } } - let source_file_ = source_file.clone(); - - debug!(">>>>> compile_sync START"); let module_url = source_file.url.clone(); - - debug!( - "Running rust part of compile_sync, module specifier: {}", - &source_file.url - ); - let target = match target { TargetLib::Main => "main", TargetLib::Worker => "worker", }; - let root_names = vec![module_url.to_string()]; let req_msg = req( msg::CompilerRequestType::Compile, @@ -382,34 +394,51 @@ impl TsCompiler { false, ); - let worker = TsCompiler::setup_worker(global_state.clone()); - let worker_ = worker.clone(); - let compiling_job = global_state - .progress - .add("Compile", &module_url.to_string()); - let global_state_ = global_state; + // TODO(ry) The code below looks very similar to spawn_ts_compiler_worker. + // Can we combine them? + let (load_sender, load_receiver) = + tokio::sync::oneshot::channel::<Result<CompiledModule, ErrBox>>(); + std::thread::spawn(move || { + debug!(">>>>> compile_async START"); - async move { - worker.post_message(req_msg).await?; - worker.await?; - debug!("Sent message to worker"); - let maybe_msg = worker_.get_message().await; - if let Some(msg) = maybe_msg { - let json_str = std::str::from_utf8(&msg).unwrap(); - debug!("Message: {}", json_str); - if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { - return Err(ErrBox::from(diagnostics)); + let mut worker = TsCompiler::setup_worker(global_state.clone()); + let handle = worker.thread_safe_handle(); + + let compiling_job = global_state + .progress + .add("Compile", &module_url.to_string()); + + let fut = async move { + if let Err(err) = handle.post_message(req_msg).await { + load_sender.send(Err(err)).unwrap(); + return; + } + if let Err(err) = (&mut *worker).await { + load_sender.send(Err(err)).unwrap(); + return; } + let maybe_msg = handle.get_message().await; + if let Some(ref msg) = maybe_msg { + let json_str = std::str::from_utf8(msg).unwrap(); + if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { + let err = ErrBox::from(diagnostics); + load_sender.send(Err(err)).unwrap(); + return; + } + } + let compiled_module = global_state + .ts_compiler + .get_compiled_module(&source_file_.url) + .expect("Expected to find compiled file"); + drop(compiling_job); + debug!(">>>>> compile_sync END"); + load_sender.send(Ok(compiled_module)).unwrap(); } - let compiled_module = global_state_ - .ts_compiler - .get_compiled_module(&source_file_.url) - .expect("Expected to find compiled file"); - drop(compiling_job); - debug!(">>>>> compile_sync END"); - Ok(compiled_module) - } - .boxed() + .boxed_local(); + crate::tokio_util::run_basic(fut); + }); + + async { load_receiver.await.unwrap() }.boxed_local() } /// Get associated `CompiledFileMetadata` for given module if it exists. @@ -625,6 +654,39 @@ impl TsCompiler { } } +// TODO(ry) this is pretty general purpose and should be lifted and generalized. +fn spawn_ts_compiler_worker( + req_msg: Buf, + global_state: ThreadSafeGlobalState, +) -> Pin<Box<CompilationResultFuture>> { + let (load_sender, load_receiver) = + tokio::sync::oneshot::channel::<JsonResult>(); + + std::thread::spawn(move || { + let mut worker = TsCompiler::setup_worker(global_state); + let handle = worker.thread_safe_handle(); + + let fut = async move { + debug!("Sent message to worker"); + if let Err(err) = handle.post_message(req_msg).await { + load_sender.send(Err(err)).unwrap(); + return; + } + if let Err(err) = (&mut *worker).await { + load_sender.send(Err(err)).unwrap(); + return; + } + let msg = handle.get_message().await.unwrap(); + let json_str = std::str::from_utf8(&msg).unwrap(); + load_sender.send(Ok(json!(json_str))).unwrap(); + }; + crate::tokio_util::run_basic(fut); + }); + + let fut = async { load_receiver.await.unwrap() }; + fut.boxed_local() +} + pub fn runtime_compile_async<S: BuildHasher>( global_state: ThreadSafeGlobalState, root_name: &str, @@ -644,18 +706,7 @@ pub fn runtime_compile_async<S: BuildHasher>( .into_boxed_str() .into_boxed_bytes(); - let worker = TsCompiler::setup_worker(global_state); - let worker_ = worker.clone(); - - async move { - worker.post_message(req_msg).await?; - worker.await?; - debug!("Sent message to worker"); - let msg = (worker_.get_message().await).unwrap(); - let json_str = std::str::from_utf8(&msg).unwrap(); - Ok(json!(json_str)) - } - .boxed() + spawn_ts_compiler_worker(req_msg, global_state) } pub fn runtime_transpile_async<S: BuildHasher>( @@ -672,38 +723,25 @@ pub fn runtime_transpile_async<S: BuildHasher>( .into_boxed_str() .into_boxed_bytes(); - let worker = TsCompiler::setup_worker(global_state); - let worker_ = worker.clone(); - - async move { - worker.post_message(req_msg).await?; - worker.await?; - debug!("Sent message to worker"); - let msg = (worker_.get_message().await).unwrap(); - let json_str = std::str::from_utf8(&msg).unwrap(); - Ok(json!(json_str)) - } - .boxed() + spawn_ts_compiler_worker(req_msg, global_state) } #[cfg(test)] mod tests { use super::*; use crate::fs as deno_fs; - use crate::tokio_util; use deno_core::ModuleSpecifier; use std::path::PathBuf; use tempfile::TempDir; - #[test] - fn test_compile_async() { + #[tokio::test] + async fn test_compile_async() { let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) .parent() .unwrap() .join("cli/tests/002_hello.ts"); let specifier = ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap(); - let out = SourceFile { url: specifier.as_url().clone(), filename: PathBuf::from(p.to_str().unwrap().to_string()), @@ -711,31 +749,24 @@ mod tests { source_code: include_bytes!("../tests/002_hello.ts").to_vec(), types_url: None, }; - let mock_state = ThreadSafeGlobalState::mock(vec![ String::from("deno"), String::from("hello.js"), ]); - - let fut = async move { - let result = mock_state - .ts_compiler - .compile_async(mock_state.clone(), &out, TargetLib::Main) - .await; - - assert!(result.is_ok()); - assert!(result - .unwrap() - .code - .as_bytes() - .starts_with(b"console.log(\"Hello World\");")); - }; - - tokio_util::run(fut.boxed()) + let result = mock_state + .ts_compiler + .compile_async(mock_state.clone(), &out, TargetLib::Main) + .await; + assert!(result.is_ok()); + assert!(result + .unwrap() + .code + .as_bytes() + .starts_with(b"console.log(\"Hello World\");")); } - #[test] - fn test_bundle_async() { + #[tokio::test] + async fn test_bundle_async() { let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) .parent() .unwrap() @@ -751,19 +782,15 @@ mod tests { String::from("$deno$/bundle.js"), ]); - let fut = async move { - let result = state - .ts_compiler - .bundle_async( - state.clone(), - module_name, - Some(String::from("$deno$/bundle.js")), - ) - .await; - - assert!(result.is_ok()); - }; - tokio_util::run(fut.boxed()) + let result = state + .ts_compiler + .bundle_async( + state.clone(), + module_name, + Some(String::from("$deno$/bundle.js")), + ) + .await; + assert!(result.is_ok()); } #[test] |