summaryrefslogtreecommitdiff
path: root/cli/state.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/state.rs')
-rw-r--r--cli/state.rs70
1 files changed, 38 insertions, 32 deletions
diff --git a/cli/state.rs b/cli/state.rs
index 8d14042aa..edfac72c0 100644
--- a/cli/state.rs
+++ b/cli/state.rs
@@ -6,8 +6,6 @@ use crate::import_map::ImportMap;
use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::permissions::DenoPermissions;
-use crate::resources;
-use crate::resources::ResourceId;
use crate::worker::Worker;
use crate::worker::WorkerChannels;
use deno::Buf;
@@ -17,7 +15,6 @@ use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::PinnedBuf;
-use futures::future::Shared;
use futures::Future;
use rand::rngs::StdRng;
use rand::SeedableRng;
@@ -26,15 +23,12 @@ use std;
use std::collections::HashMap;
use std::ops::Deref;
use std::str;
+use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
-use tokio::sync::mpsc as async_mpsc;
-
-// TODO: hold references to concrete Workers instead of shared futures of
-// those workers?
-pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>;
+use tokio::sync::mpsc;
/// Isolate cannot be passed between threads but ThreadSafeState can.
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be
@@ -53,10 +47,9 @@ pub struct State {
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: Mutex<GlobalTimer>,
- pub workers: Mutex<UserWorkerTable>,
+ pub workers: Mutex<HashMap<u32, Worker>>,
+ pub next_worker_id: AtomicUsize,
pub start_time: Instant,
- /// A reference to this worker's resource.
- pub rid: ResourceId,
pub seeded_rng: Option<Mutex<StdRng>>,
pub include_deno_namespace: bool,
}
@@ -179,25 +172,26 @@ impl Loader for ThreadSafeState {
}
impl ThreadSafeState {
- pub fn new(
- global_state: ThreadSafeGlobalState,
- main_module: Option<ModuleSpecifier>,
- include_deno_namespace: bool,
- ) -> Result<Self, ErrBox> {
- let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1);
- let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1);
+ pub fn create_channels() -> (WorkerChannels, WorkerChannels) {
+ let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
+ let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
let internal_channels = WorkerChannels {
- sender: worker_out_tx,
- receiver: worker_in_rx,
+ sender: out_tx,
+ receiver: in_rx,
};
let external_channels = WorkerChannels {
- sender: worker_in_tx,
- receiver: worker_out_rx,
+ sender: in_tx,
+ receiver: out_rx,
};
+ (internal_channels, external_channels)
+ }
- let mut table = resources::lock_resource_table();
- let rid = table.add("worker", Box::new(external_channels));
-
+ pub fn new(
+ global_state: ThreadSafeGlobalState,
+ main_module: Option<ModuleSpecifier>,
+ include_deno_namespace: bool,
+ internal_channels: WorkerChannels,
+ ) -> Result<Self, ErrBox> {
let import_map: Option<ImportMap> =
match global_state.flags.import_map_path.as_ref() {
None => None,
@@ -221,9 +215,9 @@ impl ThreadSafeState {
worker_channels: Mutex::new(internal_channels),
metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
- workers: Mutex::new(UserWorkerTable::new()),
+ workers: Mutex::new(HashMap::new()),
+ next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(),
- rid,
seeded_rng,
include_deno_namespace,
};
@@ -231,6 +225,13 @@ impl ThreadSafeState {
Ok(ThreadSafeState(Arc::new(state)))
}
+ pub fn add_child_worker(&self, worker: Worker) -> u32 {
+ let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32;
+ let mut workers_tl = self.workers.lock().unwrap();
+ workers_tl.insert(worker_id, worker);
+ worker_id
+ }
+
#[inline]
pub fn check_read(&self, filename: &str) -> Result<(), ErrBox> {
self.permissions.check_read(filename)
@@ -286,7 +287,10 @@ impl ThreadSafeState {
}
#[cfg(test)]
- pub fn mock(argv: Vec<String>) -> ThreadSafeState {
+ pub fn mock(
+ argv: Vec<String>,
+ internal_channels: WorkerChannels,
+ ) -> ThreadSafeState {
let module_specifier = if argv.is_empty() {
None
} else {
@@ -299,6 +303,7 @@ impl ThreadSafeState {
ThreadSafeGlobalState::mock(argv),
module_specifier,
true,
+ internal_channels,
)
.unwrap()
}
@@ -331,8 +336,9 @@ impl ThreadSafeState {
#[test]
fn thread_safe() {
fn f<S: Send + Sync>(_: S) {}
- f(ThreadSafeState::mock(vec![
- String::from("./deno"),
- String::from("hello.js"),
- ]));
+ let (int, _) = ThreadSafeState::create_channels();
+ f(ThreadSafeState::mock(
+ vec![String::from("./deno"), String::from("hello.js")],
+ int,
+ ));
}