summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-11-04 16:38:52 +0100
committerRy Dahl <ry@tinyclouds.org>2019-11-04 10:38:52 -0500
commit0049d4e50c9dd945f25f69b08b08fbf492001f96 (patch)
tree3bcf3c50bdba9abba6b1b8ff9340f76fc19f759c
parent429439d1983269ba40141a1b42f6ac809d1e8c86 (diff)
Refactor Worker and ThreadSafeState (#3242)
* Split ThreadSafeState into State and GlobalState. State is a "local" state belonging to "Worker" while "GlobalState" is state shared by whole program. * Update "Worker" and ops to use "GlobalState" where applicable * Move and refactor "WorkerChannels" resource
-rw-r--r--cli/compilers/js.rs2
-rw-r--r--cli/compilers/json.rs2
-rw-r--r--cli/compilers/ts.rs104
-rw-r--r--cli/global_state.rs247
-rw-r--r--cli/js/workers.ts1
-rw-r--r--cli/lib.rs53
-rw-r--r--cli/metrics.rs12
-rw-r--r--cli/ops/compiler.rs4
-rw-r--r--cli/ops/errors.rs4
-rw-r--r--cli/ops/os.rs14
-rw-r--r--cli/ops/repl.rs3
-rw-r--r--cli/ops/workers.rs53
-rw-r--r--cli/permissions.rs4
-rw-r--r--cli/resources.rs80
-rw-r--r--cli/state.rs228
-rw-r--r--cli/worker.rs162
16 files changed, 558 insertions, 415 deletions
diff --git a/cli/compilers/js.rs b/cli/compilers/js.rs
index 56c9b672e..af79690d6 100644
--- a/cli/compilers/js.rs
+++ b/cli/compilers/js.rs
@@ -2,7 +2,6 @@
use crate::compilers::CompiledModule;
use crate::compilers::CompiledModuleFuture;
use crate::file_fetcher::SourceFile;
-use crate::state::ThreadSafeState;
use std::str;
pub struct JsCompiler {}
@@ -10,7 +9,6 @@ pub struct JsCompiler {}
impl JsCompiler {
pub fn compile_async(
self: &Self,
- _state: ThreadSafeState,
source_file: &SourceFile,
) -> Box<CompiledModuleFuture> {
let module = CompiledModule {
diff --git a/cli/compilers/json.rs b/cli/compilers/json.rs
index e61b38649..22a1d5f3d 100644
--- a/cli/compilers/json.rs
+++ b/cli/compilers/json.rs
@@ -2,7 +2,6 @@
use crate::compilers::CompiledModule;
use crate::compilers::CompiledModuleFuture;
use crate::file_fetcher::SourceFile;
-use crate::state::ThreadSafeState;
use deno::ErrBox;
use regex::Regex;
use std::str;
@@ -15,7 +14,6 @@ pub struct JsonCompiler {}
impl JsonCompiler {
pub fn compile_async(
self: &Self,
- _state: ThreadSafeState,
source_file: &SourceFile,
) -> Box<CompiledModuleFuture> {
let maybe_json_value: serde_json::Result<serde_json::Value> =
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs
index c2a4ccc4a..3c36f14c4 100644
--- a/cli/compilers/ts.rs
+++ b/cli/compilers/ts.rs
@@ -5,8 +5,8 @@ use crate::diagnostics::Diagnostic;
use crate::disk_cache::DiskCache;
use crate::file_fetcher::SourceFile;
use crate::file_fetcher::SourceFileFetcher;
+use crate::global_state::ThreadSafeGlobalState;
use crate::msg;
-use crate::resources;
use crate::source_maps::SourceMapGetter;
use crate::startup_data;
use crate::state::*;
@@ -16,7 +16,7 @@ use deno::Buf;
use deno::ErrBox;
use deno::ModuleSpecifier;
use futures::Future;
-use futures::Stream;
+use futures::IntoFuture;
use regex::Regex;
use std::collections::HashSet;
use std::fs;
@@ -222,16 +222,20 @@ impl TsCompiler {
}
/// Create a new V8 worker with snapshot of TS compiler and setup compiler's runtime.
- fn setup_worker(state: ThreadSafeState) -> Worker {
+ fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
+ let worker_state = ThreadSafeState::new(global_state.clone(), None, true)
+ .expect("Unable to create worker state");
+
// Count how many times we start the compiler worker.
- state.metrics.compiler_starts.fetch_add(1, Ordering::SeqCst);
+ global_state
+ .metrics
+ .compiler_starts
+ .fetch_add(1, Ordering::SeqCst);
let mut worker = Worker::new(
"TS".to_string(),
startup_data::compiler_isolate_init(),
- // TODO(ry) Maybe we should use a separate state for the compiler.
- // as was done previously.
- state.clone(),
+ worker_state,
);
worker.execute("denoMain()").unwrap();
worker.execute("workerMain()").unwrap();
@@ -241,7 +245,7 @@ impl TsCompiler {
pub fn bundle_async(
self: &Self,
- state: ThreadSafeState,
+ global_state: ThreadSafeGlobalState,
module_name: String,
out_file: String,
) -> impl Future<Item = (), Error = ErrBox> {
@@ -253,25 +257,21 @@ impl TsCompiler {
let root_names = vec![module_name.clone()];
let req_msg = req(root_names, self.config.clone(), Some(out_file));
- let worker = TsCompiler::setup_worker(state.clone());
- let resource = worker.state.resource.clone();
- let compiler_rid = resource.rid;
- let first_msg_fut =
- resources::post_message_to_worker(compiler_rid, req_msg)
- .expect("Bad compiler rid")
- .then(move |_| worker)
- .then(move |result| {
- if let Err(err) = result {
- // TODO(ry) Need to forward the error instead of exiting.
- eprintln!("{}", err.to_string());
- std::process::exit(1);
- }
- debug!("Sent message to worker");
- let stream_future =
- resources::get_message_stream_from_worker(compiler_rid)
- .into_future();
- stream_future.map(|(f, _rest)| f).map_err(|(f, _rest)| f)
- });
+ let worker = TsCompiler::setup_worker(global_state.clone());
+ let worker_ = worker.clone();
+ let first_msg_fut = worker
+ .post_message(req_msg)
+ .into_future()
+ .then(move |_| worker)
+ .then(move |result| {
+ if let Err(err) = result {
+ // TODO(ry) Need to forward the error instead of exiting.
+ eprintln!("{}", err.to_string());
+ std::process::exit(1);
+ }
+ debug!("Sent message to worker");
+ worker_.get_message()
+ });
first_msg_fut.map_err(|_| panic!("not handled")).and_then(
move |maybe_msg: Option<Buf>| {
@@ -312,7 +312,7 @@ impl TsCompiler {
/// If compilation is required then new V8 worker is spawned with fresh TS compiler.
pub fn compile_async(
self: &Self,
- state: ThreadSafeState,
+ global_state: ThreadSafeGlobalState,
source_file: &SourceFile,
) -> Box<CompiledModuleFuture> {
if self.has_compiled(&source_file.url) {
@@ -359,28 +359,26 @@ impl TsCompiler {
let root_names = vec![module_url.to_string()];
let req_msg = req(root_names, self.config.clone(), None);
- let worker = TsCompiler::setup_worker(state.clone());
- let compiling_job = state.progress.add("Compile", &module_url.to_string());
- let state_ = state.clone();
-
- let resource = worker.state.resource.clone();
- let compiler_rid = resource.rid;
- let first_msg_fut =
- resources::post_message_to_worker(compiler_rid, req_msg)
- .expect("Bad compiler rid")
- .then(move |_| worker)
- .then(move |result| {
- if let Err(err) = result {
- // TODO(ry) Need to forward the error instead of exiting.
- eprintln!("{}", err.to_string());
- std::process::exit(1);
- }
- debug!("Sent message to worker");
- let stream_future =
- resources::get_message_stream_from_worker(compiler_rid)
- .into_future();
- stream_future.map(|(f, _rest)| f).map_err(|(f, _rest)| f)
- });
+ let worker = TsCompiler::setup_worker(global_state.clone());
+ let worker_ = worker.clone();
+ let compiling_job = global_state
+ .progress
+ .add("Compile", &module_url.to_string());
+ let global_state_ = global_state.clone();
+
+ let first_msg_fut = worker
+ .post_message(req_msg)
+ .into_future()
+ .then(move |_| worker)
+ .then(move |result| {
+ if let Err(err) = result {
+ // TODO(ry) Need to forward the error instead of exiting.
+ eprintln!("{}", err.to_string());
+ std::process::exit(1);
+ }
+ debug!("Sent message to worker");
+ worker_.get_message()
+ });
let fut = first_msg_fut
.map_err(|_| panic!("not handled"))
@@ -400,7 +398,7 @@ impl TsCompiler {
.and_then(move |_| {
// if we are this far it means compilation was successful and we can
// load compiled filed from disk
- state_
+ global_state_
.ts_compiler
.get_compiled_module(&source_file_.url)
.map_err(|e| {
@@ -663,7 +661,7 @@ mod tests {
source_code: include_bytes!("../tests/002_hello.ts").to_vec(),
};
- let mock_state = ThreadSafeState::mock(vec![
+ let mock_state = ThreadSafeGlobalState::mock(vec![
String::from("deno"),
String::from("hello.js"),
]);
@@ -696,7 +694,7 @@ mod tests {
.unwrap()
.to_string();
- let state = ThreadSafeState::mock(vec![
+ let state = ThreadSafeGlobalState::mock(vec![
String::from("deno"),
p.to_string_lossy().into(),
String::from("$deno$/bundle.js"),
diff --git a/cli/global_state.rs b/cli/global_state.rs
new file mode 100644
index 000000000..3e102cb4e
--- /dev/null
+++ b/cli/global_state.rs
@@ -0,0 +1,247 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use crate::compilers::CompiledModule;
+use crate::compilers::JsCompiler;
+use crate::compilers::JsonCompiler;
+use crate::compilers::TsCompiler;
+use crate::deno_dir;
+use crate::deno_error::permission_denied;
+use crate::file_fetcher::SourceFileFetcher;
+use crate::flags;
+use crate::lockfile::Lockfile;
+use crate::metrics::Metrics;
+use crate::msg;
+use crate::permissions::DenoPermissions;
+use crate::progress::Progress;
+use deno::ErrBox;
+use deno::ModuleSpecifier;
+use futures::Future;
+use std;
+use std::env;
+use std::ops::Deref;
+use std::str;
+use std::sync::Arc;
+use std::sync::Mutex;
+
+/// Holds state of the program and can be accessed by V8 isolate.
+pub struct ThreadSafeGlobalState(Arc<GlobalState>);
+
+/// This structure represents state of single "deno" program.
+///
+/// It is shared by all created workers (thus V8 isolates).
+#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
+pub struct GlobalState {
+ /// Vector of CLI arguments - these are user script arguments, all Deno specific flags are removed.
+ pub argv: Vec<String>,
+ /// Flags parsed from `argv` contents.
+ pub flags: flags::DenoFlags,
+ /// Entry script parsed from CLI arguments.
+ pub main_module: Option<ModuleSpecifier>,
+ /// Permissions parsed from `flags`.
+ pub permissions: DenoPermissions,
+ pub dir: deno_dir::DenoDir,
+ pub metrics: Metrics,
+ pub progress: Progress,
+ pub file_fetcher: SourceFileFetcher,
+ pub js_compiler: JsCompiler,
+ pub json_compiler: JsonCompiler,
+ pub ts_compiler: TsCompiler,
+ pub lockfile: Option<Mutex<Lockfile>>,
+}
+
+impl Clone for ThreadSafeGlobalState {
+ fn clone(&self) -> Self {
+ ThreadSafeGlobalState(self.0.clone())
+ }
+}
+
+impl Deref for ThreadSafeGlobalState {
+ type Target = Arc<GlobalState>;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl ThreadSafeGlobalState {
+ pub fn new(
+ flags: flags::DenoFlags,
+ argv_rest: Vec<String>,
+ progress: Progress,
+ ) -> Result<Self, ErrBox> {
+ let custom_root = env::var("DENO_DIR").map(String::into).ok();
+ let dir = deno_dir::DenoDir::new(custom_root)?;
+
+ let file_fetcher = SourceFileFetcher::new(
+ dir.deps_cache.clone(),
+ progress.clone(),
+ !flags.reload,
+ flags.cache_blacklist.clone(),
+ flags.no_fetch,
+ )?;
+
+ let ts_compiler = TsCompiler::new(
+ file_fetcher.clone(),
+ dir.gen_cache.clone(),
+ !flags.reload,
+ flags.config_path.clone(),
+ )?;
+
+ let main_module: Option<ModuleSpecifier> = if argv_rest.len() <= 1 {
+ None
+ } else {
+ let root_specifier = argv_rest[1].clone();
+ Some(ModuleSpecifier::resolve_url_or_path(&root_specifier)?)
+ };
+
+ // Note: reads lazily from disk on first call to lockfile.check()
+ let lockfile = if let Some(filename) = &flags.lock {
+ Some(Mutex::new(Lockfile::new(filename.to_string())))
+ } else {
+ None
+ };
+
+ let state = GlobalState {
+ main_module,
+ dir,
+ argv: argv_rest,
+ permissions: DenoPermissions::from_flags(&flags),
+ flags,
+ metrics: Metrics::default(),
+ progress,
+ file_fetcher,
+ ts_compiler,
+ js_compiler: JsCompiler {},
+ json_compiler: JsonCompiler {},
+ lockfile,
+ };
+
+ Ok(ThreadSafeGlobalState(Arc::new(state)))
+ }
+
+ pub fn fetch_compiled_module(
+ self: &Self,
+ module_specifier: &ModuleSpecifier,
+ ) -> impl Future<Item = CompiledModule, Error = ErrBox> {
+ let state1 = self.clone();
+ let state2 = self.clone();
+
+ self
+ .file_fetcher
+ .fetch_source_file_async(&module_specifier)
+ .and_then(move |out| match out.media_type {
+ msg::MediaType::Unknown => state1.js_compiler.compile_async(&out),
+ msg::MediaType::Json => state1.json_compiler.compile_async(&out),
+ msg::MediaType::TypeScript
+ | msg::MediaType::TSX
+ | msg::MediaType::JSX => {
+ state1.ts_compiler.compile_async(state1.clone(), &out)
+ }
+ msg::MediaType::JavaScript => {
+ if state1.ts_compiler.compile_js {
+ state1.ts_compiler.compile_async(state1.clone(), &out)
+ } else {
+ state1.js_compiler.compile_async(&out)
+ }
+ }
+ })
+ .and_then(move |compiled_module| {
+ if let Some(ref lockfile) = state2.lockfile {
+ let mut g = lockfile.lock().unwrap();
+ if state2.flags.lock_write {
+ g.insert(&compiled_module);
+ } else if !g.check(&compiled_module)? {
+ eprintln!(
+ "Subresource integrety check failed --lock={}\n{}",
+ g.filename, compiled_module.name
+ );
+ std::process::exit(10);
+ }
+ }
+ Ok(compiled_module)
+ })
+ }
+
+ #[inline]
+ pub fn check_read(&self, filename: &str) -> Result<(), ErrBox> {
+ self.permissions.check_read(filename)
+ }
+
+ #[inline]
+ pub fn check_write(&self, filename: &str) -> Result<(), ErrBox> {
+ self.permissions.check_write(filename)
+ }
+
+ #[inline]
+ pub fn check_env(&self) -> Result<(), ErrBox> {
+ self.permissions.check_env()
+ }
+
+ #[inline]
+ pub fn check_net(&self, hostname: &str, port: u16) -> Result<(), ErrBox> {
+ self.permissions.check_net(hostname, port)
+ }
+
+ #[inline]
+ pub fn check_net_url(&self, url: &url::Url) -> Result<(), ErrBox> {
+ self.permissions.check_net_url(url)
+ }
+
+ #[inline]
+ pub fn check_run(&self) -> Result<(), ErrBox> {
+ self.permissions.check_run()
+ }
+
+ pub fn check_dyn_import(
+ self: &Self,
+ module_specifier: &ModuleSpecifier,
+ ) -> Result<(), ErrBox> {
+ let u = module_specifier.as_url();
+ match u.scheme() {
+ "http" | "https" => {
+ self.check_net_url(u)?;
+ Ok(())
+ }
+ "file" => {
+ let filename = u
+ .to_file_path()
+ .unwrap()
+ .into_os_string()
+ .into_string()
+ .unwrap();
+ self.check_read(&filename)?;
+ Ok(())
+ }
+ _ => Err(permission_denied()),
+ }
+ }
+
+ #[cfg(test)]
+ pub fn mock(argv: Vec<String>) -> ThreadSafeGlobalState {
+ ThreadSafeGlobalState::new(
+ flags::DenoFlags::default(),
+ argv,
+ Progress::new(),
+ )
+ .unwrap()
+ }
+}
+
+#[test]
+fn thread_safe() {
+ fn f<S: Send + Sync>(_: S) {}
+ f(ThreadSafeGlobalState::mock(vec![
+ String::from("./deno"),
+ String::from("hello.js"),
+ ]));
+}
+
+#[test]
+fn import_map_given_for_repl() {
+ let _result = ThreadSafeGlobalState::new(
+ flags::DenoFlags {
+ import_map_path: Some("import_map.json".to_string()),
+ ..flags::DenoFlags::default()
+ },
+ vec![String::from("./deno")],
+ Progress::new(),
+ );
+}
diff --git a/cli/js/workers.ts b/cli/js/workers.ts
index 281fe619f..37061063f 100644
--- a/cli/js/workers.ts
+++ b/cli/js/workers.ts
@@ -65,7 +65,6 @@ export function postMessage(data: any): void {
export async function getMessage(): Promise<any> {
log("getMessage");
const res = await sendAsync(dispatch.OP_WORKER_GET_MESSAGE);
-
if (res.data != null) {
return decodeMessage(new Uint8Array(res.data));
} else {
diff --git a/cli/lib.rs b/cli/lib.rs
index 4f5319508..b6922c591 100644
--- a/cli/lib.rs
+++ b/cli/lib.rs
@@ -15,6 +15,7 @@ extern crate nix;
extern crate rand;
extern crate serde;
extern crate serde_derive;
+extern crate tokio;
extern crate url;
mod checksum;
@@ -28,12 +29,14 @@ mod file_fetcher;
pub mod flags;
pub mod fmt_errors;
mod fs;
+mod global_state;
mod global_timer;
mod http_body;
mod http_util;
mod import_map;
mod js;
mod lockfile;
+mod metrics;
pub mod msg;
pub mod ops;
pub mod permissions;
@@ -55,6 +58,7 @@ pub mod worker;
use crate::deno_error::js_check;
use crate::deno_error::print_err_and_exit;
+use crate::global_state::ThreadSafeGlobalState;
use crate::progress::Progress;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
@@ -97,11 +101,13 @@ impl log::Log for Logger {
fn create_worker_and_state(
flags: DenoFlags,
argv: Vec<String>,
-) -> (Worker, ThreadSafeState) {
+) -> (Worker, ThreadSafeGlobalState) {
use crate::shell::Shell;
use std::sync::Arc;
use std::sync::Mutex;
+
let shell = Arc::new(Mutex::new(Shell::new()));
+
let progress = Progress::new();
progress.set_callback(move |_done, _completed, _total, status, msg| {
if !status.is_empty() {
@@ -109,17 +115,23 @@ fn create_worker_and_state(
s.status(status, msg).expect("shell problem");
}
});
- // TODO(kevinkassimo): maybe make include_deno_namespace also configurable?
- let state = ThreadSafeState::new(flags, argv, progress, true)
+
+ let global_state = ThreadSafeGlobalState::new(flags, argv, progress)
.map_err(deno_error::print_err_and_exit)
.unwrap();
- let worker = Worker::new(
- "main".to_string(),
- startup_data::deno_isolate_init(),
- state.clone(),
- );
- (worker, state)
+ let state = ThreadSafeState::new(
+ global_state.clone(),
+ global_state.main_module.clone(),
+ true,
+ )
+ .map_err(deno_error::print_err_and_exit)
+ .unwrap();
+
+ let worker =
+ Worker::new("main".to_string(), startup_data::deno_isolate_init(), state);
+
+ (worker, global_state)
}
fn types_command() {
@@ -128,7 +140,7 @@ fn types_command() {
}
fn print_cache_info(worker: Worker) {
- let state = worker.state;
+ let state = &worker.state.global_state;
println!(
"{} {:?}",
@@ -151,10 +163,11 @@ pub fn print_file_info(
worker: Worker,
module_specifier: &ModuleSpecifier,
) -> impl Future<Item = Worker, Error = ()> {
+ let global_state_ = worker.state.global_state.clone();
let state_ = worker.state.clone();
let module_specifier_ = module_specifier.clone();
- state_
+ global_state_
.file_fetcher
.fetch_source_file_async(&module_specifier)
.map_err(|err| println!("{}", err))
@@ -171,7 +184,7 @@ pub fn print_file_info(
msg::enum_name_media_type(out.media_type)
);
- state_
+ global_state_
.clone()
.fetch_compiled_module(&module_specifier_)
.map_err(|e| {
@@ -182,9 +195,9 @@ pub fn print_file_info(
.and_then(move |compiled| {
if out.media_type == msg::MediaType::TypeScript
|| (out.media_type == msg::MediaType::JavaScript
- && state_.ts_compiler.compile_js)
+ && global_state_.ts_compiler.compile_js)
{
- let compiled_source_file = state_
+ let compiled_source_file = global_state_
.ts_compiler
.get_compiled_source_file(&out.url)
.unwrap();
@@ -196,7 +209,7 @@ pub fn print_file_info(
);
}
- if let Ok(source_map) = state_
+ if let Ok(source_map) = global_state_
.clone()
.ts_compiler
.get_source_map_file(&module_specifier_)
@@ -209,7 +222,7 @@ pub fn print_file_info(
}
if let Some(deps) =
- worker.state.modules.lock().unwrap().deps(&compiled.name)
+ state_.modules.lock().unwrap().deps(&compiled.name)
{
println!("{}{}", colors::bold("deps:\n".to_string()), deps.name);
if let Some(ref depsdeps) = deps.deps {
@@ -236,7 +249,7 @@ fn info_command(flags: DenoFlags, argv: Vec<String>) {
return print_cache_info(worker);
}
- let main_module = state.main_module().unwrap();
+ let main_module = state.main_module.as_ref().unwrap().clone();
let main_future = lazy(move || {
// Setup runtime.
js_check(worker.execute("denoMain()"));
@@ -259,7 +272,7 @@ fn info_command(flags: DenoFlags, argv: Vec<String>) {
fn fetch_command(flags: DenoFlags, argv: Vec<String>) {
let (mut worker, state) = create_worker_and_state(flags, argv.clone());
- let main_module = state.main_module().unwrap();
+ let main_module = state.main_module.as_ref().unwrap().clone();
let main_future = lazy(move || {
// Setup runtime.
js_check(worker.execute("denoMain()"));
@@ -307,7 +320,7 @@ fn eval_command(flags: DenoFlags, argv: Vec<String>) {
fn bundle_command(flags: DenoFlags, argv: Vec<String>) {
let (worker, state) = create_worker_and_state(flags, argv);
- let main_module = state.main_module().unwrap();
+ let main_module = state.main_module.as_ref().unwrap().clone();
assert!(state.argv.len() >= 3);
let out_file = state.argv[2].clone();
debug!(">>>>> bundle_async START");
@@ -353,7 +366,7 @@ fn run_script(flags: DenoFlags, argv: Vec<String>) {
let use_current_thread = flags.current_thread;
let (mut worker, state) = create_worker_and_state(flags, argv);
- let main_module = state.main_module().unwrap();
+ let main_module = state.main_module.as_ref().unwrap().clone();
// Normal situation of executing a module.
let main_future = lazy(move || {
// Setup runtime.
diff --git a/cli/metrics.rs b/cli/metrics.rs
new file mode 100644
index 000000000..d1e7567b9
--- /dev/null
+++ b/cli/metrics.rs
@@ -0,0 +1,12 @@
+use std::sync::atomic::AtomicUsize;
+
+#[derive(Default)]
+pub struct Metrics {
+ pub ops_dispatched: AtomicUsize,
+ pub ops_completed: AtomicUsize,
+ pub bytes_sent_control: AtomicUsize,
+ pub bytes_sent_data: AtomicUsize,
+ pub bytes_received: AtomicUsize,
+ pub resolve_count: AtomicUsize,
+ pub compiler_starts: AtomicUsize,
+}
diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs
index 720eb65fc..e7d38f364 100644
--- a/cli/ops/compiler.rs
+++ b/cli/ops/compiler.rs
@@ -4,6 +4,7 @@ use crate::futures::future::join_all;
use crate::futures::Future;
use crate::ops::json_op;
use crate::state::ThreadSafeState;
+use deno::Loader;
use deno::*;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
@@ -36,7 +37,7 @@ fn op_cache(
let module_specifier = ModuleSpecifier::resolve_url(&args.module_id)
.expect("Should be valid module specifier");
- state.ts_compiler.cache_compiler_output(
+ state.global_state.ts_compiler.cache_compiler_output(
&module_specifier,
&args.extension,
&args.contents,
@@ -67,6 +68,7 @@ fn op_fetch_source_files(
let resolved_specifier =
state.resolve(specifier, &args.referrer, false, is_dyn_import)?;
let fut = state
+ .global_state
.file_fetcher
.fetch_source_file_async(&resolved_specifier);
futures.push(fut);
diff --git a/cli/ops/errors.rs b/cli/ops/errors.rs
index 2d786b97d..3d94c05c4 100644
--- a/cli/ops/errors.rs
+++ b/cli/ops/errors.rs
@@ -30,7 +30,7 @@ fn op_format_error(
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: FormatErrorArgs = serde_json::from_value(args)?;
- let error = JSError::from_json(&args.error, &state.ts_compiler);
+ let error = JSError::from_json(&args.error, &state.global_state.ts_compiler);
Ok(JsonOp::Sync(json!({
"error": error.to_string(),
@@ -57,7 +57,7 @@ fn op_apply_source_map(
args.line.into(),
args.column.into(),
&mut mappings_map,
- &state.ts_compiler,
+ &state.global_state.ts_compiler,
);
Ok(JsonOp::Sync(json!({
diff --git a/cli/ops/os.rs b/cli/ops/os.rs
index c50b8eedb..78fd940de 100644
--- a/cli/ops/os.rs
+++ b/cli/ops/os.rs
@@ -7,7 +7,6 @@ use crate::state::ThreadSafeState;
use crate::version;
use atty;
use deno::*;
-use log;
use std::collections::HashMap;
use std::env;
use sys_info;
@@ -40,16 +39,15 @@ fn op_start(
_args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
+ let gs = &state.global_state;
+
Ok(JsonOp::Sync(json!({
"cwd": deno_fs::normalize_path(&env::current_dir().unwrap()),
"pid": std::process::id(),
- "argv": state.argv,
- "mainModule": state.main_module().map(|x| x.as_str().to_string()),
- "debugFlag": state
- .flags
- .log_level
- .map_or(false, |l| l == log::Level::Debug),
- "versionFlag": state.flags.version,
+ "argv": gs.argv,
+ "mainModule": gs.main_module.as_ref().map(|x| x.to_string()),
+ "debugFlag": gs.flags.log_level.map_or(false, |l| l == log::Level::Debug),
+ "versionFlag": gs.flags.version,
"v8Version": version::v8(),
"denoVersion": version::DENO,
"tsVersion": version::TYPESCRIPT,
diff --git a/cli/ops/repl.rs b/cli/ops/repl.rs
index 6644ab159..4a3ba68d4 100644
--- a/cli/ops/repl.rs
+++ b/cli/ops/repl.rs
@@ -44,7 +44,8 @@ fn op_repl_start(
let args: ReplStartArgs = serde_json::from_value(args)?;
debug!("op_repl_start {}", args.history_file);
- let history_path = repl::history_path(&state.dir, &args.history_file);
+ let history_path =
+ repl::history_path(&state.global_state.dir, &args.history_file);
let repl = repl::Repl::new(history_path);
let resource = ReplResource(Arc::new(Mutex::new(repl)));
let mut table = resources::lock_resource_table();
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index 809aa5d9b..f6dcf8042 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -4,7 +4,6 @@ use crate::deno_error::js_check;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::ops::json_op;
-use crate::resources;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
@@ -12,6 +11,7 @@ use deno::*;
use futures;
use futures::Async;
use futures::Future;
+use futures::IntoFuture;
use futures::Sink;
use futures::Stream;
use std;
@@ -48,18 +48,17 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
}
struct GetMessageFuture {
- pub state: ThreadSafeState,
+ state: ThreadSafeState,
}
impl Future for GetMessageFuture {
type Item = Option<Buf>;
- type Error = ();
+ type Error = ErrBox;
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
- let mut wc = self.state.worker_channels.lock().unwrap();
- wc.1
- .poll()
- .map_err(|err| panic!("worker_channel recv err {:?}", err))
+ let mut channels = self.state.worker_channels.lock().unwrap();
+ let receiver = &mut channels.receiver;
+ receiver.poll().map_err(ErrBox::from)
}
}
@@ -93,12 +92,10 @@ fn op_worker_post_message(
data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
-
- let tx = {
- let wc = state.worker_channels.lock().unwrap();
- wc.0.clone()
- };
- tx.send(d)
+ let mut channels = state.worker_channels.lock().unwrap();
+ let sender = &mut channels.sender;
+ sender
+ .send(d)
.wait()
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
@@ -132,28 +129,23 @@ fn op_create_worker(
let parent_state = state.clone();
+ // TODO(bartlomieju): Isn't this wrong?
let mut module_specifier = ModuleSpecifier::resolve_url_or_path(specifier)?;
-
- let mut child_argv = parent_state.argv.clone();
-
if !has_source_code {
- if let Some(module) = state.main_module() {
- module_specifier =
- ModuleSpecifier::resolve_import(specifier, &module.to_string())?;
- child_argv[1] = module_specifier.to_string();
+ if let Some(referrer) = parent_state.main_module.as_ref() {
+ let referrer = referrer.clone().to_string();
+ module_specifier = ModuleSpecifier::resolve_import(specifier, &referrer)?;
}
}
let child_state = ThreadSafeState::new(
- parent_state.flags.clone(),
- child_argv,
- parent_state.progress.clone(),
+ state.global_state.clone(),
+ Some(module_specifier.clone()),
include_deno_namespace,
)?;
- let rid = child_state.resource.rid;
+ let rid = child_state.rid;
let name = format!("USER-WORKER-{}", specifier);
let deno_main_call = format!("denoMain({})", include_deno_namespace);
-
let mut worker =
Worker::new(name, startup_data::deno_isolate_init(), child_state);
js_check(worker.execute(&deno_main_call));
@@ -201,9 +193,8 @@ fn op_host_get_worker_closed(
worker.clone()
};
- let op = Box::new(
- shared_worker_future.then(move |_result| futures::future::ok(json!({}))),
- );
+ let op =
+ shared_worker_future.then(move |_result| futures::future::ok(json!({})));
Ok(JsonOp::Async(Box::new(op)))
}
@@ -222,7 +213,7 @@ fn op_host_get_message(
let args: HostGetMessageArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
- let op = resources::get_message_from_worker(rid)
+ let op = Worker::get_message_from_resource(rid)
.map_err(move |_| -> ErrBox { unimplemented!() })
.and_then(move |maybe_buf| {
futures::future::ok(json!({
@@ -250,7 +241,9 @@ fn op_host_post_message(
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
- resources::post_message_to_worker(rid, d)?
+ // TODO: rename to post_message_to_child(rid, d)
+ Worker::post_message_to_resource(rid, d)
+ .into_future()
.wait()
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
diff --git a/cli/permissions.rs b/cli/permissions.rs
index af4c960d7..1a470f551 100644
--- a/cli/permissions.rs
+++ b/cli/permissions.rs
@@ -52,7 +52,7 @@ impl fmt::Display for PermissionAccessorState {
}
}
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub struct PermissionAccessor {
state: Arc<AtomicUsize>,
}
@@ -110,7 +110,7 @@ impl Default for PermissionAccessor {
}
}
-#[derive(Debug, Default)]
+#[derive(Clone, Debug, Default)]
pub struct DenoPermissions {
// Keep in sync with cli/js/permissions.ts
pub allow_read: PermissionAccessor,
diff --git a/cli/resources.rs b/cli/resources.rs
index 4df4e1798..ba7795f5d 100644
--- a/cli/resources.rs
+++ b/cli/resources.rs
@@ -11,9 +11,6 @@
use crate::deno_error;
use crate::deno_error::bad_resource;
use crate::http_body::HttpBody;
-use crate::state::WorkerChannels;
-
-use deno::Buf;
use deno::ErrBox;
pub use deno::Resource as CoreResource;
pub use deno::ResourceId;
@@ -22,8 +19,6 @@ use deno::ResourceTable;
use futures;
use futures::Future;
use futures::Poll;
-use futures::Sink;
-use futures::Stream;
use reqwest::r#async::Decoder as ReqwestDecoder;
use std;
use std::io::{Error, Read, Seek, SeekFrom, Write};
@@ -34,7 +29,6 @@ use std::sync::MutexGuard;
use tokio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
-use tokio::sync::mpsc;
use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;
@@ -101,7 +95,6 @@ enum CliResource {
ChildStdin(tokio_process::ChildStdin),
ChildStdout(tokio_process::ChildStdout),
ChildStderr(tokio_process::ChildStderr),
- Worker(WorkerChannels),
}
impl CoreResource for CliResource {
@@ -133,7 +126,6 @@ impl CoreResource for CliResource {
CliResource::ChildStdin(_) => "childStdin",
CliResource::ChildStdout(_) => "childStdout",
CliResource::ChildStderr(_) => "childStderr",
- CliResource::Worker(_) => "worker",
}
}
}
@@ -372,78 +364,6 @@ pub fn add_reqwest_body(body: ReqwestDecoder) -> Resource {
Resource { rid }
}
-pub fn add_worker(wc: WorkerChannels) -> Resource {
- let mut table = lock_resource_table();
- let rid = table.add(Box::new(CliResource::Worker(wc)));
- Resource { rid }
-}
-
-/// Post message to worker as a host or privilged overlord
-pub fn post_message_to_worker(
- rid: ResourceId,
- buf: Buf,
-) -> Result<futures::sink::Send<mpsc::Sender<Buf>>, ErrBox> {
- let mut table = lock_resource_table();
- let repr = table.get_mut::<CliResource>(rid).ok_or_else(bad_resource)?;
- match repr {
- CliResource::Worker(ref mut wc) => {
- let sender = wc.0.clone();
- Ok(sender.send(buf))
- }
- _ => Err(bad_resource()),
- }
-}
-
-pub struct WorkerReceiver {
- rid: ResourceId,
-}
-
-// Invert the dumbness that tokio_process causes by making Child itself a future.
-impl Future for WorkerReceiver {
- type Item = Option<Buf>;
- type Error = ErrBox;
-
- fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> {
- let mut table = lock_resource_table();
- let repr = table
- .get_mut::<CliResource>(self.rid)
- .ok_or_else(bad_resource)?;
- match repr {
- CliResource::Worker(ref mut wc) => wc.1.poll().map_err(ErrBox::from),
- _ => Err(bad_resource()),
- }
- }
-}
-
-pub fn get_message_from_worker(rid: ResourceId) -> WorkerReceiver {
- WorkerReceiver { rid }
-}
-
-pub struct WorkerReceiverStream {
- rid: ResourceId,
-}
-
-// Invert the dumbness that tokio_process causes by making Child itself a future.
-impl Stream for WorkerReceiverStream {
- type Item = Buf;
- type Error = ErrBox;
-
- fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> {
- let mut table = lock_resource_table();
- let repr = table
- .get_mut::<CliResource>(self.rid)
- .ok_or_else(bad_resource)?;
- match repr {
- CliResource::Worker(ref mut wc) => wc.1.poll().map_err(ErrBox::from),
- _ => Err(bad_resource()),
- }
- }
-}
-
-pub fn get_message_stream_from_worker(rid: ResourceId) -> WorkerReceiverStream {
- WorkerReceiverStream { rid }
-}
-
pub struct ChildResources {
pub child_rid: Option<ResourceId>,
pub stdin_rid: Option<ResourceId>,
diff --git a/cli/state.rs b/cli/state.rs
index 1f44f254f..544c199b8 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -1,22 +1,15 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use crate::compilers::CompiledModule;
-use crate::compilers::JsCompiler;
-use crate::compilers::JsonCompiler;
-use crate::compilers::TsCompiler;
-use crate::deno_dir;
use crate::deno_error::permission_denied;
-use crate::file_fetcher::SourceFileFetcher;
-use crate::flags;
+use crate::global_state::ThreadSafeGlobalState;
use crate::global_timer::GlobalTimer;
use crate::import_map::ImportMap;
-use crate::lockfile::Lockfile;
-use crate::msg;
+use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::permissions::DenoPermissions;
-use crate::progress::Progress;
use crate::resources;
use crate::resources::ResourceId;
use crate::worker::Worker;
+use crate::worker::WorkerChannels;
use deno::Buf;
use deno::CoreOp;
use deno::ErrBox;
@@ -31,31 +24,18 @@ use rand::SeedableRng;
use serde_json::Value;
use std;
use std::collections::HashMap;
-use std::env;
use std::ops::Deref;
use std::str;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
use tokio::sync::mpsc as async_mpsc;
-pub type WorkerSender = async_mpsc::Sender<Buf>;
-pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
-pub type WorkerChannels = (WorkerSender, WorkerReceiver);
+// TODO: hold references to concrete Workers instead of shared futures of
+// those workers?
pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>;
-#[derive(Default)]
-pub struct Metrics {
- pub ops_dispatched: AtomicUsize,
- pub ops_completed: AtomicUsize,
- pub bytes_sent_control: AtomicUsize,
- pub bytes_sent_data: AtomicUsize,
- pub bytes_received: AtomicUsize,
- pub resolve_count: AtomicUsize,
- pub compiler_starts: AtomicUsize,
-}
-
/// Isolate cannot be passed between threads but ThreadSafeState can.
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be
/// accessed outside the main V8 thread should be inside ThreadSafeState.
@@ -63,34 +43,22 @@ pub struct ThreadSafeState(Arc<State>);
#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
pub struct State {
+ pub global_state: ThreadSafeGlobalState,
pub modules: Arc<Mutex<deno::Modules>>,
- pub main_module: Option<ModuleSpecifier>,
- pub dir: deno_dir::DenoDir,
- pub argv: Vec<String>,
pub permissions: DenoPermissions,
- pub flags: flags::DenoFlags,
+ pub main_module: Option<ModuleSpecifier>,
+ pub worker_channels: Mutex<WorkerChannels>,
/// When flags contains a `.import_map_path` option, the content of the
/// import map file will be resolved and set.
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
- pub worker_channels: Mutex<WorkerChannels>,
pub global_timer: Mutex<GlobalTimer>,
pub workers: Mutex<UserWorkerTable>,
pub start_time: Instant,
/// A reference to this worker's resource.
- pub resource: resources::Resource,
- /// Reference to global progress bar.
- pub progress: Progress,
+ pub rid: ResourceId,
pub seeded_rng: Option<Mutex<StdRng>>,
-
- pub file_fetcher: SourceFileFetcher,
- pub js_compiler: JsCompiler,
- pub json_compiler: JsonCompiler,
- pub ts_compiler: TsCompiler,
-
pub include_deno_namespace: bool,
-
- pub lockfile: Option<Mutex<Lockfile>>,
}
impl Clone for ThreadSafeState {
@@ -195,158 +163,74 @@ impl Loader for ThreadSafeState {
) -> Box<deno::SourceCodeInfoFuture> {
self.metrics.resolve_count.fetch_add(1, Ordering::SeqCst);
let module_url_specified = module_specifier.to_string();
- Box::new(self.fetch_compiled_module(module_specifier).map(
- |compiled_module| deno::SourceCodeInfo {
+ let fut = self
+ .global_state
+ .fetch_compiled_module(module_specifier)
+ .map(|compiled_module| deno::SourceCodeInfo {
// Real module name, might be different from initial specifier
// due to redirections.
code: compiled_module.code,
module_url_specified,
module_url_found: compiled_module.name,
- },
- ))
+ });
+
+ Box::new(fut)
}
}
impl ThreadSafeState {
pub fn new(
- flags: flags::DenoFlags,
- argv_rest: Vec<String>,
- progress: Progress,
+ global_state: ThreadSafeGlobalState,
+ main_module: Option<ModuleSpecifier>,
include_deno_namespace: bool,
) -> Result<Self, ErrBox> {
- let custom_root = env::var("DENO_DIR").map(String::into).ok();
-
let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1);
let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1);
- let internal_channels = (worker_out_tx, worker_in_rx);
- let external_channels = (worker_in_tx, worker_out_rx);
- let resource = resources::add_worker(external_channels);
-
- let dir = deno_dir::DenoDir::new(custom_root)?;
-
- let file_fetcher = SourceFileFetcher::new(
- dir.deps_cache.clone(),
- progress.clone(),
- !flags.reload,
- flags.cache_blacklist.clone(),
- flags.no_fetch,
- )?;
-
- let ts_compiler = TsCompiler::new(
- file_fetcher.clone(),
- dir.gen_cache.clone(),
- !flags.reload,
- flags.config_path.clone(),
- )?;
-
- let main_module: Option<ModuleSpecifier> = if argv_rest.len() <= 1 {
- None
- } else {
- let root_specifier = argv_rest[1].clone();
- Some(ModuleSpecifier::resolve_url_or_path(&root_specifier)?)
+ let internal_channels = WorkerChannels {
+ sender: worker_out_tx,
+ receiver: worker_in_rx,
};
-
- let import_map: Option<ImportMap> = match &flags.import_map_path {
- None => None,
- Some(file_path) => Some(ImportMap::load(file_path)?),
+ let external_channels = WorkerChannels {
+ sender: worker_in_tx,
+ receiver: worker_out_rx,
};
- let mut seeded_rng = None;
- if let Some(seed) = flags.seed {
- seeded_rng = Some(Mutex::new(StdRng::seed_from_u64(seed)));
- };
+ let mut table = resources::lock_resource_table();
+ let rid = table.add(Box::new(external_channels));
- let modules = Arc::new(Mutex::new(deno::Modules::new()));
+ let import_map: Option<ImportMap> =
+ match global_state.flags.import_map_path.as_ref() {
+ None => None,
+ Some(file_path) => Some(ImportMap::load(file_path)?),
+ };
- // Note: reads lazily from disk on first call to lockfile.check()
- let lockfile = if let Some(filename) = &flags.lock {
- Some(Mutex::new(Lockfile::new(filename.to_string())))
- } else {
- None
+ let seeded_rng = match global_state.flags.seed {
+ Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))),
+ None => None,
};
+ let modules = Arc::new(Mutex::new(deno::Modules::new()));
+ let permissions = global_state.permissions.clone();
+
let state = State {
- main_module,
+ global_state,
modules,
- dir,
- argv: argv_rest,
- permissions: DenoPermissions::from_flags(&flags),
- flags,
+ main_module,
+ permissions,
import_map,
- metrics: Metrics::default(),
worker_channels: Mutex::new(internal_channels),
+ metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
workers: Mutex::new(UserWorkerTable::new()),
start_time: Instant::now(),
- resource,
- progress,
+ rid,
seeded_rng,
- file_fetcher,
- ts_compiler,
- js_compiler: JsCompiler {},
- json_compiler: JsonCompiler {},
include_deno_namespace,
- lockfile,
};
Ok(ThreadSafeState(Arc::new(state)))
}
- pub fn fetch_compiled_module(
- self: &Self,
- module_specifier: &ModuleSpecifier,
- ) -> impl Future<Item = CompiledModule, Error = ErrBox> {
- let state1 = self.clone();
- let state2 = self.clone();
-
- self
- .file_fetcher
- .fetch_source_file_async(&module_specifier)
- .and_then(move |out| match out.media_type {
- msg::MediaType::Unknown => {
- state1.js_compiler.compile_async(state1.clone(), &out)
- }
- msg::MediaType::Json => {
- state1.json_compiler.compile_async(state1.clone(), &out)
- }
- msg::MediaType::TypeScript
- | msg::MediaType::TSX
- | msg::MediaType::JSX => {
- state1.ts_compiler.compile_async(state1.clone(), &out)
- }
- msg::MediaType::JavaScript => {
- if state1.ts_compiler.compile_js {
- state1.ts_compiler.compile_async(state1.clone(), &out)
- } else {
- state1.js_compiler.compile_async(state1.clone(), &out)
- }
- }
- })
- .and_then(move |compiled_module| {
- if let Some(ref lockfile) = state2.lockfile {
- let mut g = lockfile.lock().unwrap();
- if state2.flags.lock_write {
- g.insert(&compiled_module);
- } else if !g.check(&compiled_module)? {
- eprintln!(
- "Subresource integrety check failed --lock={}\n{}",
- g.filename, compiled_module.name
- );
- std::process::exit(10);
- }
- }
- Ok(compiled_module)
- })
- }
-
- /// Read main module from argv
- pub fn main_module(&self) -> Option<ModuleSpecifier> {
- match &self.main_module {
- Some(module_specifier) => Some(module_specifier.clone()),
- None => None,
- }
- }
-
#[inline]
pub fn check_read(&self, filename: &str) -> Result<(), ErrBox> {
self.permissions.check_read(filename)
@@ -403,10 +287,17 @@ impl ThreadSafeState {
#[cfg(test)]
pub fn mock(argv: Vec<String>) -> ThreadSafeState {
+ let module_specifier = if argv.is_empty() {
+ None
+ } else {
+ let module_specifier = ModuleSpecifier::resolve_url_or_path(&argv[0])
+ .expect("Invalid entry module");
+ Some(module_specifier)
+ };
+
ThreadSafeState::new(
- flags::DenoFlags::default(),
- argv,
- Progress::new(),
+ ThreadSafeGlobalState::mock(argv),
+ module_specifier,
true,
)
.unwrap()
@@ -445,16 +336,3 @@ fn thread_safe() {
String::from("hello.js"),
]));
}
-
-#[test]
-fn import_map_given_for_repl() {
- let _result = ThreadSafeState::new(
- flags::DenoFlags {
- import_map_path: Some("import_map.json".to_string()),
- ..flags::DenoFlags::default()
- },
- vec![String::from("./deno")],
- Progress::new(),
- true,
- );
-}
diff --git a/cli/worker.rs b/cli/worker.rs
index 4bb68fb95..90fb95af7 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -1,33 +1,57 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use crate::deno_error::bad_resource;
use crate::fmt_errors::JSError;
use crate::ops;
+use crate::resources;
+use crate::resources::CoreResource;
+use crate::resources::ResourceId;
use crate::state::ThreadSafeState;
use deno;
+use deno::Buf;
use deno::ErrBox;
use deno::ModuleSpecifier;
use deno::RecursiveLoad;
use deno::StartupData;
use futures::Async;
use futures::Future;
+use futures::Poll;
+use futures::Sink;
+use futures::Stream;
use std::env;
use std::sync::Arc;
use std::sync::Mutex;
+use tokio::sync::mpsc;
use url::Url;
+/// Wraps mpsc channels into a generic resource so they can be referenced
+/// from ops and used to facilitate parent-child communication
+/// for workers.
+pub struct WorkerChannels {
+ pub sender: mpsc::Sender<Buf>,
+ pub receiver: mpsc::Receiver<Buf>,
+}
+
+impl CoreResource for WorkerChannels {
+ fn inspect_repr(&self) -> &str {
+ "worker"
+ }
+}
+
/// Wraps deno::Isolate to provide source maps, ops for the CLI, and
-/// high-level module loading
+/// high-level module loading.
#[derive(Clone)]
pub struct Worker {
+ pub name: String,
isolate: Arc<Mutex<deno::Isolate>>,
pub state: ThreadSafeState,
}
impl Worker {
pub fn new(
- _name: String,
+ name: String,
startup_data: StartupData,
state: ThreadSafeState,
- ) -> Worker {
+ ) -> Self {
let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false)));
{
let mut i = isolate.lock().unwrap();
@@ -61,12 +85,16 @@ impl Worker {
Box::new(load_stream)
});
- let state_ = state.clone();
+ let global_state_ = state.global_state.clone();
i.set_js_error_create(move |v8_exception| {
- JSError::from_v8_exception(v8_exception, &state_.ts_compiler)
+ JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
})
}
- Self { isolate, state }
+ Self {
+ name,
+ isolate,
+ state,
+ }
}
/// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
@@ -106,7 +134,7 @@ impl Worker {
)
.get_future(isolate);
recursive_load.and_then(move |id| -> Result<(), ErrBox> {
- worker.state.progress.done();
+ worker.state.global_state.progress.done();
if is_prefetch {
Ok(())
} else {
@@ -115,6 +143,37 @@ impl Worker {
}
})
}
+
+ /// Post message to worker as a host or privileged overlord
+ pub fn post_message(self: &Self, buf: Buf) -> Result<Async<()>, ErrBox> {
+ Worker::post_message_to_resource(self.state.rid, buf)
+ }
+
+ pub fn post_message_to_resource(
+ rid: resources::ResourceId,
+ buf: Buf,
+ ) -> Result<Async<()>, ErrBox> {
+ debug!("post message to resource {}", rid);
+ let mut table = resources::lock_resource_table();
+ let worker = table
+ .get_mut::<WorkerChannels>(rid)
+ .ok_or_else(bad_resource)?;
+ let sender = &mut worker.sender;
+ sender
+ .send(buf)
+ .poll()
+ .map(|_| Async::Ready(()))
+ .map_err(ErrBox::from)
+ }
+
+ pub fn get_message(self: &Self) -> WorkerReceiver {
+ Worker::get_message_from_resource(self.state.rid)
+ }
+
+ pub fn get_message_from_resource(rid: ResourceId) -> WorkerReceiver {
+ debug!("get message from resource {}", rid);
+ WorkerReceiver { rid }
+ }
}
impl Future for Worker {
@@ -127,16 +186,39 @@ impl Future for Worker {
}
}
+/// This structure wraps worker's resource id to implement future
+/// that will return message received from worker or None
+/// if worker's channel has been closed.
+pub struct WorkerReceiver {
+ rid: ResourceId,
+}
+
+impl Future for WorkerReceiver {
+ type Item = Option<Buf>;
+ type Error = ErrBox;
+
+ fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> {
+ let mut table = resources::lock_resource_table();
+ let worker = table
+ .get_mut::<WorkerChannels>(self.rid)
+ .ok_or_else(bad_resource)?;
+ let receiver = &mut worker.receiver;
+ receiver.poll().map_err(ErrBox::from)
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
use crate::flags;
+ use crate::flags::DenoFlags;
+ use crate::global_state::ThreadSafeGlobalState;
use crate::progress::Progress;
- use crate::resources;
use crate::startup_data;
use crate::state::ThreadSafeState;
use crate::tokio_util;
use futures::future::lazy;
+ use futures::IntoFuture;
use std::sync::atomic::Ordering;
#[test]
@@ -149,13 +231,15 @@ mod tests {
let module_specifier =
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
let argv = vec![String::from("./deno"), module_specifier.to_string()];
- let state = ThreadSafeState::new(
+ let global_state = ThreadSafeGlobalState::new(
flags::DenoFlags::default(),
argv,
Progress::new(),
- true,
)
.unwrap();
+ let state =
+ ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
+ .unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
@@ -186,13 +270,12 @@ mod tests {
let module_specifier =
ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap();
let argv = vec![String::from("deno"), module_specifier.to_string()];
- let state = ThreadSafeState::new(
- flags::DenoFlags::default(),
- argv,
- Progress::new(),
- true,
- )
- .unwrap();
+ let global_state =
+ ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new())
+ .unwrap();
+ let state =
+ ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
+ .unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
@@ -227,8 +310,15 @@ mod tests {
let argv = vec![String::from("deno"), module_specifier.to_string()];
let mut flags = flags::DenoFlags::default();
flags.reload = true;
- let state =
- ThreadSafeState::new(flags, argv, Progress::new(), true).unwrap();
+ let global_state =
+ ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap();
+ let state = ThreadSafeState::new(
+ global_state.clone(),
+ Some(module_specifier.clone()),
+ true,
+ )
+ .unwrap();
+ let global_state_ = global_state.clone();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker = Worker::new(
@@ -247,10 +337,12 @@ mod tests {
})
}));
- let metrics = &state_.metrics;
- assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 3);
+ assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3);
// Check that we've only invoked the compiler once.
- assert_eq!(metrics.compiler_starts.load(Ordering::SeqCst), 1);
+ assert_eq!(
+ global_state_.metrics.compiler_starts.load(Ordering::SeqCst),
+ 1
+ );
drop(http_server_guard);
}
@@ -285,8 +377,9 @@ mod tests {
"#;
worker.execute(source).unwrap();
- let resource = worker.state.resource.clone();
- let resource_ = resource.clone();
+ let worker_ = worker.clone();
+ let rid = worker.state.rid;
+ let resource_ = resources::Resource { rid };
tokio::spawn(lazy(move || {
worker.then(move |r| -> Result<(), ()> {
@@ -298,14 +391,10 @@ mod tests {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = resources::post_message_to_worker(resource.rid, msg)
- .expect("Bad resource")
- .wait();
+ let r = worker_.post_message(msg).into_future().wait();
assert!(r.is_ok());
- let maybe_msg = resources::get_message_from_worker(resource.rid)
- .wait()
- .unwrap();
+ let maybe_msg = worker_.get_message().wait().unwrap();
assert!(maybe_msg.is_some());
// Check if message received is [1, 2, 3] in json
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
@@ -314,9 +403,7 @@ mod tests {
.to_string()
.into_boxed_str()
.into_boxed_bytes();
- let r = resources::post_message_to_worker(resource.rid, msg)
- .expect("Bad resource")
- .wait();
+ let r = worker_.post_message(msg).into_future().wait();
assert!(r.is_ok());
})
}
@@ -329,8 +416,9 @@ mod tests {
.execute("onmessage = () => { delete window.onmessage; }")
.unwrap();
- let resource = worker.state.resource.clone();
- let rid = resource.rid;
+ let rid = worker.state.rid;
+ let resource = resources::Resource { rid };
+ let worker_ = worker.clone();
let worker_future = worker
.then(move |r| -> Result<(), ()> {
@@ -345,9 +433,7 @@ mod tests {
tokio::spawn(lazy(move || worker_future_.then(|_| Ok(()))));
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = resources::post_message_to_worker(rid, msg)
- .expect("Bad resource")
- .wait();
+ let r = worker_.post_message(msg).into_future().wait();
assert!(r.is_ok());
debug!("rid {:?}", rid);