From d2579f45641b437974829c87d58c2a362ef66919 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 9 Apr 2019 13:11:25 -0400 Subject: core: Rename Behavior to Dispatch (#2082) And rename IsolateState to ThreadSafeState. Also make ThreadSafeState directly implement Dispatch. This is simpler. --- cli/cli_behavior.rs | 28 -------- cli/compiler.rs | 25 +++---- cli/isolate_state.rs | 154 ----------------------------------------- cli/main.rs | 18 +++-- cli/msg.rs | 6 +- cli/ops.rs | 113 +++++++++++++++--------------- cli/resources.rs | 2 +- cli/state.rs | 192 +++++++++++++++++++++++++++++++++++++++++++++++++++ cli/worker.rs | 40 +++++------ 9 files changed, 286 insertions(+), 292 deletions(-) delete mode 100644 cli/cli_behavior.rs delete mode 100644 cli/isolate_state.rs create mode 100644 cli/state.rs (limited to 'cli') diff --git a/cli/cli_behavior.rs b/cli/cli_behavior.rs deleted file mode 100644 index 510b2608c..000000000 --- a/cli/cli_behavior.rs +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::isolate_state::*; -use crate::ops; -use deno::deno_buf; -use deno::Behavior; -use deno::Op; -use std::sync::Arc; - -/// Implements deno::Behavior for the main Deno command-line. -pub struct CliBehavior { - pub state: Arc, -} - -impl CliBehavior { - pub fn new(state: Arc) -> Self { - Self { state } - } -} - -impl Behavior for CliBehavior { - fn dispatch( - &mut self, - control: &[u8], - zero_copy: deno_buf, - ) -> (bool, Box) { - ops::dispatch_all(&self.state, control, zero_copy, ops::op_selector_std) - } -} diff --git a/cli/compiler.rs b/cli/compiler.rs index d01e9fc0b..bd9cc4c64 100644 --- a/cli/compiler.rs +++ b/cli/compiler.rs @@ -1,12 +1,11 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::cli_behavior::CliBehavior; -use crate::isolate_state::*; use crate::js_errors; use crate::js_errors::JSErrorColor; use crate::msg; use crate::resources; use crate::resources::ResourceId; use crate::startup_data; +use crate::state::*; use crate::tokio_util; use crate::worker::Worker; use deno::js_check; @@ -21,7 +20,6 @@ use std::collections::HashMap; use std::str; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::sync::Arc; use std::sync::Mutex; use tokio::runtime::Runtime; @@ -88,22 +86,21 @@ fn parse_cmd_id(res_json: &str) -> CmdId { } } -fn lazy_start(parent_state: Arc) -> ResourceId { +fn lazy_start(parent_state: ThreadSafeState) -> ResourceId { let mut cell = C_RID.lock().unwrap(); cell .get_or_insert_with(|| { - let child_state = Arc::new(IsolateState::new( + let child_state = ThreadSafeState::new( parent_state.flags.clone(), parent_state.argv.clone(), - )); + ); let rid = child_state.resource.rid; let resource = child_state.resource.clone(); - let behavior = CliBehavior::new(child_state); let mut worker = Worker::new( "TS".to_string(), startup_data::compiler_isolate_init(), - behavior, + child_state, ); js_check(worker.execute("denoMain()")); @@ -160,7 +157,7 @@ fn req(specifier: &str, referrer: &str, cmd_id: u32) -> Buf { } pub fn compile_async( - parent_state: Arc, + parent_state: ThreadSafeState, specifier: &str, referrer: &str, module_meta_data: &ModuleMetaData, @@ -248,7 +245,7 @@ pub fn compile_async( } pub fn compile_sync( - parent_state: Arc, + parent_state: ThreadSafeState, specifier: &str, referrer: &str, module_meta_data: &ModuleMetaData, @@ -286,12 +283,8 @@ mod tests { maybe_source_map: None, }; - out = compile_sync( - Arc::new(IsolateState::mock()), - specifier, - &referrer, - &out, - ).unwrap(); + out = compile_sync(ThreadSafeState::mock(), specifier, &referrer, &out) + .unwrap(); assert!( out .maybe_output_code diff --git a/cli/isolate_state.rs b/cli/isolate_state.rs deleted file mode 100644 index a672f5ee2..000000000 --- a/cli/isolate_state.rs +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::deno_dir; -use crate::errors::DenoResult; -use crate::flags; -use crate::global_timer::GlobalTimer; -use crate::modules::Modules; -use crate::permissions::DenoPermissions; -use crate::resources; -use crate::resources::ResourceId; -use crate::worker::Worker; -use deno::Buf; -use futures::future::Shared; -use std; -use std::collections::HashMap; -use std::env; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Mutex; -use std::time::Instant; -use tokio::sync::mpsc as async_mpsc; - -pub type WorkerSender = async_mpsc::Sender; -pub type WorkerReceiver = async_mpsc::Receiver; -pub type WorkerChannels = (WorkerSender, WorkerReceiver); -pub type UserWorkerTable = HashMap>; - -// AtomicU64 is currently unstable -#[derive(Default)] -pub struct Metrics { - pub ops_dispatched: AtomicUsize, - pub ops_completed: AtomicUsize, - pub bytes_sent_control: AtomicUsize, - pub bytes_sent_data: AtomicUsize, - pub bytes_received: AtomicUsize, - pub resolve_count: AtomicUsize, -} - -// Isolate cannot be passed between threads but IsolateState can. -// IsolateState satisfies Send and Sync. -// So any state that needs to be accessed outside the main V8 thread should be -// inside IsolateState. -#[cfg_attr(feature = "cargo-clippy", allow(stutter))] -pub struct IsolateState { - pub dir: deno_dir::DenoDir, - pub argv: Vec, - pub permissions: DenoPermissions, - pub flags: flags::DenoFlags, - pub metrics: Metrics, - pub modules: Mutex, - pub worker_channels: Mutex, - pub global_timer: Mutex, - pub workers: Mutex, - pub start_time: Instant, - pub resource: resources::Resource, -} - -impl IsolateState { - pub fn new(flags: flags::DenoFlags, argv_rest: Vec) -> Self { - let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok(); - - let (worker_in_tx, worker_in_rx) = async_mpsc::channel::(1); - let (worker_out_tx, worker_out_rx) = async_mpsc::channel::(1); - let internal_channels = (worker_out_tx, worker_in_rx); - let external_channels = (worker_in_tx, worker_out_rx); - let resource = resources::add_worker(external_channels); - - Self { - dir: deno_dir::DenoDir::new(custom_root).unwrap(), - argv: argv_rest, - permissions: DenoPermissions::from_flags(&flags), - flags, - metrics: Metrics::default(), - modules: Mutex::new(Modules::new()), - worker_channels: Mutex::new(internal_channels), - global_timer: Mutex::new(GlobalTimer::new()), - workers: Mutex::new(UserWorkerTable::new()), - start_time: Instant::now(), - resource, - } - } - - /// Read main module from argv - pub fn main_module(&self) -> Option { - if self.argv.len() <= 1 { - None - } else { - let specifier = self.argv[1].clone(); - let referrer = "."; - match self.dir.resolve_module_url(&specifier, referrer) { - Ok(url) => Some(url.to_string()), - Err(e) => { - debug!("Potentially swallowed error {}", e); - None - } - } - } - } - - #[inline] - pub fn check_read(&self, filename: &str) -> DenoResult<()> { - self.permissions.check_read(filename) - } - - #[inline] - pub fn check_write(&self, filename: &str) -> DenoResult<()> { - self.permissions.check_write(filename) - } - - #[inline] - pub fn check_env(&self) -> DenoResult<()> { - self.permissions.check_env() - } - - #[inline] - pub fn check_net(&self, filename: &str) -> DenoResult<()> { - self.permissions.check_net(filename) - } - - #[inline] - pub fn check_run(&self) -> DenoResult<()> { - self.permissions.check_run() - } - - #[cfg(test)] - pub fn mock() -> IsolateState { - let argv = vec![String::from("./deno"), String::from("hello.js")]; - // For debugging: argv.push_back(String::from("-D")); - let (flags, rest_argv) = flags::set_flags(argv).unwrap(); - IsolateState::new(flags, rest_argv) - } - - pub fn metrics_op_dispatched( - &self, - bytes_sent_control: usize, - bytes_sent_data: usize, - ) { - self.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst); - self - .metrics - .bytes_sent_control - .fetch_add(bytes_sent_control, Ordering::SeqCst); - self - .metrics - .bytes_sent_data - .fetch_add(bytes_sent_data, Ordering::SeqCst); - } - - pub fn metrics_op_completed(&self, bytes_received: usize) { - self.metrics.ops_completed.fetch_add(1, Ordering::SeqCst); - self - .metrics - .bytes_received - .fetch_add(bytes_received, Ordering::SeqCst); - } -} diff --git a/cli/main.rs b/cli/main.rs index 346bbbd89..6178dafee 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -11,7 +11,6 @@ extern crate clap; extern crate deno; mod ansi; -pub mod cli_behavior; pub mod compiler; pub mod deno_dir; pub mod errors; @@ -20,7 +19,6 @@ mod fs; mod global_timer; mod http_body; mod http_util; -pub mod isolate_state; pub mod js_errors; pub mod modules; pub mod msg; @@ -31,20 +29,19 @@ mod repl; pub mod resolve_addr; pub mod resources; mod startup_data; +pub mod state; mod tokio_util; mod tokio_write; pub mod version; pub mod worker; -use crate::cli_behavior::CliBehavior; use crate::errors::RustOrJsError; -use crate::isolate_state::IsolateState; +use crate::state::ThreadSafeState; use crate::worker::Worker; use futures::lazy; use futures::Future; use log::{LevelFilter, Metadata, Record}; use std::env; -use std::sync::Arc; static LOGGER: Logger = Logger; @@ -104,11 +101,12 @@ fn main() { let should_prefetch = flags.prefetch || flags.info; let should_display_info = flags.info; - let state = Arc::new(IsolateState::new(flags, rest_argv)); - let state_ = state.clone(); - let cli = CliBehavior::new(state_); - let mut main_worker = - Worker::new("main".to_string(), startup_data::deno_isolate_init(), cli); + let state = ThreadSafeState::new(flags, rest_argv); + let mut main_worker = Worker::new( + "main".to_string(), + startup_data::deno_isolate_init(), + state.clone(), + ); let main_future = lazy(move || { // Setup runtime. diff --git a/cli/msg.rs b/cli/msg.rs index d4f62604c..d97940a34 100644 --- a/cli/msg.rs +++ b/cli/msg.rs @@ -5,7 +5,7 @@ feature = "cargo-clippy", allow(clippy::all, clippy::pedantic) )] -use crate::isolate_state; +use crate::state; use flatbuffers; use std::sync::atomic::Ordering; @@ -13,8 +13,8 @@ use std::sync::atomic::Ordering; // build_extra/rust/run.py (for the GN+Ninja build). include!(concat!(env!("GN_OUT_DIR"), "/gen/cli/msg_generated.rs")); -impl<'a> From<&'a isolate_state::Metrics> for MetricsResArgs { - fn from(m: &'a isolate_state::Metrics) -> Self { +impl<'a> From<&'a state::Metrics> for MetricsResArgs { + fn from(m: &'a state::Metrics) -> Self { MetricsResArgs { ops_dispatched: m.ops_dispatched.load(Ordering::SeqCst) as u64, ops_completed: m.ops_completed.load(Ordering::SeqCst) as u64, diff --git a/cli/ops.rs b/cli/ops.rs index 3d43b2401..baa159a68 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -1,12 +1,10 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use atty; use crate::ansi; -use crate::cli_behavior::CliBehavior; use crate::errors; use crate::errors::{DenoError, DenoResult, ErrorKind}; use crate::fs as deno_fs; use crate::http_util; -use crate::isolate_state::IsolateState; use crate::js_errors::apply_source_map; use crate::js_errors::JSErrorColor; use crate::msg; @@ -17,6 +15,7 @@ use crate::resources; use crate::resources::table_entries; use crate::resources::Resource; use crate::startup_data; +use crate::state::ThreadSafeState; use crate::tokio_util; use crate::tokio_write; use crate::version; @@ -42,7 +41,6 @@ use std::net::Shutdown; use std::path::Path; use std::path::PathBuf; use std::process::Command; -use std::sync::Arc; use std::time::{Duration, Instant, UNIX_EPOCH}; use tokio; use tokio::net::TcpListener; @@ -62,7 +60,7 @@ pub type OpWithError = dyn Future + Send; // TODO Ideally we wouldn't have to box the OpWithError being returned. // The box is just to make it easier to get a prototype refactor working. type OpCreator = - fn(state: &Arc, base: &msg::Base<'_>, data: deno_buf) + fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf) -> Box; type OpSelector = fn(inner_type: msg::Any) -> Option; @@ -77,7 +75,7 @@ fn empty_buf() -> Buf { /// control corresponds to the first argument of Deno.core.dispatch(). /// data corresponds to the second argument of Deno.core.dispatch(). pub fn dispatch_all( - state: &Arc, + state: &ThreadSafeState, control: &[u8], zero_copy: deno_buf, op_selector: OpSelector, @@ -209,7 +207,7 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option { // If the High precision flag is not set, the // nanoseconds are rounded on 2ms. fn op_now( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -245,7 +243,7 @@ fn op_now( } fn op_is_tty( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, _data: deno_buf, ) -> Box { @@ -270,7 +268,7 @@ fn op_is_tty( } fn op_exit( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, _data: deno_buf, ) -> Box { @@ -279,7 +277,7 @@ fn op_exit( } fn op_start( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -335,7 +333,7 @@ fn op_start( } fn op_format_error( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -394,7 +392,7 @@ pub fn odd_future(err: DenoError) -> Box { // https://github.com/denoland/deno/blob/golang/os.go#L100-L154 fn op_fetch_module_meta_data( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -435,7 +433,7 @@ fn op_fetch_module_meta_data( } fn op_chdir( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -449,7 +447,7 @@ fn op_chdir( } fn op_global_timer_stop( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -462,7 +460,7 @@ fn op_global_timer_stop( } fn op_global_timer( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -495,7 +493,7 @@ fn op_global_timer( } fn op_set_env( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -511,7 +509,7 @@ fn op_set_env( } fn op_env( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -543,7 +541,7 @@ fn op_env( } fn op_permissions( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -573,7 +571,7 @@ fn op_permissions( } fn op_revoke_permission( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -596,7 +594,7 @@ fn op_revoke_permission( } fn op_fetch( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -683,7 +681,7 @@ where } fn op_make_temp_dir( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -732,7 +730,7 @@ fn op_make_temp_dir( } fn op_mkdir( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -754,7 +752,7 @@ fn op_mkdir( } fn op_chmod( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -793,7 +791,7 @@ fn op_chmod( } fn op_open( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -883,7 +881,7 @@ fn op_open( } fn op_close( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -900,7 +898,7 @@ fn op_close( } fn op_shutdown( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -926,7 +924,7 @@ fn op_shutdown( } fn op_read( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -964,7 +962,7 @@ fn op_read( } fn op_write( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1001,7 +999,7 @@ fn op_write( } fn op_seek( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1023,7 +1021,7 @@ fn op_seek( } fn op_remove( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1052,7 +1050,7 @@ fn op_remove( } fn op_copy_file( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1108,7 +1106,7 @@ fn get_mode(_perm: &fs::Permissions) -> u32 { } fn op_cwd( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1134,7 +1132,7 @@ fn op_cwd( } fn op_stat( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1186,7 +1184,7 @@ fn op_stat( } fn op_read_dir( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1247,7 +1245,7 @@ fn op_read_dir( } fn op_rename( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1267,7 +1265,7 @@ fn op_rename( } fn op_link( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1289,7 +1287,7 @@ fn op_link( } fn op_symlink( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1318,7 +1316,7 @@ fn op_symlink( } fn op_read_link( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1356,7 +1354,7 @@ fn op_read_link( } fn op_repl_start( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1387,7 +1385,7 @@ fn op_repl_start( } fn op_repl_readline( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1423,7 +1421,7 @@ fn op_repl_readline( } fn op_truncate( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1446,7 +1444,7 @@ fn op_truncate( } fn op_listen( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1508,7 +1506,7 @@ fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult { } fn op_accept( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1534,7 +1532,7 @@ fn op_accept( } fn op_dial( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1560,7 +1558,7 @@ fn op_dial( } fn op_metrics( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1584,7 +1582,7 @@ fn op_metrics( } fn op_resources( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1636,7 +1634,7 @@ fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio { } fn op_run( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1709,7 +1707,7 @@ fn op_run( } fn op_run_status( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1765,7 +1763,7 @@ fn op_run_status( } struct GetMessageFuture { - pub state: Arc, + pub state: ThreadSafeState, } impl Future for GetMessageFuture { @@ -1782,7 +1780,7 @@ impl Future for GetMessageFuture { /// Get message from host as guest worker fn op_worker_get_message( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1817,7 +1815,7 @@ fn op_worker_get_message( /// Post message to host as guest worker fn op_worker_post_message( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1847,7 +1845,7 @@ fn op_worker_post_message( /// Create worker as the host fn op_create_worker( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1859,16 +1857,15 @@ fn op_create_worker( Box::new(futures::future::result(move || -> OpResult { let parent_state = state.clone(); - let child_state = Arc::new(IsolateState::new( + let child_state = ThreadSafeState::new( parent_state.flags.clone(), parent_state.argv.clone(), - )); + ); let rid = child_state.resource.rid; - let behavior = CliBehavior::new(child_state); let name = format!("USER-WORKER-{}", specifier); let mut worker = - Worker::new(name, startup_data::deno_isolate_init(), behavior); + Worker::new(name, startup_data::deno_isolate_init(), child_state); js_check(worker.execute("denoMain()")); js_check(worker.execute("workerMain()")); let result = worker.execute_mod(specifier, false); @@ -1899,7 +1896,7 @@ fn op_create_worker( /// Return when the worker closes fn op_host_get_worker_closed( - state: &Arc, + state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1930,7 +1927,7 @@ fn op_host_get_worker_closed( /// Get message from guest worker as host fn op_host_get_message( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { @@ -1964,7 +1961,7 @@ fn op_host_get_message( /// Post message to guest worker as host fn op_host_post_message( - _state: &Arc, + _state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf, ) -> Box { diff --git a/cli/resources.rs b/cli/resources.rs index b1689c890..3a7121d4c 100644 --- a/cli/resources.rs +++ b/cli/resources.rs @@ -13,8 +13,8 @@ use crate::errors::bad_resource; use crate::errors::DenoError; use crate::errors::DenoResult; use crate::http_body::HttpBody; -use crate::isolate_state::WorkerChannels; use crate::repl::Repl; +use crate::state::WorkerChannels; use deno::Buf; diff --git a/cli/state.rs b/cli/state.rs new file mode 100644 index 000000000..09a57d985 --- /dev/null +++ b/cli/state.rs @@ -0,0 +1,192 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::deno_dir; +use crate::errors::DenoResult; +use crate::flags; +use crate::global_timer::GlobalTimer; +use crate::modules::Modules; +use crate::ops; +use crate::permissions::DenoPermissions; +use crate::resources; +use crate::resources::ResourceId; +use crate::worker::Worker; +use deno::deno_buf; +use deno::Buf; +use deno::Dispatch; +use deno::Op; +use futures::future::Shared; +use std; +use std::collections::HashMap; +use std::env; +use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Instant; +use tokio::sync::mpsc as async_mpsc; + +pub type WorkerSender = async_mpsc::Sender; +pub type WorkerReceiver = async_mpsc::Receiver; +pub type WorkerChannels = (WorkerSender, WorkerReceiver); +pub type UserWorkerTable = HashMap>; + +// AtomicU64 is currently unstable +#[derive(Default)] +pub struct Metrics { + pub ops_dispatched: AtomicUsize, + pub ops_completed: AtomicUsize, + pub bytes_sent_control: AtomicUsize, + pub bytes_sent_data: AtomicUsize, + pub bytes_received: AtomicUsize, + pub resolve_count: AtomicUsize, +} + +// Wrap State so that it can implement Dispatch. +pub struct ThreadSafeState(Arc); + +// Isolate cannot be passed between threads but ThreadSafeState can. +// ThreadSafeState satisfies Send and Sync. +// So any state that needs to be accessed outside the main V8 thread should be +// inside ThreadSafeState. +#[cfg_attr(feature = "cargo-clippy", allow(stutter))] +pub struct State { + pub dir: deno_dir::DenoDir, + pub argv: Vec, + pub permissions: DenoPermissions, + pub flags: flags::DenoFlags, + pub metrics: Metrics, + pub modules: Mutex, + pub worker_channels: Mutex, + pub global_timer: Mutex, + pub workers: Mutex, + pub start_time: Instant, + pub resource: resources::Resource, +} + +impl Clone for ThreadSafeState { + fn clone(&self) -> Self { + ThreadSafeState(self.0.clone()) + } +} + +impl Deref for ThreadSafeState { + type Target = Arc; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Dispatch for ThreadSafeState { + fn dispatch( + &mut self, + control: &[u8], + zero_copy: deno_buf, + ) -> (bool, Box) { + ops::dispatch_all(self, control, zero_copy, ops::op_selector_std) + } +} + +impl ThreadSafeState { + pub fn new(flags: flags::DenoFlags, argv_rest: Vec) -> Self { + let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok(); + + let (worker_in_tx, worker_in_rx) = async_mpsc::channel::(1); + let (worker_out_tx, worker_out_rx) = async_mpsc::channel::(1); + let internal_channels = (worker_out_tx, worker_in_rx); + let external_channels = (worker_in_tx, worker_out_rx); + let resource = resources::add_worker(external_channels); + + ThreadSafeState(Arc::new(State { + dir: deno_dir::DenoDir::new(custom_root).unwrap(), + argv: argv_rest, + permissions: DenoPermissions::from_flags(&flags), + flags, + metrics: Metrics::default(), + modules: Mutex::new(Modules::new()), + worker_channels: Mutex::new(internal_channels), + global_timer: Mutex::new(GlobalTimer::new()), + workers: Mutex::new(UserWorkerTable::new()), + start_time: Instant::now(), + resource, + })) + } + + /// Read main module from argv + pub fn main_module(&self) -> Option { + if self.argv.len() <= 1 { + None + } else { + let specifier = self.argv[1].clone(); + let referrer = "."; + match self.dir.resolve_module_url(&specifier, referrer) { + Ok(url) => Some(url.to_string()), + Err(e) => { + debug!("Potentially swallowed error {}", e); + None + } + } + } + } + + #[inline] + pub fn check_read(&self, filename: &str) -> DenoResult<()> { + self.permissions.check_read(filename) + } + + #[inline] + pub fn check_write(&self, filename: &str) -> DenoResult<()> { + self.permissions.check_write(filename) + } + + #[inline] + pub fn check_env(&self) -> DenoResult<()> { + self.permissions.check_env() + } + + #[inline] + pub fn check_net(&self, filename: &str) -> DenoResult<()> { + self.permissions.check_net(filename) + } + + #[inline] + pub fn check_run(&self) -> DenoResult<()> { + self.permissions.check_run() + } + + #[cfg(test)] + pub fn mock() -> ThreadSafeState { + let argv = vec![String::from("./deno"), String::from("hello.js")]; + // For debugging: argv.push_back(String::from("-D")); + let (flags, rest_argv) = flags::set_flags(argv).unwrap(); + ThreadSafeState::new(flags, rest_argv) + } + + pub fn metrics_op_dispatched( + &self, + bytes_sent_control: usize, + bytes_sent_data: usize, + ) { + self.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst); + self + .metrics + .bytes_sent_control + .fetch_add(bytes_sent_control, Ordering::SeqCst); + self + .metrics + .bytes_sent_data + .fetch_add(bytes_sent_data, Ordering::SeqCst); + } + + pub fn metrics_op_completed(&self, bytes_received: usize) { + self.metrics.ops_completed.fetch_add(1, Ordering::SeqCst); + self + .metrics + .bytes_received + .fetch_add(bytes_received, Ordering::SeqCst); + } +} + +#[test] +fn thread_safe() { + fn f(_: S) {} + f(ThreadSafeState::mock()); +} diff --git a/cli/worker.rs b/cli/worker.rs index 6609c5005..325ec46eb 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -1,13 +1,12 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use crate::cli_behavior::CliBehavior; use crate::compiler::compile_async; use crate::compiler::ModuleMetaData; use crate::errors::DenoError; use crate::errors::RustOrJsError; -use crate::isolate_state::IsolateState; use crate::js_errors; use crate::js_errors::JSErrorColor; use crate::msg; +use crate::state::ThreadSafeState; use crate::tokio_util; use deno; use deno::deno_mod; @@ -17,24 +16,23 @@ use futures::future::Either; use futures::Async; use futures::Future; use std::sync::atomic::Ordering; -use std::sync::Arc; /// Wraps deno::Isolate to provide source maps, ops for the CLI, and /// high-level module loading pub struct Worker { - inner: deno::Isolate, - state: Arc, + inner: deno::Isolate, + state: ThreadSafeState, } impl Worker { pub fn new( _name: String, startup_data: StartupData, - behavior: CliBehavior, + state: ThreadSafeState, ) -> Worker { - let state = behavior.state.clone(); + let state_ = state.clone(); Self { - inner: deno::Isolate::new(startup_data, behavior), + inner: deno::Isolate::new(startup_data, state_), state, } } @@ -107,7 +105,7 @@ impl Worker { js_filename: &str, is_prefetch: bool, ) -> Result<(), RustOrJsError> { - // TODO move isolate_state::execute_mod impl here. + // TODO move state::execute_mod impl here. self .execute_mod_inner(js_filename, is_prefetch) .map_err(|err| match err { @@ -204,7 +202,7 @@ impl Future for Worker { } fn fetch_module_meta_data_and_maybe_compile_async( - state: &Arc, + state: &ThreadSafeState, specifier: &str, referrer: &str, ) -> impl Future { @@ -239,7 +237,7 @@ fn fetch_module_meta_data_and_maybe_compile_async( } fn fetch_module_meta_data_and_maybe_compile( - state: &Arc, + state: &ThreadSafeState, specifier: &str, referrer: &str, ) -> Result { @@ -251,11 +249,10 @@ fn fetch_module_meta_data_and_maybe_compile( #[cfg(test)] mod tests { use super::*; - use crate::cli_behavior::CliBehavior; use crate::flags; - use crate::isolate_state::IsolateState; use crate::resources; use crate::startup_data; + use crate::state::ThreadSafeState; use crate::tokio_util; use deno::js_check; use futures::future::lazy; @@ -272,11 +269,11 @@ mod tests { let argv = vec![String::from("./deno"), filename.clone()]; let (flags, rest_argv) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv)); + let state = ThreadSafeState::new(flags, rest_argv); let state_ = state.clone(); tokio_util::run(lazy(move || { - let cli = CliBehavior::new(state.clone()); - let mut worker = Worker::new("TEST".to_string(), StartupData::None, cli); + let mut worker = + Worker::new("TEST".to_string(), StartupData::None, state); if let Err(err) = worker.execute_mod(&filename, false) { eprintln!("execute_mod err {:?}", err); } @@ -295,11 +292,11 @@ mod tests { let argv = vec![String::from("./deno"), filename.clone()]; let (flags, rest_argv) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv)); + let state = ThreadSafeState::new(flags, rest_argv); let state_ = state.clone(); tokio_util::run(lazy(move || { - let cli = CliBehavior::new(state.clone()); - let mut worker = Worker::new("TEST".to_string(), StartupData::None, cli); + let mut worker = + Worker::new("TEST".to_string(), StartupData::None, state); if let Err(err) = worker.execute_mod(&filename, false) { eprintln!("execute_mod err {:?}", err); } @@ -311,10 +308,9 @@ mod tests { } fn create_test_worker() -> Worker { - let state = Arc::new(IsolateState::mock()); - let cli = CliBehavior::new(state.clone()); + let state = ThreadSafeState::mock(); let mut worker = - Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), cli); + Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), state); js_check(worker.execute("denoMain()")); js_check(worker.execute("workerMain()")); worker -- cgit v1.2.3