diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-02-11 10:04:59 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-11 10:04:59 +0100 |
commit | 79b3bc05d6de520f1df73face1744ae3d8be0bb8 (patch) | |
tree | f4c449efa67b88c767df52dd3ecec2246dded2e5 /cli/compilers/wasm.rs | |
parent | 81905a867ea3f942619229e330840d132c57a5da (diff) |
workers: basic event loop (#3828)
* establish basic event loop for workers
* make "self.close()" inside worker
* remove "runWorkerMessageLoop() - instead manually call global function
in Rust when message arrives. This is done in preparation for structured clone
* refactor "WorkerChannel" and use distinct structs for internal
and external channels; "WorkerChannelsInternal" and "WorkerHandle"
* move "State.worker_channels_internal" to "Worker.internal_channels"
* add "WorkerEvent" enum for child->host communication;
currently "Message(Buf)" and "Error(ErrBox)" variants are supported
* add tests for nested workers
* add tests for worker throwing error on startup
Diffstat (limited to 'cli/compilers/wasm.rs')
-rw-r--r-- | cli/compilers/wasm.rs | 118 |
1 files changed, 63 insertions, 55 deletions
diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index c1c179f62..9bc9d2ab4 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -3,8 +3,13 @@ use super::compiler_worker::CompilerWorker; use crate::compilers::CompiledModule; use crate::file_fetcher::SourceFile; use crate::global_state::GlobalState; +use crate::ops::worker_host::run_worker_loop; use crate::startup_data; use crate::state::*; +use crate::tokio_util::create_basic_runtime; +use crate::worker::WorkerEvent; +use crate::worker::WorkerHandle; +use deno_core::Buf; use deno_core::ErrBox; use deno_core::ModuleSpecifier; use serde_derive::Deserialize; @@ -83,64 +88,67 @@ impl WasmCompiler { if let Some(m) = maybe_cached { return Ok(m); } - - let (load_sender, load_receiver) = - tokio::sync::oneshot::channel::<Result<CompiledModule, ErrBox>>(); - - std::thread::spawn(move || { - debug!(">>>>> wasm_compile_async START"); - let base64_data = base64::encode(&source_file.source_code); - let mut worker = WasmCompiler::setup_worker(global_state); - let handle = worker.thread_safe_handle(); - let url = source_file.url.clone(); - - let fut = async move { - let _ = handle - .post_message( - serde_json::to_string(&base64_data) - .unwrap() - .into_boxed_str() - .into_boxed_bytes(), - ) - .await; - - if let Err(err) = (&mut *worker).await { - load_sender.send(Err(err)).unwrap(); - return; - } - - debug!("Sent message to worker"); - let json_msg = handle.get_message().await.expect("not handled"); - - debug!("Received message from worker"); - let module_info: WasmModuleInfo = - serde_json::from_slice(&json_msg).unwrap(); - - debug!("WASM module info: {:#?}", &module_info); - let code = wrap_wasm_code( - &base64_data, - &module_info.import_list, - &module_info.export_list, - ); - - debug!("Generated code: {}", &code); - let module = CompiledModule { - code, - name: url.to_string(), - }; - { - cache_.lock().unwrap().insert(url.clone(), module.clone()); - } - debug!("<<<<< wasm_compile_async END"); - load_sender.send(Ok(module)).unwrap(); - }; - - crate::tokio_util::run_basic(fut); - }); - load_receiver.await.unwrap() + debug!(">>>>> wasm_compile_async START"); + let base64_data = base64::encode(&source_file.source_code); + let url = source_file.url.clone(); + let req_msg = serde_json::to_string(&base64_data) + .unwrap() + .into_boxed_str() + .into_boxed_bytes(); + let msg = execute_in_thread(global_state.clone(), req_msg).await?; + debug!("Received message from worker"); + let module_info: WasmModuleInfo = serde_json::from_slice(&msg).unwrap(); + debug!("WASM module info: {:#?}", &module_info); + let code = wrap_wasm_code( + &base64_data, + &module_info.import_list, + &module_info.export_list, + ); + debug!("Generated code: {}", &code); + let module = CompiledModule { + code, + name: url.to_string(), + }; + { + cache_.lock().unwrap().insert(url.clone(), module.clone()); + } + debug!("<<<<< wasm_compile_async END"); + Ok(module) } } +async fn execute_in_thread( + global_state: GlobalState, + req: Buf, +) -> Result<Buf, ErrBox> { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1); + let builder = + std::thread::Builder::new().name("deno-wasm-compiler".to_string()); + let join_handle = builder.spawn(move || { + let mut worker = WasmCompiler::setup_worker(global_state); + handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); + drop(handle_sender); + let mut rt = create_basic_runtime(); + run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); + })?; + let mut handle = handle_receiver.recv().unwrap()?; + handle.post_message(req).await?; + let event = handle.get_event().await.expect("Compiler didn't respond"); + let buf = match event { + WorkerEvent::Message(buf) => Ok(buf), + WorkerEvent::Error(error) => Err(error), + }?; + // Compiler worker finishes after one request + // so we should receive signal that channel was closed. + // Then close worker's channel and join the thread. + let event = handle.get_event().await; + assert!(event.is_none()); + handle.sender.close_channel(); + join_handle.join().unwrap(); + Ok(buf) +} + fn build_single_import(index: usize, origin: &str) -> String { let origin_json = serde_json::to_string(origin).unwrap(); format!( |