diff options
author | andy finch <andyfinch7@gmail.com> | 2019-04-01 15:09:59 -0400 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2019-04-01 15:09:59 -0400 |
commit | b0a23beb8fae964be3cdd8c23c38af66257d34c7 (patch) | |
tree | 8f7875c8ca059dfb0a3ade4da7bfb94e57d6e1aa /cli | |
parent | 659acadf77fdbeef8579a37839a464feb408437a (diff) |
Add web worker JS API (#1993)
* Refactored the way worker polling is scheduled and errors are handled.
* Share the worker future as a Shared
Diffstat (limited to 'cli')
-rw-r--r-- | cli/compiler.rs | 318 | ||||
-rw-r--r-- | cli/errors.rs | 10 | ||||
-rw-r--r-- | cli/isolate.rs | 4 | ||||
-rw-r--r-- | cli/isolate_state.rs | 15 | ||||
-rw-r--r-- | cli/main.rs | 2 | ||||
-rw-r--r-- | cli/msg.fbs | 38 | ||||
-rw-r--r-- | cli/ops.rs | 160 | ||||
-rw-r--r-- | cli/resources.rs | 5 | ||||
-rw-r--r-- | cli/startup_data.rs | 2 | ||||
-rw-r--r-- | cli/workers.rs | 303 |
10 files changed, 680 insertions, 177 deletions
diff --git a/cli/compiler.rs b/cli/compiler.rs index 4613aff99..bd0a763c1 100644 --- a/cli/compiler.rs +++ b/cli/compiler.rs @@ -1,26 +1,53 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use core::ops::Deref; +use crate::flags::DenoFlags; use crate::isolate_state::*; +use crate::js_errors::JSErrorColor; use crate::msg; use crate::ops; use crate::resources; -use crate::resources::Resource; use crate::resources::ResourceId; use crate::startup_data; use crate::workers; use crate::workers::WorkerBehavior; +use crate::workers::WorkerInit; use deno::deno_buf; use deno::Behavior; use deno::Buf; +use deno::JSError; use deno::Op; use deno::StartupData; +use futures::future::*; +use futures::sync::oneshot; use futures::Future; use serde_json; use std::str; use std::sync::Arc; use std::sync::Mutex; +use tokio::runtime::Runtime; + +/// Used for normalization of types on internal future completions +type CompilerInnerResult = Result<ModuleMetaData, Option<JSError>>; +type WorkerErrReceiver = oneshot::Receiver<CompilerInnerResult>; + +/// Shared resources for used to complete compiler operations. +/// rid is the resource id for compiler worker resource used for sending it +/// compile requests +/// worker_err_receiver is a shared future that will compelete when the +/// compiler worker future completes, and send back an error if present +/// or a None if not +#[derive(Clone)] +struct CompilerShared { + pub rid: ResourceId, + pub worker_err_receiver: Shared<WorkerErrReceiver>, +} lazy_static! { - static ref C_RID: Mutex<Option<ResourceId>> = Mutex::new(None); + // Shared worker resources so we can spawn + static ref C_SHARED: Mutex<Option<CompilerShared>> = Mutex::new(None); + // tokio runtime specifically for spawning logic that is dependent on + // completetion of the compiler worker future + static ref C_RUNTIME: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap()); } pub struct CompilerBehavior { @@ -28,8 +55,10 @@ pub struct CompilerBehavior { } impl CompilerBehavior { - pub fn new(state: Arc<IsolateState>) -> Self { - Self { state } + pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self { + Self { + state: Arc::new(IsolateState::new(flags, argv_rest, None, true)), + } } } @@ -65,13 +94,14 @@ impl WorkerBehavior for CompilerBehavior { self.state.flags.clone(), self.state.argv.clone(), Some(worker_channels), + true, )); } } // This corresponds to JS ModuleMetaData. // TODO Rename one or the other so they correspond. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ModuleMetaData { pub module_name: String, pub filename: String, @@ -102,26 +132,60 @@ impl ModuleMetaData { } } -fn lazy_start(parent_state: Arc<IsolateState>) -> Resource { - let mut cell = C_RID.lock().unwrap(); - let rid = cell.get_or_insert_with(|| { - let resource = workers::spawn( - CompilerBehavior::new(Arc::new(IsolateState::new( - parent_state.flags.clone(), - parent_state.argv.clone(), - None, - ))), - "compilerMain()".to_string(), - ); - resource.rid - }); - Resource { rid: *rid } +fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared { + let mut cell = C_SHARED.lock().unwrap(); + cell + .get_or_insert_with(|| { + let worker_result = workers::spawn( + CompilerBehavior::new( + parent_state.flags.clone(), + parent_state.argv.clone(), + ), + "TS", + WorkerInit::Script("compilerMain()".to_string()), + ); + match worker_result { + Ok(worker) => { + let rid = worker.resource.rid.clone(); + // create oneshot channels and use the sender to pass back + // results from worker future + let (err_sender, err_receiver) = + oneshot::channel::<CompilerInnerResult>(); + let mut runtime = C_RUNTIME.lock().unwrap(); + runtime.spawn(lazy(move || { + let resource = worker.resource.clone(); + worker.then(move |result| -> Result<(), ()> { + resource.close(); + match result { + Err(err) => err_sender.send(Err(Some(err))).unwrap(), + _ => err_sender.send(Err(None)).unwrap(), + }; + Ok(()) + }) + })); + CompilerShared { + rid, + worker_err_receiver: err_receiver.shared(), + } + } + Err(err) => { + println!("{}", err.to_string()); + std::process::exit(1); + } + } + }).clone() } -fn req(specifier: &str, referrer: &str) -> Buf { +fn show_compiler_error(err: JSError) -> ModuleMetaData { + eprintln!("{}", JSErrorColor(&err).to_string()); + std::process::exit(1); +} + +fn req(specifier: &str, referrer: &str, is_worker_main: bool) -> Buf { json!({ "specifier": specifier, "referrer": referrer, + "isWorker": is_worker_main }).to_string() .into_boxed_str() .into_boxed_bytes() @@ -133,70 +197,172 @@ pub fn compile_sync( referrer: &str, module_meta_data: &ModuleMetaData, ) -> ModuleMetaData { - let req_msg = req(specifier, referrer); - - let compiler = lazy_start(parent_state); - - let send_future = resources::worker_post_message(compiler.rid, req_msg); - send_future.wait().unwrap(); - - let recv_future = resources::worker_recv_message(compiler.rid); - let result = recv_future.wait().unwrap(); - assert!(result.is_some()); - let res_msg = result.unwrap(); - - let res_json = std::str::from_utf8(&res_msg).unwrap(); - match serde_json::from_str::<serde_json::Value>(res_json) { - Ok(serde_json::Value::Object(map)) => ModuleMetaData { - module_name: module_meta_data.module_name.clone(), - filename: module_meta_data.filename.clone(), - media_type: module_meta_data.media_type, - source_code: module_meta_data.source_code.clone(), - maybe_output_code: match map["outputCode"].as_str() { - Some(str) => Some(str.as_bytes().to_owned()), - _ => None, - }, - maybe_output_code_filename: None, - maybe_source_map: match map["sourceMap"].as_str() { - Some(str) => Some(str.as_bytes().to_owned()), - _ => None, - }, - maybe_source_map_filename: None, - }, - _ => panic!("error decoding compiler response"), + let is_worker = parent_state.is_worker.clone(); + let shared = lazy_start(parent_state); + + let (local_sender, local_receiver) = + oneshot::channel::<Result<ModuleMetaData, Option<JSError>>>(); + + // Just some extra scoping to keep things clean + { + let compiler_rid = shared.rid.clone(); + let module_meta_data_ = module_meta_data.clone(); + let req_msg = req(specifier, referrer, is_worker); + let sender_arc = Arc::new(Some(local_sender)); + let specifier_ = specifier.clone().to_string(); + let referrer_ = referrer.clone().to_string(); + + let mut runtime = C_RUNTIME.lock().unwrap(); + runtime.spawn(lazy(move || { + debug!( + "Running rust part of compile_sync specifier: {} referrer: {}", + specifier_, referrer_ + ); + let mut send_sender_arc = sender_arc.clone(); + resources::post_message_to_worker(compiler_rid, req_msg) + .map_err(move |_| { + let sender = Arc::get_mut(&mut send_sender_arc).unwrap().take(); + sender.unwrap().send(Err(None)).unwrap() + }).and_then(move |_| { + debug!( + "Sent message to worker specifier: {} referrer: {}", + specifier_, referrer_ + ); + let mut get_sender_arc = sender_arc.clone(); + let mut result_sender_arc = sender_arc.clone(); + resources::get_message_from_worker(compiler_rid) + .map_err(move |_| { + let sender = Arc::get_mut(&mut get_sender_arc).unwrap().take(); + sender.unwrap().send(Err(None)).unwrap() + }).and_then(move |res_msg_option| -> Result<(), ()> { + debug!( + "Recieved message from worker specifier: {} referrer: {}", + specifier_, referrer_ + ); + let res_msg = res_msg_option.unwrap(); + let res_json = std::str::from_utf8(&res_msg).unwrap(); + let sender = Arc::get_mut(&mut result_sender_arc).unwrap().take(); + let sender = sender.unwrap(); + Ok( + sender + .send(Ok(match serde_json::from_str::<serde_json::Value>( + res_json, + ) { + Ok(serde_json::Value::Object(map)) => ModuleMetaData { + module_name: module_meta_data_.module_name.clone(), + filename: module_meta_data_.filename.clone(), + media_type: module_meta_data_.media_type, + source_code: module_meta_data_.source_code.clone(), + maybe_output_code: match map["outputCode"].as_str() { + Some(str) => Some(str.as_bytes().to_owned()), + _ => None, + }, + maybe_output_code_filename: None, + maybe_source_map: match map["sourceMap"].as_str() { + Some(str) => Some(str.as_bytes().to_owned()), + _ => None, + }, + maybe_source_map_filename: None, + }, + _ => panic!("error decoding compiler response"), + })).unwrap(), + ) + }) + }) + })); + } + + let worker_receiver = shared.worker_err_receiver.clone(); + + let union = + futures::future::select_all(vec![worker_receiver, local_receiver.shared()]); + + match union.wait() { + Ok((result, i, rest)) => { + // We got a sucessful finish before any recivers where canceled + let mut rest_mut = rest; + match ((*result.deref()).clone(), i) { + // Either receiver was completed with success. + (Ok(v), _) => v, + // Either receiver was completed with a valid error + // this should be fatal for now since it is not intended + // to be possible to recover from a uncaught error in a isolate + (Err(Some(err)), _) => show_compiler_error(err), + // local_receiver finished first with a none error. This is intended + // to catch when the local logic can't complete because it is unable + // to send and/or receive messages from the compiler worker. + // Due to the way that scheduling works it is very likely that the + // compiler worker future has already or will in the near future + // complete with a valid JSError or a None. + (Err(None), 1) => { + debug!("Compiler local exited with None error!"); + // While technically possible to get stuck here indefinately + // in theory it is highly unlikely. + debug!( + "Waiting on compiler worker result specifier: {} referrer: {}!", + specifier, referrer + ); + let worker_result = + (*rest_mut.remove(0).wait().unwrap().deref()).clone(); + debug!( + "Finished waiting on worker result specifier: {} referrer: {}!", + specifier, referrer + ); + match worker_result { + Err(Some(err)) => show_compiler_error(err), + Err(None) => panic!("Compiler exit for an unknown reason!"), + Ok(v) => v, + } + } + // While possible beccause the compiler worker can exit without error + // this shouldn't occurr normally and I don't intend to attempt to + // handle it right now + (_, i) => panic!("Odd compiler result for future {}!", i), + } + } + // This should always a result of a reciver being cancled + // in theory but why not give a print out just in case + Err((err, i, _)) => panic!("compile_sync {} failed: {}", i, err), } } #[cfg(test)] mod tests { use super::*; + use crate::tokio_util; #[test] fn test_compile_sync() { - let cwd = std::env::current_dir().unwrap(); - let cwd_string = cwd.to_str().unwrap().to_owned(); - - let specifier = "./tests/002_hello.ts"; - let referrer = cwd_string + "/"; - - let mut out = ModuleMetaData { - module_name: "xxx".to_owned(), - filename: "/tests/002_hello.ts".to_owned(), - media_type: msg::MediaType::TypeScript, - source_code: "console.log(\"Hello World\");".as_bytes().to_owned(), - maybe_output_code_filename: None, - maybe_output_code: None, - maybe_source_map_filename: None, - maybe_source_map: None, - }; - - out = - compile_sync(Arc::new(IsolateState::mock()), specifier, &referrer, &out); - assert!( - out - .maybe_output_code - .unwrap() - .starts_with("console.log(\"Hello World\");".as_bytes()) - ); + tokio_util::init(|| { + let cwd = std::env::current_dir().unwrap(); + let cwd_string = cwd.to_str().unwrap().to_owned(); + + let specifier = "./tests/002_hello.ts"; + let referrer = cwd_string + "/"; + + let mut out = ModuleMetaData { + module_name: "xxx".to_owned(), + filename: "/tests/002_hello.ts".to_owned(), + media_type: msg::MediaType::TypeScript, + source_code: include_bytes!("../tests/002_hello.ts").to_vec(), + maybe_output_code_filename: None, + maybe_output_code: None, + maybe_source_map_filename: None, + maybe_source_map: None, + }; + + out = compile_sync( + Arc::new(IsolateState::mock()), + specifier, + &referrer, + &out, + ); + assert!( + out + .maybe_output_code + .unwrap() + .starts_with("console.log(\"Hello World\");".as_bytes()) + ); + }); } } diff --git a/cli/errors.rs b/cli/errors.rs index a2c3c3441..3873f70ff 100644 --- a/cli/errors.rs +++ b/cli/errors.rs @@ -180,7 +180,15 @@ pub fn permission_denied() -> DenoError { } pub fn op_not_implemented() -> DenoError { - new(ErrorKind::BadResource, String::from("op not implemented")) + new(ErrorKind::OpNotAvaiable, String::from("op not implemented")) +} + +pub fn worker_init_failed() -> DenoError { + // TODO(afinch7) pass worker error data through here + new( + ErrorKind::WorkerInitFailed, + String::from("worker init failed"), + ) } #[derive(Debug)] diff --git a/cli/isolate.rs b/cli/isolate.rs index 15ad4e125..ced1cb792 100644 --- a/cli/isolate.rs +++ b/cli/isolate.rs @@ -226,7 +226,7 @@ mod tests { let argv = vec![String::from("./deno"), filename.clone()]; let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv, None)); + let state = Arc::new(IsolateState::new(flags, rest_argv, None, false)); let state_ = state.clone(); tokio_util::run(lazy(move || { let cli = CliBehavior::new(None, state.clone()); @@ -249,7 +249,7 @@ mod tests { let argv = vec![String::from("./deno"), filename.clone()]; let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv, None)); + let state = Arc::new(IsolateState::new(flags, rest_argv, None, false)); let state_ = state.clone(); tokio_util::run(lazy(move || { let cli = CliBehavior::new(None, state.clone()); diff --git a/cli/isolate_state.rs b/cli/isolate_state.rs index 9f6749925..b0fb97f10 100644 --- a/cli/isolate_state.rs +++ b/cli/isolate_state.rs @@ -5,9 +5,14 @@ use crate::flags; use crate::global_timer::GlobalTimer; use crate::modules::Modules; use crate::permissions::DenoPermissions; +use crate::resources::ResourceId; +use crate::workers::UserWorkerBehavior; +use crate::workers::Worker; use deno::Buf; +use futures::future::Shared; use futures::sync::mpsc as async_mpsc; use std; +use std::collections::HashMap; use std::env; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -16,6 +21,8 @@ use std::sync::Mutex; pub type WorkerSender = async_mpsc::Sender<Buf>; pub type WorkerReceiver = async_mpsc::Receiver<Buf>; pub type WorkerChannels = (WorkerSender, WorkerReceiver); +pub type UserWorkerTable = + HashMap<ResourceId, Shared<Worker<UserWorkerBehavior>>>; // AtomicU64 is currently unstable #[derive(Default)] @@ -42,6 +49,8 @@ pub struct IsolateState { pub modules: Mutex<Modules>, pub worker_channels: Option<Mutex<WorkerChannels>>, pub global_timer: Mutex<GlobalTimer>, + pub workers: Mutex<UserWorkerTable>, + pub is_worker: bool, } impl IsolateState { @@ -49,6 +58,7 @@ impl IsolateState { flags: flags::DenoFlags, argv_rest: Vec<String>, worker_channels: Option<WorkerChannels>, + is_worker: bool, ) -> Self { let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok(); @@ -61,9 +71,12 @@ impl IsolateState { modules: Mutex::new(Modules::new()), worker_channels: worker_channels.map(Mutex::new), global_timer: Mutex::new(GlobalTimer::new()), + workers: Mutex::new(UserWorkerTable::new()), + is_worker, } } + /// Read main module from argv pub fn main_module(&self) -> Option<String> { if self.argv.len() <= 1 { None @@ -110,7 +123,7 @@ impl IsolateState { let argv = vec![String::from("./deno"), String::from("hello.js")]; // For debugging: argv.push_back(String::from("-D")); let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - IsolateState::new(flags, rest_argv, None) + IsolateState::new(flags, rest_argv, None, false) } pub fn metrics_op_dispatched( diff --git a/cli/main.rs b/cli/main.rs index 12f94650d..f9a88803e 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -108,7 +108,7 @@ fn main() { let should_prefetch = flags.prefetch || flags.info; let should_display_info = flags.info; - let state = Arc::new(IsolateState::new(flags, rest_argv, None)); + let state = Arc::new(IsolateState::new(flags, rest_argv, None, false)); let state_ = state.clone(); let startup_data = startup_data::deno_isolate_init(); let cli = CliBehavior::new(Some(startup_data), state_); diff --git a/cli/msg.fbs b/cli/msg.fbs index 45f940f7d..695515f55 100644 --- a/cli/msg.fbs +++ b/cli/msg.fbs @@ -63,6 +63,12 @@ union Any { StatRes, Symlink, Truncate, + CreateWorker, + CreateWorkerRes, + HostGetWorkerClosed, + HostGetMessage, + HostGetMessageRes, + HostPostMessage, WorkerGetMessage, WorkerGetMessageRes, WorkerPostMessage, @@ -121,6 +127,8 @@ enum ErrorKind: byte { // custom errors InvalidUri, InvalidSeekMode, + OpNotAvaiable, + WorkerInitFailed } table Cwd {} @@ -171,6 +179,35 @@ table FormatErrorRes { error: string; } +// Create worker as host +table CreateWorker { + specifier: string; +} + +table CreateWorkerRes { + rid: uint32; +} + +table HostGetWorkerClosed { + rid: uint32; +} + +// Get message from guest worker as host +table HostGetMessage { + rid: uint32; +} + +table HostGetMessageRes { + data: [ubyte]; +} + +// Post message to guest worker as host +table HostPostMessage { + rid: uint32; + // data passed thru the zero-copy data parameter. +} + +// Get message from host as guest worker table WorkerGetMessage { unused: int8; } @@ -179,6 +216,7 @@ table WorkerGetMessageRes { data: [ubyte]; } +// Post message to host as guest worker table WorkerPostMessage { // data passed thru the zero-copy data parameter. } diff --git a/cli/ops.rs b/cli/ops.rs index a7c2e868f..c8119771d 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -18,6 +18,7 @@ use crate::resources::Resource; use crate::tokio_util; use crate::tokio_write; use crate::version; +use crate::workers; use deno::deno_buf; use deno::Buf; use deno::JSError; @@ -141,13 +142,24 @@ pub fn dispatch_all( (base.sync(), boxed_op) } +/// Superset of op_selector_worker for compiler isolates pub fn op_selector_compiler(inner_type: msg::Any) -> Option<OpCreator> { match inner_type { msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data), + _ => op_selector_worker(inner_type), + } +} + +/// Superset of op_selector_std for worker isolates +pub fn op_selector_worker(inner_type: msg::Any) -> Option<OpCreator> { + match inner_type { + msg::Any::WorkerGetMessage => Some(op_worker_get_message), + msg::Any::WorkerPostMessage => Some(op_worker_post_message), _ => op_selector_std(inner_type), } } +/// Standard ops set for most isolates pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> { match inner_type { msg::Any::Accept => Some(op_accept), @@ -189,8 +201,10 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> { msg::Any::Stat => Some(op_stat), msg::Any::Symlink => Some(op_symlink), msg::Any::Truncate => Some(op_truncate), - msg::Any::WorkerGetMessage => Some(op_worker_get_message), - msg::Any::WorkerPostMessage => Some(op_worker_post_message), + msg::Any::CreateWorker => Some(op_create_worker), + msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed), + msg::Any::HostGetMessage => Some(op_host_get_message), + msg::Any::HostPostMessage => Some(op_host_post_message), msg::Any::Write => Some(op_write), _ => None, } @@ -1741,6 +1755,7 @@ impl Future for GetMessageFuture { } } +/// Get message from host as guest worker fn op_worker_get_message( sc: &IsolateStateContainer, base: &msg::Base<'_>, @@ -1775,6 +1790,7 @@ fn op_worker_get_message( Box::new(op) } +/// Post message to host as guest worker fn op_worker_post_message( sc: &IsolateStateContainer, base: &msg::Base<'_>, @@ -1807,3 +1823,143 @@ fn op_worker_post_message( }); Box::new(op) } + +/// Create worker as the host +fn op_create_worker( + sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box<OpWithError> { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_create_worker().unwrap(); + let specifier = inner.specifier().unwrap(); + + Box::new(futures::future::result(move || -> OpResult { + let parent_state = sc.state().clone(); + let behavior = workers::UserWorkerBehavior::new( + parent_state.flags.clone(), + parent_state.argv.clone(), + ); + match workers::spawn( + behavior, + &format!("USER-WORKER-{}", specifier), + workers::WorkerInit::Module(specifier.to_string()), + ) { + Ok(worker) => { + let mut workers_tl = parent_state.workers.lock().unwrap(); + let rid = worker.resource.rid.clone(); + workers_tl.insert(rid, worker.shared()); + let builder = &mut FlatBufferBuilder::new(); + let msg_inner = msg::CreateWorkerRes::create( + builder, + &msg::CreateWorkerResArgs { rid }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(msg_inner.as_union_value()), + inner_type: msg::Any::CreateWorkerRes, + ..Default::default() + }, + )) + } + Err(errors::RustOrJsError::Js(_)) => Err(errors::worker_init_failed()), + Err(errors::RustOrJsError::Rust(err)) => Err(err), + } + }())) +} + +/// Return when the worker closes +fn op_host_get_worker_closed( + sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box<OpWithError> { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_get_worker_closed().unwrap(); + let rid = inner.rid(); + let state = sc.state().clone(); + + let shared_worker_future = { + let workers_tl = state.workers.lock().unwrap(); + let worker = workers_tl.get(&rid).unwrap(); + worker.clone() + }; + + Box::new(shared_worker_future.then(move |_result| { + let builder = &mut FlatBufferBuilder::new(); + + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + })) +} + +/// Get message from guest worker as host +fn op_host_get_message( + _sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box<OpWithError> { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_get_message().unwrap(); + let rid = inner.rid(); + + let op = resources::get_message_from_worker(rid); + let op = op.map_err(move |_| -> DenoError { unimplemented!() }); + let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> { + let builder = &mut FlatBufferBuilder::new(); + + let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); + let msg_inner = msg::HostGetMessageRes::create( + builder, + &msg::HostGetMessageResArgs { data }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(msg_inner.as_union_value()), + inner_type: msg::Any::HostGetMessageRes, + ..Default::default() + }, + )) + }); + Box::new(op) +} + +/// Post message to guest worker as host +fn op_host_post_message( + _sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box<OpWithError> { + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_post_message().unwrap(); + let rid = inner.rid(); + + let d = Vec::from(data.as_ref()).into_boxed_slice(); + + let op = resources::post_message_to_worker(rid, d); + let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); + let op = op.and_then(move |_| -> DenoResult<Buf> { + let builder = &mut FlatBufferBuilder::new(); + + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + }); + Box::new(op) +} diff --git a/cli/resources.rs b/cli/resources.rs index 5b08e4b32..817f6062d 100644 --- a/cli/resources.rs +++ b/cli/resources.rs @@ -305,7 +305,8 @@ pub fn add_worker(wc: WorkerChannels) -> Resource { Resource { rid } } -pub fn worker_post_message( +/// Post message to worker as a host or privilged overlord +pub fn post_message_to_worker( rid: ResourceId, buf: Buf, ) -> futures::sink::Send<futures::sync::mpsc::Sender<Buf>> { @@ -341,7 +342,7 @@ impl Future for WorkerReceiver { } } -pub fn worker_recv_message(rid: ResourceId) -> WorkerReceiver { +pub fn get_message_from_worker(rid: ResourceId) -> WorkerReceiver { WorkerReceiver { rid } } diff --git a/cli/startup_data.rs b/cli/startup_data.rs index 7f59c0678..61891ced4 100644 --- a/cli/startup_data.rs +++ b/cli/startup_data.rs @@ -31,7 +31,7 @@ pub fn deno_isolate_init() -> StartupData { pub fn compiler_isolate_init() -> StartupData { if cfg!(feature = "no-snapshot-init") { - debug!("Deno isolate init without snapshots."); + debug!("Compiler isolate init without snapshots."); #[cfg(not(feature = "check-only"))] let source_bytes = include_bytes!(concat!( env!("GN_OUT_DIR"), diff --git a/cli/workers.rs b/cli/workers.rs index 0c8d49fa7..cb919e8ed 100644 --- a/cli/workers.rs +++ b/cli/workers.rs @@ -1,17 +1,72 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::errors::*; +use crate::flags::DenoFlags; use crate::isolate::{DenoBehavior, Isolate}; +use crate::isolate_state::IsolateState; +use crate::isolate_state::IsolateStateContainer; use crate::isolate_state::WorkerChannels; -use crate::js_errors::JSErrorColor; +use crate::ops; use crate::resources; -use crate::tokio_util; +use crate::startup_data; +use deno::deno_buf; +use deno::Behavior; use deno::Buf; use deno::JSError; -use futures::future::lazy; +use deno::Op; +use deno::StartupData; use futures::sync::mpsc; -use futures::sync::oneshot; use futures::Future; use futures::Poll; -use std::thread; +use std::sync::Arc; + +pub struct UserWorkerBehavior { + pub state: Arc<IsolateState>, +} + +impl UserWorkerBehavior { + pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self { + Self { + state: Arc::new(IsolateState::new(flags, argv_rest, None, true)), + } + } +} + +impl IsolateStateContainer for UserWorkerBehavior { + fn state(&self) -> Arc<IsolateState> { + self.state.clone() + } +} + +impl IsolateStateContainer for &UserWorkerBehavior { + fn state(&self) -> Arc<IsolateState> { + self.state.clone() + } +} + +impl Behavior for UserWorkerBehavior { + fn startup_data(&mut self) -> Option<StartupData> { + Some(startup_data::deno_isolate_init()) + } + + fn dispatch( + &mut self, + control: &[u8], + zero_copy: deno_buf, + ) -> (bool, Box<Op>) { + ops::dispatch_all(self, control, zero_copy, ops::op_selector_worker) + } +} + +impl WorkerBehavior for UserWorkerBehavior { + fn set_internal_channels(&mut self, worker_channels: WorkerChannels) { + self.state = Arc::new(IsolateState::new( + self.state.flags.clone(), + self.state.argv.clone(), + Some(worker_channels), + true, + )); + } +} /// Behavior trait specific to workers pub trait WorkerBehavior: DenoBehavior { @@ -24,10 +79,11 @@ pub trait WorkerBehavior: DenoBehavior { /// Rust interface for WebWorkers. pub struct Worker<B: WorkerBehavior> { isolate: Isolate<B>, + pub resource: resources::Resource, } impl<B: WorkerBehavior> Worker<B> { - pub fn new(mut behavior: B) -> (Self, WorkerChannels) { + pub fn new(mut behavior: B) -> Self { let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1); let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1); @@ -38,13 +94,23 @@ impl<B: WorkerBehavior> Worker<B> { let isolate = Isolate::new(behavior); - let worker = Worker { isolate }; - (worker, external_channels) + Worker { + isolate, + resource: resources::add_worker(external_channels), + } } pub fn execute(&mut self, js_source: &str) -> Result<(), JSError> { self.isolate.execute(js_source) } + + pub fn execute_mod( + &mut self, + js_filename: &str, + is_prefetch: bool, + ) -> Result<(), RustOrJsError> { + self.isolate.execute_mod(js_filename, is_prefetch) + } } impl<B: WorkerBehavior> Future for Worker<B> { @@ -56,47 +122,48 @@ impl<B: WorkerBehavior> Future for Worker<B> { } } +/// Method and data used to initalize a worker +pub enum WorkerInit { + Script(String), + Module(String), +} + pub fn spawn<B: WorkerBehavior + 'static>( behavior: B, - js_source: String, -) -> resources::Resource { - // TODO This function should return a Future, so that the caller can retrieve - // the JSError if one is thrown. Currently it just prints to stderr and calls - // exit(1). - // let (js_error_tx, js_error_rx) = oneshot::channel::<JSError>(); - let (p, c) = oneshot::channel::<resources::Resource>(); - let builder = thread::Builder::new().name("worker".to_string()); - - let _tid = builder - .spawn(move || { - tokio_util::run(lazy(move || { - let (mut worker, external_channels) = Worker::new(behavior); - let resource = resources::add_worker(external_channels); - p.send(resource.clone()).unwrap(); - - worker - .execute("denoMain()") - .expect("worker denoMain failed"); - worker - .execute("workerMain()") - .expect("worker workerMain failed"); - worker.execute(&js_source).expect("worker js_source failed"); + worker_debug_name: &str, + init: WorkerInit, +) -> Result<Worker<B>, RustOrJsError> { + let state = behavior.state().clone(); + let mut worker = Worker::new(behavior); - worker.then(move |r| -> Result<(), ()> { - resource.close(); - debug!("workers.rs after resource close"); - if let Err(err) = r { - eprintln!("{}", JSErrorColor(&err).to_string()); - std::process::exit(1); - } - Ok(()) - }) - })); + worker + .execute(&format!("denoMain('{}')", worker_debug_name)) + .expect("worker workerInit failed"); + + worker + .execute("workerMain()") + .expect("worker workerMain failed"); - debug!("workers.rs after spawn"); - }).unwrap(); + let init_result = match init { + WorkerInit::Script(script) => match worker.execute(&script) { + Ok(v) => Ok(v), + Err(e) => Err(RustOrJsError::Js(e)), + }, + WorkerInit::Module(specifier) => { + let should_prefetch = state.flags.prefetch || state.flags.info; + match state.dir.resolve_module_url(&specifier, ".") { + Err(err) => Err(RustOrJsError::Rust(DenoError::from(err))), + Ok(module_url) => { + worker.execute_mod(&module_url.to_string(), should_prefetch) + } + } + } + }; - c.wait().unwrap() + match init_result { + Ok(_) => Ok(worker), + Err(err) => Err(err), + } } #[cfg(test)] @@ -104,63 +171,117 @@ mod tests { use super::*; use crate::compiler::CompilerBehavior; use crate::isolate_state::IsolateState; - use std::sync::Arc; + use crate::js_errors::JSErrorColor; + use crate::tokio_util; + use futures::future::lazy; + use std::thread; #[test] fn test_spawn() { - let resource = spawn( - CompilerBehavior::new(Arc::new(IsolateState::mock())), - r#" - onmessage = function(e) { - let s = new TextDecoder().decode(e.data);; - console.log("msg from main script", s); - if (s == "exit") { - close(); - return; - } else { - console.assert(s === "hi"); + tokio_util::init(|| { + let worker_result = spawn( + CompilerBehavior::new( + IsolateState::mock().flags.clone(), + IsolateState::mock().argv.clone(), + ), + "TEST", + WorkerInit::Script( + r#" + onmessage = function(e) { + console.log("msg from main script", e.data); + if (e.data == "exit") { + close(); + return; + } else { + console.assert(e.data === "hi"); + } + postMessage([1, 2, 3]); + console.log("after postMessage"); } - postMessage(new Uint8Array([1, 2, 3])); - console.log("after postMessage"); - } - "#.into(), - ); - let msg = String::from("hi").into_boxed_str().into_boxed_bytes(); + "#.into(), + ), + ); + assert!(worker_result.is_ok()); + let worker = worker_result.unwrap(); + let resource = worker.resource.clone(); + let resource_ = resource.clone(); - let r = resources::worker_post_message(resource.rid, msg).wait(); - assert!(r.is_ok()); + tokio::spawn(lazy(move || { + worker.then(move |r| -> Result<(), ()> { + resource_.close(); + debug!("workers.rs after resource close"); + if let Err(err) = r { + eprintln!("{}", JSErrorColor(&err).to_string()); + assert!(false) + } + Ok(()) + }) + })); - let maybe_msg = - resources::worker_recv_message(resource.rid).wait().unwrap(); - assert!(maybe_msg.is_some()); - assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]); + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let msg = String::from("exit").into_boxed_str().into_boxed_bytes(); - let r = resources::worker_post_message(resource.rid, msg).wait(); - assert!(r.is_ok()); + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + + let maybe_msg = resources::get_message_from_worker(resource.rid) + .wait() + .unwrap(); + assert!(maybe_msg.is_some()); + // Check if message received is [1, 2, 3] in json + assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); + + let msg = json!("exit") + .to_string() + .into_boxed_str() + .into_boxed_bytes(); + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + }) } #[test] fn removed_from_resource_table_on_close() { - let resource = spawn( - CompilerBehavior::new(Arc::new(IsolateState::mock())), - "onmessage = () => close();".into(), - ); - - assert_eq!( - resources::get_type(resource.rid), - Some("worker".to_string()) - ); - - let msg = String::from("hi").into_boxed_str().into_boxed_bytes(); - let r = resources::worker_post_message(resource.rid, msg).wait(); - assert!(r.is_ok()); - println!("rid {:?}", resource.rid); - - // TODO Need a way to get a future for when a resource closes. - // For now, just sleep for a bit. - // resource.close(); - thread::sleep(std::time::Duration::from_millis(1000)); - assert_eq!(resources::get_type(resource.rid), None); + tokio_util::init(|| { + let worker_result = spawn( + CompilerBehavior::new( + IsolateState::mock().flags.clone(), + IsolateState::mock().argv.clone(), + ), + "TEST", + WorkerInit::Script("onmessage = () => close();".into()), + ); + assert!(worker_result.is_ok()); + let worker = worker_result.unwrap(); + let resource = worker.resource.clone(); + let resource_ = resource.clone(); + + tokio::spawn(lazy(move || { + worker.then(move |r| -> Result<(), ()> { + resource_.close(); + debug!("workers.rs after resource close"); + if let Err(err) = r { + eprintln!("{}", JSErrorColor(&err).to_string()); + assert!(false) + } + Ok(()) + }) + })); + + assert_eq!( + resources::get_type(resource.rid), + Some("worker".to_string()) + ); + + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + println!("rid {:?}", resource.rid); + + // TODO Need a way to get a future for when a resource closes. + // For now, just sleep for a bit. + // resource.close(); + thread::sleep(std::time::Duration::from_millis(1000)); + assert_eq!(resources::get_type(resource.rid), None); + }) } } |