summaryrefslogtreecommitdiff
path: root/cli
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2019-04-08 17:10:00 -0400
committerGitHub <noreply@github.com>2019-04-08 17:10:00 -0400
commit2debbdacb935cfe1eb7bb8d1f40a5063b339d90b (patch)
tree317dbe65c90a10811c62bb9fd2888641af83f5da /cli
parent734cf781c6e606a8a836863a391c94cf4fad22d7 (diff)
Merge Worker and Isolate types (#2078)
Reduces generics.
Diffstat (limited to 'cli')
-rw-r--r--cli/cli_behavior.rs14
-rw-r--r--cli/compiler.rs171
-rw-r--r--cli/isolate_state.rs38
-rw-r--r--cli/main.rs18
-rw-r--r--cli/ops.rs299
-rw-r--r--cli/resources.rs17
-rw-r--r--cli/worker.rs (renamed from cli/isolate.rs)143
-rw-r--r--cli/workers.rs284
8 files changed, 355 insertions, 629 deletions
diff --git a/cli/cli_behavior.rs b/cli/cli_behavior.rs
index 860ec76ec..510b2608c 100644
--- a/cli/cli_behavior.rs
+++ b/cli/cli_behavior.rs
@@ -17,24 +17,12 @@ impl CliBehavior {
}
}
-impl IsolateStateContainer for &CliBehavior {
- fn state(&self) -> Arc<IsolateState> {
- self.state.clone()
- }
-}
-
-impl IsolateStateContainer for CliBehavior {
- fn state(&self) -> Arc<IsolateState> {
- self.state.clone()
- }
-}
-
impl Behavior for CliBehavior {
fn dispatch(
&mut self,
control: &[u8],
zero_copy: deno_buf,
) -> (bool, Box<Op>) {
- ops::dispatch_all(self, control, zero_copy, ops::op_selector_std)
+ ops::dispatch_all(&self.state, control, zero_copy, ops::op_selector_std)
}
}
diff --git a/cli/compiler.rs b/cli/compiler.rs
index e4d76c44c..d01e9fc0b 100644
--- a/cli/compiler.rs
+++ b/cli/compiler.rs
@@ -1,22 +1,17 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use crate::flags::DenoFlags;
+use crate::cli_behavior::CliBehavior;
use crate::isolate_state::*;
use crate::js_errors;
use crate::js_errors::JSErrorColor;
use crate::msg;
-use crate::ops;
use crate::resources;
use crate::resources::ResourceId;
use crate::startup_data;
use crate::tokio_util;
-use crate::workers;
-use crate::workers::WorkerBehavior;
-use crate::workers::WorkerInit;
-use deno::deno_buf;
-use deno::Behavior;
+use crate::worker::Worker;
+use deno::js_check;
use deno::Buf;
use deno::JSError;
-use deno::Op;
use futures::future::*;
use futures::sync::oneshot;
use futures::Future;
@@ -44,51 +39,6 @@ lazy_static! {
static ref C_RUNTIME: Mutex<Runtime> = Mutex::new(Runtime::new().unwrap());
}
-pub struct CompilerBehavior {
- pub state: Arc<IsolateState>,
-}
-
-impl CompilerBehavior {
- pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self {
- Self {
- state: Arc::new(IsolateState::new(flags, argv_rest, None, true)),
- }
- }
-}
-
-impl IsolateStateContainer for CompilerBehavior {
- fn state(&self) -> Arc<IsolateState> {
- self.state.clone()
- }
-}
-
-impl IsolateStateContainer for &CompilerBehavior {
- fn state(&self) -> Arc<IsolateState> {
- self.state.clone()
- }
-}
-
-impl Behavior for CompilerBehavior {
- fn dispatch(
- &mut self,
- control: &[u8],
- zero_copy: deno_buf,
- ) -> (bool, Box<Op>) {
- ops::dispatch_all(self, control, zero_copy, ops::op_selector_compiler)
- }
-}
-
-impl WorkerBehavior for CompilerBehavior {
- fn set_internal_channels(&mut self, worker_channels: WorkerChannels) {
- self.state = Arc::new(IsolateState::new(
- self.state.flags.clone(),
- self.state.argv.clone(),
- Some(worker_channels),
- true,
- ));
- }
-}
-
// This corresponds to JS ModuleMetaData.
// TODO Rename one or the other so they correspond.
#[derive(Debug, Clone)]
@@ -142,74 +92,67 @@ fn lazy_start(parent_state: Arc<IsolateState>) -> ResourceId {
let mut cell = C_RID.lock().unwrap();
cell
.get_or_insert_with(|| {
- let worker_result = workers::spawn(
+ let child_state = Arc::new(IsolateState::new(
+ parent_state.flags.clone(),
+ parent_state.argv.clone(),
+ ));
+ let rid = child_state.resource.rid;
+ let resource = child_state.resource.clone();
+ let behavior = CliBehavior::new(child_state);
+
+ let mut worker = Worker::new(
+ "TS".to_string(),
startup_data::compiler_isolate_init(),
- CompilerBehavior::new(
- parent_state.flags.clone(),
- parent_state.argv.clone(),
- ),
- "TS",
- WorkerInit::Script("compilerMain()".to_string()),
+ behavior,
);
- match worker_result {
- Ok(worker) => {
- let rid = worker.resource.rid;
- let mut runtime = C_RUNTIME.lock().unwrap();
- runtime.spawn(lazy(move || {
- let resource = worker.resource.clone();
- worker.then(move |result| -> Result<(), ()> {
- // Close resource so the future created by
- // handle_worker_message_stream exits
- resource.close();
- debug!("Compiler worker exited!");
- if let Err(e) = result {
- eprintln!("{}", JSErrorColor(&e).to_string());
- }
- std::process::exit(1);
- })
- }));
- runtime.spawn(lazy(move || {
- debug!("Start worker stream handler!");
- let worker_stream = resources::get_message_stream_from_worker(rid);
- worker_stream
- .for_each(|msg: Buf| {
- // All worker responses are handled here first before being sent via
- // their respective sender. This system can be compared to the
- // promise system used on the js side. This provides a way to
- // resolve many futures via the same channel.
- let res_json = std::str::from_utf8(&msg).unwrap();
- debug!("Got message from worker: {}", res_json);
- // Get the intended receiver's cmd_id from the message.
- let cmd_id = parse_cmd_id(res_json);
- let mut table = C_RES_SENDER_TABLE.lock().unwrap();
- debug!("Cmd id for get message handler: {}", cmd_id);
- // Get the corresponding response sender from the table and
- // send a response.
- let response_sender = table.remove(&(cmd_id as CmdId)).unwrap();
- response_sender.send(msg).unwrap();
- Ok(())
- }).map_err(|_| ())
- }));
- rid
- }
- Err(err) => {
- println!("{}", err.to_string());
+
+ js_check(worker.execute("denoMain()"));
+ js_check(worker.execute("workerMain()"));
+ js_check(worker.execute("compilerMain()"));
+
+ let mut runtime = C_RUNTIME.lock().unwrap();
+ runtime.spawn(lazy(move || {
+ worker.then(move |result| -> Result<(), ()> {
+ // Close resource so the future created by
+ // handle_worker_message_stream exits
+ resource.close();
+ debug!("Compiler worker exited!");
+ if let Err(e) = result {
+ eprintln!("{}", JSErrorColor(&e).to_string());
+ }
std::process::exit(1);
- }
- }
+ })
+ }));
+ runtime.spawn(lazy(move || {
+ debug!("Start worker stream handler!");
+ let worker_stream = resources::get_message_stream_from_worker(rid);
+ worker_stream
+ .for_each(|msg: Buf| {
+ // All worker responses are handled here first before being sent via
+ // their respective sender. This system can be compared to the
+ // promise system used on the js side. This provides a way to
+ // resolve many futures via the same channel.
+ let res_json = std::str::from_utf8(&msg).unwrap();
+ debug!("Got message from worker: {}", res_json);
+ // Get the intended receiver's cmd_id from the message.
+ let cmd_id = parse_cmd_id(res_json);
+ let mut table = C_RES_SENDER_TABLE.lock().unwrap();
+ debug!("Cmd id for get message handler: {}", cmd_id);
+ // Get the corresponding response sender from the table and
+ // send a response.
+ let response_sender = table.remove(&(cmd_id as CmdId)).unwrap();
+ response_sender.send(msg).unwrap();
+ Ok(())
+ }).map_err(|_| ())
+ }));
+ rid
}).clone()
}
-fn req(
- specifier: &str,
- referrer: &str,
- is_worker_main: bool,
- cmd_id: u32,
-) -> Buf {
+fn req(specifier: &str, referrer: &str, cmd_id: u32) -> Buf {
json!({
"specifier": specifier,
"referrer": referrer,
- "isWorker": is_worker_main,
"cmdId": cmd_id,
}).to_string()
.into_boxed_str()
@@ -228,7 +171,7 @@ pub fn compile_async(
);
let cmd_id = new_cmd_id();
- let req_msg = req(&specifier, &referrer, parent_state.is_worker, cmd_id);
+ let req_msg = req(&specifier, &referrer, cmd_id);
let module_meta_data_ = module_meta_data.clone();
let compiler_rid = lazy_start(parent_state.clone());
@@ -362,7 +305,7 @@ mod tests {
fn test_parse_cmd_id() {
let cmd_id = new_cmd_id();
- let msg = req("Hello", "World", false, cmd_id);
+ let msg = req("Hello", "World", cmd_id);
let res_json = std::str::from_utf8(&msg).unwrap();
diff --git a/cli/isolate_state.rs b/cli/isolate_state.rs
index 313f4f6ce..a672f5ee2 100644
--- a/cli/isolate_state.rs
+++ b/cli/isolate_state.rs
@@ -5,25 +5,23 @@ use crate::flags;
use crate::global_timer::GlobalTimer;
use crate::modules::Modules;
use crate::permissions::DenoPermissions;
+use crate::resources;
use crate::resources::ResourceId;
-use crate::workers::UserWorkerBehavior;
-use crate::workers::Worker;
+use crate::worker::Worker;
use deno::Buf;
use futures::future::Shared;
-use futures::sync::mpsc as async_mpsc;
use std;
use std::collections::HashMap;
use std::env;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
+use tokio::sync::mpsc as async_mpsc;
pub type WorkerSender = async_mpsc::Sender<Buf>;
pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
pub type WorkerChannels = (WorkerSender, WorkerReceiver);
-pub type UserWorkerTable =
- HashMap<ResourceId, Shared<Worker<UserWorkerBehavior>>>;
+pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>;
// AtomicU64 is currently unstable
#[derive(Default)]
@@ -48,22 +46,23 @@ pub struct IsolateState {
pub flags: flags::DenoFlags,
pub metrics: Metrics,
pub modules: Mutex<Modules>,
- pub worker_channels: Option<Mutex<WorkerChannels>>,
+ pub worker_channels: Mutex<WorkerChannels>,
pub global_timer: Mutex<GlobalTimer>,
pub workers: Mutex<UserWorkerTable>,
- pub is_worker: bool,
pub start_time: Instant,
+ pub resource: resources::Resource,
}
impl IsolateState {
- pub fn new(
- flags: flags::DenoFlags,
- argv_rest: Vec<String>,
- worker_channels: Option<WorkerChannels>,
- is_worker: bool,
- ) -> Self {
+ pub fn new(flags: flags::DenoFlags, argv_rest: Vec<String>) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();
+ let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1);
+ let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1);
+ let internal_channels = (worker_out_tx, worker_in_rx);
+ let external_channels = (worker_in_tx, worker_out_rx);
+ let resource = resources::add_worker(external_channels);
+
Self {
dir: deno_dir::DenoDir::new(custom_root).unwrap(),
argv: argv_rest,
@@ -71,11 +70,11 @@ impl IsolateState {
flags,
metrics: Metrics::default(),
modules: Mutex::new(Modules::new()),
- worker_channels: worker_channels.map(Mutex::new),
+ worker_channels: Mutex::new(internal_channels),
global_timer: Mutex::new(GlobalTimer::new()),
workers: Mutex::new(UserWorkerTable::new()),
- is_worker,
start_time: Instant::now(),
+ resource,
}
}
@@ -126,7 +125,7 @@ impl IsolateState {
let argv = vec![String::from("./deno"), String::from("hello.js")];
// For debugging: argv.push_back(String::from("-D"));
let (flags, rest_argv) = flags::set_flags(argv).unwrap();
- IsolateState::new(flags, rest_argv, None, false)
+ IsolateState::new(flags, rest_argv)
}
pub fn metrics_op_dispatched(
@@ -153,8 +152,3 @@ impl IsolateState {
.fetch_add(bytes_received, Ordering::SeqCst);
}
}
-
-/// Provides state getter function
-pub trait IsolateStateContainer {
- fn state(&self) -> Arc<IsolateState>;
-}
diff --git a/cli/main.rs b/cli/main.rs
index bda4f7b8f..346bbbd89 100644
--- a/cli/main.rs
+++ b/cli/main.rs
@@ -20,7 +20,6 @@ mod fs;
mod global_timer;
mod http_body;
mod http_util;
-pub mod isolate;
pub mod isolate_state;
pub mod js_errors;
pub mod modules;
@@ -35,12 +34,12 @@ mod startup_data;
mod tokio_util;
mod tokio_write;
pub mod version;
-pub mod workers;
+pub mod worker;
use crate::cli_behavior::CliBehavior;
use crate::errors::RustOrJsError;
-use crate::isolate::Isolate;
use crate::isolate_state::IsolateState;
+use crate::worker::Worker;
use futures::lazy;
use futures::Future;
use log::{LevelFilter, Metadata, Record};
@@ -105,27 +104,28 @@ fn main() {
let should_prefetch = flags.prefetch || flags.info;
let should_display_info = flags.info;
- let state = Arc::new(IsolateState::new(flags, rest_argv, None, false));
+ let state = Arc::new(IsolateState::new(flags, rest_argv));
let state_ = state.clone();
let cli = CliBehavior::new(state_);
- let mut isolate = Isolate::new(startup_data::deno_isolate_init(), cli);
+ let mut main_worker =
+ Worker::new("main".to_string(), startup_data::deno_isolate_init(), cli);
let main_future = lazy(move || {
// Setup runtime.
- js_check(isolate.execute("denoMain()"));
+ js_check(main_worker.execute("denoMain()"));
// Execute main module.
if let Some(main_module) = state.main_module() {
debug!("main_module {}", main_module);
- js_check(isolate.execute_mod(&main_module, should_prefetch));
+ js_check(main_worker.execute_mod(&main_module, should_prefetch));
if should_display_info {
// Display file info and exit. Do not run file
- isolate.print_file_info(&main_module);
+ main_worker.print_file_info(&main_module);
std::process::exit(0);
}
}
- isolate.then(|result| {
+ main_worker.then(|result| {
js_check(result);
Ok(())
})
diff --git a/cli/ops.rs b/cli/ops.rs
index d5656a3b6..3d43b2401 100644
--- a/cli/ops.rs
+++ b/cli/ops.rs
@@ -1,11 +1,12 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use atty;
use crate::ansi;
+use crate::cli_behavior::CliBehavior;
use crate::errors;
use crate::errors::{DenoError, DenoResult, ErrorKind};
use crate::fs as deno_fs;
use crate::http_util;
-use crate::isolate_state::{IsolateState, IsolateStateContainer};
+use crate::isolate_state::IsolateState;
use crate::js_errors::apply_source_map;
use crate::js_errors::JSErrorColor;
use crate::msg;
@@ -19,8 +20,9 @@ use crate::startup_data;
use crate::tokio_util;
use crate::tokio_write;
use crate::version;
-use crate::workers;
+use crate::worker::Worker;
use deno::deno_buf;
+use deno::js_check;
use deno::Buf;
use deno::JSError;
use deno::Op;
@@ -60,7 +62,7 @@ pub type OpWithError = dyn Future<Item = Buf, Error = DenoError> + Send;
// TODO Ideally we wouldn't have to box the OpWithError being returned.
// The box is just to make it easier to get a prototype refactor working.
type OpCreator =
- fn(sc: &IsolateStateContainer, base: &msg::Base<'_>, data: deno_buf)
+ fn(state: &Arc<IsolateState>, base: &msg::Base<'_>, data: deno_buf)
-> Box<OpWithError>;
type OpSelector = fn(inner_type: msg::Any) -> Option<OpCreator>;
@@ -75,7 +77,7 @@ fn empty_buf() -> Buf {
/// control corresponds to the first argument of Deno.core.dispatch().
/// data corresponds to the second argument of Deno.core.dispatch().
pub fn dispatch_all(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
control: &[u8],
zero_copy: deno_buf,
op_selector: OpSelector,
@@ -92,10 +94,9 @@ pub fn dispatch_all(
None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)),
};
- let state = sc.state().clone();
-
- let op: Box<OpWithError> = op_func(sc, &base, zero_copy);
+ let op: Box<OpWithError> = op_func(state, &base, zero_copy);
+ let state = state.clone();
state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);
let boxed_op = Box::new(
@@ -143,23 +144,6 @@ pub fn dispatch_all(
(base.sync(), boxed_op)
}
-/// Superset of op_selector_worker for compiler isolates
-pub fn op_selector_compiler(inner_type: msg::Any) -> Option<OpCreator> {
- match inner_type {
- msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data),
- _ => op_selector_worker(inner_type),
- }
-}
-
-/// Superset of op_selector_std for worker isolates
-pub fn op_selector_worker(inner_type: msg::Any) -> Option<OpCreator> {
- match inner_type {
- msg::Any::WorkerGetMessage => Some(op_worker_get_message),
- msg::Any::WorkerPostMessage => Some(op_worker_post_message),
- _ => op_selector_std(inner_type),
- }
-}
-
/// Standard ops set for most isolates
pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
match inner_type {
@@ -208,6 +192,14 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
msg::Any::HostGetMessage => Some(op_host_get_message),
msg::Any::HostPostMessage => Some(op_host_post_message),
msg::Any::Write => Some(op_write),
+
+ // TODO(ry) split these out so that only the appropriate Workers can access
+ // them. Only the compiler worker should be able to access
+ // FetchModuleMetaData.
+ msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data),
+ msg::Any::WorkerGetMessage => Some(op_worker_get_message),
+ msg::Any::WorkerPostMessage => Some(op_worker_post_message),
+
_ => None,
}
}
@@ -217,19 +209,19 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> {
// If the High precision flag is not set, the
// nanoseconds are rounded on 2ms.
fn op_now(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
assert_eq!(data.len(), 0);
- let seconds = sc.state().start_time.elapsed().as_secs();
- let mut subsec_nanos = sc.state().start_time.elapsed().subsec_nanos();
+ let seconds = state.start_time.elapsed().as_secs();
+ let mut subsec_nanos = state.start_time.elapsed().subsec_nanos();
let reduced_time_precision = 2000000; // 2ms in nanoseconds
// If the permission is not enabled
// Round the nano result on 2 milliseconds
// see: https://developer.mozilla.org/en-US/docs/Web/API/DOMHighResTimeStamp#Reduced_time_precision
- if !sc.state().permissions.allows_high_precision() {
+ if !state.permissions.allows_high_precision() {
subsec_nanos -= subsec_nanos % reduced_time_precision
}
@@ -253,7 +245,7 @@ fn op_now(
}
fn op_is_tty(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
_data: deno_buf,
) -> Box<OpWithError> {
@@ -278,7 +270,7 @@ fn op_is_tty(
}
fn op_exit(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
_data: deno_buf,
) -> Box<OpWithError> {
@@ -287,14 +279,14 @@ fn op_exit(
}
fn op_start(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
assert_eq!(data.len(), 0);
let mut builder = FlatBufferBuilder::new();
- let state = sc.state();
+ let state = state;
let argv = state.argv.iter().map(|s| s.as_str()).collect::<Vec<_>>();
let argv_off = builder.create_vector_of_strings(argv.as_slice());
@@ -311,7 +303,7 @@ fn op_start(
let deno_version = version::DENO;
let deno_version_off = builder.create_string(deno_version);
- let main_module = sc.state().main_module().map(|m| builder.create_string(&m));
+ let main_module = state.main_module().map(|m| builder.create_string(&m));
let inner = msg::StartRes::create(
&mut builder,
@@ -320,9 +312,9 @@ fn op_start(
pid: std::process::id(),
argv: Some(argv_off),
main_module,
- debug_flag: sc.state().flags.log_debug,
- types_flag: sc.state().flags.types,
- version_flag: sc.state().flags.version,
+ debug_flag: state.flags.log_debug,
+ types_flag: state.flags.types,
+ version_flag: state.flags.version,
v8_version: Some(v8_version_off),
deno_version: Some(deno_version_off),
no_color: !ansi::use_color(),
@@ -343,7 +335,7 @@ fn op_start(
}
fn op_format_error(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -352,7 +344,7 @@ fn op_format_error(
let orig_error = String::from(inner.error().unwrap());
let js_error = JSError::from_v8_exception(&orig_error).unwrap();
- let js_error_mapped = apply_source_map(&js_error, &sc.state().dir);
+ let js_error_mapped = apply_source_map(&js_error, &state.dir);
let js_error_string = JSErrorColor(&js_error_mapped).to_string();
let mut builder = FlatBufferBuilder::new();
@@ -402,7 +394,7 @@ pub fn odd_future(err: DenoError) -> Box<OpWithError> {
// https://github.com/denoland/deno/blob/golang/os.go#L100-L154
fn op_fetch_module_meta_data(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -412,19 +404,14 @@ fn op_fetch_module_meta_data(
let specifier = inner.specifier().unwrap();
let referrer = inner.referrer().unwrap();
- assert_eq!(
- sc.state().dir.root.join("gen"),
- sc.state().dir.gen,
- "Sanity check"
- );
+ assert_eq!(state.dir.root.join("gen"), state.dir.gen, "Sanity check");
- let use_cache = !sc.state().flags.reload;
+ let use_cache = !state.flags.reload;
Box::new(futures::future::result(|| -> OpResult {
let builder = &mut FlatBufferBuilder::new();
// TODO(ry) Use fetch_module_meta_data_async.
- let out = sc
- .state()
+ let out = state
.dir
.fetch_module_meta_data(specifier, referrer, use_cache)?;
let data_off = builder.create_vector(out.source_code.as_slice());
@@ -448,7 +435,7 @@ fn op_fetch_module_meta_data(
}
fn op_chdir(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -462,20 +449,20 @@ fn op_chdir(
}
fn op_global_timer_stop(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
assert!(base.sync());
assert_eq!(data.len(), 0);
- let state = sc.state();
+ let state = state;
let mut t = state.global_timer.lock().unwrap();
t.cancel();
ok_future(empty_buf())
}
fn op_global_timer(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -486,7 +473,7 @@ fn op_global_timer(
let val = inner.timeout();
assert!(val >= 0);
- let state = sc.state();
+ let state = state;
let mut t = state.global_timer.lock().unwrap();
let deadline = Instant::now() + Duration::from_millis(val as u64);
let f = t.new_timeout(deadline);
@@ -508,7 +495,7 @@ fn op_global_timer(
}
fn op_set_env(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -516,7 +503,7 @@ fn op_set_env(
let inner = base.inner_as_set_env().unwrap();
let key = inner.key().unwrap();
let value = inner.value().unwrap();
- if let Err(e) = sc.state().check_env() {
+ if let Err(e) = state.check_env() {
return odd_future(e);
}
std::env::set_var(key, value);
@@ -524,14 +511,14 @@ fn op_set_env(
}
fn op_env(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
assert_eq!(data.len(), 0);
let cmd_id = base.cmd_id();
- if let Err(e) = sc.state().check_env() {
+ if let Err(e) = state.check_env() {
return odd_future(e);
}
@@ -556,7 +543,7 @@ fn op_env(
}
fn op_permissions(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -566,12 +553,12 @@ fn op_permissions(
let inner = msg::PermissionsRes::create(
builder,
&msg::PermissionsResArgs {
- run: sc.state().permissions.allows_run(),
- read: sc.state().permissions.allows_read(),
- write: sc.state().permissions.allows_write(),
- net: sc.state().permissions.allows_net(),
- env: sc.state().permissions.allows_env(),
- high_precision: sc.state().permissions.allows_high_precision(),
+ run: state.permissions.allows_run(),
+ read: state.permissions.allows_read(),
+ write: state.permissions.allows_write(),
+ net: state.permissions.allows_net(),
+ env: state.permissions.allows_env(),
+ high_precision: state.permissions.allows_high_precision(),
},
);
ok_future(serialize_response(
@@ -586,7 +573,7 @@ fn op_permissions(
}
fn op_revoke_permission(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -594,12 +581,12 @@ fn op_revoke_permission(
let inner = base.inner_as_permission_revoke().unwrap();
let permission = inner.permission().unwrap();
let result = match permission {
- "run" => sc.state().permissions.revoke_run(),
- "read" => sc.state().permissions.revoke_read(),
- "write" => sc.state().permissions.revoke_write(),
- "net" => sc.state().permissions.revoke_net(),
- "env" => sc.state().permissions.revoke_env(),
- "highPrecision" => sc.state().permissions.revoke_high_precision(),
+ "run" => state.permissions.revoke_run(),
+ "read" => state.permissions.revoke_read(),
+ "write" => state.permissions.revoke_write(),
+ "net" => state.permissions.revoke_net(),
+ "env" => state.permissions.revoke_env(),
+ "highPrecision" => state.permissions.revoke_high_precision(),
_ => Ok(()),
};
if let Err(e) = result {
@@ -609,7 +596,7 @@ fn op_revoke_permission(
}
fn op_fetch(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -632,7 +619,7 @@ fn op_fetch(
}
let req = maybe_req.unwrap();
- if let Err(e) = sc.state().check_net(url) {
+ if let Err(e) = state.check_net(url) {
return odd_future(e);
}
@@ -696,7 +683,7 @@ where
}
fn op_make_temp_dir(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -706,7 +693,7 @@ fn op_make_temp_dir(
let cmd_id = base.cmd_id();
// FIXME
- if let Err(e) = sc.state().check_write("make_temp") {
+ if let Err(e) = state.check_write("make_temp") {
return odd_future(e);
}
@@ -745,7 +732,7 @@ fn op_make_temp_dir(
}
fn op_mkdir(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -755,7 +742,7 @@ fn op_mkdir(
let recursive = inner.recursive();
let mode = inner.mode();
- if let Err(e) = sc.state().check_write(&path) {
+ if let Err(e) = state.check_write(&path) {
return odd_future(e);
}
@@ -767,7 +754,7 @@ fn op_mkdir(
}
fn op_chmod(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -776,7 +763,7 @@ fn op_chmod(
let _mode = inner.mode();
let path = String::from(inner.path().unwrap());
- if let Err(e) = sc.state().check_write(&path) {
+ if let Err(e) = state.check_write(&path) {
return odd_future(e);
}
@@ -806,7 +793,7 @@ fn op_chmod(
}
fn op_open(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -855,20 +842,20 @@ fn op_open(
match mode {
"r" => {
- if let Err(e) = sc.state().check_read(&filename_str) {
+ if let Err(e) = state.check_read(&filename_str) {
return odd_future(e);
}
}
"w" | "a" | "x" => {
- if let Err(e) = sc.state().check_write(&filename_str) {
+ if let Err(e) = state.check_write(&filename_str) {
return odd_future(e);
}
}
&_ => {
- if let Err(e) = sc.state().check_read(&filename_str) {
+ if let Err(e) = state.check_read(&filename_str) {
return odd_future(e);
}
- if let Err(e) = sc.state().check_write(&filename_str) {
+ if let Err(e) = state.check_write(&filename_str) {
return odd_future(e);
}
}
@@ -896,7 +883,7 @@ fn op_open(
}
fn op_close(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -913,7 +900,7 @@ fn op_close(
}
fn op_shutdown(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -939,7 +926,7 @@ fn op_shutdown(
}
fn op_read(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -977,7 +964,7 @@ fn op_read(
}
fn op_write(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1014,7 +1001,7 @@ fn op_write(
}
fn op_seek(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1036,7 +1023,7 @@ fn op_seek(
}
fn op_remove(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1046,7 +1033,7 @@ fn op_remove(
let path = PathBuf::from(path_);
let recursive = inner.recursive();
- if let Err(e) = sc.state().check_write(path.to_str().unwrap()) {
+ if let Err(e) = state.check_write(path.to_str().unwrap()) {
return odd_future(e);
}
@@ -1065,7 +1052,7 @@ fn op_remove(
}
fn op_copy_file(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1076,10 +1063,10 @@ fn op_copy_file(
let to_ = inner.to().unwrap();
let to = PathBuf::from(to_);
- if let Err(e) = sc.state().check_read(&from_) {
+ if let Err(e) = state.check_read(&from_) {
return odd_future(e);
}
- if let Err(e) = sc.state().check_write(&to_) {
+ if let Err(e) = state.check_write(&to_) {
return odd_future(e);
}
@@ -1121,7 +1108,7 @@ fn get_mode(_perm: &fs::Permissions) -> u32 {
}
fn op_cwd(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1147,7 +1134,7 @@ fn op_cwd(
}
fn op_stat(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1158,7 +1145,7 @@ fn op_stat(
let filename = PathBuf::from(filename_);
let lstat = inner.lstat();
- if let Err(e) = sc.state().check_read(&filename_) {
+ if let Err(e) = state.check_read(&filename_) {
return odd_future(e);
}
@@ -1199,7 +1186,7 @@ fn op_stat(
}
fn op_read_dir(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1208,7 +1195,7 @@ fn op_read_dir(
let cmd_id = base.cmd_id();
let path = String::from(inner.path().unwrap());
- if let Err(e) = sc.state().check_read(&path) {
+ if let Err(e) = state.check_read(&path) {
return odd_future(e);
}
@@ -1260,7 +1247,7 @@ fn op_read_dir(
}
fn op_rename(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1269,7 +1256,7 @@ fn op_rename(
let oldpath = PathBuf::from(inner.oldpath().unwrap());
let newpath_ = inner.newpath().unwrap();
let newpath = PathBuf::from(newpath_);
- if let Err(e) = sc.state().check_write(&newpath_) {
+ if let Err(e) = state.check_write(&newpath_) {
return odd_future(e);
}
blocking(base.sync(), move || -> OpResult {
@@ -1280,7 +1267,7 @@ fn op_rename(
}
fn op_link(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1290,7 +1277,7 @@ fn op_link(
let newname_ = inner.newname().unwrap();
let newname = PathBuf::from(newname_);
- if let Err(e) = sc.state().check_write(&newname_) {
+ if let Err(e) = state.check_write(&newname_) {
return odd_future(e);
}
@@ -1302,7 +1289,7 @@ fn op_link(
}
fn op_symlink(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1312,7 +1299,7 @@ fn op_symlink(
let newname_ = inner.newname().unwrap();
let newname = PathBuf::from(newname_);
- if let Err(e) = sc.state().check_write(&newname_) {
+ if let Err(e) = state.check_write(&newname_) {
return odd_future(e);
}
// TODO Use type for Windows.
@@ -1331,7 +1318,7 @@ fn op_symlink(
}
fn op_read_link(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1341,7 +1328,7 @@ fn op_read_link(
let name_ = inner.name().unwrap();
let name = PathBuf::from(name_);
- if let Err(e) = sc.state().check_read(&name_) {
+ if let Err(e) = state.check_read(&name_) {
return odd_future(e);
}
@@ -1369,7 +1356,7 @@ fn op_read_link(
}
fn op_repl_start(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1379,7 +1366,7 @@ fn op_repl_start(
let history_file = String::from(inner.history_file().unwrap());
debug!("op_repl_start {}", history_file);
- let history_path = repl::history_path(&sc.state().dir, &history_file);
+ let history_path = repl::history_path(&state.dir, &history_file);
let repl = repl::Repl::new(history_path);
let resource = resources::add_repl(repl);
@@ -1400,7 +1387,7 @@ fn op_repl_start(
}
fn op_repl_readline(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1436,7 +1423,7 @@ fn op_repl_readline(
}
fn op_truncate(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1446,7 +1433,7 @@ fn op_truncate(
let filename = String::from(inner.name().unwrap());
let len = inner.len();
- if let Err(e) = sc.state().check_write(&filename) {
+ if let Err(e) = state.check_write(&filename) {
return odd_future(e);
}
@@ -1459,12 +1446,12 @@ fn op_truncate(
}
fn op_listen(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
assert_eq!(data.len(), 0);
- if let Err(e) = sc.state().check_net("listen") {
+ if let Err(e) = state.check_net("listen") {
return odd_future(e);
}
@@ -1521,12 +1508,12 @@ fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult {
}
fn op_accept(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
assert_eq!(data.len(), 0);
- if let Err(e) = sc.state().check_net("accept") {
+ if let Err(e) = state.check_net("accept") {
return odd_future(e);
}
let cmd_id = base.cmd_id();
@@ -1547,12 +1534,12 @@ fn op_accept(
}
fn op_dial(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
assert_eq!(data.len(), 0);
- if let Err(e) = sc.state().check_net("dial") {
+ if let Err(e) = state.check_net("dial") {
return odd_future(e);
}
let cmd_id = base.cmd_id();
@@ -1573,7 +1560,7 @@ fn op_dial(
}
fn op_metrics(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1583,7 +1570,7 @@ fn op_metrics(
let builder = &mut FlatBufferBuilder::new();
let inner = msg::MetricsRes::create(
builder,
- &msg::MetricsResArgs::from(&sc.state().metrics),
+ &msg::MetricsResArgs::from(&state.metrics),
);
ok_future(serialize_response(
cmd_id,
@@ -1597,7 +1584,7 @@ fn op_metrics(
}
fn op_resources(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1649,14 +1636,14 @@ fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio {
}
fn op_run(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
assert!(base.sync());
let cmd_id = base.cmd_id();
- if let Err(e) = sc.state().check_run() {
+ if let Err(e) = state.check_run() {
return odd_future(e);
}
@@ -1722,7 +1709,7 @@ fn op_run(
}
fn op_run_status(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1731,7 +1718,7 @@ fn op_run_status(
let inner = base.inner_as_run_status().unwrap();
let rid = inner.rid();
- if let Err(e) = sc.state().check_run() {
+ if let Err(e) = state.check_run() {
return odd_future(e);
}
@@ -1786,20 +1773,16 @@ impl Future for GetMessageFuture {
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
- assert!(self.state.worker_channels.is_some());
- match self.state.worker_channels {
- None => panic!("expected worker_channels"),
- Some(ref wc) => {
- let mut wc = wc.lock().unwrap();
- wc.1.poll()
- }
- }
+ let mut wc = self.state.worker_channels.lock().unwrap();
+ wc.1
+ .poll()
+ .map_err(|err| panic!("worker_channel recv err {:?}", err))
}
}
/// Get message from host as guest worker
fn op_worker_get_message(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1807,7 +1790,7 @@ fn op_worker_get_message(
let cmd_id = base.cmd_id();
let op = GetMessageFuture {
- state: sc.state().clone(),
+ state: state.clone(),
};
let op = op.map_err(move |_| -> DenoError { unimplemented!() });
let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> {
@@ -1834,7 +1817,7 @@ fn op_worker_get_message(
/// Post message to host as guest worker
fn op_worker_post_message(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1842,13 +1825,9 @@ fn op_worker_post_message(
let d = Vec::from(data.as_ref()).into_boxed_slice();
- assert!(sc.state().worker_channels.is_some());
- let tx = match sc.state().worker_channels {
- None => panic!("expected worker_channels"),
- Some(ref wc) => {
- let wc = wc.lock().unwrap();
- wc.0.clone()
- }
+ let tx = {
+ let wc = state.worker_channels.lock().unwrap();
+ wc.0.clone()
};
let op = tx.send(d);
let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
@@ -1868,7 +1847,7 @@ fn op_worker_post_message(
/// Create worker as the host
fn op_create_worker(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1878,20 +1857,24 @@ fn op_create_worker(
let specifier = inner.specifier().unwrap();
Box::new(futures::future::result(move || -> OpResult {
- let parent_state = sc.state().clone();
- let behavior = workers::UserWorkerBehavior::new(
+ let parent_state = state.clone();
+
+ let child_state = Arc::new(IsolateState::new(
parent_state.flags.clone(),
parent_state.argv.clone(),
- );
- match workers::spawn(
- startup_data::deno_isolate_init(),
- behavior,
- &format!("USER-WORKER-{}", specifier),
- workers::WorkerInit::Module(specifier.to_string()),
- ) {
- Ok(worker) => {
+ ));
+ let rid = child_state.resource.rid;
+ let behavior = CliBehavior::new(child_state);
+ let name = format!("USER-WORKER-{}", specifier);
+
+ let mut worker =
+ Worker::new(name, startup_data::deno_isolate_init(), behavior);
+ js_check(worker.execute("denoMain()"));
+ js_check(worker.execute("workerMain()"));
+ let result = worker.execute_mod(specifier, false);
+ match result {
+ Ok(_) => {
let mut workers_tl = parent_state.workers.lock().unwrap();
- let rid = worker.resource.rid;
workers_tl.insert(rid, worker.shared());
let builder = &mut FlatBufferBuilder::new();
let msg_inner = msg::CreateWorkerRes::create(
@@ -1916,7 +1899,7 @@ fn op_create_worker(
/// Return when the worker closes
fn op_host_get_worker_closed(
- sc: &IsolateStateContainer,
+ state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1924,7 +1907,7 @@ fn op_host_get_worker_closed(
let cmd_id = base.cmd_id();
let inner = base.inner_as_host_get_worker_closed().unwrap();
let rid = inner.rid();
- let state = sc.state().clone();
+ let state = state.clone();
let shared_worker_future = {
let workers_tl = state.workers.lock().unwrap();
@@ -1947,7 +1930,7 @@ fn op_host_get_worker_closed(
/// Get message from guest worker as host
fn op_host_get_message(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
@@ -1981,7 +1964,7 @@ fn op_host_get_message(
/// Post message to guest worker as host
fn op_host_post_message(
- _sc: &IsolateStateContainer,
+ _state: &Arc<IsolateState>,
base: &msg::Base<'_>,
data: deno_buf,
) -> Box<OpWithError> {
diff --git a/cli/resources.rs b/cli/resources.rs
index 701d5a937..b1689c890 100644
--- a/cli/resources.rs
+++ b/cli/resources.rs
@@ -35,6 +35,7 @@ use std::sync::{Arc, Mutex};
use tokio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
+use tokio::sync::mpsc;
use tokio_process;
pub type ResourceId = u32; // Sometimes referred to RID.
@@ -309,7 +310,7 @@ pub fn add_worker(wc: WorkerChannels) -> Resource {
pub fn post_message_to_worker(
rid: ResourceId,
buf: Buf,
-) -> futures::sink::Send<futures::sync::mpsc::Sender<Buf>> {
+) -> futures::sink::Send<mpsc::Sender<Buf>> {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&rid);
match maybe_repr {
@@ -334,9 +335,10 @@ impl Future for WorkerReceiver {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&self.rid);
match maybe_repr {
- Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(|()| {
- errors::new(errors::ErrorKind::Other, "recv msg error".to_string())
- }),
+ Some(Repr::Worker(ref mut wc)) => wc
+ .1
+ .poll()
+ .map_err(|err| errors::new(errors::ErrorKind::Other, err.to_string())),
_ => Err(bad_resource()),
}
}
@@ -359,9 +361,10 @@ impl Stream for WorkerReceiverStream {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&self.rid);
match maybe_repr {
- Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(|()| {
- errors::new(errors::ErrorKind::Other, "recv msg error".to_string())
- }),
+ Some(Repr::Worker(ref mut wc)) => wc
+ .1
+ .poll()
+ .map_err(|err| errors::new(errors::ErrorKind::Other, err.to_string())),
_ => Err(bad_resource()),
}
}
diff --git a/cli/isolate.rs b/cli/worker.rs
index 2e6d69d87..6609c5005 100644
--- a/cli/isolate.rs
+++ b/cli/worker.rs
@@ -1,17 +1,16 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+use crate::cli_behavior::CliBehavior;
use crate::compiler::compile_async;
use crate::compiler::ModuleMetaData;
use crate::errors::DenoError;
use crate::errors::RustOrJsError;
use crate::isolate_state::IsolateState;
-use crate::isolate_state::IsolateStateContainer;
use crate::js_errors;
use crate::js_errors::JSErrorColor;
use crate::msg;
use crate::tokio_util;
use deno;
use deno::deno_mod;
-use deno::Behavior;
use deno::JSError;
use deno::StartupData;
use futures::future::Either;
@@ -20,23 +19,22 @@ use futures::Future;
use std::sync::atomic::Ordering;
use std::sync::Arc;
-pub trait DenoBehavior: Behavior + IsolateStateContainer + Send {}
-impl<T> DenoBehavior for T where T: Behavior + IsolateStateContainer + Send {}
-
-type CoreIsolate<B> = deno::Isolate<B>;
-
/// Wraps deno::Isolate to provide source maps, ops for the CLI, and
/// high-level module loading
-pub struct Isolate<B: Behavior> {
- inner: CoreIsolate<B>,
+pub struct Worker {
+ inner: deno::Isolate<CliBehavior>,
state: Arc<IsolateState>,
}
-impl<B: DenoBehavior> Isolate<B> {
- pub fn new(startup_data: StartupData, behavior: B) -> Isolate<B> {
- let state = behavior.state().clone();
+impl Worker {
+ pub fn new(
+ _name: String,
+ startup_data: StartupData,
+ behavior: CliBehavior,
+ ) -> Worker {
+ let state = behavior.state.clone();
Self {
- inner: CoreIsolate::new(startup_data, behavior),
+ inner: deno::Isolate::new(startup_data, behavior),
state,
}
}
@@ -196,7 +194,7 @@ impl<B: DenoBehavior> Isolate<B> {
}
}
-impl<B: DenoBehavior> Future for Isolate<B> {
+impl Future for Worker {
type Item = ();
type Error = JSError;
@@ -255,8 +253,14 @@ mod tests {
use super::*;
use crate::cli_behavior::CliBehavior;
use crate::flags;
+ use crate::isolate_state::IsolateState;
+ use crate::resources;
+ use crate::startup_data;
+ use crate::tokio_util;
+ use deno::js_check;
use futures::future::lazy;
use std::sync::atomic::Ordering;
+ use std::thread;
#[test]
fn execute_mod() {
@@ -268,15 +272,15 @@ mod tests {
let argv = vec![String::from("./deno"), filename.clone()];
let (flags, rest_argv) = flags::set_flags(argv).unwrap();
- let state = Arc::new(IsolateState::new(flags, rest_argv, None, false));
+ let state = Arc::new(IsolateState::new(flags, rest_argv));
let state_ = state.clone();
tokio_util::run(lazy(move || {
let cli = CliBehavior::new(state.clone());
- let mut isolate = Isolate::new(StartupData::None, cli);
- if let Err(err) = isolate.execute_mod(&filename, false) {
+ let mut worker = Worker::new("TEST".to_string(), StartupData::None, cli);
+ if let Err(err) = worker.execute_mod(&filename, false) {
eprintln!("execute_mod err {:?}", err);
}
- tokio_util::panic_on_error(isolate)
+ tokio_util::panic_on_error(worker)
}));
let metrics = &state_.metrics;
@@ -291,18 +295,113 @@ mod tests {
let argv = vec![String::from("./deno"), filename.clone()];
let (flags, rest_argv) = flags::set_flags(argv).unwrap();
- let state = Arc::new(IsolateState::new(flags, rest_argv, None, false));
+ let state = Arc::new(IsolateState::new(flags, rest_argv));
let state_ = state.clone();
tokio_util::run(lazy(move || {
let cli = CliBehavior::new(state.clone());
- let mut isolate = Isolate::new(StartupData::None, cli);
- if let Err(err) = isolate.execute_mod(&filename, false) {
+ let mut worker = Worker::new("TEST".to_string(), StartupData::None, cli);
+ if let Err(err) = worker.execute_mod(&filename, false) {
eprintln!("execute_mod err {:?}", err);
}
- tokio_util::panic_on_error(isolate)
+ tokio_util::panic_on_error(worker)
}));
let metrics = &state_.metrics;
assert_eq!(metrics.resolve_count.load(Ordering::SeqCst), 2);
}
+
+ fn create_test_worker() -> Worker {
+ let state = Arc::new(IsolateState::mock());
+ let cli = CliBehavior::new(state.clone());
+ let mut worker =
+ Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), cli);
+ js_check(worker.execute("denoMain()"));
+ js_check(worker.execute("workerMain()"));
+ worker
+ }
+
+ #[test]
+ fn test_worker_messages() {
+ tokio_util::init(|| {
+ let mut worker = create_test_worker();
+ let source = r#"
+ onmessage = function(e) {
+ console.log("msg from main script", e.data);
+ if (e.data == "exit") {
+ close();
+ return;
+ } else {
+ console.assert(e.data === "hi");
+ }
+ postMessage([1, 2, 3]);
+ console.log("after postMessage");
+ }
+ "#;
+ js_check(worker.execute(source));
+
+ let resource = worker.state.resource.clone();
+ let resource_ = resource.clone();
+
+ tokio::spawn(lazy(move || {
+ worker.then(move |r| -> Result<(), ()> {
+ resource_.close();
+ js_check(r);
+ Ok(())
+ })
+ }));
+
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+
+ let maybe_msg = resources::get_message_from_worker(resource.rid)
+ .wait()
+ .unwrap();
+ assert!(maybe_msg.is_some());
+ // Check if message received is [1, 2, 3] in json
+ assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
+
+ let msg = json!("exit")
+ .to_string()
+ .into_boxed_str()
+ .into_boxed_bytes();
+ let r = resources::post_message_to_worker(resource.rid, msg).wait();
+ assert!(r.is_ok());
+ })
+ }
+
+ #[test]
+ fn removed_from_resource_table_on_close() {
+ tokio_util::init(|| {
+ let mut worker = create_test_worker();
+ js_check(
+ worker.execute("onmessage = () => { delete window['onmessage']; }"),
+ );
+
+ let resource = worker.state.resource.clone();
+ let rid = resource.rid;
+
+ tokio::spawn(lazy(move || {
+ worker.then(move |r| -> Result<(), ()> {
+ resource.close();
+ println!("workers.rs after resource close");
+ js_check(r);
+ Ok(())
+ })
+ }));
+
+ assert_eq!(resources::get_type(rid), Some("worker".to_string()));
+
+ let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
+ let r = resources::post_message_to_worker(rid, msg).wait();
+ assert!(r.is_ok());
+ debug!("rid {:?}", rid);
+
+ // TODO Need a way to get a future for when a resource closes.
+ // For now, just sleep for a bit.
+ thread::sleep(std::time::Duration::from_millis(1000));
+ assert_eq!(resources::get_type(rid), None);
+ })
+ }
}
diff --git a/cli/workers.rs b/cli/workers.rs
deleted file mode 100644
index 7c39a5d2e..000000000
--- a/cli/workers.rs
+++ /dev/null
@@ -1,284 +0,0 @@
-// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use crate::errors::*;
-use crate::flags::DenoFlags;
-use crate::isolate::{DenoBehavior, Isolate};
-use crate::isolate_state::IsolateState;
-use crate::isolate_state::IsolateStateContainer;
-use crate::isolate_state::WorkerChannels;
-use crate::ops;
-use crate::resources;
-use deno::deno_buf;
-use deno::Behavior;
-use deno::Buf;
-use deno::JSError;
-use deno::Op;
-use deno::StartupData;
-use futures::sync::mpsc;
-use futures::Future;
-use futures::Poll;
-use std::sync::Arc;
-
-pub struct UserWorkerBehavior {
- pub state: Arc<IsolateState>,
-}
-
-impl UserWorkerBehavior {
- pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self {
- Self {
- state: Arc::new(IsolateState::new(flags, argv_rest, None, true)),
- }
- }
-}
-
-impl IsolateStateContainer for UserWorkerBehavior {
- fn state(&self) -> Arc<IsolateState> {
- self.state.clone()
- }
-}
-
-impl IsolateStateContainer for &UserWorkerBehavior {
- fn state(&self) -> Arc<IsolateState> {
- self.state.clone()
- }
-}
-
-impl Behavior for UserWorkerBehavior {
- fn dispatch(
- &mut self,
- control: &[u8],
- zero_copy: deno_buf,
- ) -> (bool, Box<Op>) {
- ops::dispatch_all(self, control, zero_copy, ops::op_selector_worker)
- }
-}
-
-impl WorkerBehavior for UserWorkerBehavior {
- fn set_internal_channels(&mut self, worker_channels: WorkerChannels) {
- self.state = Arc::new(IsolateState::new(
- self.state.flags.clone(),
- self.state.argv.clone(),
- Some(worker_channels),
- true,
- ));
- }
-}
-
-/// Behavior trait specific to workers
-pub trait WorkerBehavior: DenoBehavior {
- /// Used to setup internal channels at worker creation.
- /// This is intended to be temporary fix.
- /// TODO(afinch7) come up with a better solution to set worker channels
- fn set_internal_channels(&mut self, worker_channels: WorkerChannels);
-}
-
-/// Rust interface for WebWorkers.
-pub struct Worker<B: WorkerBehavior> {
- isolate: Isolate<B>,
- pub resource: resources::Resource,
-}
-
-impl<B: WorkerBehavior> Worker<B> {
- pub fn new(startup_data: StartupData, mut behavior: B) -> Self {
- let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1);
- let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1);
-
- let internal_channels = (worker_out_tx, worker_in_rx);
- let external_channels = (worker_in_tx, worker_out_rx);
-
- behavior.set_internal_channels(internal_channels);
-
- let isolate = Isolate::new(startup_data, behavior);
-
- Worker {
- isolate,
- resource: resources::add_worker(external_channels),
- }
- }
-
- pub fn execute(&mut self, js_source: &str) -> Result<(), JSError> {
- self.isolate.execute(js_source)
- }
-
- pub fn execute_mod(
- &mut self,
- js_filename: &str,
- is_prefetch: bool,
- ) -> Result<(), RustOrJsError> {
- self.isolate.execute_mod(js_filename, is_prefetch)
- }
-}
-
-impl<B: WorkerBehavior> Future for Worker<B> {
- type Item = ();
- type Error = JSError;
-
- fn poll(&mut self) -> Poll<(), JSError> {
- self.isolate.poll()
- }
-}
-
-/// Method and data used to initalize a worker
-pub enum WorkerInit {
- Script(String),
- Module(String),
-}
-
-pub fn spawn<B: WorkerBehavior + 'static>(
- startup_data: StartupData,
- behavior: B,
- worker_debug_name: &str,
- init: WorkerInit,
-) -> Result<Worker<B>, RustOrJsError> {
- let state = behavior.state().clone();
- let mut worker = Worker::new(startup_data, behavior);
-
- worker
- .execute(&format!("denoMain('{}')", worker_debug_name))
- .expect("worker workerInit failed");
-
- worker
- .execute("workerMain()")
- .expect("worker workerMain failed");
-
- let init_result = match init {
- WorkerInit::Script(script) => match worker.execute(&script) {
- Ok(_) => Ok(()),
- Err(e) => Err(RustOrJsError::Js(e)),
- },
- WorkerInit::Module(specifier) => {
- let should_prefetch = state.flags.prefetch || state.flags.info;
- match state.dir.resolve_module_url(&specifier, ".") {
- Err(err) => Err(RustOrJsError::Rust(DenoError::from(err))),
- Ok(module_url) => {
- worker.execute_mod(&module_url.to_string(), should_prefetch)
- }
- }
- }
- };
-
- match init_result {
- Ok(_) => Ok(worker),
- Err(err) => Err(err),
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::compiler::CompilerBehavior;
- use crate::isolate_state::IsolateState;
- use crate::js_errors::JSErrorColor;
- use crate::startup_data;
- use crate::tokio_util;
- use futures::future::lazy;
- use std::thread;
-
- #[test]
- fn test_spawn() {
- tokio_util::init(|| {
- let worker_result = spawn(
- startup_data::compiler_isolate_init(),
- CompilerBehavior::new(
- IsolateState::mock().flags.clone(),
- IsolateState::mock().argv.clone(),
- ),
- "TEST",
- WorkerInit::Script(
- r#"
- onmessage = function(e) {
- console.log("msg from main script", e.data);
- if (e.data == "exit") {
- close();
- return;
- } else {
- console.assert(e.data === "hi");
- }
- postMessage([1, 2, 3]);
- console.log("after postMessage");
- }
- "#.into(),
- ),
- );
- assert!(worker_result.is_ok());
- let worker = worker_result.unwrap();
- let resource = worker.resource.clone();
- let resource_ = resource.clone();
-
- tokio::spawn(lazy(move || {
- worker.then(move |r| -> Result<(), ()> {
- resource_.close();
- debug!("workers.rs after resource close");
- if let Err(err) = r {
- panic!("{}", JSErrorColor(&err).to_string());
- }
- Ok(())
- })
- }));
-
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
-
- let r = resources::post_message_to_worker(resource.rid, msg).wait();
- assert!(r.is_ok());
-
- let maybe_msg = resources::get_message_from_worker(resource.rid)
- .wait()
- .unwrap();
- assert!(maybe_msg.is_some());
- // Check if message received is [1, 2, 3] in json
- assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
-
- let msg = json!("exit")
- .to_string()
- .into_boxed_str()
- .into_boxed_bytes();
- let r = resources::post_message_to_worker(resource.rid, msg).wait();
- assert!(r.is_ok());
- })
- }
-
- #[test]
- fn removed_from_resource_table_on_close() {
- tokio_util::init(|| {
- let worker_result = spawn(
- startup_data::compiler_isolate_init(),
- CompilerBehavior::new(
- IsolateState::mock().flags.clone(),
- IsolateState::mock().argv.clone(),
- ),
- "TEST",
- WorkerInit::Script("onmessage = () => close();".into()),
- );
- assert!(worker_result.is_ok());
- let worker = worker_result.unwrap();
- let resource = worker.resource.clone();
- let resource_ = resource.clone();
-
- tokio::spawn(lazy(move || {
- worker.then(move |r| -> Result<(), ()> {
- resource_.close();
- debug!("workers.rs after resource close");
- if let Err(err) = r {
- panic!("{}", JSErrorColor(&err).to_string());
- }
- Ok(())
- })
- }));
-
- assert_eq!(
- resources::get_type(resource.rid),
- Some("worker".to_string())
- );
-
- let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
- let r = resources::post_message_to_worker(resource.rid, msg).wait();
- assert!(r.is_ok());
- println!("rid {:?}", resource.rid);
-
- // TODO Need a way to get a future for when a resource closes.
- // For now, just sleep for a bit.
- // resource.close();
- thread::sleep(std::time::Duration::from_millis(1000));
- assert_eq!(resources::get_type(resource.rid), None);
- })
- }
-}