diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2020-02-08 20:34:31 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-08 20:34:31 +0100 |
commit | cdba5ab6fc633606aaa6f95d0825832c3ac6fe5c (patch) | |
tree | e8dee2801e14b65b2da6aca62e39cd3d3ac2a786 /cli/state.rs | |
parent | 619a24390ff15d5ea5e577a4d0391823f94e8592 (diff) |
refactor: rename ThreadSafeState, use RefCell for mutable state (#3931)
* rename ThreadSafeState to State
* State stores InnerState wrapped in Rc and RefCell
Diffstat (limited to 'cli/state.rs')
-rw-r--r-- | cli/state.rs | 163 |
1 files changed, 69 insertions, 94 deletions
diff --git a/cli/state.rs b/cli/state.rs index 86479d707..4e2f47e62 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -8,7 +8,6 @@ use crate::metrics::Metrics; use crate::ops::JsonOp; use crate::ops::MinimalOp; use crate::permissions::DenoPermissions; -use crate::web_worker::WebWorker; use crate::worker::WorkerChannelsExternal; use crate::worker::WorkerChannelsInternal; use deno_core::Buf; @@ -19,68 +18,53 @@ use deno_core::ModuleSpecifier; use deno_core::Op; use deno_core::ResourceTable; use deno_core::ZeroCopyBuf; -use futures::channel::mpsc; use futures::future::FutureExt; use futures::future::TryFutureExt; use rand::rngs::StdRng; use rand::SeedableRng; use serde_json::Value; use std; +use std::cell::RefCell; use std::collections::HashMap; use std::ops::Deref; use std::path::Path; use std::pin::Pin; +use std::rc::Rc; use std::str; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::sync::Mutex; -use std::sync::MutexGuard; use std::time::Instant; -/// Isolate cannot be passed between threads but ThreadSafeState can. -/// ThreadSafeState satisfies Send and Sync. So any state that needs to be -/// accessed outside the main V8 thread should be inside ThreadSafeState. -pub struct ThreadSafeState(Arc<State>); +#[derive(Clone)] +pub struct State(Rc<RefCell<StateInner>>); + +impl Deref for State { + type Target = Rc<RefCell<StateInner>>; + fn deref(&self) -> &Self::Target { + &self.0 + } +} #[cfg_attr(feature = "cargo-clippy", allow(stutter))] -pub struct State { +pub struct StateInner { pub global_state: GlobalState, - pub permissions: Arc<Mutex<DenoPermissions>>, + pub permissions: DenoPermissions, pub main_module: ModuleSpecifier, /// When flags contains a `.import_map_path` option, the content of the /// import map file will be resolved and set. pub import_map: Option<ImportMap>, pub metrics: Metrics, - pub global_timer: Mutex<GlobalTimer>, - pub workers: Mutex<HashMap<u32, WorkerChannelsExternal>>, - pub worker_channels_internal: Mutex<Option<WorkerChannelsInternal>>, - pub loading_workers: Mutex<HashMap<u32, mpsc::Receiver<Result<(), ErrBox>>>>, + pub global_timer: GlobalTimer, + pub workers: HashMap<u32, WorkerChannelsExternal>, + pub worker_channels_internal: Option<WorkerChannelsInternal>, pub next_worker_id: AtomicUsize, pub start_time: Instant, - pub seeded_rng: Option<Mutex<StdRng>>, - pub resource_table: Mutex<ResourceTable>, + pub seeded_rng: Option<StdRng>, + pub resource_table: ResourceTable, pub target_lib: TargetLib, } -impl Clone for ThreadSafeState { - fn clone(&self) -> Self { - ThreadSafeState(self.0.clone()) - } -} - -impl Deref for ThreadSafeState { - type Target = Arc<State>; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl ThreadSafeState { - pub fn lock_resource_table(&self) -> MutexGuard<ResourceTable> { - self.resource_table.lock().unwrap() - } - +impl State { /// Wrap core `OpDispatcher` to collect metrics. pub fn core_op<D>( &self, @@ -130,7 +114,7 @@ impl ThreadSafeState { dispatcher: D, ) -> impl Fn(i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>> where - D: Fn(&ThreadSafeState, i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>, + D: Fn(&State, i32, Option<ZeroCopyBuf>) -> Pin<Box<MinimalOp>>, { let state = self.clone(); @@ -149,11 +133,7 @@ impl ThreadSafeState { dispatcher: D, ) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox> where - D: Fn( - &ThreadSafeState, - Value, - Option<ZeroCopyBuf>, - ) -> Result<JsonOp, ErrBox>, + D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, ErrBox>, { let state = self.clone(); @@ -163,7 +143,7 @@ impl ThreadSafeState { } } -impl Loader for ThreadSafeState { +impl Loader for State { fn resolve( &self, specifier: &str, @@ -171,7 +151,7 @@ impl Loader for ThreadSafeState { is_main: bool, ) -> Result<ModuleSpecifier, ErrBox> { if !is_main { - if let Some(import_map) = &self.import_map { + if let Some(import_map) = &self.borrow().import_map { let result = import_map.resolve(specifier, referrer)?; if let Some(r) = result { return Ok(r); @@ -198,11 +178,12 @@ impl Loader for ThreadSafeState { } } + let state = self.borrow(); // TODO(bartlomieju): incrementing resolve_count here has no sense... - self.metrics.resolve_count.fetch_add(1, Ordering::SeqCst); + state.metrics.resolve_count.fetch_add(1, Ordering::SeqCst); let module_url_specified = module_specifier.to_string(); - let global_state = self.global_state.clone(); - let target_lib = self.target_lib.clone(); + let global_state = state.global_state.clone(); + let target_lib = state.target_lib.clone(); let fut = async move { let compiled_module = global_state .fetch_compiled_module(module_specifier, maybe_referrer, target_lib) @@ -220,11 +201,11 @@ impl Loader for ThreadSafeState { } } -impl ThreadSafeState { +impl State { /// If `shared_permission` is None then permissions from globa state are used. pub fn new( global_state: GlobalState, - shared_permissions: Option<Arc<Mutex<DenoPermissions>>>, + shared_permissions: Option<DenoPermissions>, main_module: ModuleSpecifier, ) -> Result<Self, ErrBox> { let import_map: Option<ImportMap> = @@ -234,116 +215,114 @@ impl ThreadSafeState { }; let seeded_rng = match global_state.flags.seed { - Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))), + Some(seed) => Some(StdRng::seed_from_u64(seed)), None => None, }; let permissions = if let Some(perm) = shared_permissions { perm } else { - Arc::new(Mutex::new(global_state.permissions.clone())) + global_state.permissions.clone() }; - let state = State { + let state = Rc::new(RefCell::new(StateInner { global_state, main_module, permissions, import_map, metrics: Metrics::default(), - global_timer: Mutex::new(GlobalTimer::new()), - worker_channels_internal: Mutex::new(None), - workers: Mutex::new(HashMap::new()), - loading_workers: Mutex::new(HashMap::new()), + global_timer: GlobalTimer::new(), + worker_channels_internal: None, + workers: HashMap::new(), next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), seeded_rng, - resource_table: Mutex::new(ResourceTable::default()), + resource_table: ResourceTable::default(), target_lib: TargetLib::Main, - }; + })); - Ok(ThreadSafeState(Arc::new(state))) + Ok(Self(state)) } /// If `shared_permission` is None then permissions from globa state are used. pub fn new_for_worker( global_state: GlobalState, - shared_permissions: Option<Arc<Mutex<DenoPermissions>>>, + shared_permissions: Option<DenoPermissions>, main_module: ModuleSpecifier, ) -> Result<Self, ErrBox> { let seeded_rng = match global_state.flags.seed { - Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))), + Some(seed) => Some(StdRng::seed_from_u64(seed)), None => None, }; let permissions = if let Some(perm) = shared_permissions { perm } else { - Arc::new(Mutex::new(global_state.permissions.clone())) + global_state.permissions.clone() }; - let state = State { + let state = Rc::new(RefCell::new(StateInner { global_state, main_module, permissions, import_map: None, metrics: Metrics::default(), - global_timer: Mutex::new(GlobalTimer::new()), - worker_channels_internal: Mutex::new(None), - workers: Mutex::new(HashMap::new()), - loading_workers: Mutex::new(HashMap::new()), + global_timer: GlobalTimer::new(), + worker_channels_internal: None, + workers: HashMap::new(), next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), seeded_rng, - resource_table: Mutex::new(ResourceTable::default()), + resource_table: ResourceTable::default(), target_lib: TargetLib::Worker, - }; + })); - Ok(ThreadSafeState(Arc::new(state))) + Ok(Self(state)) } - pub fn add_child_worker(&self, worker: &WebWorker) -> u32 { - let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; - let handle = worker.thread_safe_handle(); - let mut workers_tl = self.workers.lock().unwrap(); - workers_tl.insert(worker_id, handle); + pub fn add_child_worker(&self, handle: WorkerChannelsExternal) -> u32 { + let mut inner_state = self.borrow_mut(); + let worker_id = + inner_state.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; + inner_state.workers.insert(worker_id, handle); worker_id } #[inline] pub fn check_read(&self, path: &Path) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_read(path) + self.borrow().permissions.check_read(path) } #[inline] pub fn check_write(&self, path: &Path) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_write(path) + self.borrow().permissions.check_write(path) } #[inline] pub fn check_env(&self) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_env() + self.borrow().permissions.check_env() } #[inline] pub fn check_net(&self, hostname: &str, port: u16) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_net(hostname, port) + self.borrow().permissions.check_net(hostname, port) } #[inline] pub fn check_net_url(&self, url: &url::Url) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_net_url(url) + self.borrow().permissions.check_net_url(url) } #[inline] pub fn check_run(&self) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_run() + self.borrow().permissions.check_run() } #[inline] pub fn check_plugin(&self, filename: &Path) -> Result<(), ErrBox> { - self.permissions.lock().unwrap().check_plugin(filename) + self.borrow().permissions.check_plugin(filename) } pub fn check_dyn_import( @@ -371,10 +350,10 @@ impl ThreadSafeState { } #[cfg(test)] - pub fn mock(main_module: &str) -> ThreadSafeState { + pub fn mock(main_module: &str) -> State { let module_specifier = ModuleSpecifier::resolve_url_or_path(main_module) .expect("Invalid entry module"); - ThreadSafeState::new( + State::new( GlobalState::mock(vec!["deno".to_string()]), None, module_specifier, @@ -387,28 +366,24 @@ impl ThreadSafeState { bytes_sent_control: usize, bytes_sent_data: usize, ) { - self.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst); - self + let state = self.borrow(); + state.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst); + state .metrics .bytes_sent_control .fetch_add(bytes_sent_control, Ordering::SeqCst); - self + state .metrics .bytes_sent_data .fetch_add(bytes_sent_data, Ordering::SeqCst); } pub fn metrics_op_completed(&self, bytes_received: usize) { - self.metrics.ops_completed.fetch_add(1, Ordering::SeqCst); - self + let state = self.borrow(); + state.metrics.ops_completed.fetch_add(1, Ordering::SeqCst); + state .metrics .bytes_received .fetch_add(bytes_received, Ordering::SeqCst); } } - -#[test] -fn thread_safe() { - fn f<S: Send + Sync>(_: S) {} - f(ThreadSafeState::mock("./hello.js")); -} |