diff options
Diffstat (limited to 'cli/state.rs')
-rw-r--r-- | cli/state.rs | 70 |
1 files changed, 38 insertions, 32 deletions
diff --git a/cli/state.rs b/cli/state.rs index 8d14042aa..edfac72c0 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -6,8 +6,6 @@ use crate::import_map::ImportMap; use crate::metrics::Metrics; use crate::ops::JsonOp; use crate::permissions::DenoPermissions; -use crate::resources; -use crate::resources::ResourceId; use crate::worker::Worker; use crate::worker::WorkerChannels; use deno::Buf; @@ -17,7 +15,6 @@ use deno::Loader; use deno::ModuleSpecifier; use deno::Op; use deno::PinnedBuf; -use futures::future::Shared; use futures::Future; use rand::rngs::StdRng; use rand::SeedableRng; @@ -26,15 +23,12 @@ use std; use std::collections::HashMap; use std::ops::Deref; use std::str; +use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; use std::time::Instant; -use tokio::sync::mpsc as async_mpsc; - -// TODO: hold references to concrete Workers instead of shared futures of -// those workers? -pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>; +use tokio::sync::mpsc; /// Isolate cannot be passed between threads but ThreadSafeState can. /// ThreadSafeState satisfies Send and Sync. So any state that needs to be @@ -53,10 +47,9 @@ pub struct State { pub import_map: Option<ImportMap>, pub metrics: Metrics, pub global_timer: Mutex<GlobalTimer>, - pub workers: Mutex<UserWorkerTable>, + pub workers: Mutex<HashMap<u32, Worker>>, + pub next_worker_id: AtomicUsize, pub start_time: Instant, - /// A reference to this worker's resource. - pub rid: ResourceId, pub seeded_rng: Option<Mutex<StdRng>>, pub include_deno_namespace: bool, } @@ -179,25 +172,26 @@ impl Loader for ThreadSafeState { } impl ThreadSafeState { - pub fn new( - global_state: ThreadSafeGlobalState, - main_module: Option<ModuleSpecifier>, - include_deno_namespace: bool, - ) -> Result<Self, ErrBox> { - let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1); - let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1); + pub fn create_channels() -> (WorkerChannels, WorkerChannels) { + let (in_tx, in_rx) = mpsc::channel::<Buf>(1); + let (out_tx, out_rx) = mpsc::channel::<Buf>(1); let internal_channels = WorkerChannels { - sender: worker_out_tx, - receiver: worker_in_rx, + sender: out_tx, + receiver: in_rx, }; let external_channels = WorkerChannels { - sender: worker_in_tx, - receiver: worker_out_rx, + sender: in_tx, + receiver: out_rx, }; + (internal_channels, external_channels) + } - let mut table = resources::lock_resource_table(); - let rid = table.add("worker", Box::new(external_channels)); - + pub fn new( + global_state: ThreadSafeGlobalState, + main_module: Option<ModuleSpecifier>, + include_deno_namespace: bool, + internal_channels: WorkerChannels, + ) -> Result<Self, ErrBox> { let import_map: Option<ImportMap> = match global_state.flags.import_map_path.as_ref() { None => None, @@ -221,9 +215,9 @@ impl ThreadSafeState { worker_channels: Mutex::new(internal_channels), metrics: Metrics::default(), global_timer: Mutex::new(GlobalTimer::new()), - workers: Mutex::new(UserWorkerTable::new()), + workers: Mutex::new(HashMap::new()), + next_worker_id: AtomicUsize::new(0), start_time: Instant::now(), - rid, seeded_rng, include_deno_namespace, }; @@ -231,6 +225,13 @@ impl ThreadSafeState { Ok(ThreadSafeState(Arc::new(state))) } + pub fn add_child_worker(&self, worker: Worker) -> u32 { + let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32; + let mut workers_tl = self.workers.lock().unwrap(); + workers_tl.insert(worker_id, worker); + worker_id + } + #[inline] pub fn check_read(&self, filename: &str) -> Result<(), ErrBox> { self.permissions.check_read(filename) @@ -286,7 +287,10 @@ impl ThreadSafeState { } #[cfg(test)] - pub fn mock(argv: Vec<String>) -> ThreadSafeState { + pub fn mock( + argv: Vec<String>, + internal_channels: WorkerChannels, + ) -> ThreadSafeState { let module_specifier = if argv.is_empty() { None } else { @@ -299,6 +303,7 @@ impl ThreadSafeState { ThreadSafeGlobalState::mock(argv), module_specifier, true, + internal_channels, ) .unwrap() } @@ -331,8 +336,9 @@ impl ThreadSafeState { #[test] fn thread_safe() { fn f<S: Send + Sync>(_: S) {} - f(ThreadSafeState::mock(vec![ - String::from("./deno"), - String::from("hello.js"), - ])); + let (int, _) = ThreadSafeState::create_channels(); + f(ThreadSafeState::mock( + vec![String::from("./deno"), String::from("hello.js")], + int, + )); } |