summaryrefslogtreecommitdiff
path: root/cli/compiler.rs
diff options
context:
space:
mode:
authorandy finch <andyfinch7@gmail.com>2019-04-04 05:33:32 -0400
committerRyan Dahl <ry@tinyclouds.org>2019-04-04 05:33:32 -0400
commit0e7311e1717edd312d371148f331fb558d9bcc4b (patch)
tree38957fde88f8359886f4f7a00bea91668c7609b3 /cli/compiler.rs
parent8c8576619852ee8b8095ca735f6d517a7e707e79 (diff)
Non-fatal compile_sync failures (#2039)
And model worker resources as Stream
Diffstat (limited to 'cli/compiler.rs')
-rw-r--r--cli/compiler.rs232
1 files changed, 157 insertions, 75 deletions
diff --git a/cli/compiler.rs b/cli/compiler.rs
index 410842871..0fc03a6d6 100644
--- a/cli/compiler.rs
+++ b/cli/compiler.rs
@@ -2,7 +2,7 @@
use core::ops::Deref;
use crate::flags::DenoFlags;
use crate::isolate_state::*;
-use crate::js_errors::JSErrorColor;
+use crate::js_errors;
use crate::msg;
use crate::ops;
use crate::resources;
@@ -20,8 +20,12 @@ use deno::StartupData;
use futures::future::*;
use futures::sync::oneshot;
use futures::Future;
+use futures::Stream;
use serde_json;
+use std::collections::HashMap;
use std::str;
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::runtime::Runtime;
@@ -29,6 +33,8 @@ use tokio::runtime::Runtime;
/// Used for normalization of types on internal future completions
type CompilerInnerResult = Result<ModuleMetaData, Option<JSError>>;
type WorkerErrReceiver = oneshot::Receiver<CompilerInnerResult>;
+type CmdId = u32;
+type ResponseSenderTable = HashMap<CmdId, oneshot::Sender<Buf>>;
/// Shared resources for used to complete compiler operations.
/// rid is the resource id for compiler worker resource used for sending it
@@ -43,6 +49,9 @@ struct CompilerShared {
}
lazy_static! {
+ static ref C_NEXT_CMD_ID: AtomicUsize = AtomicUsize::new(1);
+ // Map of response senders
+ static ref C_RES_SENDER_TABLE: Mutex<ResponseSenderTable> = Mutex::new(ResponseSenderTable::new());
// Shared worker resources so we can spawn
static ref C_SHARED: Mutex<Option<CompilerShared>> = Mutex::new(None);
// tokio runtime specifically for spawning logic that is dependent on
@@ -133,6 +142,21 @@ impl ModuleMetaData {
}
}
+fn new_cmd_id() -> CmdId {
+ let next_rid = C_NEXT_CMD_ID.fetch_add(1, Ordering::SeqCst);
+ next_rid as CmdId
+}
+
+fn parse_cmd_id(res_json: &str) -> CmdId {
+ match serde_json::from_str::<serde_json::Value>(res_json) {
+ Ok(serde_json::Value::Object(map)) => match map["cmdId"].as_u64() {
+ Some(cmd_id) => cmd_id as CmdId,
+ _ => panic!("Error decoding compiler response: expected cmdId"),
+ },
+ _ => panic!("Error decoding compiler response"),
+ }
+}
+
fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
let mut cell = C_SHARED.lock().unwrap();
cell
@@ -156,6 +180,8 @@ fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
runtime.spawn(lazy(move || {
let resource = worker.resource.clone();
worker.then(move |result| -> Result<(), ()> {
+ // Close resource so the future created by
+ // handle_worker_message_stream exits
resource.close();
match result {
Err(err) => err_sender.send(Err(Some(err))).unwrap(),
@@ -164,6 +190,28 @@ fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
Ok(())
})
}));
+ runtime.spawn(lazy(move || {
+ debug!("Start worker stream handler!");
+ let worker_stream = resources::get_message_stream_from_worker(rid);
+ worker_stream
+ .for_each(|msg: Buf| {
+ // All worker responses are handled here first before being sent via
+ // their respective sender. This system can be compared to the
+ // promise system used on the js side. This provides a way to
+ // resolve many futures via the same channel.
+ let res_json = std::str::from_utf8(&msg).unwrap();
+ debug!("Got message from worker: {}", res_json);
+ // Get the intended receiver's cmd_id from the message.
+ let cmd_id = parse_cmd_id(res_json);
+ let mut table = C_RES_SENDER_TABLE.lock().unwrap();
+ debug!("Cmd id for get message handler: {}", cmd_id);
+ // Get the corresponding response sender from the table and
+ // send a response.
+ let response_sender = table.remove(&(cmd_id as CmdId)).unwrap();
+ response_sender.send(msg).unwrap();
+ Ok(())
+ }).map_err(|_| ())
+ }));
CompilerShared {
rid,
worker_err_receiver: err_receiver.shared(),
@@ -177,16 +225,17 @@ fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
}).clone()
}
-fn show_compiler_error(err: JSError) -> ModuleMetaData {
- eprintln!("{}", JSErrorColor(&err).to_string());
- std::process::exit(1);
-}
-
-fn req(specifier: &str, referrer: &str, is_worker_main: bool) -> Buf {
+fn req(
+ specifier: &str,
+ referrer: &str,
+ is_worker_main: bool,
+ cmd_id: u32,
+) -> Buf {
json!({
"specifier": specifier,
"referrer": referrer,
- "isWorker": is_worker_main
+ "isWorker": is_worker_main,
+ "cmdId": cmd_id,
}).to_string()
.into_boxed_str()
.into_boxed_bytes()
@@ -197,48 +246,76 @@ pub fn compile_sync(
specifier: &str,
referrer: &str,
module_meta_data: &ModuleMetaData,
-) -> ModuleMetaData {
+) -> Result<ModuleMetaData, JSError> {
debug!(
"Running rust part of compile_sync. specifier: {}, referrer: {}",
&specifier, &referrer
);
+ let cmd_id = new_cmd_id();
- let req_msg = req(specifier, referrer, parent_state.is_worker);
+ let req_msg = req(specifier, referrer, parent_state.is_worker, cmd_id);
let module_meta_data_ = module_meta_data.clone();
- let shared = lazy_start(parent_state);
+ let shared = lazy_start(parent_state.clone());
let compiler_rid = shared.rid;
let (local_sender, local_receiver) =
oneshot::channel::<Result<ModuleMetaData, Option<JSError>>>();
- let mut runtime = C_RUNTIME.lock().unwrap();
- runtime.spawn(lazy(move || {
- resources::post_message_to_worker(compiler_rid, req_msg)
- .then(move |_| {
- debug!("Sent message to worker");
- resources::get_message_from_worker(compiler_rid)
- }).and_then(move |res_msg| {
- debug!("Received message from worker");
- let res_json = std::str::from_utf8(res_msg.as_ref().unwrap()).unwrap();
- let res_data = serde_json::from_str::<serde_json::Value>(res_json)
- .expect("Error decoding compiler response");
- let res_module_meta_data = ModuleMetaData {
- maybe_output_code: res_data["outputCode"]
- .as_str()
- .map(|s| s.as_bytes().to_owned()),
- maybe_source_map: res_data["sourceMap"]
- .as_str()
- .map(|s| s.as_bytes().to_owned()),
- ..module_meta_data_
- };
- Ok(res_module_meta_data)
- }).map_err(|_| None)
- .then(move |result| {
- local_sender.send(result).expect("Oneshot send() failed");
- Ok(())
- })
- }));
+ let (response_sender, response_receiver) = oneshot::channel::<Buf>();
+
+ // Scoping to auto dispose of locks when done using them
+ {
+ let mut table = C_RES_SENDER_TABLE.lock().unwrap();
+ debug!("Cmd id for response sender insert: {}", cmd_id);
+ // Place our response sender in the table so we can find it later.
+ table.insert(cmd_id, response_sender);
+
+ let mut runtime = C_RUNTIME.lock().unwrap();
+ runtime.spawn(lazy(move || {
+ resources::post_message_to_worker(compiler_rid, req_msg)
+ .then(move |_| {
+ debug!("Sent message to worker");
+ response_receiver.map_err(|_| None)
+ }).and_then(move |res_msg| {
+ debug!("Received message from worker");
+ let res_json = std::str::from_utf8(res_msg.as_ref()).unwrap();
+ let res = serde_json::from_str::<serde_json::Value>(res_json)
+ .expect("Error decoding compiler response");
+ let res_data = res["data"].as_object().expect(
+ "Error decoding compiler response: expected object field 'data'",
+ );
+ match res["success"].as_bool() {
+ Some(true) => Ok(ModuleMetaData {
+ maybe_output_code: res_data["outputCode"]
+ .as_str()
+ .map(|s| s.as_bytes().to_owned()),
+ maybe_source_map: res_data["sourceMap"]
+ .as_str()
+ .map(|s| s.as_bytes().to_owned()),
+ ..module_meta_data_
+ }),
+ Some(false) => {
+ let js_error = JSError::from_json_value(
+ serde_json::Value::Object(res_data.clone()),
+ ).expect(
+ "Error decoding compiler response: failed to parse error",
+ );
+ Err(Some(js_errors::apply_source_map(
+ &js_error,
+ &parent_state.dir,
+ )))
+ }
+ _ => panic!(
+ "Error decoding compiler response: expected bool field 'success'"
+ ),
+ }
+ }).then(move |result| {
+ local_sender.send(result).expect("Oneshot send() failed");
+ Ok(())
+ })
+ }));
+ }
let worker_receiver = shared.worker_err_receiver.clone();
@@ -251,11 +328,11 @@ pub fn compile_sync(
let mut rest_mut = rest;
match ((*result.deref()).clone(), i) {
// Either receiver was completed with success.
- (Ok(v), _) => v,
+ (Ok(v), _) => Ok(v),
// Either receiver was completed with a valid error
// this should be fatal for now since it is not intended
// to be possible to recover from a uncaught error in a isolate
- (Err(Some(err)), _) => show_compiler_error(err),
+ (Err(Some(err)), _) => Err(err),
// local_receiver finished first with a none error. This is intended
// to catch when the local logic can't complete because it is unable
// to send and/or receive messages from the compiler worker.
@@ -277,9 +354,9 @@ pub fn compile_sync(
specifier, referrer
);
match worker_result {
- Err(Some(err)) => show_compiler_error(err),
+ Err(Some(err)) => Err(err),
Err(None) => panic!("Compiler exit for an unknown reason!"),
- Ok(v) => v,
+ Ok(v) => Ok(v),
}
}
// While possible beccause the compiler worker can exit without error
@@ -297,41 +374,46 @@ pub fn compile_sync(
#[cfg(test)]
mod tests {
use super::*;
- use crate::tokio_util;
#[test]
fn test_compile_sync() {
- tokio_util::init(|| {
- let cwd = std::env::current_dir().unwrap();
- let cwd_string = cwd.to_str().unwrap().to_owned();
-
- let specifier = "./tests/002_hello.ts";
- let referrer = cwd_string + "/";
-
- let mut out = ModuleMetaData {
- module_name: "xxx".to_owned(),
- module_redirect_source_name: None,
- filename: "/tests/002_hello.ts".to_owned(),
- media_type: msg::MediaType::TypeScript,
- source_code: include_bytes!("../tests/002_hello.ts").to_vec(),
- maybe_output_code_filename: None,
- maybe_output_code: None,
- maybe_source_map_filename: None,
- maybe_source_map: None,
- };
-
- out = compile_sync(
- Arc::new(IsolateState::mock()),
- specifier,
- &referrer,
- &out,
- );
- assert!(
- out
- .maybe_output_code
- .unwrap()
- .starts_with("console.log(\"Hello World\");".as_bytes())
- );
- });
+ let cwd = std::env::current_dir().unwrap();
+ let cwd_string = cwd.to_str().unwrap().to_owned();
+
+ let specifier = "./tests/002_hello.ts";
+ let referrer = cwd_string + "/";
+
+ let mut out = ModuleMetaData {
+ module_name: "xxx".to_owned(),
+ module_redirect_source_name: None,
+ filename: "/tests/002_hello.ts".to_owned(),
+ media_type: msg::MediaType::TypeScript,
+ source_code: include_bytes!("../tests/002_hello.ts").to_vec(),
+ maybe_output_code_filename: None,
+ maybe_output_code: None,
+ maybe_source_map_filename: None,
+ maybe_source_map: None,
+ };
+
+ out =
+ compile_sync(Arc::new(IsolateState::mock()), specifier, &referrer, &out)
+ .unwrap();
+ assert!(
+ out
+ .maybe_output_code
+ .unwrap()
+ .starts_with("console.log(\"Hello World\");".as_bytes())
+ );
+ }
+
+ #[test]
+ fn test_parse_cmd_id() {
+ let cmd_id = new_cmd_id();
+
+ let msg = req("Hello", "World", false, cmd_id);
+
+ let res_json = std::str::from_utf8(&msg).unwrap();
+
+ assert_eq!(parse_cmd_id(res_json), cmd_id);
}
}