diff options
Diffstat (limited to 'cli/state.rs')
-rw-r--r-- | cli/state.rs | 192 |
1 files changed, 192 insertions, 0 deletions
diff --git a/cli/state.rs b/cli/state.rs new file mode 100644 index 000000000..09a57d985 --- /dev/null +++ b/cli/state.rs @@ -0,0 +1,192 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::deno_dir; +use crate::errors::DenoResult; +use crate::flags; +use crate::global_timer::GlobalTimer; +use crate::modules::Modules; +use crate::ops; +use crate::permissions::DenoPermissions; +use crate::resources; +use crate::resources::ResourceId; +use crate::worker::Worker; +use deno::deno_buf; +use deno::Buf; +use deno::Dispatch; +use deno::Op; +use futures::future::Shared; +use std; +use std::collections::HashMap; +use std::env; +use std::ops::Deref; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Instant; +use tokio::sync::mpsc as async_mpsc; + +pub type WorkerSender = async_mpsc::Sender<Buf>; +pub type WorkerReceiver = async_mpsc::Receiver<Buf>; +pub type WorkerChannels = (WorkerSender, WorkerReceiver); +pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>; + +// AtomicU64 is currently unstable +#[derive(Default)] +pub struct Metrics { + pub ops_dispatched: AtomicUsize, + pub ops_completed: AtomicUsize, + pub bytes_sent_control: AtomicUsize, + pub bytes_sent_data: AtomicUsize, + pub bytes_received: AtomicUsize, + pub resolve_count: AtomicUsize, +} + +// Wrap State so that it can implement Dispatch. +pub struct ThreadSafeState(Arc<State>); + +// Isolate cannot be passed between threads but ThreadSafeState can. +// ThreadSafeState satisfies Send and Sync. +// So any state that needs to be accessed outside the main V8 thread should be +// inside ThreadSafeState. +#[cfg_attr(feature = "cargo-clippy", allow(stutter))] +pub struct State { + pub dir: deno_dir::DenoDir, + pub argv: Vec<String>, + pub permissions: DenoPermissions, + pub flags: flags::DenoFlags, + pub metrics: Metrics, + pub modules: Mutex<Modules>, + pub worker_channels: Mutex<WorkerChannels>, + pub global_timer: Mutex<GlobalTimer>, + pub workers: Mutex<UserWorkerTable>, + pub start_time: Instant, + pub resource: resources::Resource, +} + +impl Clone for ThreadSafeState { + fn clone(&self) -> Self { + ThreadSafeState(self.0.clone()) + } +} + +impl Deref for ThreadSafeState { + type Target = Arc<State>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Dispatch for ThreadSafeState { + fn dispatch( + &mut self, + control: &[u8], + zero_copy: deno_buf, + ) -> (bool, Box<Op>) { + ops::dispatch_all(self, control, zero_copy, ops::op_selector_std) + } +} + +impl ThreadSafeState { + pub fn new(flags: flags::DenoFlags, argv_rest: Vec<String>) -> Self { + let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok(); + + let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1); + let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1); + let internal_channels = (worker_out_tx, worker_in_rx); + let external_channels = (worker_in_tx, worker_out_rx); + let resource = resources::add_worker(external_channels); + + ThreadSafeState(Arc::new(State { + dir: deno_dir::DenoDir::new(custom_root).unwrap(), + argv: argv_rest, + permissions: DenoPermissions::from_flags(&flags), + flags, + metrics: Metrics::default(), + modules: Mutex::new(Modules::new()), + worker_channels: Mutex::new(internal_channels), + global_timer: Mutex::new(GlobalTimer::new()), + workers: Mutex::new(UserWorkerTable::new()), + start_time: Instant::now(), + resource, + })) + } + + /// Read main module from argv + pub fn main_module(&self) -> Option<String> { + if self.argv.len() <= 1 { + None + } else { + let specifier = self.argv[1].clone(); + let referrer = "."; + match self.dir.resolve_module_url(&specifier, referrer) { + Ok(url) => Some(url.to_string()), + Err(e) => { + debug!("Potentially swallowed error {}", e); + None + } + } + } + } + + #[inline] + pub fn check_read(&self, filename: &str) -> DenoResult<()> { + self.permissions.check_read(filename) + } + + #[inline] + pub fn check_write(&self, filename: &str) -> DenoResult<()> { + self.permissions.check_write(filename) + } + + #[inline] + pub fn check_env(&self) -> DenoResult<()> { + self.permissions.check_env() + } + + #[inline] + pub fn check_net(&self, filename: &str) -> DenoResult<()> { + self.permissions.check_net(filename) + } + + #[inline] + pub fn check_run(&self) -> DenoResult<()> { + self.permissions.check_run() + } + + #[cfg(test)] + pub fn mock() -> ThreadSafeState { + let argv = vec![String::from("./deno"), String::from("hello.js")]; + // For debugging: argv.push_back(String::from("-D")); + let (flags, rest_argv) = flags::set_flags(argv).unwrap(); + ThreadSafeState::new(flags, rest_argv) + } + + pub fn metrics_op_dispatched( + &self, + bytes_sent_control: usize, + bytes_sent_data: usize, + ) { + self.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst); + self + .metrics + .bytes_sent_control + .fetch_add(bytes_sent_control, Ordering::SeqCst); + self + .metrics + .bytes_sent_data + .fetch_add(bytes_sent_data, Ordering::SeqCst); + } + + pub fn metrics_op_completed(&self, bytes_received: usize) { + self.metrics.ops_completed.fetch_add(1, Ordering::SeqCst); + self + .metrics + .bytes_received + .fetch_add(bytes_received, Ordering::SeqCst); + } +} + +#[test] +fn thread_safe() { + fn f<S: Send + Sync>(_: S) {} + f(ThreadSafeState::mock()); +} |