summaryrefslogtreecommitdiff
path: root/cli
diff options
context:
space:
mode:
authorandy finch <andyfinch7@gmail.com>2019-04-01 15:09:59 -0400
committerRyan Dahl <ry@tinyclouds.org>2019-04-01 15:09:59 -0400
commitb0a23beb8fae964be3cdd8c23c38af66257d34c7 (patch)
tree8f7875c8ca059dfb0a3ade4da7bfb94e57d6e1aa /cli
parent659acadf77fdbeef8579a37839a464feb408437a (diff)
Add web worker JS API (#1993)
* Refactored the way worker polling is scheduled and errors are handled. * Share the worker future as a Shared
Diffstat (limited to 'cli')
-rw-r--r--cli/compiler.rs318
-rw-r--r--cli/errors.rs10
-rw-r--r--cli/isolate.rs4
-rw-r--r--cli/isolate_state.rs15
-rw-r--r--cli/main.rs2
-rw-r--r--cli/msg.fbs38
-rw-r--r--cli/ops.rs160
-rw-r--r--cli/resources.rs5
-rw-r--r--cli/startup_data.rs2
-rw-r--r--cli/workers.rs303
10 files changed, 680 insertions, 177 deletions
diff --git a/cli/compiler.rs b/cli/compiler.rs
index 4613aff99..bd0a763c1 100644
--- a/cli/compiler.rs
+++ b/cli/compiler.rs
@@ -1,26 +1,53 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use core::ops::Deref;
+use crate::flags::DenoFlags;
use crate::isolate_state::*;
+use crate::js_errors::JSErrorColor;
use crate::msg;
use crate::ops;
use crate::resources;
-use crate::resources::Resource;
use crate::resources::ResourceId;
use crate::startup_data;
use crate::workers;
use crate::workers::WorkerBehavior;
+use crate::workers::WorkerInit;
use deno::deno_buf;
use deno::Behavior;
use deno::Buf;
+use deno::JSError;
use deno::Op;
use deno::StartupData;
+use futures::future::*;
+use futures::sync::oneshot;
use futures::Future;
use serde_json;
use std::str;
use std::sync::Arc;
use std::sync::Mutex;
+use tokio::runtime::Runtime;
+
+/// Used for normalization of types on internal future completions
+type CompilerInnerResult = Result<ModuleMetaData, Option<JSError>>;
+type WorkerErrReceiver = oneshot::Receiver<CompilerInnerResult>;
+
+/// Shared resources for used to complete compiler operations.
+/// rid is the resource id for compiler worker resource used for sending it
+/// compile requests
+/// worker_err_receiver is a shared future that will compelete when the
+/// compiler worker future completes, and send back an error if present
+/// or a None if not
+#[derive(Clone)]
+struct CompilerShared {
+ pub rid: ResourceId,
+ pub worker_err_receiver: Shared<WorkerErrReceiver>,
+}
lazy_static! {
- static ref C_RID: Mutex<Option<ResourceId>> = Mutex::new(None);
+ // Shared worker resources so we can spawn
+ static ref C_SHARED: Mutex<Option<CompilerShared>> = Mutex::new(None);
+ // tokio runtime specifically for spawning logic that is dependent on
+ // completetion of the compiler worker future
+ static ref C_RUNTIME: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap());
}
pub struct CompilerBehavior {
@@ -28,8 +55,10 @@ pub struct CompilerBehavior {
}
impl CompilerBehavior {
- pub fn new(state: Arc<IsolateState>) -> Self {
- Self { state }
+ pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self {
+ Self {
+ state: Arc::new(IsolateState::new(flags, argv_rest, None, true)),
+ }
}
}
@@ -65,13 +94,14 @@ impl WorkerBehavior for CompilerBehavior {
self.state.flags.clone(),
self.state.argv.clone(),
Some(worker_channels),
+ true,
));
}
}
// This corresponds to JS ModuleMetaData.
// TODO Rename one or the other so they correspond.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct ModuleMetaData {
pub module_name: String,
pub filename: String,
@@ -102,26 +132,60 @@ impl ModuleMetaData {
}
}
-fn lazy_start(parent_state: Arc<IsolateState>) -> Resource {
- let mut cell = C_RID.lock().unwrap();
- let rid = cell.get_or_insert_with(|| {
- let resource = workers::spawn(
- CompilerBehavior::new(Arc::new(IsolateState::new(
- parent_state.flags.clone(),
- parent_state.argv.clone(),
- None,
- ))),
- "compilerMain()".to_string(),
- );
- resource.rid
- });
- Resource { rid: *rid }
+fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
+ let mut cell = C_SHARED.lock().unwrap();
+ cell
+ .get_or_insert_with(|| {
+ let worker_result = workers::spawn(
+ CompilerBehavior::new(
+ parent_state.flags.clone(),
+ parent_state.argv.clone(),
+ ),
+ "TS",
+ WorkerInit::Script("compilerMain()".to_string()),
+ );
+ match worker_result {
+ Ok(worker) => {
+ let rid = worker.resource.rid.clone();
+ // create oneshot channels and use the sender to pass back
+ // results from worker future
+ let (err_sender, err_receiver) =
+ oneshot::channel::<CompilerInnerResult>();
+ let mut runtime = C_RUNTIME.lock().unwrap();
+ runtime.spawn(lazy(move || {
+ let resource = worker.resource.clone();
+ worker.then(move |result| -> Result<(), ()> {
+ resource.close();
+ match result {
+ Err(err) => err_sender.send(Err(Some(err))).unwrap(),
+ _ => err_sender.send(Err(None)).unwrap(),
+ };
+ Ok(())
+ })
+ }));
+ CompilerShared {
+ rid,
+ worker_err_receiver: err_receiver.shared(),
+ }
+ }
+ Err(err) => {
+ println!("{}", err.to_string());
+ std::process::exit(1);
+ }
+ }
+ }).clone()
}
-fn req(specifier: &str, referrer: &str) -> Buf {
+fn show_compiler_error(err: JSError) -> ModuleMetaData {
+ eprintln!("{}", JSErrorColor(&err).to_string());
+ std::process::exit(1);
+}
+
+fn req(specifier: &str, referrer: &str, is_worker_main: bool) -> Buf {
json!({
"specifier": specifier,
"referrer": referrer,
+ "isWorker": is_worker_main
}).to_string()
.into_boxed_str()
.into_boxed_bytes()
@@ -133,70 +197,172 @@ pub fn compile_sync(
referrer: &str,
module_meta_data: &ModuleMetaData,
) -> ModuleMetaData {
- let req_msg = req(specifier, referrer);
-
- let compiler = lazy_start(parent_state);
-
- let send_future = resources::worker_post_message(compiler.rid, req_msg);
- send_future.wait().unwrap();
-
- let recv_future = resources::worker_recv_message(compiler.rid);
- let result = recv_future.wait().unwrap();
- assert!(result.is_some());
- let res_msg = result.unwrap();
-
- let res_json = std::str::from_utf8(&res_msg).unwrap();
- match serde_json::from_str::<serde_json::Value>(res_json) {
- Ok(serde_json::Value::Object(map)) => ModuleMetaData {
- module_name: module_meta_data.module_name.clone(),
- filename: module_meta_data.filename.clone(),
- media_type: module_meta_data.media_type,
- source_code: module_meta_data.source_code.clone(),
- maybe_output_code: match map["outputCode"].as_str() {
- Some(str) => Some(str.as_bytes().to_owned()),
- _ => None,
- },
- maybe_output_code_filename: None,
- maybe_source_map: match map["sourceMap"].as_str() {
- Some(str) => Some(str.as_bytes().to_owned()),
- _ => None,
- },
- maybe_source_map_filename: None,
- },
- _ => panic!("error decoding compiler response"),
+ let is_worker = parent_state.is_worker.clone();
+ let shared = lazy_start(parent_state);
+
+ let (local_sender, local_receiver) =
+ oneshot::channel::<Result<ModuleMetaData, Option<JSError>>>();
+
+ // Just some extra scoping to keep things clean
+ {
+ let compiler_rid = shared.rid.clone();
+ let module_meta_data_ = module_meta_data.clone();
+ let req_msg = req(specifier, referrer, is_worker);
+ let sender_arc = Arc::new(Some(local_sender));
+ let specifier_ = specifier.clone().to_string();
+ let referrer_ = referrer.clone().to_string();
+
+ let mut runtime = C_RUNTIME.lock().unwrap();
+ runtime.spawn(lazy(move || {
+ debug!(
+ "Running rust part of compile_sync specifier: {} referrer: {}",
+ specifier_, referrer_
+ );
+ let mut send_sender_arc = sender_arc.clone();
+ resources::post_message_to_worker(compiler_rid, req_msg)
+ .map_err(move |_| {
+ let sender = Arc::get_mut(&mut send_sender_arc).unwrap().take();
+ sender.unwrap().send(Err(None)).unwrap()
+ }).and_then(move |_| {
+ debug!(
+ "Sent message to worker specifier: {} referrer: {}",
+ specifier_, referrer_
+ );
+ let mut get_sender_arc = sender_arc.clone();
+ let mut result_sender_arc = sender_arc.clone();
+ resources::get_message_from_worker(compiler_rid)
+ .map_err(move |_| {
+ let sender = Arc::get_mut(&mut get_sender_arc).unwrap().take();
+ sender.unwrap().send(Err(None)).unwrap()
+ }).and_then(move |res_msg_option| -> Result<(), ()> {
+ debug!(
+ "Recieved message from worker specifier: {} referrer: {}",
+ specifier_, referrer_
+ );
+ let res_msg = res_msg_option.unwrap();
+ let res_json = std::str::from_utf8(&res_msg).unwrap();
+ let sender = Arc::get_mut(&mut result_sender_arc).unwrap().take();
+ let sender = sender.unwrap();
+ Ok(
+ sender
+ .send(Ok(match serde_json::from_str::<serde_json::Value>(
+ res_json,
+ ) {
+ Ok(serde_json::Value::Object(map)) => ModuleMetaData {
+ module_name: module_meta_data_.module_name.clone(),
+ filename: module_meta_data_.filename.clone(),
+ media_type: module_meta_data_.media_type,
+ source_code: module_meta_data_.source_code.clone(),
+ maybe_output_code: match map["outputCode"].as_str() {
+ Some(str) => Some(str.as_bytes().to_owned()),
+ _ => None,
+ },
+ maybe_output_code_filename: None,
+ maybe_source_map: match map["sourceMap"].as_str() {
+ Some(str) => Some(str.as_bytes().to_owned()),
+ _ => None,
+ },
+ maybe_source_map_filename: None,
+ },
+ _ => panic!("error decoding compiler response"),
+ })).unwrap(),
+ )
+ })
+ })
+ }));
+ }
+
+ let worker_receiver = shared.worker_err_receiver.clone();
+
+ let union =
+ futures::future::select_all(vec![worker_receiver, local_receiver.shared()]);
+
+ match union.wait() {
+ Ok((result, i, rest)) => {
+ // We got a sucessful finish before any recivers where canceled
+ let mut rest_mut = rest;
+ match ((*result.deref()).clone(), i) {
+ // Either receiver was completed with success.
+ (Ok(v), _) => v,
+ // Either receiver was completed with a valid error
+ // this should be fatal for now since it is not intended
+ // to be possible to recover from a uncaught error in a isolate
+ (Err(Some(err)), _) => show_compiler_error(err),
+ // local_receiver finished first with a none error. This is intended
+ // to catch when the local logic can't complete because it is unable
+ // to send and/or receive messages from the compiler worker.
+ // Due to the way that scheduling works it is very likely that the
+ // compiler worker future has already or will in the near future
+ // complete with a valid JSError or a None.
+ (Err(None), 1) => {
+ debug!("Compiler local exited with None error!");
+ // While technically possible to get stuck here indefinately
+ // in theory it is highly unlikely.
+ debug!(
+ "Waiting on compiler worker result specifier: {} referrer: {}!",
+ specifier, referrer
+ );
+ let worker_result =
+ (*rest_mut.remove(0).wait().unwrap().deref()).clone();
+ debug!(
+ "Finished waiting on worker result specifier: {} referrer: {}!",
+ specifier, referrer
+ );
+ match worker_result {
+ Err(Some(err)) => show_compiler_error(err),
+ Err(None) => panic!("Compiler exit for an unknown reason!"),
+ Ok(v) => v,
+ }
+ }
+ // While possible beccause the compiler worker can exit without error
+ // this shouldn't occurr normally and I don't intend to attempt to
+ // handle it right now
+ (_, i) => panic!("Odd compiler result for future {}!", i),
+ }
+ }
+ // This should always a result of a reciver being cancled
+ // in theory but why not give a print out just in case
+ Err((err, i, _)) => panic!("compile_sync {} failed: {}", i, err),
}
}
#[cfg(test)]
mod tests {
use super::*;
+ use crate::tokio_util;
#[test]
fn test_compile_sync() {
- let cwd = std::env::current_dir().unwrap();
- let cwd_string = cwd.to_str().unwrap().to_owned();
-
- let specifier = "./tests/002_hello.ts";
- let referrer = cwd_string + "/";
-
- let mut out = ModuleMetaData {
- module_name: "xxx".to_owned(),
- filename: "/tests/002_hello.ts".to_owned(),
- media_type: msg::MediaType::TypeScript,
- source_code: "console.log(\"Hello World\");".as_bytes().to_owned(),
- maybe_output_code_filename: None,
- maybe_output_code: None,
- maybe_source_map_filename: None,
- maybe_source_map: None,
- };
-
- out =
- compile_sync(Arc::new(IsolateState::mock()), specifier, &referrer, &out);
- assert!(
- out
- .maybe_output_code
- .unwrap()
- .starts_with("console.log(\"Hello World\");".as_bytes())
- );
+ tokio_util::init(|| {
+ let cwd = std::env::current_dir().unwrap();
+ let cwd_string = cwd.to_str().unwrap().to_owned();
+
+ let specifier = "./tests/002_hello.ts";
+ let referrer = cwd_string + "/";
+
+ let mut out = ModuleMetaData {
+ module_name: "xxx".to_owned(),
+ filename: "/tests/002_hello.ts".to_owned(),
+ media_type: msg::MediaType::TypeScript,
+ source_code: include_bytes!("../tests/002_hello.ts").to_vec(),
+ maybe_output_code_filename: None,
+ maybe_output_code: None,
+ maybe_source_map_filename: None,
+ maybe_source_map: None,
+ };
+
+ out = compile_sync(
+ Arc::new(IsolateState::mock()),
+ specifier,
+ &referrer,
+ &out,
+ );
+ assert!(
+ out
+ .maybe_output_code
+ .unwrap()
+ .starts_with("console.log(\"Hello World\");".as_bytes())
+ );
+ });
}
}
diff --git a/cli/errors.rs b/cli/errors.rs
index a2c3c3441..3873f70ff 100644
--- a/cli/errors.rs
+++ b/cli/errors.rs
@@ -180,7 +180,15 @@ pub fn permission_denied() -> DenoError {
}
pub fn op_not_implemented() -> DenoError {
- new(ErrorKind::BadResource, String::from("op not implemented"))
+ new(ErrorKind::OpNotAvaiable, String::from("op not implemented"))
+}
+
+pub fn worker_init_failed() -> DenoError {
+ // TODO(afinch7) pass worker error data through here
+ new(
+ ErrorKind::WorkerInitFailed,
+ String::from("worker init failed"),
+ )
}
#[derive(Debug)]
diff --git a/cli/isolate.rs b/cli/isolate.rs
index 15ad4e125..ced1cb792 100644
--- a/cli/isolate.rs
+++ b/cli/isolate.rs
@@ -226,7 +226,7 @@ mod tests {
let argv = vec![String::from("./deno"), filename.clone()];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
- let state = Arc::new(IsolateState::new(flags, rest_argv, None));
+ let state = Arc::new(IsolateState::new(flags, rest_argv, None, false));
let state_ = state.clone();
tokio_util::run(lazy(move || {
let cli = CliBehavior::new(None, state.clone());
@@ -249,7 +249,7 @@ mod tests {
let argv = vec![String::from("./deno"), filename.clone()];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
- let state = Arc::new(IsolateState::new(flags, rest_argv, None));
+ let state = Arc::new(IsolateState::new(flags, rest_argv, None, false));
let state_ = state.clone();
tokio_util::run(lazy(move || {
let cli = CliBehavior::new(None, state.clone());
diff --git a/cli/isolate_state.rs b/cli/isolate_state.rs
index 9f6749925..b0fb97f10 100644
--- a/cli/isolate_state.rs
+++ b/cli/isolate_state.rs
@@ -5,9 +5,14 @@ use crate::flags;
use crate::global_timer::GlobalTimer;
use crate::modules::Modules;
use crate::permissions::DenoPermissions;
+use crate::resources::ResourceId;
+use crate::workers::UserWorkerBehavior;
+use crate::workers::Worker;
use deno::Buf;
+use futures::future::Shared;
use futures::sync::mpsc as async_mpsc;
use std;
+use std::collections::HashMap;
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -16,6 +21,8 @@ use std::sync::Mutex;
pub type WorkerSender = async_mpsc::Sender<Buf>;
pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
pub type WorkerChannels = (WorkerSender, WorkerReceiver);
+pub type UserWorkerTable =
+ HashMap<ResourceId, Shared<Worker<UserWorkerBehavior>>>;
// AtomicU64 is currently unstable
#[derive(Default)]
@@ -42,6 +49,8 @@ pub struct IsolateState {
pub modules: Mutex<Modules>,
pub worker_channels: Option<Mutex<WorkerChannels>>,
pub global_timer: Mutex<GlobalTimer>,
+ pub workers: Mutex<UserWorkerTable>,
+ pub is_worker: bool,
}
impl IsolateState {
@@ -49,6 +58,7 @@ impl IsolateState {
flags: flags::DenoFlags,
argv_rest: Vec<String>,
worker_channels: Option<WorkerChannels>,
+ is_worker: bool,
) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();
@@ -61,9 +71,12 @@ impl IsolateState {
modules: Mutex::new(Modules::new()),
worker_channels: worker_channels.map(Mutex::new),
global_timer: Mutex::new(GlobalTimer::new()),
+ workers: Mutex::new(UserWorkerTable::new()),
+ is_worker,
}
}
+ /// Read main module from argv
pub fn main_module(&self) -> Option<String> {
if self.argv.len() <= 1 {
None
@@ -110,7 +123,7 @@ impl IsolateState {
let argv = vec![String::from("./deno"), String::from("hello.js")];
// For debugging: argv.push_back(String::from("-D"));
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
- IsolateState::new(flags, rest_argv, None)
+ IsolateState::new(flags, rest_argv, None, false)
}
pub fn metrics_op_dispatched(
diff --git a/cli/main.rs b/cli/main.rs
index 12f94650d..f9a88803e 100644
--- a/cli/main.rs
+++ b/cli/main.rs
@@ -108,7 +108,7 @@ fn main() {
let should_prefetch = flags.prefetch || flags.info;
let should_display_info = flags.info;
- let state = Arc::new(IsolateState::new(flags, rest_argv, None));
+ let state = Arc::new(IsolateState::new(flags, rest_argv, None, false));
let state_ = state.clone();
let startup_data = startup_data::deno_isolate_init();
let cli = CliBehavior::new(Some(startup_data), state_);
diff --git a/cli/msg.fbs b/cli/msg.fbs
index 45f940f7d..695515f55 100644
--- a/cli/msg.fbs
+++ b/cli/msg.fbs
@@ -63,6 +63,12 @@ union Any {
StatRes,
Symlink,
Truncate,
+ CreateWorker,
+ CreateWorkerRes,
+ HostGetWorkerClosed,
+ HostGetMessage,
+ HostGetMessageRes,
+ HostPostMessage,
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
@@ -121,6 +127,8 @@ enum ErrorKind: byte {
// custom errors
InvalidUri,
InvalidSeekMode,
+ OpNotAvaiable,
+ WorkerInitFailed
}
table Cwd {}
@@ -171,6 +179,35 @@ table FormatErrorRes {
error: string;
}
+// Create worker as host
+table CreateWorker {
+ specifier: string;
+}
+
+table CreateWorkerRes {
+ rid: uint32;
+}
+
+table HostGetWorkerClosed {
+ rid: uint32;
+}
+
+// Get message from guest worker as host
+table HostGetMessage {
+ rid: uint32;
+}
+
+table HostGetMessageRes {
+ data: [ubyte];
+}
+
+// Post message to guest worker as host
+table HostPostMessage {
+ rid: uint32;
+ // data passed thru the zero-copy data parameter.
+}
+
+// Get message from host as guest worker
table WorkerGetMessage {
unused: int8;
}
@@ -179,6 +216,7 @@ table WorkerGetMessageRes {
data: [ubyte];
}
+// Post message to host as guest worker
table WorkerPostMessage {
// data passed thru the zero-copy data parameter.
}
diff --git a/cli/ops.rs b/cli/ops.rs
index a7c2e868f..c8119771d 100644
--- a/cli/ops.rs
+++ b/cli/ops.rs
@@ -18,6 +18,7 @@ use crate::resources::Resource;
use crate::tokio_util;
use crate::tokio_write;
use crate::version;
+use crate::workers;
use deno::deno_buf;
use deno::Buf;
use deno::JSError;
@@ -141,13 +142,24 @@ pub fn dispatch_all(
(base.sync(), boxed_op)
}
+/// Superset of op_selector_worker for compiler isolates
pub fn op_selector_compiler(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data),
+ _ => op_selector_worker(inner_type),
+ }
+}
+
+/// Superset of op_selector_std for worker isolates
+pub fn op_selector_worker(inner_type: msg::Any) -> Option<OpCreator> {
+ match inner_type {
+ msg::Any::WorkerGetMessage => Some(op_worker_get_message),
+ msg::Any::WorkerPostMessage => Some(op_worker_post_message),
_ => op_selector_std(inner_type),
}
}
+/// Standard ops set for most isolates
pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::Accept => Some(op_accept),
@@ -189,8 +201,10 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
msg::Any::Stat => Some(op_stat),
msg::Any::Symlink => Some(op_symlink),
msg::Any::Truncate => Some(op_truncate),
- msg::Any::WorkerGetMessage => Some(op_worker_get_message),
- msg::Any::WorkerPostMessage => Some(op_worker_post_message),
+ msg::Any::CreateWorker => Some(op_create_worker),
+ msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed),
+ msg::Any::HostGetMessage => Some(op_host_get_message),
+ msg::Any::HostPostMessage => Some(op_host_post_message),
msg::Any::Write => Some(op_write),
_ => None,
}
@@ -1741,6 +1755,7 @@ impl Future for GetMessageFuture {
}
}
+/// Get message from host as guest worker
fn op_worker_get_message(
sc: &IsolateStateContainer,
base: &msg::Base<'_>,
@@ -1775,6 +1790,7 @@ fn op_worker_get_message(
Box::new(op)
}
+/// Post message to host as guest worker
fn op_worker_post_message(
sc: &IsolateStateContainer,
base: &msg::Base<'_>,
@@ -1807,3 +1823,143 @@ fn op_worker_post_message(
});
Box::new(op)
}
+
+/// Create worker as the host
+fn op_create_worker(
+ sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_create_worker().unwrap();
+ let specifier = inner.specifier().unwrap();
+
+ Box::new(futures::future::result(move || -> OpResult {
+ let parent_state = sc.state().clone();
+ let behavior = workers::UserWorkerBehavior::new(
+ parent_state.flags.clone(),
+ parent_state.argv.clone(),
+ );
+ match workers::spawn(
+ behavior,
+ &format!("USER-WORKER-{}", specifier),
+ workers::WorkerInit::Module(specifier.to_string()),
+ ) {
+ Ok(worker) => {
+ let mut workers_tl = parent_state.workers.lock().unwrap();
+ let rid = worker.resource.rid.clone();
+ workers_tl.insert(rid, worker.shared());
+ let builder = &mut FlatBufferBuilder::new();
+ let msg_inner = msg::CreateWorkerRes::create(
+ builder,
+ &msg::CreateWorkerResArgs { rid },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::CreateWorkerRes,
+ ..Default::default()
+ },
+ ))
+ }
+ Err(errors::RustOrJsError::Js(_)) => Err(errors::worker_init_failed()),
+ Err(errors::RustOrJsError::Rust(err)) => Err(err),
+ }
+ }()))
+}
+
+/// Return when the worker closes
+fn op_host_get_worker_closed(
+ sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_get_worker_closed().unwrap();
+ let rid = inner.rid();
+ let state = sc.state().clone();
+
+ let shared_worker_future = {
+ let workers_tl = state.workers.lock().unwrap();
+ let worker = workers_tl.get(&rid).unwrap();
+ worker.clone()
+ };
+
+ Box::new(shared_worker_future.then(move |_result| {
+ let builder = &mut FlatBufferBuilder::new();
+
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ }))
+}
+
+/// Get message from guest worker as host
+fn op_host_get_message(
+ _sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_get_message().unwrap();
+ let rid = inner.rid();
+
+ let op = resources::get_message_from_worker(rid);
+ let op = op.map_err(move |_| -> DenoError { unimplemented!() });
+ let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
+
+ let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
+ let msg_inner = msg::HostGetMessageRes::create(
+ builder,
+ &msg::HostGetMessageResArgs { data },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::HostGetMessageRes,
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
+}
+
+/// Post message to guest worker as host
+fn op_host_post_message(
+ _sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_post_message().unwrap();
+ let rid = inner.rid();
+
+ let d = Vec::from(data.as_ref()).into_boxed_slice();
+
+ let op = resources::post_message_to_worker(rid, d);
+ let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
+ let op = op.and_then(move |_| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
+
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
+}
diff --git a/cli/resources.rs b/cli/resources.rs
index 5b08e4b32..817f6062d 100644
--- a/cli/resources.rs
+++ b/cli/resources.rs
@@ -305,7 +305,8 @@ pub fn add_worker(wc: WorkerChannels) -> Resource {
Resource { rid }
}
-pub fn worker_post_message(
+/// Post message to worker as a host or privilged overlord
+pub fn post_message_to_worker(
rid: ResourceId,
buf: Buf,
) -> futures::sink::Send<futures::sync::mpsc::Sender<Buf>> {
@@ -341,7 +342,7 @@ impl Future for WorkerReceiver {
}
}
-pub fn worker_recv_message(rid: ResourceId) -> WorkerReceiver {
+pub fn get_message_from_worker(rid: ResourceId) -> WorkerReceiver {
WorkerReceiver { rid }
}
diff --git a/cli/startup_data.rs b/cli/startup_data.rs
index 7f59c0678..61891ced4 100644
--- a/cli/startup_data.rs
+++ b/cli/startup_data.rs
@@ -31,7 +31,7 @@ pub fn deno_isolate_init() -> StartupData {
pub fn compiler_isolate_init() -> StartupData {
if cfg!(feature = "no-snapshot-init") {
- debug!("Deno isolate init without snapshots.");
+ debug!("Compiler isolate init without snapshots.");
#[cfg(not(feature = "check-only"))]
let source_bytes = include_bytes!(concat!(
env!("GN_OUT_DIR"),
diff --git a/cli/workers.rs b/cli/workers.rs
index 0c8d49fa7..cb919e8ed 100644
--- a/cli/workers.rs
+++ b/cli/workers.rs
@@ -1,17 +1,72 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use crate::errors::*;
+use crate::flags::DenoFlags;
use crate::isolate::{DenoBehavior, Isolate};
+use crate::isolate_state::IsolateState;
+use crate::isolate_state::IsolateStateContainer;
use crate::isolate_state::WorkerChannels;
-use crate::js_errors::JSErrorColor;
+use crate::ops;
use crate::resources;
-use crate::tokio_util;
+use crate::startup_data;
+use deno::deno_buf;
+use deno::Behavior;
use deno::Buf;
use deno::JSError;
-use futures::future::lazy;
+use deno::Op;
+use deno::StartupData;
use futures::sync::mpsc;
-use futures::sync::oneshot;
use futures::Future;
use futures::Poll;
-use std::thread;
+use std::sync::Arc;
+
+pub struct UserWorkerBehavior {
+ pub state: Arc<IsolateState>,
+}
+
+impl UserWorkerBehavior {
+ pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self {
+ Self {
+ state: Arc::new(IsolateState::new(flags, argv_rest, None, true)),
+ }
+ }
+}
+
+impl IsolateStateContainer for UserWorkerBehavior {
+ fn state(&self) -> Arc<IsolateState> {
+ self.state.clone()
+ }
+}
+
+impl IsolateStateContainer for &UserWorkerBehavior {
+ fn state(&self) -> Arc<IsolateState> {
+ self.state.clone()
+ }
+}
+
+impl Behavior for UserWorkerBehavior {
+ fn startup_data(&mut self) -> Option<StartupData> {
+ Some(startup_data::deno_isolate_init())
+ }
+
+ fn dispatch(
+ &mut self,
+ control: &[u8],
+ zero_copy: deno_buf,
+ ) -> (bool, Box<Op>) {
+ ops::dispatch_all(self, control, zero_copy, ops::op_selector_worker)
+ }
+}
+
+impl WorkerBehavior for UserWorkerBehavior {
+ fn set_internal_channels(&mut self, worker_channels: WorkerChannels) {
+ self.state = Arc::new(IsolateState::new(
+ self.state.flags.clone(),
+ self.state.argv.clone(),
+ Some(worker_channels),
+ true,
+ ));
+ }
+}
/// Behavior trait specific to workers
pub trait WorkerBehavior: DenoBehavior {
@@ -24,10 +79,11 @@ pub trait WorkerBehavior: DenoBehavior {
/// Rust interface for WebWorkers.
pub struct Worker<B: WorkerBehavior> {
isolate: Isolate<B>,
+ pub resource: resources::Resource,
}
impl<B: WorkerBehavior> Worker<B> {
- pub fn new(mut behavior: B) -> (Self, WorkerChannels) {
+ pub fn new(mut behavior: B) -> Self {
let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1);
let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1);
@@ -38,13 +94,23 @@ impl<B: WorkerBehavior> Worker<B> {
let isolate = Isolate::new(behavior);
- let worker = Worker { isolate };
- (worker, external_channels)
+ Worker {
+ isolate,
+ resource: resources::add_worker(external_channels),
+ }
}
pub fn execute(&mut self, js_source: &str) -> Result<(), JSError> {
self.isolate.execute(js_source)
}
+
+ pub fn execute_mod(
+ &mut self,
+ js_filename: &str,
+ is_prefetch: bool,
+ ) -> Result<(), RustOrJsError> {
+ self.isolate.execute_mod(js_filename, is_prefetch)
+ }
}
impl<B: WorkerBehavior> Future for Worker<B> {
@@ -56,47 +122,48 @@ impl<B: WorkerBehavior> Future for Worker<B> {
}
}
+/// Method and data used to initalize a worker
+pub enum WorkerInit {
+ Script(String),
+ Module(String),
+}
+
pub fn spawn<B: WorkerBehavior + 'static>(
behavior: B,
- js_source: String,
-) -> resources::Resource {
- // TODO This function should return a Future, so that the caller can retrieve
- // the JSError if one is thrown. Currently it just prints to stderr and calls
- // exit(1).
- // let (js_error_tx, js_error_rx) = oneshot::channel::<JSError>();
- let (p, c) = oneshot::channel::<resources::Resource>();
- let builder = thread::Builder::new().name("worker".to_string());
-
- let _tid = builder
- .spawn(move || {
- tokio_util::run(lazy(move || {
- let (mut worker, external_channels) = Worker::new(behavior);
- let resource = resources::add_worker(external_channels);
- p.send(resource.clone()).unwrap();
-
- worker
- .execute("denoMain()")
- .expect("worker denoMain failed");
- worker
- .execute("workerMain()")
- .expect("worker workerMain failed");
- worker.execute(&js_source).expect("worker js_source failed");
+ worker_debug_name: &str,
+ init: WorkerInit,
+) -> Result<Worker<B>, RustOrJsError> {
+ let state = behavior.state().clone();
+ let mut worker = Worker::new(behavior);
- worker.then(move |r| -> Result<(), ()> {
- resource.close();
- debug!("workers.rs after resource close");
- if let Err(err) = r {
- eprintln!("{}", JSErrorColor(&err).to_string());
- std::process::exit(1);
- }
- Ok(())
- })
- }));
+ worker
+ .execute(&format!("denoMain('{}')", worker_debug_name))
+ .expect("worker workerInit failed");
+
+ worker
+ .execute("workerMain()")
+ .expect("worker workerMain failed");
- debug!("workers.rs after spawn");
- }).unwrap();
+ let init_result = match init {
+ WorkerInit::Script(script) => match worker.execute(&script) {
+ Ok(v) => Ok(v),
+ Err(e) => Err(RustOrJsError::Js(e)),
+ },
+ WorkerInit::Module(specifier) => {
+ let should_prefetch = state.flags.prefetch || state.flags.info;
+ match state.dir.resolve_module_url(&specifier, ".") {
+ Err(err) => Err(RustOrJsError::Rust(DenoError::from(err))),
+ Ok(module_url) => {
+ worker.execute_mod(&module_url.to_string(), should_prefetch)
+ }
+ }
+ }
+ };
- c.wait().unwrap()
+ match init_result {
+ Ok(_) => Ok(worker),
+ Err(err) => Err(err),
+ }
}
#[cfg(test)]
@@ -104,63 +171,117 @@ mod tests {
use super::*;
use crate::compiler::CompilerBehavior;
use crate::isolate_state::IsolateState;
- use std::sync::Arc;
+ use crate::js_errors::JSErrorColor;
+ use crate::tokio_util;
+ use futures::future::lazy;
+ use std::thread;
#[test]
fn test_spawn() {
- let resource = spawn(
- CompilerBehavior::new(Arc::new(IsolateState::mock())),
- r#"
- onmessage = function(e) {
- let s = new TextDecoder().decode(e.data);;
- console.log("msg from main script", s);
- if (s == "exit") {
- close();
- return;
- } else {
- console.assert(s === "hi");
+ tokio_util::init(|| {
+ let worker_result = spawn(
+ CompilerBehavior::new(
+ IsolateState::mock().flags.clone(),
+ IsolateState::mock().argv.clone(),
+ ),
+ "TEST",
+ WorkerInit::Script(
+ r#"
+ onmessage = function(e) {
+ console.log("msg from main script", e.data);
+ if (e.data == "exit") {
+ close();
+ return;
+ } else {
+ console.assert(e.data === "hi");
+ }
+ postMessage([1, 2, 3]);
+ console.log("after postMessage");
}
- postMessage(new Uint8Array([1, 2, 3]));
- console.log("after postMessage");
- }
- "#.into(),
- );
- let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
+ "#.into(),
+ ),
+ );
+ assert!(worker_result.is_ok());
+ let worker = worker_result.unwrap();
+ let resource = worker.resource.clone();
+ let resource_ = resource.clone();
- let r = resources::worker_post_message(resource.rid, msg).wait();
- assert!(r.is_ok());
+ tokio::spawn(lazy(move || {
+ worker.then(move |r| -> Result<(), ()> {
+ resource_.close();
+ debug!("workers.rs after resource close");
+ if let Err(err) = r {
+ eprintln!("{}", JSErrorColor(&err).to_string());
+ assert!(false)
+ }
+ Ok(())
+ })
+ }));
- let maybe_msg =
- resources::worker_recv_message(resource.rid).wait().unwrap();
- assert!(maybe_msg.is_some());
- assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]);
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let msg = String::from("exit").into_boxed_str().into_boxed_bytes();
- let r = resources::worker_post_message(resource.rid, msg).wait();
- assert!(r.is_ok());
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+
+ let maybe_msg = resources::get_message_from_worker(resource.rid)
+ .wait()
+ .unwrap();
+ assert!(maybe_msg.is_some());
+ // Check if message received is [1, 2, 3] in json
+ assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
+
+ let msg = json!("exit")
+ .to_string()
+ .into_boxed_str()
+ .into_boxed_bytes();
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ })
}
#[test]
fn removed_from_resource_table_on_close() {
- let resource = spawn(
- CompilerBehavior::new(Arc::new(IsolateState::mock())),
- "onmessage = () => close();".into(),
- );
-
- assert_eq!(
- resources::get_type(resource.rid),
- Some("worker".to_string())
- );
-
- let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
- let r = resources::worker_post_message(resource.rid, msg).wait();
- assert!(r.is_ok());
- println!("rid {:?}", resource.rid);
-
- // TODO Need a way to get a future for when a resource closes.
- // For now, just sleep for a bit.
- // resource.close();
- thread::sleep(std::time::Duration::from_millis(1000));
- assert_eq!(resources::get_type(resource.rid), None);
+ tokio_util::init(|| {
+ let worker_result = spawn(
+ CompilerBehavior::new(
+ IsolateState::mock().flags.clone(),
+ IsolateState::mock().argv.clone(),
+ ),
+ "TEST",
+ WorkerInit::Script("onmessage = () => close();".into()),
+ );
+ assert!(worker_result.is_ok());
+ let worker = worker_result.unwrap();
+ let resource = worker.resource.clone();
+ let resource_ = resource.clone();
+
+ tokio::spawn(lazy(move || {
+ worker.then(move |r| -> Result<(), ()> {
+ resource_.close();
+ debug!("workers.rs after resource close");
+ if let Err(err) = r {
+ eprintln!("{}", JSErrorColor(&err).to_string());
+ assert!(false)
+ }
+ Ok(())
+ })
+ }));
+
+ assert_eq!(
+ resources::get_type(resource.rid),
+ Some("worker".to_string())
+ );
+
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ println!("rid {:?}", resource.rid);
+
+ // TODO Need a way to get a future for when a resource closes.
+ // For now, just sleep for a bit.
+ // resource.close();
+ thread::sleep(std::time::Duration::from_millis(1000));
+ assert_eq!(resources::get_type(resource.rid), None);
+ })
}
}