summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/compiler.rs318
-rw-r--r--cli/errors.rs10
-rw-r--r--cli/isolate.rs4
-rw-r--r--cli/isolate_state.rs15
-rw-r--r--cli/main.rs2
-rw-r--r--cli/msg.fbs38
-rw-r--r--cli/ops.rs160
-rw-r--r--cli/resources.rs5
-rw-r--r--cli/startup_data.rs2
-rw-r--r--cli/workers.rs303
-rw-r--r--core/js_errors.rs4
-rw-r--r--js/assets.ts16
-rw-r--r--js/compiler.ts26
-rw-r--r--js/globals.ts9
-rw-r--r--js/main.ts4
-rw-r--r--js/workers.ts147
-rw-r--r--tests/026_workers.test2
-rw-r--r--tests/026_workers.ts14
-rw-r--r--tests/026_workers.ts.out4
-rw-r--r--tests/error_004_missing_module (renamed from tests/error_004_missing_module.disabled)0
-rw-r--r--tests/error_005_missing_dynamic_import (renamed from tests/error_005_missing_dynamic_import.disabled)0
-rw-r--r--tests/error_006_import_ext_failure (renamed from tests/error_006_import_ext_failure.disabled)0
-rw-r--r--tests/subdir/test_worker.js7
-rw-r--r--tests/subdir/test_worker.ts7
-rw-r--r--tools/ts_library_builder/ast_util.ts5
-rw-r--r--tools/ts_library_builder/build_library.ts166
-rw-r--r--tools/ts_library_builder/main.ts1
-rw-r--r--tools/ts_library_builder/test.ts17
28 files changed, 1021 insertions, 265 deletions
diff --git a/cli/compiler.rs b/cli/compiler.rs
index 4613aff99..bd0a763c1 100644
--- a/cli/compiler.rs
+++ b/cli/compiler.rs
@@ -1,26 +1,53 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use core::ops::Deref;
+use crate::flags::DenoFlags;
use crate::isolate_state::*;
+use crate::js_errors::JSErrorColor;
use crate::msg;
use crate::ops;
use crate::resources;
-use crate::resources::Resource;
use crate::resources::ResourceId;
use crate::startup_data;
use crate::workers;
use crate::workers::WorkerBehavior;
+use crate::workers::WorkerInit;
use deno::deno_buf;
use deno::Behavior;
use deno::Buf;
+use deno::JSError;
use deno::Op;
use deno::StartupData;
+use futures::future::*;
+use futures::sync::oneshot;
use futures::Future;
use serde_json;
use std::str;
use std::sync::Arc;
use std::sync::Mutex;
+use tokio::runtime::Runtime;
+
+/// Used for normalization of types on internal future completions
+type CompilerInnerResult = Result<ModuleMetaData, Option<JSError>>;
+type WorkerErrReceiver = oneshot::Receiver<CompilerInnerResult>;
+
+/// Shared resources for used to complete compiler operations.
+/// rid is the resource id for compiler worker resource used for sending it
+/// compile requests
+/// worker_err_receiver is a shared future that will compelete when the
+/// compiler worker future completes, and send back an error if present
+/// or a None if not
+#[derive(Clone)]
+struct CompilerShared {
+ pub rid: ResourceId,
+ pub worker_err_receiver: Shared<WorkerErrReceiver>,
+}
lazy_static! {
- static ref C_RID: Mutex<Option<ResourceId>> = Mutex::new(None);
+ // Shared worker resources so we can spawn
+ static ref C_SHARED: Mutex<Option<CompilerShared>> = Mutex::new(None);
+ // tokio runtime specifically for spawning logic that is dependent on
+ // completetion of the compiler worker future
+ static ref C_RUNTIME: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap());
}
pub struct CompilerBehavior {
@@ -28,8 +55,10 @@ pub struct CompilerBehavior {
}
impl CompilerBehavior {
- pub fn new(state: Arc<IsolateState>) -> Self {
- Self { state }
+ pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self {
+ Self {
+ state: Arc::new(IsolateState::new(flags, argv_rest, None, true)),
+ }
}
}
@@ -65,13 +94,14 @@ impl WorkerBehavior for CompilerBehavior {
self.state.flags.clone(),
self.state.argv.clone(),
Some(worker_channels),
+ true,
));
}
}
// This corresponds to JS ModuleMetaData.
// TODO Rename one or the other so they correspond.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct ModuleMetaData {
pub module_name: String,
pub filename: String,
@@ -102,26 +132,60 @@ impl ModuleMetaData {
}
}
-fn lazy_start(parent_state: Arc<IsolateState>) -> Resource {
- let mut cell = C_RID.lock().unwrap();
- let rid = cell.get_or_insert_with(|| {
- let resource = workers::spawn(
- CompilerBehavior::new(Arc::new(IsolateState::new(
- parent_state.flags.clone(),
- parent_state.argv.clone(),
- None,
- ))),
- "compilerMain()".to_string(),
- );
- resource.rid
- });
- Resource { rid: *rid }
+fn lazy_start(parent_state: Arc<IsolateState>) -> CompilerShared {
+ let mut cell = C_SHARED.lock().unwrap();
+ cell
+ .get_or_insert_with(|| {
+ let worker_result = workers::spawn(
+ CompilerBehavior::new(
+ parent_state.flags.clone(),
+ parent_state.argv.clone(),
+ ),
+ "TS",
+ WorkerInit::Script("compilerMain()".to_string()),
+ );
+ match worker_result {
+ Ok(worker) => {
+ let rid = worker.resource.rid.clone();
+ // create oneshot channels and use the sender to pass back
+ // results from worker future
+ let (err_sender, err_receiver) =
+ oneshot::channel::<CompilerInnerResult>();
+ let mut runtime = C_RUNTIME.lock().unwrap();
+ runtime.spawn(lazy(move || {
+ let resource = worker.resource.clone();
+ worker.then(move |result| -> Result<(), ()> {
+ resource.close();
+ match result {
+ Err(err) => err_sender.send(Err(Some(err))).unwrap(),
+ _ => err_sender.send(Err(None)).unwrap(),
+ };
+ Ok(())
+ })
+ }));
+ CompilerShared {
+ rid,
+ worker_err_receiver: err_receiver.shared(),
+ }
+ }
+ Err(err) => {
+ println!("{}", err.to_string());
+ std::process::exit(1);
+ }
+ }
+ }).clone()
}
-fn req(specifier: &str, referrer: &str) -> Buf {
+fn show_compiler_error(err: JSError) -> ModuleMetaData {
+ eprintln!("{}", JSErrorColor(&err).to_string());
+ std::process::exit(1);
+}
+
+fn req(specifier: &str, referrer: &str, is_worker_main: bool) -> Buf {
json!({
"specifier": specifier,
"referrer": referrer,
+ "isWorker": is_worker_main
}).to_string()
.into_boxed_str()
.into_boxed_bytes()
@@ -133,70 +197,172 @@ pub fn compile_sync(
referrer: &str,
module_meta_data: &ModuleMetaData,
) -> ModuleMetaData {
- let req_msg = req(specifier, referrer);
-
- let compiler = lazy_start(parent_state);
-
- let send_future = resources::worker_post_message(compiler.rid, req_msg);
- send_future.wait().unwrap();
-
- let recv_future = resources::worker_recv_message(compiler.rid);
- let result = recv_future.wait().unwrap();
- assert!(result.is_some());
- let res_msg = result.unwrap();
-
- let res_json = std::str::from_utf8(&res_msg).unwrap();
- match serde_json::from_str::<serde_json::Value>(res_json) {
- Ok(serde_json::Value::Object(map)) => ModuleMetaData {
- module_name: module_meta_data.module_name.clone(),
- filename: module_meta_data.filename.clone(),
- media_type: module_meta_data.media_type,
- source_code: module_meta_data.source_code.clone(),
- maybe_output_code: match map["outputCode"].as_str() {
- Some(str) => Some(str.as_bytes().to_owned()),
- _ => None,
- },
- maybe_output_code_filename: None,
- maybe_source_map: match map["sourceMap"].as_str() {
- Some(str) => Some(str.as_bytes().to_owned()),
- _ => None,
- },
- maybe_source_map_filename: None,
- },
- _ => panic!("error decoding compiler response"),
+ let is_worker = parent_state.is_worker.clone();
+ let shared = lazy_start(parent_state);
+
+ let (local_sender, local_receiver) =
+ oneshot::channel::<Result<ModuleMetaData, Option<JSError>>>();
+
+ // Just some extra scoping to keep things clean
+ {
+ let compiler_rid = shared.rid.clone();
+ let module_meta_data_ = module_meta_data.clone();
+ let req_msg = req(specifier, referrer, is_worker);
+ let sender_arc = Arc::new(Some(local_sender));
+ let specifier_ = specifier.clone().to_string();
+ let referrer_ = referrer.clone().to_string();
+
+ let mut runtime = C_RUNTIME.lock().unwrap();
+ runtime.spawn(lazy(move || {
+ debug!(
+ "Running rust part of compile_sync specifier: {} referrer: {}",
+ specifier_, referrer_
+ );
+ let mut send_sender_arc = sender_arc.clone();
+ resources::post_message_to_worker(compiler_rid, req_msg)
+ .map_err(move |_| {
+ let sender = Arc::get_mut(&mut send_sender_arc).unwrap().take();
+ sender.unwrap().send(Err(None)).unwrap()
+ }).and_then(move |_| {
+ debug!(
+ "Sent message to worker specifier: {} referrer: {}",
+ specifier_, referrer_
+ );
+ let mut get_sender_arc = sender_arc.clone();
+ let mut result_sender_arc = sender_arc.clone();
+ resources::get_message_from_worker(compiler_rid)
+ .map_err(move |_| {
+ let sender = Arc::get_mut(&mut get_sender_arc).unwrap().take();
+ sender.unwrap().send(Err(None)).unwrap()
+ }).and_then(move |res_msg_option| -> Result<(), ()> {
+ debug!(
+ "Recieved message from worker specifier: {} referrer: {}",
+ specifier_, referrer_
+ );
+ let res_msg = res_msg_option.unwrap();
+ let res_json = std::str::from_utf8(&res_msg).unwrap();
+ let sender = Arc::get_mut(&mut result_sender_arc).unwrap().take();
+ let sender = sender.unwrap();
+ Ok(
+ sender
+ .send(Ok(match serde_json::from_str::<serde_json::Value>(
+ res_json,
+ ) {
+ Ok(serde_json::Value::Object(map)) => ModuleMetaData {
+ module_name: module_meta_data_.module_name.clone(),
+ filename: module_meta_data_.filename.clone(),
+ media_type: module_meta_data_.media_type,
+ source_code: module_meta_data_.source_code.clone(),
+ maybe_output_code: match map["outputCode"].as_str() {
+ Some(str) => Some(str.as_bytes().to_owned()),
+ _ => None,
+ },
+ maybe_output_code_filename: None,
+ maybe_source_map: match map["sourceMap"].as_str() {
+ Some(str) => Some(str.as_bytes().to_owned()),
+ _ => None,
+ },
+ maybe_source_map_filename: None,
+ },
+ _ => panic!("error decoding compiler response"),
+ })).unwrap(),
+ )
+ })
+ })
+ }));
+ }
+
+ let worker_receiver = shared.worker_err_receiver.clone();
+
+ let union =
+ futures::future::select_all(vec![worker_receiver, local_receiver.shared()]);
+
+ match union.wait() {
+ Ok((result, i, rest)) => {
+ // We got a sucessful finish before any recivers where canceled
+ let mut rest_mut = rest;
+ match ((*result.deref()).clone(), i) {
+ // Either receiver was completed with success.
+ (Ok(v), _) => v,
+ // Either receiver was completed with a valid error
+ // this should be fatal for now since it is not intended
+ // to be possible to recover from a uncaught error in a isolate
+ (Err(Some(err)), _) => show_compiler_error(err),
+ // local_receiver finished first with a none error. This is intended
+ // to catch when the local logic can't complete because it is unable
+ // to send and/or receive messages from the compiler worker.
+ // Due to the way that scheduling works it is very likely that the
+ // compiler worker future has already or will in the near future
+ // complete with a valid JSError or a None.
+ (Err(None), 1) => {
+ debug!("Compiler local exited with None error!");
+ // While technically possible to get stuck here indefinately
+ // in theory it is highly unlikely.
+ debug!(
+ "Waiting on compiler worker result specifier: {} referrer: {}!",
+ specifier, referrer
+ );
+ let worker_result =
+ (*rest_mut.remove(0).wait().unwrap().deref()).clone();
+ debug!(
+ "Finished waiting on worker result specifier: {} referrer: {}!",
+ specifier, referrer
+ );
+ match worker_result {
+ Err(Some(err)) => show_compiler_error(err),
+ Err(None) => panic!("Compiler exit for an unknown reason!"),
+ Ok(v) => v,
+ }
+ }
+ // While possible beccause the compiler worker can exit without error
+ // this shouldn't occurr normally and I don't intend to attempt to
+ // handle it right now
+ (_, i) => panic!("Odd compiler result for future {}!", i),
+ }
+ }
+ // This should always a result of a reciver being cancled
+ // in theory but why not give a print out just in case
+ Err((err, i, _)) => panic!("compile_sync {} failed: {}", i, err),
}
}
#[cfg(test)]
mod tests {
use super::*;
+ use crate::tokio_util;
#[test]
fn test_compile_sync() {
- let cwd = std::env::current_dir().unwrap();
- let cwd_string = cwd.to_str().unwrap().to_owned();
-
- let specifier = "./tests/002_hello.ts";
- let referrer = cwd_string + "/";
-
- let mut out = ModuleMetaData {
- module_name: "xxx".to_owned(),
- filename: "/tests/002_hello.ts".to_owned(),
- media_type: msg::MediaType::TypeScript,
- source_code: "console.log(\"Hello World\");".as_bytes().to_owned(),
- maybe_output_code_filename: None,
- maybe_output_code: None,
- maybe_source_map_filename: None,
- maybe_source_map: None,
- };
-
- out =
- compile_sync(Arc::new(IsolateState::mock()), specifier, &referrer, &out);
- assert!(
- out
- .maybe_output_code
- .unwrap()
- .starts_with("console.log(\"Hello World\");".as_bytes())
- );
+ tokio_util::init(|| {
+ let cwd = std::env::current_dir().unwrap();
+ let cwd_string = cwd.to_str().unwrap().to_owned();
+
+ let specifier = "./tests/002_hello.ts";
+ let referrer = cwd_string + "/";
+
+ let mut out = ModuleMetaData {
+ module_name: "xxx".to_owned(),
+ filename: "/tests/002_hello.ts".to_owned(),
+ media_type: msg::MediaType::TypeScript,
+ source_code: include_bytes!("../tests/002_hello.ts").to_vec(),
+ maybe_output_code_filename: None,
+ maybe_output_code: None,
+ maybe_source_map_filename: None,
+ maybe_source_map: None,
+ };
+
+ out = compile_sync(
+ Arc::new(IsolateState::mock()),
+ specifier,
+ &referrer,
+ &out,
+ );
+ assert!(
+ out
+ .maybe_output_code
+ .unwrap()
+ .starts_with("console.log(\"Hello World\");".as_bytes())
+ );
+ });
}
}
diff --git a/cli/errors.rs b/cli/errors.rs
index a2c3c3441..3873f70ff 100644
--- a/cli/errors.rs
+++ b/cli/errors.rs
@@ -180,7 +180,15 @@ pub fn permission_denied() -> DenoError {
}
pub fn op_not_implemented() -> DenoError {
- new(ErrorKind::BadResource, String::from("op not implemented"))
+ new(ErrorKind::OpNotAvaiable, String::from("op not implemented"))
+}
+
+pub fn worker_init_failed() -> DenoError {
+ // TODO(afinch7) pass worker error data through here
+ new(
+ ErrorKind::WorkerInitFailed,
+ String::from("worker init failed"),
+ )
}
#[derive(Debug)]
diff --git a/cli/isolate.rs b/cli/isolate.rs
index 15ad4e125..ced1cb792 100644
--- a/cli/isolate.rs
+++ b/cli/isolate.rs
@@ -226,7 +226,7 @@ mod tests {
let argv = vec![String::from("./deno"), filename.clone()];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
- let state = Arc::new(IsolateState::new(flags, rest_argv, None));
+ let state = Arc::new(IsolateState::new(flags, rest_argv, None, false));
let state_ = state.clone();
tokio_util::run(lazy(move || {
let cli = CliBehavior::new(None, state.clone());
@@ -249,7 +249,7 @@ mod tests {
let argv = vec![String::from("./deno"), filename.clone()];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
- let state = Arc::new(IsolateState::new(flags, rest_argv, None));
+ let state = Arc::new(IsolateState::new(flags, rest_argv, None, false));
let state_ = state.clone();
tokio_util::run(lazy(move || {
let cli = CliBehavior::new(None, state.clone());
diff --git a/cli/isolate_state.rs b/cli/isolate_state.rs
index 9f6749925..b0fb97f10 100644
--- a/cli/isolate_state.rs
+++ b/cli/isolate_state.rs
@@ -5,9 +5,14 @@ use crate::flags;
use crate::global_timer::GlobalTimer;
use crate::modules::Modules;
use crate::permissions::DenoPermissions;
+use crate::resources::ResourceId;
+use crate::workers::UserWorkerBehavior;
+use crate::workers::Worker;
use deno::Buf;
+use futures::future::Shared;
use futures::sync::mpsc as async_mpsc;
use std;
+use std::collections::HashMap;
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -16,6 +21,8 @@ use std::sync::Mutex;
pub type WorkerSender = async_mpsc::Sender<Buf>;
pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
pub type WorkerChannels = (WorkerSender, WorkerReceiver);
+pub type UserWorkerTable =
+ HashMap<ResourceId, Shared<Worker<UserWorkerBehavior>>>;
// AtomicU64 is currently unstable
#[derive(Default)]
@@ -42,6 +49,8 @@ pub struct IsolateState {
pub modules: Mutex<Modules>,
pub worker_channels: Option<Mutex<WorkerChannels>>,
pub global_timer: Mutex<GlobalTimer>,
+ pub workers: Mutex<UserWorkerTable>,
+ pub is_worker: bool,
}
impl IsolateState {
@@ -49,6 +58,7 @@ impl IsolateState {
flags: flags::DenoFlags,
argv_rest: Vec<String>,
worker_channels: Option<WorkerChannels>,
+ is_worker: bool,
) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();
@@ -61,9 +71,12 @@ impl IsolateState {
modules: Mutex::new(Modules::new()),
worker_channels: worker_channels.map(Mutex::new),
global_timer: Mutex::new(GlobalTimer::new()),
+ workers: Mutex::new(UserWorkerTable::new()),
+ is_worker,
}
}
+ /// Read main module from argv
pub fn main_module(&self) -> Option<String> {
if self.argv.len() <= 1 {
None
@@ -110,7 +123,7 @@ impl IsolateState {
let argv = vec![String::from("./deno"), String::from("hello.js")];
// For debugging: argv.push_back(String::from("-D"));
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
- IsolateState::new(flags, rest_argv, None)
+ IsolateState::new(flags, rest_argv, None, false)
}
pub fn metrics_op_dispatched(
diff --git a/cli/main.rs b/cli/main.rs
index 12f94650d..f9a88803e 100644
--- a/cli/main.rs
+++ b/cli/main.rs
@@ -108,7 +108,7 @@ fn main() {
let should_prefetch = flags.prefetch || flags.info;
let should_display_info = flags.info;
- let state = Arc::new(IsolateState::new(flags, rest_argv, None));
+ let state = Arc::new(IsolateState::new(flags, rest_argv, None, false));
let state_ = state.clone();
let startup_data = startup_data::deno_isolate_init();
let cli = CliBehavior::new(Some(startup_data), state_);
diff --git a/cli/msg.fbs b/cli/msg.fbs
index 45f940f7d..695515f55 100644
--- a/cli/msg.fbs
+++ b/cli/msg.fbs
@@ -63,6 +63,12 @@ union Any {
StatRes,
Symlink,
Truncate,
+ CreateWorker,
+ CreateWorkerRes,
+ HostGetWorkerClosed,
+ HostGetMessage,
+ HostGetMessageRes,
+ HostPostMessage,
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
@@ -121,6 +127,8 @@ enum ErrorKind: byte {
// custom errors
InvalidUri,
InvalidSeekMode,
+ OpNotAvaiable,
+ WorkerInitFailed
}
table Cwd {}
@@ -171,6 +179,35 @@ table FormatErrorRes {
error: string;
}
+// Create worker as host
+table CreateWorker {
+ specifier: string;
+}
+
+table CreateWorkerRes {
+ rid: uint32;
+}
+
+table HostGetWorkerClosed {
+ rid: uint32;
+}
+
+// Get message from guest worker as host
+table HostGetMessage {
+ rid: uint32;
+}
+
+table HostGetMessageRes {
+ data: [ubyte];
+}
+
+// Post message to guest worker as host
+table HostPostMessage {
+ rid: uint32;
+ // data passed thru the zero-copy data parameter.
+}
+
+// Get message from host as guest worker
table WorkerGetMessage {
unused: int8;
}
@@ -179,6 +216,7 @@ table WorkerGetMessageRes {
data: [ubyte];
}
+// Post message to host as guest worker
table WorkerPostMessage {
// data passed thru the zero-copy data parameter.
}
diff --git a/cli/ops.rs b/cli/ops.rs
index a7c2e868f..c8119771d 100644
--- a/cli/ops.rs
+++ b/cli/ops.rs
@@ -18,6 +18,7 @@ use crate::resources::Resource;
use crate::tokio_util;
use crate::tokio_write;
use crate::version;
+use crate::workers;
use deno::deno_buf;
use deno::Buf;
use deno::JSError;
@@ -141,13 +142,24 @@ pub fn dispatch_all(
(base.sync(), boxed_op)
}
+/// Superset of op_selector_worker for compiler isolates
pub fn op_selector_compiler(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data),
+ _ => op_selector_worker(inner_type),
+ }
+}
+
+/// Superset of op_selector_std for worker isolates
+pub fn op_selector_worker(inner_type: msg::Any) -> Option<OpCreator> {
+ match inner_type {
+ msg::Any::WorkerGetMessage => Some(op_worker_get_message),
+ msg::Any::WorkerPostMessage => Some(op_worker_post_message),
_ => op_selector_std(inner_type),
}
}
+/// Standard ops set for most isolates
pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
msg::Any::Accept => Some(op_accept),
@@ -189,8 +201,10 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
msg::Any::Stat => Some(op_stat),
msg::Any::Symlink => Some(op_symlink),
msg::Any::Truncate => Some(op_truncate),
- msg::Any::WorkerGetMessage => Some(op_worker_get_message),
- msg::Any::WorkerPostMessage => Some(op_worker_post_message),
+ msg::Any::CreateWorker => Some(op_create_worker),
+ msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed),
+ msg::Any::HostGetMessage => Some(op_host_get_message),
+ msg::Any::HostPostMessage => Some(op_host_post_message),
msg::Any::Write => Some(op_write),
_ => None,
}
@@ -1741,6 +1755,7 @@ impl Future for GetMessageFuture {
}
}
+/// Get message from host as guest worker
fn op_worker_get_message(
sc: &IsolateStateContainer,
base: &msg::Base<'_>,
@@ -1775,6 +1790,7 @@ fn op_worker_get_message(
Box::new(op)
}
+/// Post message to host as guest worker
fn op_worker_post_message(
sc: &IsolateStateContainer,
base: &msg::Base<'_>,
@@ -1807,3 +1823,143 @@ fn op_worker_post_message(
});
Box::new(op)
}
+
+/// Create worker as the host
+fn op_create_worker(
+ sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_create_worker().unwrap();
+ let specifier = inner.specifier().unwrap();
+
+ Box::new(futures::future::result(move || -> OpResult {
+ let parent_state = sc.state().clone();
+ let behavior = workers::UserWorkerBehavior::new(
+ parent_state.flags.clone(),
+ parent_state.argv.clone(),
+ );
+ match workers::spawn(
+ behavior,
+ &format!("USER-WORKER-{}", specifier),
+ workers::WorkerInit::Module(specifier.to_string()),
+ ) {
+ Ok(worker) => {
+ let mut workers_tl = parent_state.workers.lock().unwrap();
+ let rid = worker.resource.rid.clone();
+ workers_tl.insert(rid, worker.shared());
+ let builder = &mut FlatBufferBuilder::new();
+ let msg_inner = msg::CreateWorkerRes::create(
+ builder,
+ &msg::CreateWorkerResArgs { rid },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::CreateWorkerRes,
+ ..Default::default()
+ },
+ ))
+ }
+ Err(errors::RustOrJsError::Js(_)) => Err(errors::worker_init_failed()),
+ Err(errors::RustOrJsError::Rust(err)) => Err(err),
+ }
+ }()))
+}
+
+/// Return when the worker closes
+fn op_host_get_worker_closed(
+ sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_get_worker_closed().unwrap();
+ let rid = inner.rid();
+ let state = sc.state().clone();
+
+ let shared_worker_future = {
+ let workers_tl = state.workers.lock().unwrap();
+ let worker = workers_tl.get(&rid).unwrap();
+ worker.clone()
+ };
+
+ Box::new(shared_worker_future.then(move |_result| {
+ let builder = &mut FlatBufferBuilder::new();
+
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ }))
+}
+
+/// Get message from guest worker as host
+fn op_host_get_message(
+ _sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ assert_eq!(data.len(), 0);
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_get_message().unwrap();
+ let rid = inner.rid();
+
+ let op = resources::get_message_from_worker(rid);
+ let op = op.map_err(move |_| -> DenoError { unimplemented!() });
+ let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
+
+ let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
+ let msg_inner = msg::HostGetMessageRes::create(
+ builder,
+ &msg::HostGetMessageResArgs { data },
+ );
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ inner: Some(msg_inner.as_union_value()),
+ inner_type: msg::Any::HostGetMessageRes,
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
+}
+
+/// Post message to guest worker as host
+fn op_host_post_message(
+ _sc: &IsolateStateContainer,
+ base: &msg::Base<'_>,
+ data: deno_buf,
+) -> Box<OpWithError> {
+ let cmd_id = base.cmd_id();
+ let inner = base.inner_as_host_post_message().unwrap();
+ let rid = inner.rid();
+
+ let d = Vec::from(data.as_ref()).into_boxed_slice();
+
+ let op = resources::post_message_to_worker(rid, d);
+ let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
+ let op = op.and_then(move |_| -> DenoResult<Buf> {
+ let builder = &mut FlatBufferBuilder::new();
+
+ Ok(serialize_response(
+ cmd_id,
+ builder,
+ msg::BaseArgs {
+ ..Default::default()
+ },
+ ))
+ });
+ Box::new(op)
+}
diff --git a/cli/resources.rs b/cli/resources.rs
index 5b08e4b32..817f6062d 100644
--- a/cli/resources.rs
+++ b/cli/resources.rs
@@ -305,7 +305,8 @@ pub fn add_worker(wc: WorkerChannels) -> Resource {
Resource { rid }
}
-pub fn worker_post_message(
+/// Post message to worker as a host or privilged overlord
+pub fn post_message_to_worker(
rid: ResourceId,
buf: Buf,
) -> futures::sink::Send<futures::sync::mpsc::Sender<Buf>> {
@@ -341,7 +342,7 @@ impl Future for WorkerReceiver {
}
}
-pub fn worker_recv_message(rid: ResourceId) -> WorkerReceiver {
+pub fn get_message_from_worker(rid: ResourceId) -> WorkerReceiver {
WorkerReceiver { rid }
}
diff --git a/cli/startup_data.rs b/cli/startup_data.rs
index 7f59c0678..61891ced4 100644
--- a/cli/startup_data.rs
+++ b/cli/startup_data.rs
@@ -31,7 +31,7 @@ pub fn deno_isolate_init() -> StartupData {
pub fn compiler_isolate_init() -> StartupData {
if cfg!(feature = "no-snapshot-init") {
- debug!("Deno isolate init without snapshots.");
+ debug!("Compiler isolate init without snapshots.");
#[cfg(not(feature = "check-only"))]
let source_bytes = include_bytes!(concat!(
env!("GN_OUT_DIR"),
diff --git a/cli/workers.rs b/cli/workers.rs
index 0c8d49fa7..cb919e8ed 100644
--- a/cli/workers.rs
+++ b/cli/workers.rs
@@ -1,17 +1,72 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use crate::errors::*;
+use crate::flags::DenoFlags;
use crate::isolate::{DenoBehavior, Isolate};
+use crate::isolate_state::IsolateState;
+use crate::isolate_state::IsolateStateContainer;
use crate::isolate_state::WorkerChannels;
-use crate::js_errors::JSErrorColor;
+use crate::ops;
use crate::resources;
-use crate::tokio_util;
+use crate::startup_data;
+use deno::deno_buf;
+use deno::Behavior;
use deno::Buf;
use deno::JSError;
-use futures::future::lazy;
+use deno::Op;
+use deno::StartupData;
use futures::sync::mpsc;
-use futures::sync::oneshot;
use futures::Future;
use futures::Poll;
-use std::thread;
+use std::sync::Arc;
+
+pub struct UserWorkerBehavior {
+ pub state: Arc<IsolateState>,
+}
+
+impl UserWorkerBehavior {
+ pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self {
+ Self {
+ state: Arc::new(IsolateState::new(flags, argv_rest, None, true)),
+ }
+ }
+}
+
+impl IsolateStateContainer for UserWorkerBehavior {
+ fn state(&self) -> Arc<IsolateState> {
+ self.state.clone()
+ }
+}
+
+impl IsolateStateContainer for &UserWorkerBehavior {
+ fn state(&self) -> Arc<IsolateState> {
+ self.state.clone()
+ }
+}
+
+impl Behavior for UserWorkerBehavior {
+ fn startup_data(&mut self) -> Option<StartupData> {
+ Some(startup_data::deno_isolate_init())
+ }
+
+ fn dispatch(
+ &mut self,
+ control: &[u8],
+ zero_copy: deno_buf,
+ ) -> (bool, Box<Op>) {
+ ops::dispatch_all(self, control, zero_copy, ops::op_selector_worker)
+ }
+}
+
+impl WorkerBehavior for UserWorkerBehavior {
+ fn set_internal_channels(&mut self, worker_channels: WorkerChannels) {
+ self.state = Arc::new(IsolateState::new(
+ self.state.flags.clone(),
+ self.state.argv.clone(),
+ Some(worker_channels),
+ true,
+ ));
+ }
+}
/// Behavior trait specific to workers
pub trait WorkerBehavior: DenoBehavior {
@@ -24,10 +79,11 @@ pub trait WorkerBehavior: DenoBehavior {
/// Rust interface for WebWorkers.
pub struct Worker<B: WorkerBehavior> {
isolate: Isolate<B>,
+ pub resource: resources::Resource,
}
impl<B: WorkerBehavior> Worker<B> {
- pub fn new(mut behavior: B) -> (Self, WorkerChannels) {
+ pub fn new(mut behavior: B) -> Self {
let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1);
let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1);
@@ -38,13 +94,23 @@ impl<B: WorkerBehavior> Worker<B> {
let isolate = Isolate::new(behavior);
- let worker = Worker { isolate };
- (worker, external_channels)
+ Worker {
+ isolate,
+ resource: resources::add_worker(external_channels),
+ }
}
pub fn execute(&mut self, js_source: &str) -> Result<(), JSError> {
self.isolate.execute(js_source)
}
+
+ pub fn execute_mod(
+ &mut self,
+ js_filename: &str,
+ is_prefetch: bool,
+ ) -> Result<(), RustOrJsError> {
+ self.isolate.execute_mod(js_filename, is_prefetch)
+ }
}
impl<B: WorkerBehavior> Future for Worker<B> {
@@ -56,47 +122,48 @@ impl<B: WorkerBehavior> Future for Worker<B> {
}
}
+/// Method and data used to initalize a worker
+pub enum WorkerInit {
+ Script(String),
+ Module(String),
+}
+
pub fn spawn<B: WorkerBehavior + 'static>(
behavior: B,
- js_source: String,
-) -> resources::Resource {
- // TODO This function should return a Future, so that the caller can retrieve
- // the JSError if one is thrown. Currently it just prints to stderr and calls
- // exit(1).
- // let (js_error_tx, js_error_rx) = oneshot::channel::<JSError>();
- let (p, c) = oneshot::channel::<resources::Resource>();
- let builder = thread::Builder::new().name("worker".to_string());
-
- let _tid = builder
- .spawn(move || {
- tokio_util::run(lazy(move || {
- let (mut worker, external_channels) = Worker::new(behavior);
- let resource = resources::add_worker(external_channels);
- p.send(resource.clone()).unwrap();
-
- worker
- .execute("denoMain()")
- .expect("worker denoMain failed");
- worker
- .execute("workerMain()")
- .expect("worker workerMain failed");
- worker.execute(&js_source).expect("worker js_source failed");
+ worker_debug_name: &str,
+ init: WorkerInit,
+) -> Result<Worker<B>, RustOrJsError> {
+ let state = behavior.state().clone();
+ let mut worker = Worker::new(behavior);
- worker.then(move |r| -> Result<(), ()> {
- resource.close();
- debug!("workers.rs after resource close");
- if let Err(err) = r {
- eprintln!("{}", JSErrorColor(&err).to_string());
- std::process::exit(1);
- }
- Ok(())
- })
- }));
+ worker
+ .execute(&format!("denoMain('{}')", worker_debug_name))
+ .expect("worker workerInit failed");
+
+ worker
+ .execute("workerMain()")
+ .expect("worker workerMain failed");
- debug!("workers.rs after spawn");
- }).unwrap();
+ let init_result = match init {
+ WorkerInit::Script(script) => match worker.execute(&script) {
+ Ok(v) => Ok(v),
+ Err(e) => Err(RustOrJsError::Js(e)),
+ },
+ WorkerInit::Module(specifier) => {
+ let should_prefetch = state.flags.prefetch || state.flags.info;
+ match state.dir.resolve_module_url(&specifier, ".") {
+ Err(err) => Err(RustOrJsError::Rust(DenoError::from(err))),
+ Ok(module_url) => {
+ worker.execute_mod(&module_url.to_string(), should_prefetch)
+ }
+ }
+ }
+ };
- c.wait().unwrap()
+ match init_result {
+ Ok(_) => Ok(worker),
+ Err(err) => Err(err),
+ }
}
#[cfg(test)]
@@ -104,63 +171,117 @@ mod tests {
use super::*;
use crate::compiler::CompilerBehavior;
use crate::isolate_state::IsolateState;
- use std::sync::Arc;
+ use crate::js_errors::JSErrorColor;
+ use crate::tokio_util;
+ use futures::future::lazy;
+ use std::thread;
#[test]
fn test_spawn() {
- let resource = spawn(
- CompilerBehavior::new(Arc::new(IsolateState::mock())),
- r#"
- onmessage = function(e) {
- let s = new TextDecoder().decode(e.data);;
- console.log("msg from main script", s);
- if (s == "exit") {
- close();
- return;
- } else {
- console.assert(s === "hi");
+ tokio_util::init(|| {
+ let worker_result = spawn(
+ CompilerBehavior::new(
+ IsolateState::mock().flags.clone(),
+ IsolateState::mock().argv.clone(),
+ ),
+ "TEST",
+ WorkerInit::Script(
+ r#"
+ onmessage = function(e) {
+ console.log("msg from main script", e.data);
+ if (e.data == "exit") {
+ close();
+ return;
+ } else {
+ console.assert(e.data === "hi");
+ }
+ postMessage([1, 2, 3]);
+ console.log("after postMessage");
}
- postMessage(new Uint8Array([1, 2, 3]));
- console.log("after postMessage");
- }
- "#.into(),
- );
- let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
+ "#.into(),
+ ),
+ );
+ assert!(worker_result.is_ok());
+ let worker = worker_result.unwrap();
+ let resource = worker.resource.clone();
+ let resource_ = resource.clone();
- let r = resources::worker_post_message(resource.rid, msg).wait();
- assert!(r.is_ok());
+ tokio::spawn(lazy(move || {
+ worker.then(move |r| -> Result<(), ()> {
+ resource_.close();
+ debug!("workers.rs after resource close");
+ if let Err(err) = r {
+ eprintln!("{}", JSErrorColor(&err).to_string());
+ assert!(false)
+ }
+ Ok(())
+ })
+ }));
- let maybe_msg =
- resources::worker_recv_message(resource.rid).wait().unwrap();
- assert!(maybe_msg.is_some());
- assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]);
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let msg = String::from("exit").into_boxed_str().into_boxed_bytes();
- let r = resources::worker_post_message(resource.rid, msg).wait();
- assert!(r.is_ok());
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+
+ let maybe_msg = resources::get_message_from_worker(resource.rid)
+ .wait()
+ .unwrap();
+ assert!(maybe_msg.is_some());
+ // Check if message received is [1, 2, 3] in json
+ assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
+
+ let msg = json!("exit")
+ .to_string()
+ .into_boxed_str()
+ .into_boxed_bytes();
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ })
}
#[test]
fn removed_from_resource_table_on_close() {
- let resource = spawn(
- CompilerBehavior::new(Arc::new(IsolateState::mock())),
- "onmessage = () => close();".into(),
- );
-
- assert_eq!(
- resources::get_type(resource.rid),
- Some("worker".to_string())
- );
-
- let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
- let r = resources::worker_post_message(resource.rid, msg).wait();
- assert!(r.is_ok());
- println!("rid {:?}", resource.rid);
-
- // TODO Need a way to get a future for when a resource closes.
- // For now, just sleep for a bit.
- // resource.close();
- thread::sleep(std::time::Duration::from_millis(1000));
- assert_eq!(resources::get_type(resource.rid), None);
+ tokio_util::init(|| {
+ let worker_result = spawn(
+ CompilerBehavior::new(
+ IsolateState::mock().flags.clone(),
+ IsolateState::mock().argv.clone(),
+ ),
+ "TEST",
+ WorkerInit::Script("onmessage = () => close();".into()),
+ );
+ assert!(worker_result.is_ok());
+ let worker = worker_result.unwrap();
+ let resource = worker.resource.clone();
+ let resource_ = resource.clone();
+
+ tokio::spawn(lazy(move || {
+ worker.then(move |r| -> Result<(), ()> {
+ resource_.close();
+ debug!("workers.rs after resource close");
+ if let Err(err) = r {
+ eprintln!("{}", JSErrorColor(&err).to_string());
+ assert!(false)
+ }
+ Ok(())
+ })
+ }));
+
+ assert_eq!(
+ resources::get_type(resource.rid),
+ Some("worker".to_string())
+ );
+
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ println!("rid {:?}", resource.rid);
+
+ // TODO Need a way to get a future for when a resource closes.
+ // For now, just sleep for a bit.
+ // resource.close();
+ thread::sleep(std::time::Duration::from_millis(1000));
+ assert_eq!(resources::get_type(resource.rid), None);
+ })
}
}
diff --git a/core/js_errors.rs b/core/js_errors.rs
index e8fb0701c..ee3272baf 100644
--- a/core/js_errors.rs
+++ b/core/js_errors.rs
@@ -13,7 +13,7 @@ use serde_json;
use std::fmt;
use std::str;
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Clone)]
pub struct StackFrame {
pub line: i64, // zero indexed
pub column: i64, // zero indexed
@@ -24,7 +24,7 @@ pub struct StackFrame {
pub is_wasm: bool,
}
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Clone)]
pub struct JSError {
pub message: String,
diff --git a/js/assets.ts b/js/assets.ts
index 436906783..b3a6e00db 100644
--- a/js/assets.ts
+++ b/js/assets.ts
@@ -44,12 +44,8 @@ import libEsnextDts from "/third_party/node_modules/typescript/lib/lib.esnext.d.
import libEsnextIntlDts from "/third_party/node_modules/typescript/lib/lib.esnext.intl.d.ts!string";
import libEsnextSymbolDts from "/third_party/node_modules/typescript/lib/lib.esnext.symbol.d.ts!string";
-// @internal
-export const assetSourceCode: { [key: string]: string } = {
- // Generated library
- "lib.deno_runtime.d.ts": libDts,
-
- // Static libraries
+// Default static libraries for all compile jobs
+const defaultAssets: { [key: string]: string } = {
"lib.es2015.collection.d.ts": libEs2015CollectionDts,
"lib.es2015.core.d.ts": libEs2015CoreDts,
"lib.es2015.d.ts": libEs2015Dts,
@@ -85,3 +81,11 @@ export const assetSourceCode: { [key: string]: string } = {
"lib.esnext.intl.d.ts": libEsnextIntlDts,
"lib.esnext.symbol.d.ts": libEsnextSymbolDts
};
+
+// assests for normal compile jobs
+// @internal
+export const assetSourceCode: { [key: string]: string } = {
+ // Generated library
+ "lib.deno_runtime.d.ts": libDts,
+ ...defaultAssets
+};
diff --git a/js/compiler.ts b/js/compiler.ts
index 0f7070fd2..72ac391ea 100644
--- a/js/compiler.ts
+++ b/js/compiler.ts
@@ -46,6 +46,7 @@ type SourceMap = string;
interface CompilerLookup {
specifier: ModuleSpecifier;
referrer: ContainingFile;
+ isWorker: boolean;
}
/** Abstraction of the APIs required from the `os` module so they can be
@@ -179,6 +180,8 @@ class Compiler implements ts.LanguageServiceHost, ts.FormatDiagnosticsHost {
// testing
private _ts: Ts = ts;
+ private readonly _assetsSourceCode: { [key: string]: string };
+
/** The TypeScript language service often refers to the resolved fileName of
* a module, this is a shortcut to avoid unnecessary module resolution logic
* for modules that may have been initially resolved by a `moduleSpecifier`
@@ -239,9 +242,12 @@ class Compiler implements ts.LanguageServiceHost, ts.FormatDiagnosticsHost {
// not null assertion
moduleId = moduleSpecifier.split("/").pop()!;
const assetName = moduleId.includes(".") ? moduleId : `${moduleId}.d.ts`;
- assert(assetName in assetSourceCode, `No such asset "${assetName}"`);
+ assert(
+ assetName in this._assetsSourceCode,
+ `No such asset "${assetName}"`
+ );
mediaType = msg.MediaType.TypeScript;
- sourceCode = assetSourceCode[assetName];
+ sourceCode = this._assetsSourceCode[assetName];
fileName = `${ASSETS}/${assetName}`;
} else {
// We query Rust with a CodeFetch message. It will load the sourceCode,
@@ -299,7 +305,8 @@ class Compiler implements ts.LanguageServiceHost, ts.FormatDiagnosticsHost {
innerMap.set(moduleSpecifier, fileName);
}
- constructor() {
+ constructor(assetsSourceCode: { [key: string]: string }) {
+ this._assetsSourceCode = assetsSourceCode;
this._service = this._ts.createLanguageService(this);
}
@@ -498,7 +505,7 @@ class Compiler implements ts.LanguageServiceHost, ts.FormatDiagnosticsHost {
}
}
-const compiler = new Compiler();
+const compiler = new Compiler(assetSourceCode);
// set global objects for compiler web worker
window.clearTimeout = clearTimer;
@@ -514,17 +521,12 @@ window.TextEncoder = TextEncoder;
// lazy instantiating the compiler web worker
window.compilerMain = function compilerMain() {
// workerMain should have already been called since a compiler is a worker.
- const encoder = new TextEncoder();
- const decoder = new TextDecoder();
- window.onmessage = ({ data }: { data: Uint8Array }) => {
- const json = decoder.decode(data);
- const { specifier, referrer } = JSON.parse(json) as CompilerLookup;
+ window.onmessage = ({ data }: { data: CompilerLookup }) => {
+ const { specifier, referrer } = data;
const result = compiler.compile(specifier, referrer);
- const responseJson = JSON.stringify(result);
- const response = encoder.encode(responseJson);
- postMessage(response);
+ postMessage(result);
};
};
diff --git a/js/globals.ts b/js/globals.ts
index 5a0fb18ce..56956b4ad 100644
--- a/js/globals.ts
+++ b/js/globals.ts
@@ -102,7 +102,16 @@ export type TextDecoder = textEncoding.TextDecoder;
window.performance = new performanceUtil.Performance();
+// This variable functioning correctly depends on `declareAsLet`
+// in //tools/ts_library_builder/main.ts
+window.onmessage = workers.onmessage;
+
window.workerMain = workers.workerMain;
+window.workerClose = workers.workerClose;
+window.postMessage = workers.postMessage;
+
+window.Worker = workers.WorkerImpl;
+export type Worker = workers.Worker;
// below are interfaces that are available in TypeScript but
// have different signatures
diff --git a/js/main.ts b/js/main.ts
index e7f7e284e..c32f3ac9e 100644
--- a/js/main.ts
+++ b/js/main.ts
@@ -18,8 +18,8 @@ import * as deno from "./deno";
// TODO(kitsonk) remove with `--types` below
import libDts from "gen/cli/lib/lib.deno_runtime.d.ts!string";
-export default function denoMain(): void {
- const startResMsg = os.start();
+export default function denoMain(name?: string): void {
+ const startResMsg = os.start(name);
setVersions(startResMsg.denoVersion()!, startResMsg.v8Version()!);
diff --git a/js/workers.ts b/js/workers.ts
index bdfbed640..601ffa0b1 100644
--- a/js/workers.ts
+++ b/js/workers.ts
@@ -1,33 +1,110 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-import * as dispatch from "./dispatch";
+/* eslint-disable @typescript-eslint/no-explicit-any */
+import { sendAsync, sendSync } from "./dispatch";
import * as msg from "gen/cli/msg_generated";
import * as flatbuffers from "./flatbuffers";
import { assert, log } from "./util";
+import { TextDecoder, TextEncoder } from "./text_encoding";
import { window } from "./window";
-export async function postMessage(data: Uint8Array): Promise<void> {
+const encoder = new TextEncoder();
+const decoder = new TextDecoder();
+
+export function encodeMessage(data: any): Uint8Array {
+ const dataJson = JSON.stringify(data);
+ return encoder.encode(dataJson);
+}
+
+export function decodeMessage(dataIntArray: Uint8Array): any {
+ const dataJson = decoder.decode(dataIntArray);
+ return JSON.parse(dataJson);
+}
+
+function createWorker(specifier: string): number {
+ const builder = flatbuffers.createBuilder();
+ const specifier_ = builder.createString(specifier);
+ msg.CreateWorker.startCreateWorker(builder);
+ msg.CreateWorker.addSpecifier(builder, specifier_);
+ const inner = msg.CreateWorker.endCreateWorker(builder);
+ const baseRes = sendSync(builder, msg.Any.CreateWorker, inner);
+ assert(baseRes != null);
+ assert(
+ msg.Any.CreateWorkerRes === baseRes!.innerType(),
+ `base.innerType() unexpectedly is ${baseRes!.innerType()}`
+ );
+ const res = new msg.CreateWorkerRes();
+ assert(baseRes!.inner(res) != null);
+ return res.rid();
+}
+
+async function hostGetWorkerClosed(rid: number): Promise<void> {
+ const builder = flatbuffers.createBuilder();
+ msg.HostGetWorkerClosed.startHostGetWorkerClosed(builder);
+ msg.HostGetWorkerClosed.addRid(builder, rid);
+ const inner = msg.HostGetWorkerClosed.endHostGetWorkerClosed(builder);
+ await sendAsync(builder, msg.Any.HostGetWorkerClosed, inner);
+}
+
+function hostPostMessage(rid: number, data: any): void {
+ const dataIntArray = encodeMessage(data);
+ const builder = flatbuffers.createBuilder();
+ msg.HostPostMessage.startHostPostMessage(builder);
+ msg.HostPostMessage.addRid(builder, rid);
+ const inner = msg.HostPostMessage.endHostPostMessage(builder);
+ const baseRes = sendSync(
+ builder,
+ msg.Any.HostPostMessage,
+ inner,
+ dataIntArray
+ );
+ assert(baseRes != null);
+}
+
+async function hostGetMessage(rid: number): Promise<any> {
+ const builder = flatbuffers.createBuilder();
+ msg.HostGetMessage.startHostGetMessage(builder);
+ msg.HostGetMessage.addRid(builder, rid);
+ const inner = msg.HostGetMessage.endHostGetMessage(builder);
+ const baseRes = await sendAsync(builder, msg.Any.HostGetMessage, inner);
+ assert(baseRes != null);
+ assert(
+ msg.Any.HostGetMessageRes === baseRes!.innerType(),
+ `base.innerType() unexpectedly is ${baseRes!.innerType()}`
+ );
+ const res = new msg.HostGetMessageRes();
+ assert(baseRes!.inner(res) != null);
+
+ const dataArray = res.dataArray();
+ if (dataArray != null) {
+ return decodeMessage(dataArray);
+ } else {
+ return null;
+ }
+}
+
+// Stuff for workers
+export let onmessage: (e: { data: any }) => void = (): void => {};
+
+export function postMessage(data: any): void {
+ const dataIntArray = encodeMessage(data);
const builder = flatbuffers.createBuilder();
msg.WorkerPostMessage.startWorkerPostMessage(builder);
const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder);
- const baseRes = await dispatch.sendAsync(
+ const baseRes = sendSync(
builder,
msg.Any.WorkerPostMessage,
inner,
- data
+ dataIntArray
);
assert(baseRes != null);
}
-export async function getMessage(): Promise<null | Uint8Array> {
+export async function getMessage(): Promise<any> {
log("getMessage");
const builder = flatbuffers.createBuilder();
msg.WorkerGetMessage.startWorkerGetMessage(builder);
const inner = msg.WorkerGetMessage.endWorkerGetMessage(builder);
- const baseRes = await dispatch.sendAsync(
- builder,
- msg.Any.WorkerGetMessage,
- inner
- );
+ const baseRes = await sendAsync(builder, msg.Any.WorkerGetMessage, inner);
assert(baseRes != null);
assert(
msg.Any.WorkerGetMessageRes === baseRes!.innerType(),
@@ -37,14 +114,14 @@ export async function getMessage(): Promise<null | Uint8Array> {
assert(baseRes!.inner(res) != null);
const dataArray = res.dataArray();
- if (dataArray == null) {
- return null;
+ if (dataArray != null) {
+ return decodeMessage(dataArray);
} else {
- return new Uint8Array(dataArray!);
+ return null;
}
}
-let isClosing = false;
+export let isClosing = false;
export function workerClose(): void {
isClosing = true;
@@ -67,3 +144,45 @@ export async function workerMain(): Promise<void> {
}
}
}
+
+export interface Worker {
+ onerror?: () => void;
+ onmessage?: (e: { data: any }) => void;
+ onmessageerror?: () => void;
+ postMessage(data: any): void;
+}
+
+export class WorkerImpl implements Worker {
+ private readonly rid: number;
+ private isClosing: boolean = false;
+ public onerror?: () => void;
+ public onmessage?: (data: any) => void;
+ public onmessageerror?: () => void;
+
+ constructor(specifier: string) {
+ this.rid = createWorker(specifier);
+ this.run();
+ hostGetWorkerClosed(this.rid).then(() => {
+ this.isClosing = true;
+ });
+ }
+
+ postMessage(data: any): void {
+ hostPostMessage(this.rid, data);
+ }
+
+ private async run(): Promise<void> {
+ while (!this.isClosing) {
+ const data = await hostGetMessage(this.rid);
+ if (data == null) {
+ log("worker got null message. quitting.");
+ break;
+ }
+ // TODO(afinch7) stop this from eating messages before onmessage has been assigned
+ if (this.onmessage) {
+ const event = { data };
+ this.onmessage(event);
+ }
+ }
+ }
+}
diff --git a/tests/026_workers.test b/tests/026_workers.test
new file mode 100644
index 000000000..1c5b6f4e6
--- /dev/null
+++ b/tests/026_workers.test
@@ -0,0 +1,2 @@
+args: tests/026_workers.ts --reload
+output: tests/026_workers.ts.out \ No newline at end of file
diff --git a/tests/026_workers.ts b/tests/026_workers.ts
new file mode 100644
index 000000000..0cf8f53b1
--- /dev/null
+++ b/tests/026_workers.ts
@@ -0,0 +1,14 @@
+const jsWorker = new Worker("tests/subdir/test_worker.js");
+const tsWorker = new Worker("tests/subdir/test_worker.ts");
+
+tsWorker.onmessage = e => {
+ console.log("Received ts: " + e.data);
+};
+
+jsWorker.onmessage = e => {
+ console.log("Received js: " + e.data);
+
+ tsWorker.postMessage("Hello World");
+};
+
+jsWorker.postMessage("Hello World");
diff --git a/tests/026_workers.ts.out b/tests/026_workers.ts.out
new file mode 100644
index 000000000..7538cc867
--- /dev/null
+++ b/tests/026_workers.ts.out
@@ -0,0 +1,4 @@
+Hello World
+Received js: Hello World
+Hello World
+Received ts: Hello World
diff --git a/tests/error_004_missing_module.disabled b/tests/error_004_missing_module
index b94c86004..b94c86004 100644
--- a/tests/error_004_missing_module.disabled
+++ b/tests/error_004_missing_module
diff --git a/tests/error_005_missing_dynamic_import.disabled b/tests/error_005_missing_dynamic_import
index 2acf467c1..2acf467c1 100644
--- a/tests/error_005_missing_dynamic_import.disabled
+++ b/tests/error_005_missing_dynamic_import
diff --git a/tests/error_006_import_ext_failure.disabled b/tests/error_006_import_ext_failure
index 5fe245739..5fe245739 100644
--- a/tests/error_006_import_ext_failure.disabled
+++ b/tests/error_006_import_ext_failure
diff --git a/tests/subdir/test_worker.js b/tests/subdir/test_worker.js
new file mode 100644
index 000000000..53d38ba96
--- /dev/null
+++ b/tests/subdir/test_worker.js
@@ -0,0 +1,7 @@
+onmessage = function(e) {
+ console.log(e.data);
+
+ postMessage(e.data);
+
+ workerClose();
+};
diff --git a/tests/subdir/test_worker.ts b/tests/subdir/test_worker.ts
new file mode 100644
index 000000000..53d38ba96
--- /dev/null
+++ b/tests/subdir/test_worker.ts
@@ -0,0 +1,7 @@
+onmessage = function(e) {
+ console.log(e.data);
+
+ postMessage(e.data);
+
+ workerClose();
+};
diff --git a/tools/ts_library_builder/ast_util.ts b/tools/ts_library_builder/ast_util.ts
index d195da721..14546f9c5 100644
--- a/tools/ts_library_builder/ast_util.ts
+++ b/tools/ts_library_builder/ast_util.ts
@@ -94,11 +94,14 @@ export function addVariableDeclaration(
node: StatementedNode,
name: string,
type: string,
+ isConst: boolean,
hasDeclareKeyword?: boolean,
jsdocs?: JSDoc[]
): VariableStatement {
return node.addVariableStatement({
- declarationKind: VariableDeclarationKind.Const,
+ declarationKind: isConst
+ ? VariableDeclarationKind.Const
+ : VariableDeclarationKind.Let,
declarations: [{ name, type }],
docs: jsdocs && jsdocs.map(jsdoc => jsdoc.getText()),
hasDeclareKeyword
diff --git a/tools/ts_library_builder/build_library.ts b/tools/ts_library_builder/build_library.ts
index 2045c288f..22268f1dc 100644
--- a/tools/ts_library_builder/build_library.ts
+++ b/tools/ts_library_builder/build_library.ts
@@ -57,6 +57,16 @@ export interface BuildLibraryOptions {
inputs?: string[];
/**
+ * Path to globals file to be used I.E. `js/globals.ts`
+ */
+ additionalGlobals?: string[];
+
+ /**
+ * List of global variables to define as let instead of the default const.
+ */
+ declareAsLet?: string[];
+
+ /**
* The path to the output library
*/
outFile: string;
@@ -170,30 +180,21 @@ export function flatten({
}
}
-interface MergeGlobalOptions {
- basePath: string;
- debug?: boolean;
- declarationProject: Project;
- filePath: string;
+interface PrepareFileForMergeOptions {
globalVarName: string;
- ignore?: string[];
- inputProject: Project;
interfaceName: string;
targetSourceFile: SourceFile;
}
-/** Take a module and merge it into the global scope */
-export function mergeGlobal({
- basePath,
- debug,
- declarationProject,
- filePath,
+interface PrepareFileForMergeReturn {
+ interfaceDeclaration: InterfaceDeclaration;
+}
+
+export function prepareFileForMerge({
globalVarName,
- ignore,
- inputProject,
interfaceName,
targetSourceFile
-}: MergeGlobalOptions): void {
+}: PrepareFileForMergeOptions): PrepareFileForMergeReturn {
// Add the global object interface
const interfaceDeclaration = targetSourceFile.addInterface({
name: interfaceName,
@@ -201,15 +202,56 @@ export function mergeGlobal({
});
// Declare the global variable
- addVariableDeclaration(targetSourceFile, globalVarName, interfaceName, true);
+ addVariableDeclaration(
+ targetSourceFile,
+ globalVarName,
+ interfaceName,
+ true,
+ true
+ );
// `globalThis` accesses the global scope and is defined here:
// https://github.com/tc39/proposal-global
- addVariableDeclaration(targetSourceFile, "globalThis", interfaceName, true);
+ addVariableDeclaration(
+ targetSourceFile,
+ "globalThis",
+ interfaceName,
+ true,
+ true
+ );
// Add self reference to the global variable
addInterfaceProperty(interfaceDeclaration, globalVarName, interfaceName);
+ return {
+ interfaceDeclaration
+ };
+}
+
+interface MergeGlobalOptions extends PrepareFileForMergeOptions {
+ basePath: string;
+ debug?: boolean;
+ declarationProject: Project;
+ filePath: string;
+ ignore?: string[];
+ inputProject: Project;
+ prepareReturn: PrepareFileForMergeReturn;
+ declareAsLet?: string[];
+}
+
+/** Take a module and merge it into the global scope */
+export function mergeGlobals({
+ basePath,
+ debug,
+ declarationProject,
+ filePath,
+ globalVarName,
+ ignore,
+ inputProject,
+ targetSourceFile,
+ declareAsLet,
+ prepareReturn: { interfaceDeclaration }
+}: MergeGlobalOptions): void {
// Retrieve source file from the input project
const sourceFile = inputProject.getSourceFileOrThrow(filePath);
@@ -267,7 +309,8 @@ export function mergeGlobal({
dependentSourceFiles.add(valueDeclaration.getSourceFile());
}
}
- addVariableDeclaration(targetSourceFile, property, type, true);
+ const isConst = !(declareAsLet && declareAsLet.includes(property));
+ addVariableDeclaration(targetSourceFile, property, type, isConst, true);
addInterfaceProperty(interfaceDeclaration, property, type);
}
}
@@ -297,29 +340,32 @@ export function mergeGlobal({
const importDeclarations = sourceFile.getImportDeclarations();
const namespaces = new Set<string>();
for (const declaration of importDeclarations) {
- const declarationSourceFile = declaration.getModuleSpecifierSourceFile();
- if (
- declarationSourceFile &&
- dependentSourceFiles.has(declarationSourceFile)
- ) {
- // the source file will resolve to the original `.ts` file, but the
- // information we really want is in the emitted `.d.ts` file, so we will
- // resolve to that file
- const dtsFilePath = declarationSourceFile
- .getFilePath()
- .replace(/\.ts$/, ".d.ts");
- const dtsSourceFile = declarationProject.getSourceFileOrThrow(
- dtsFilePath
- );
- targetSourceFile.addStatements(
- namespaceSourceFile(dtsSourceFile, {
- debug,
- namespace: declaration.getNamespaceImportOrThrow().getText(),
- namespaces,
- rootPath: basePath,
- sourceFileMap
- })
- );
+ const namespaceImport = declaration.getNamespaceImport();
+ if (namespaceImport) {
+ const declarationSourceFile = declaration.getModuleSpecifierSourceFile();
+ if (
+ declarationSourceFile &&
+ dependentSourceFiles.has(declarationSourceFile)
+ ) {
+ // the source file will resolve to the original `.ts` file, but the
+ // information we really want is in the emitted `.d.ts` file, so we will
+ // resolve to that file
+ const dtsFilePath = declarationSourceFile
+ .getFilePath()
+ .replace(/\.ts$/, ".d.ts");
+ const dtsSourceFile = declarationProject.getSourceFileOrThrow(
+ dtsFilePath
+ );
+ targetSourceFile.addStatements(
+ namespaceSourceFile(dtsSourceFile, {
+ debug,
+ namespace: namespaceImport.getText(),
+ namespaces,
+ rootPath: basePath,
+ sourceFileMap
+ })
+ );
+ }
}
}
@@ -337,6 +383,8 @@ export function main({
buildPath,
inline,
inputs,
+ additionalGlobals,
+ declareAsLet,
debug,
outFile,
silent
@@ -476,20 +524,46 @@ export function main({
}${msgGeneratedDtsText}\n`
};
- mergeGlobal({
+ const prepareForMergeOpts: PrepareFileForMergeOptions = {
+ globalVarName: "window",
+ interfaceName: "Window",
+ targetSourceFile: libDTs
+ };
+
+ const prepareReturn = prepareFileForMerge(prepareForMergeOpts);
+
+ mergeGlobals({
basePath,
debug,
declarationProject,
filePath: `${basePath}/js/globals.ts`,
- globalVarName: "window",
inputProject,
ignore: ["Deno"],
- interfaceName: "Window",
- targetSourceFile: libDTs
+ declareAsLet,
+ ...prepareForMergeOpts,
+ prepareReturn
});
log(`Merged "globals" into global scope.`);
+ if (additionalGlobals) {
+ for (const additionalGlobal of additionalGlobals) {
+ mergeGlobals({
+ basePath,
+ debug,
+ declarationProject,
+ filePath: `${basePath}/${additionalGlobal}`,
+ inputProject,
+ ignore: ["Deno"],
+ declareAsLet,
+ ...prepareForMergeOpts,
+ prepareReturn
+ });
+ }
+
+ log(`Added additional "globals" into global scope.`);
+ }
+
flatten({
basePath,
customSources,
diff --git a/tools/ts_library_builder/main.ts b/tools/ts_library_builder/main.ts
index 54a659d01..e4e2e73ed 100644
--- a/tools/ts_library_builder/main.ts
+++ b/tools/ts_library_builder/main.ts
@@ -46,6 +46,7 @@ buildRuntimeLib({
"js/deno.ts",
"js/globals.ts"
],
+ declareAsLet: ["onmessage"],
outFile,
silent
});
diff --git a/tools/ts_library_builder/test.ts b/tools/ts_library_builder/test.ts
index 5aeb8d611..2b6abe714 100644
--- a/tools/ts_library_builder/test.ts
+++ b/tools/ts_library_builder/test.ts
@@ -5,7 +5,7 @@
import * as assert from "assert";
import { Project, ts } from "ts-morph";
-import { flatten, mergeGlobal } from "./build_library";
+import { flatten, mergeGlobals, prepareFileForMerge } from "./build_library";
import { inlineFiles, loadDtsFiles } from "./ast_util";
const { ModuleKind, ModuleResolutionKind, ScriptTarget } = ts;
@@ -146,15 +146,22 @@ function buildLibraryMerge(): void {
outputSourceFile: targetSourceFile
} = setupFixtures();
- mergeGlobal({
+ const prepareForMergeOpts = {
+ globalVarName: "foobarbaz",
+ interfaceName: "FooBar",
+ targetSourceFile
+ };
+
+ const prepareReturn = prepareFileForMerge(prepareForMergeOpts);
+
+ mergeGlobals({
basePath,
declarationProject,
debug,
- globalVarName: "foobarbaz",
filePath: `${buildPath}/globals.ts`,
inputProject,
- interfaceName: "FooBar",
- targetSourceFile
+ ...prepareForMergeOpts,
+ prepareReturn
});
assert(targetSourceFile.getNamespace("moduleC") != null);