diff options
Diffstat (limited to 'cli/compilers')
-rw-r--r-- | cli/compilers/compiler_worker.rs | 2 | ||||
-rw-r--r-- | cli/compilers/ts.rs | 84 | ||||
-rw-r--r-- | cli/compilers/wasm.rs | 118 |
3 files changed, 106 insertions, 98 deletions
diff --git a/cli/compilers/compiler_worker.rs b/cli/compilers/compiler_worker.rs index 3252aae02..123c29abb 100644 --- a/cli/compilers/compiler_worker.rs +++ b/cli/compilers/compiler_worker.rs @@ -30,7 +30,7 @@ impl CompilerWorker { let isolate = &mut worker.isolate; ops::runtime::init(isolate, &state); ops::compiler::init(isolate, &state); - ops::web_worker::init(isolate, &state); + ops::web_worker::init(isolate, &state, &worker.internal_channels.sender); ops::errors::init(isolate, &state); // for compatibility with Worker scope, though unused at // the moment diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index 91c264345..d1fb3a6ac 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -8,11 +8,15 @@ use crate::file_fetcher::SourceFile; use crate::file_fetcher::SourceFileFetcher; use crate::global_state::GlobalState; use crate::msg; +use crate::ops::worker_host::run_worker_loop; use crate::ops::JsonResult; use crate::source_maps::SourceMapGetter; use crate::startup_data; use crate::state::*; +use crate::tokio_util::create_basic_runtime; use crate::version; +use crate::worker::WorkerEvent; +use crate::worker::WorkerHandle; use deno_core::Buf; use deno_core::ErrBox; use deno_core::ModuleSpecifier; @@ -288,13 +292,11 @@ impl TsCompiler { true, ); - let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?; - if let Some(ref msg) = maybe_msg { - let json_str = std::str::from_utf8(msg).unwrap(); - debug!("Message: {}", json_str); - if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { - return Err(ErrBox::from(diagnostics)); - } + let msg = execute_in_thread(global_state.clone(), req_msg).await?; + let json_str = std::str::from_utf8(&msg).unwrap(); + debug!("Message: {}", json_str); + if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { + return Err(ErrBox::from(diagnostics)); } Ok(()) } @@ -376,13 +378,11 @@ impl TsCompiler { let compiling_job = global_state .progress .add("Compile", &module_url.to_string()); - let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?; + let msg = execute_in_thread(global_state.clone(), req_msg).await?; - if let Some(ref msg) = maybe_msg { - let json_str = std::str::from_utf8(msg).unwrap(); - if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { - return Err(ErrBox::from(diagnostics)); - } + let json_str = std::str::from_utf8(&msg).unwrap(); + if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { + return Err(ErrBox::from(diagnostics)); } let compiled_module = ts_compiler.get_compiled_module(&source_file_.url)?; drop(compiling_job); @@ -602,45 +602,45 @@ impl TsCompiler { } } +// TODO(bartlomieju): exactly same function is in `wasm.rs` - only difference +// it created WasmCompiler instead of TsCompiler - deduplicate async fn execute_in_thread( global_state: GlobalState, req: Buf, -) -> Result<Option<Buf>, ErrBox> { - let (load_sender, load_receiver) = - tokio::sync::oneshot::channel::<Result<Option<Buf>, ErrBox>>(); - std::thread::spawn(move || { - debug!(">>>>> compile_async START"); - +) -> Result<Buf, ErrBox> { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1); + let builder = + std::thread::Builder::new().name("deno-ts-compiler".to_string()); + let join_handle = builder.spawn(move || { let mut worker = TsCompiler::setup_worker(global_state.clone()); - let handle = worker.thread_safe_handle(); - - crate::tokio_util::run_basic( - async move { - if let Err(err) = handle.post_message(req).await { - load_sender.send(Err(err)).unwrap(); - return; - } - if let Err(err) = (&mut *worker).await { - load_sender.send(Err(err)).unwrap(); - return; - } - let maybe_msg = handle.get_message().await; - load_sender.send(Ok(maybe_msg)).unwrap(); - debug!(">>>>> compile_sync END"); - } - .boxed_local(), - ); - }); - - load_receiver.await.unwrap() + handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); + drop(handle_sender); + let mut rt = create_basic_runtime(); + run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); + })?; + let mut handle = handle_receiver.recv().unwrap()?; + handle.post_message(req).await?; + let event = handle.get_event().await.expect("Compiler didn't respond"); + let buf = match event { + WorkerEvent::Message(buf) => Ok(buf), + WorkerEvent::Error(error) => Err(error), + }?; + // Compiler worker finishes after one request + // so we should receive signal that channel was closed. + // Then close worker's channel and join the thread. + let event = handle.get_event().await; + assert!(event.is_none()); + handle.sender.close_channel(); + join_handle.join().unwrap(); + Ok(buf) } async fn execute_in_thread_json( req_msg: Buf, global_state: GlobalState, ) -> JsonResult { - let maybe_msg = execute_in_thread(global_state, req_msg).await?; - let msg = maybe_msg.unwrap(); + let msg = execute_in_thread(global_state, req_msg).await?; let json_str = std::str::from_utf8(&msg).unwrap(); Ok(json!(json_str)) } diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index c1c179f62..9bc9d2ab4 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -3,8 +3,13 @@ use super::compiler_worker::CompilerWorker; use crate::compilers::CompiledModule; use crate::file_fetcher::SourceFile; use crate::global_state::GlobalState; +use crate::ops::worker_host::run_worker_loop; use crate::startup_data; use crate::state::*; +use crate::tokio_util::create_basic_runtime; +use crate::worker::WorkerEvent; +use crate::worker::WorkerHandle; +use deno_core::Buf; use deno_core::ErrBox; use deno_core::ModuleSpecifier; use serde_derive::Deserialize; @@ -83,64 +88,67 @@ impl WasmCompiler { if let Some(m) = maybe_cached { return Ok(m); } - - let (load_sender, load_receiver) = - tokio::sync::oneshot::channel::<Result<CompiledModule, ErrBox>>(); - - std::thread::spawn(move || { - debug!(">>>>> wasm_compile_async START"); - let base64_data = base64::encode(&source_file.source_code); - let mut worker = WasmCompiler::setup_worker(global_state); - let handle = worker.thread_safe_handle(); - let url = source_file.url.clone(); - - let fut = async move { - let _ = handle - .post_message( - serde_json::to_string(&base64_data) - .unwrap() - .into_boxed_str() - .into_boxed_bytes(), - ) - .await; - - if let Err(err) = (&mut *worker).await { - load_sender.send(Err(err)).unwrap(); - return; - } - - debug!("Sent message to worker"); - let json_msg = handle.get_message().await.expect("not handled"); - - debug!("Received message from worker"); - let module_info: WasmModuleInfo = - serde_json::from_slice(&json_msg).unwrap(); - - debug!("WASM module info: {:#?}", &module_info); - let code = wrap_wasm_code( - &base64_data, - &module_info.import_list, - &module_info.export_list, - ); - - debug!("Generated code: {}", &code); - let module = CompiledModule { - code, - name: url.to_string(), - }; - { - cache_.lock().unwrap().insert(url.clone(), module.clone()); - } - debug!("<<<<< wasm_compile_async END"); - load_sender.send(Ok(module)).unwrap(); - }; - - crate::tokio_util::run_basic(fut); - }); - load_receiver.await.unwrap() + debug!(">>>>> wasm_compile_async START"); + let base64_data = base64::encode(&source_file.source_code); + let url = source_file.url.clone(); + let req_msg = serde_json::to_string(&base64_data) + .unwrap() + .into_boxed_str() + .into_boxed_bytes(); + let msg = execute_in_thread(global_state.clone(), req_msg).await?; + debug!("Received message from worker"); + let module_info: WasmModuleInfo = serde_json::from_slice(&msg).unwrap(); + debug!("WASM module info: {:#?}", &module_info); + let code = wrap_wasm_code( + &base64_data, + &module_info.import_list, + &module_info.export_list, + ); + debug!("Generated code: {}", &code); + let module = CompiledModule { + code, + name: url.to_string(), + }; + { + cache_.lock().unwrap().insert(url.clone(), module.clone()); + } + debug!("<<<<< wasm_compile_async END"); + Ok(module) } } +async fn execute_in_thread( + global_state: GlobalState, + req: Buf, +) -> Result<Buf, ErrBox> { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1); + let builder = + std::thread::Builder::new().name("deno-wasm-compiler".to_string()); + let join_handle = builder.spawn(move || { + let mut worker = WasmCompiler::setup_worker(global_state); + handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); + drop(handle_sender); + let mut rt = create_basic_runtime(); + run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); + })?; + let mut handle = handle_receiver.recv().unwrap()?; + handle.post_message(req).await?; + let event = handle.get_event().await.expect("Compiler didn't respond"); + let buf = match event { + WorkerEvent::Message(buf) => Ok(buf), + WorkerEvent::Error(error) => Err(error), + }?; + // Compiler worker finishes after one request + // so we should receive signal that channel was closed. + // Then close worker's channel and join the thread. + let event = handle.get_event().await; + assert!(event.is_none()); + handle.sender.close_channel(); + join_handle.join().unwrap(); + Ok(buf) +} + fn build_single_import(index: usize, origin: &str) -> String { let origin_json = serde_json::to_string(origin).unwrap(); format!( |