summaryrefslogtreecommitdiff
path: root/cli/compilers/ts.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2020-02-11 10:04:59 +0100
committerGitHub <noreply@github.com>2020-02-11 10:04:59 +0100
commit79b3bc05d6de520f1df73face1744ae3d8be0bb8 (patch)
treef4c449efa67b88c767df52dd3ecec2246dded2e5 /cli/compilers/ts.rs
parent81905a867ea3f942619229e330840d132c57a5da (diff)
workers: basic event loop (#3828)
* establish basic event loop for workers * make "self.close()" inside worker * remove "runWorkerMessageLoop() - instead manually call global function in Rust when message arrives. This is done in preparation for structured clone * refactor "WorkerChannel" and use distinct structs for internal and external channels; "WorkerChannelsInternal" and "WorkerHandle" * move "State.worker_channels_internal" to "Worker.internal_channels" * add "WorkerEvent" enum for child->host communication; currently "Message(Buf)" and "Error(ErrBox)" variants are supported * add tests for nested workers * add tests for worker throwing error on startup
Diffstat (limited to 'cli/compilers/ts.rs')
-rw-r--r--cli/compilers/ts.rs84
1 files changed, 42 insertions, 42 deletions
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs
index 91c264345..d1fb3a6ac 100644
--- a/cli/compilers/ts.rs
+++ b/cli/compilers/ts.rs
@@ -8,11 +8,15 @@ use crate::file_fetcher::SourceFile;
use crate::file_fetcher::SourceFileFetcher;
use crate::global_state::GlobalState;
use crate::msg;
+use crate::ops::worker_host::run_worker_loop;
use crate::ops::JsonResult;
use crate::source_maps::SourceMapGetter;
use crate::startup_data;
use crate::state::*;
+use crate::tokio_util::create_basic_runtime;
use crate::version;
+use crate::worker::WorkerEvent;
+use crate::worker::WorkerHandle;
use deno_core::Buf;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
@@ -288,13 +292,11 @@ impl TsCompiler {
true,
);
- let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?;
- if let Some(ref msg) = maybe_msg {
- let json_str = std::str::from_utf8(msg).unwrap();
- debug!("Message: {}", json_str);
- if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
- return Err(ErrBox::from(diagnostics));
- }
+ let msg = execute_in_thread(global_state.clone(), req_msg).await?;
+ let json_str = std::str::from_utf8(&msg).unwrap();
+ debug!("Message: {}", json_str);
+ if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
+ return Err(ErrBox::from(diagnostics));
}
Ok(())
}
@@ -376,13 +378,11 @@ impl TsCompiler {
let compiling_job = global_state
.progress
.add("Compile", &module_url.to_string());
- let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?;
+ let msg = execute_in_thread(global_state.clone(), req_msg).await?;
- if let Some(ref msg) = maybe_msg {
- let json_str = std::str::from_utf8(msg).unwrap();
- if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
- return Err(ErrBox::from(diagnostics));
- }
+ let json_str = std::str::from_utf8(&msg).unwrap();
+ if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) {
+ return Err(ErrBox::from(diagnostics));
}
let compiled_module = ts_compiler.get_compiled_module(&source_file_.url)?;
drop(compiling_job);
@@ -602,45 +602,45 @@ impl TsCompiler {
}
}
+// TODO(bartlomieju): exactly same function is in `wasm.rs` - only difference
+// it created WasmCompiler instead of TsCompiler - deduplicate
async fn execute_in_thread(
global_state: GlobalState,
req: Buf,
-) -> Result<Option<Buf>, ErrBox> {
- let (load_sender, load_receiver) =
- tokio::sync::oneshot::channel::<Result<Option<Buf>, ErrBox>>();
- std::thread::spawn(move || {
- debug!(">>>>> compile_async START");
-
+) -> Result<Buf, ErrBox> {
+ let (handle_sender, handle_receiver) =
+ std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
+ let builder =
+ std::thread::Builder::new().name("deno-ts-compiler".to_string());
+ let join_handle = builder.spawn(move || {
let mut worker = TsCompiler::setup_worker(global_state.clone());
- let handle = worker.thread_safe_handle();
-
- crate::tokio_util::run_basic(
- async move {
- if let Err(err) = handle.post_message(req).await {
- load_sender.send(Err(err)).unwrap();
- return;
- }
- if let Err(err) = (&mut *worker).await {
- load_sender.send(Err(err)).unwrap();
- return;
- }
- let maybe_msg = handle.get_message().await;
- load_sender.send(Ok(maybe_msg)).unwrap();
- debug!(">>>>> compile_sync END");
- }
- .boxed_local(),
- );
- });
-
- load_receiver.await.unwrap()
+ handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
+ drop(handle_sender);
+ let mut rt = create_basic_runtime();
+ run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
+ })?;
+ let mut handle = handle_receiver.recv().unwrap()?;
+ handle.post_message(req).await?;
+ let event = handle.get_event().await.expect("Compiler didn't respond");
+ let buf = match event {
+ WorkerEvent::Message(buf) => Ok(buf),
+ WorkerEvent::Error(error) => Err(error),
+ }?;
+ // Compiler worker finishes after one request
+ // so we should receive signal that channel was closed.
+ // Then close worker's channel and join the thread.
+ let event = handle.get_event().await;
+ assert!(event.is_none());
+ handle.sender.close_channel();
+ join_handle.join().unwrap();
+ Ok(buf)
}
async fn execute_in_thread_json(
req_msg: Buf,
global_state: GlobalState,
) -> JsonResult {
- let maybe_msg = execute_in_thread(global_state, req_msg).await?;
- let msg = maybe_msg.unwrap();
+ let msg = execute_in_thread(global_state, req_msg).await?;
let json_str = std::str::from_utf8(&msg).unwrap();
Ok(json!(json_str))
}