summaryrefslogtreecommitdiff
path: root/cli/state.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-11-04 16:38:52 +0100
committerRy Dahl <ry@tinyclouds.org>2019-11-04 10:38:52 -0500
commit0049d4e50c9dd945f25f69b08b08fbf492001f96 (patch)
tree3bcf3c50bdba9abba6b1b8ff9340f76fc19f759c /cli/state.rs
parent429439d1983269ba40141a1b42f6ac809d1e8c86 (diff)
Refactor Worker and ThreadSafeState (#3242)
* Split ThreadSafeState into State and GlobalState. State is a "local" state belonging to "Worker" while "GlobalState" is state shared by whole program. * Update "Worker" and ops to use "GlobalState" where applicable * Move and refactor "WorkerChannels" resource
Diffstat (limited to 'cli/state.rs')
-rw-r--r--cli/state.rs228
1 files changed, 53 insertions, 175 deletions
diff --git a/cli/state.rs b/cli/state.rs
index 1f44f254f..544c199b8 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -1,22 +1,15 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-use crate::compilers::CompiledModule;
-use crate::compilers::JsCompiler;
-use crate::compilers::JsonCompiler;
-use crate::compilers::TsCompiler;
-use crate::deno_dir;
use crate::deno_error::permission_denied;
-use crate::file_fetcher::SourceFileFetcher;
-use crate::flags;
+use crate::global_state::ThreadSafeGlobalState;
use crate::global_timer::GlobalTimer;
use crate::import_map::ImportMap;
-use crate::lockfile::Lockfile;
-use crate::msg;
+use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::permissions::DenoPermissions;
-use crate::progress::Progress;
use crate::resources;
use crate::resources::ResourceId;
use crate::worker::Worker;
+use crate::worker::WorkerChannels;
use deno::Buf;
use deno::CoreOp;
use deno::ErrBox;
@@ -31,31 +24,18 @@ use rand::SeedableRng;
use serde_json::Value;
use std;
use std::collections::HashMap;
-use std::env;
use std::ops::Deref;
use std::str;
-use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
use tokio::sync::mpsc as async_mpsc;
-pub type WorkerSender = async_mpsc::Sender<Buf>;
-pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
-pub type WorkerChannels = (WorkerSender, WorkerReceiver);
+// TODO: hold references to concrete Workers instead of shared futures of
+// those workers?
pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>;
-#[derive(Default)]
-pub struct Metrics {
- pub ops_dispatched: AtomicUsize,
- pub ops_completed: AtomicUsize,
- pub bytes_sent_control: AtomicUsize,
- pub bytes_sent_data: AtomicUsize,
- pub bytes_received: AtomicUsize,
- pub resolve_count: AtomicUsize,
- pub compiler_starts: AtomicUsize,
-}
-
/// Isolate cannot be passed between threads but ThreadSafeState can.
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be
/// accessed outside the main V8 thread should be inside ThreadSafeState.
@@ -63,34 +43,22 @@ pub struct ThreadSafeState(Arc<State>);
#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
pub struct State {
+ pub global_state: ThreadSafeGlobalState,
pub modules: Arc<Mutex<deno::Modules>>,
- pub main_module: Option<ModuleSpecifier>,
- pub dir: deno_dir::DenoDir,
- pub argv: Vec<String>,
pub permissions: DenoPermissions,
- pub flags: flags::DenoFlags,
+ pub main_module: Option<ModuleSpecifier>,
+ pub worker_channels: Mutex<WorkerChannels>,
/// When flags contains a `.import_map_path` option, the content of the
/// import map file will be resolved and set.
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
- pub worker_channels: Mutex<WorkerChannels>,
pub global_timer: Mutex<GlobalTimer>,
pub workers: Mutex<UserWorkerTable>,
pub start_time: Instant,
/// A reference to this worker's resource.
- pub resource: resources::Resource,
- /// Reference to global progress bar.
- pub progress: Progress,
+ pub rid: ResourceId,
pub seeded_rng: Option<Mutex<StdRng>>,
-
- pub file_fetcher: SourceFileFetcher,
- pub js_compiler: JsCompiler,
- pub json_compiler: JsonCompiler,
- pub ts_compiler: TsCompiler,
-
pub include_deno_namespace: bool,
-
- pub lockfile: Option<Mutex<Lockfile>>,
}
impl Clone for ThreadSafeState {
@@ -195,158 +163,74 @@ impl Loader for ThreadSafeState {
) -> Box<deno::SourceCodeInfoFuture> {
self.metrics.resolve_count.fetch_add(1, Ordering::SeqCst);
let module_url_specified = module_specifier.to_string();
- Box::new(self.fetch_compiled_module(module_specifier).map(
- |compiled_module| deno::SourceCodeInfo {
+ let fut = self
+ .global_state
+ .fetch_compiled_module(module_specifier)
+ .map(|compiled_module| deno::SourceCodeInfo {
// Real module name, might be different from initial specifier
// due to redirections.
code: compiled_module.code,
module_url_specified,
module_url_found: compiled_module.name,
- },
- ))
+ });
+
+ Box::new(fut)
}
}
impl ThreadSafeState {
pub fn new(
- flags: flags::DenoFlags,
- argv_rest: Vec<String>,
- progress: Progress,
+ global_state: ThreadSafeGlobalState,
+ main_module: Option<ModuleSpecifier>,
include_deno_namespace: bool,
) -> Result<Self, ErrBox> {
- let custom_root = env::var("DENO_DIR").map(String::into).ok();
-
let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1);
let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1);
- let internal_channels = (worker_out_tx, worker_in_rx);
- let external_channels = (worker_in_tx, worker_out_rx);
- let resource = resources::add_worker(external_channels);
-
- let dir = deno_dir::DenoDir::new(custom_root)?;
-
- let file_fetcher = SourceFileFetcher::new(
- dir.deps_cache.clone(),
- progress.clone(),
- !flags.reload,
- flags.cache_blacklist.clone(),
- flags.no_fetch,
- )?;
-
- let ts_compiler = TsCompiler::new(
- file_fetcher.clone(),
- dir.gen_cache.clone(),
- !flags.reload,
- flags.config_path.clone(),
- )?;
-
- let main_module: Option<ModuleSpecifier> = if argv_rest.len() <= 1 {
- None
- } else {
- let root_specifier = argv_rest[1].clone();
- Some(ModuleSpecifier::resolve_url_or_path(&root_specifier)?)
+ let internal_channels = WorkerChannels {
+ sender: worker_out_tx,
+ receiver: worker_in_rx,
};
-
- let import_map: Option<ImportMap> = match &flags.import_map_path {
- None => None,
- Some(file_path) => Some(ImportMap::load(file_path)?),
+ let external_channels = WorkerChannels {
+ sender: worker_in_tx,
+ receiver: worker_out_rx,
};
- let mut seeded_rng = None;
- if let Some(seed) = flags.seed {
- seeded_rng = Some(Mutex::new(StdRng::seed_from_u64(seed)));
- };
+ let mut table = resources::lock_resource_table();
+ let rid = table.add(Box::new(external_channels));
- let modules = Arc::new(Mutex::new(deno::Modules::new()));
+ let import_map: Option<ImportMap> =
+ match global_state.flags.import_map_path.as_ref() {
+ None => None,
+ Some(file_path) => Some(ImportMap::load(file_path)?),
+ };
- // Note: reads lazily from disk on first call to lockfile.check()
- let lockfile = if let Some(filename) = &flags.lock {
- Some(Mutex::new(Lockfile::new(filename.to_string())))
- } else {
- None
+ let seeded_rng = match global_state.flags.seed {
+ Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))),
+ None => None,
};
+ let modules = Arc::new(Mutex::new(deno::Modules::new()));
+ let permissions = global_state.permissions.clone();
+
let state = State {
- main_module,
+ global_state,
modules,
- dir,
- argv: argv_rest,
- permissions: DenoPermissions::from_flags(&flags),
- flags,
+ main_module,
+ permissions,
import_map,
- metrics: Metrics::default(),
worker_channels: Mutex::new(internal_channels),
+ metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
workers: Mutex::new(UserWorkerTable::new()),
start_time: Instant::now(),
- resource,
- progress,
+ rid,
seeded_rng,
- file_fetcher,
- ts_compiler,
- js_compiler: JsCompiler {},
- json_compiler: JsonCompiler {},
include_deno_namespace,
- lockfile,
};
Ok(ThreadSafeState(Arc::new(state)))
}
- pub fn fetch_compiled_module(
- self: &Self,
- module_specifier: &ModuleSpecifier,
- ) -> impl Future<Item = CompiledModule, Error = ErrBox> {
- let state1 = self.clone();
- let state2 = self.clone();
-
- self
- .file_fetcher
- .fetch_source_file_async(&module_specifier)
- .and_then(move |out| match out.media_type {
- msg::MediaType::Unknown => {
- state1.js_compiler.compile_async(state1.clone(), &out)
- }
- msg::MediaType::Json => {
- state1.json_compiler.compile_async(state1.clone(), &out)
- }
- msg::MediaType::TypeScript
- | msg::MediaType::TSX
- | msg::MediaType::JSX => {
- state1.ts_compiler.compile_async(state1.clone(), &out)
- }
- msg::MediaType::JavaScript => {
- if state1.ts_compiler.compile_js {
- state1.ts_compiler.compile_async(state1.clone(), &out)
- } else {
- state1.js_compiler.compile_async(state1.clone(), &out)
- }
- }
- })
- .and_then(move |compiled_module| {
- if let Some(ref lockfile) = state2.lockfile {
- let mut g = lockfile.lock().unwrap();
- if state2.flags.lock_write {
- g.insert(&compiled_module);
- } else if !g.check(&compiled_module)? {
- eprintln!(
- "Subresource integrety check failed --lock={}\n{}",
- g.filename, compiled_module.name
- );
- std::process::exit(10);
- }
- }
- Ok(compiled_module)
- })
- }
-
- /// Read main module from argv
- pub fn main_module(&self) -> Option<ModuleSpecifier> {
- match &self.main_module {
- Some(module_specifier) => Some(module_specifier.clone()),
- None => None,
- }
- }
-
#[inline]
pub fn check_read(&self, filename: &str) -> Result<(), ErrBox> {
self.permissions.check_read(filename)
@@ -403,10 +287,17 @@ impl ThreadSafeState {
#[cfg(test)]
pub fn mock(argv: Vec<String>) -> ThreadSafeState {
+ let module_specifier = if argv.is_empty() {
+ None
+ } else {
+ let module_specifier = ModuleSpecifier::resolve_url_or_path(&argv[0])
+ .expect("Invalid entry module");
+ Some(module_specifier)
+ };
+
ThreadSafeState::new(
- flags::DenoFlags::default(),
- argv,
- Progress::new(),
+ ThreadSafeGlobalState::mock(argv),
+ module_specifier,
true,
)
.unwrap()
@@ -445,16 +336,3 @@ fn thread_safe() {
String::from("hello.js"),
]));
}
-
-#[test]
-fn import_map_given_for_repl() {
- let _result = ThreadSafeState::new(
- flags::DenoFlags {
- import_map_path: Some("import_map.json".to_string()),
- ..flags::DenoFlags::default()
- },
- vec![String::from("./deno")],
- Progress::new(),
- true,
- );
-}