diff options
-rw-r--r-- | cli/compilers/compiler_worker.rs | 4 | ||||
-rw-r--r-- | cli/compilers/ts.rs | 4 | ||||
-rw-r--r-- | cli/compilers/wasm.rs | 4 | ||||
-rw-r--r-- | cli/lib.rs | 10 | ||||
-rw-r--r-- | cli/ops/web_worker.rs | 11 | ||||
-rw-r--r-- | cli/ops/worker_host.rs | 3 | ||||
-rw-r--r-- | cli/state.rs | 38 | ||||
-rw-r--r-- | cli/web_worker.rs | 8 | ||||
-rw-r--r-- | cli/worker.rs | 76 |
9 files changed, 79 insertions, 79 deletions
diff --git a/cli/compilers/compiler_worker.rs b/cli/compilers/compiler_worker.rs index 87144c1d0..f76395e75 100644 --- a/cli/compilers/compiler_worker.rs +++ b/cli/compilers/compiler_worker.rs @@ -2,7 +2,6 @@ use crate::ops; use crate::state::ThreadSafeState; use crate::worker::Worker; -use crate::worker::WorkerChannels; use deno_core; use deno_core::StartupData; use std::ops::Deref; @@ -28,10 +27,9 @@ impl CompilerWorker { name: String, startup_data: StartupData, state: ThreadSafeState, - external_channels: WorkerChannels, ) -> Self { let state_ = state.clone(); - let mut worker = Worker::new(name, startup_data, state_, external_channels); + let mut worker = Worker::new(name, startup_data, state_); { let isolate = &mut worker.isolate; ops::runtime::init(isolate, &state); diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index 8517c319b..c7896ec61 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -240,11 +240,10 @@ impl TsCompiler { /// Create a new V8 worker with snapshot of TS compiler and setup compiler's /// runtime. fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker { - let (int, ext) = ThreadSafeState::create_channels(); let entry_point = ModuleSpecifier::resolve_url_or_path("./__$deno$ts_compiler.ts").unwrap(); let worker_state = - ThreadSafeState::new(global_state.clone(), None, entry_point, int) + ThreadSafeState::new(global_state.clone(), None, entry_point) .expect("Unable to create worker state"); // Count how many times we start the compiler worker. @@ -257,7 +256,6 @@ impl TsCompiler { "TS".to_string(), startup_data::compiler_isolate_init(), worker_state, - ext, ); worker.execute("bootstrapTsCompilerRuntime()").unwrap(); worker diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index 0cba8118e..a3e578f01 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -45,12 +45,11 @@ pub struct WasmCompiler { impl WasmCompiler { /// Create a new V8 worker with snapshot of WASM compiler and setup compiler's runtime. fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker { - let (int, ext) = ThreadSafeState::create_channels(); let entry_point = ModuleSpecifier::resolve_url_or_path("./__$deno$wasm_compiler.ts") .unwrap(); let worker_state = - ThreadSafeState::new(global_state.clone(), None, entry_point, int) + ThreadSafeState::new(global_state.clone(), None, entry_point) .expect("Unable to create worker state"); // Count how many times we start the compiler worker. @@ -63,7 +62,6 @@ impl WasmCompiler { "WASM".to_string(), startup_data::compiler_isolate_init(), worker_state, - ext, ); worker.execute("bootstrapWasmCompilerRuntime()").unwrap(); worker diff --git a/cli/lib.rs b/cli/lib.rs index a7390b515..fee9e079f 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -122,8 +122,7 @@ fn create_main_worker( global_state: ThreadSafeGlobalState, main_module: ModuleSpecifier, ) -> MainWorker { - let (int, ext) = ThreadSafeState::create_channels(); - let state = ThreadSafeState::new(global_state, None, main_module, int) + let state = ThreadSafeState::new(global_state, None, main_module) .map_err(deno_error::print_err_and_exit) .unwrap(); @@ -136,12 +135,7 @@ fn create_main_worker( resource_table.add("stderr", Box::new(stderr)); } - MainWorker::new( - "main".to_string(), - startup_data::deno_isolate_init(), - state, - ext, - ) + MainWorker::new("main".to_string(), startup_data::deno_isolate_init(), state) } fn types_command() { diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs index db7086c59..7bfc70a84 100644 --- a/cli/ops/web_worker.rs +++ b/cli/ops/web_worker.rs @@ -7,8 +7,6 @@ use crate::state::ThreadSafeState; use deno_core::*; use futures; use futures::future::FutureExt; -use futures::sink::SinkExt; -use futures::stream::StreamExt; use std; use std::convert::From; @@ -31,8 +29,8 @@ fn op_worker_get_message( ) -> Result<JsonOp, ErrBox> { let state_ = state.clone(); let op = async move { - let mut receiver = state_.worker_channels.receiver.lock().await; - let maybe_buf = receiver.next().await; + let c = state_.worker_channels_internal.lock().unwrap(); + let maybe_buf = c.as_ref().unwrap().get_message().await; debug!("op_worker_get_message"); Ok(json!({ "data": maybe_buf })) }; @@ -47,8 +45,9 @@ fn op_worker_post_message( data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - let mut sender = state.worker_channels.sender.clone(); - futures::executor::block_on(sender.send(d)) + let c = state.worker_channels_internal.lock().unwrap(); + let fut = c.as_ref().unwrap().post_message(d); + futures::executor::block_on(fut) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; Ok(JsonOp::Sync(json!({}))) diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index b033b9ed9..c1dcd6aaa 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -99,12 +99,10 @@ fn op_create_worker( result.unwrap() }; - let (int, ext) = ThreadSafeState::create_channels(); let result = ThreadSafeState::new_for_worker( parent_state.global_state.clone(), Some(parent_state.permissions.clone()), // by default share with parent module_specifier.clone(), - int, ); if let Err(err) = result { load_sender.send(Err(err)).unwrap(); @@ -122,7 +120,6 @@ fn op_create_worker( worker_name.to_string(), startup_data::deno_isolate_init(), child_state, - ext, ); let script = format!("bootstrapWorkerRuntime(\"{}\")", worker_name); js_check(worker.execute(&script)); diff --git a/cli/state.rs b/cli/state.rs index 903c5871b..f09191f11 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -9,7 +9,8 @@ use crate::ops::JsonOp; use crate::ops::MinimalOp; use crate::permissions::DenoPermissions; use crate::web_worker::WebWorker; -use crate::worker::WorkerChannels; +use crate::worker::WorkerChannelsExternal; +use crate::worker::WorkerChannelsInternal; use deno_core::Buf; use deno_core::CoreOp; use deno_core::ErrBox; @@ -36,7 +37,6 @@ use std::sync::Arc; use std::sync::Mutex; use std::sync::MutexGuard; use std::time::Instant; -use tokio::sync::Mutex as AsyncMutex; /// Isolate cannot be passed between threads but ThreadSafeState can. /// ThreadSafeState satisfies Send and Sync. So any state that needs to be @@ -48,14 +48,13 @@ pub struct State { pub global_state: ThreadSafeGlobalState, pub permissions: Arc<Mutex<DenoPermissions>>, pub main_module: ModuleSpecifier, - // TODO(ry) rename to worker_channels_internal - pub worker_channels: WorkerChannels, /// When flags contains a `.import_map_path` option, the content of the /// import map file will be resolved and set. pub import_map: Option<ImportMap>, pub metrics: Metrics, pub global_timer: Mutex<GlobalTimer>, - pub workers: Mutex<HashMap<u32, WorkerChannels>>, + pub workers: Mutex<HashMap<u32, WorkerChannelsExternal>>, + pub worker_channels_internal: Mutex<Option<WorkerChannelsInternal>>, pub loading_workers: Mutex<HashMap<u32, mpsc::Receiver<Result<(), ErrBox>>>>, pub next_worker_id: AtomicUsize, pub start_time: Instant, @@ -222,26 +221,11 @@ impl Loader for ThreadSafeState { } impl ThreadSafeState { - pub fn create_channels() -> (WorkerChannels, WorkerChannels) { - let (in_tx, in_rx) = mpsc::channel::<Buf>(1); - let (out_tx, out_rx) = mpsc::channel::<Buf>(1); - let internal_channels = WorkerChannels { - sender: out_tx, - receiver: Arc::new(AsyncMutex::new(in_rx)), - }; - let external_channels = WorkerChannels { - sender: in_tx, - receiver: Arc::new(AsyncMutex::new(out_rx)), - }; - (internal_channels, external_channels) - } - /// If `shared_permission` is None then permissions from globa state are used. pub fn new( global_state: ThreadSafeGlobalState, shared_permissions: Option<Arc<Mutex<DenoPermissions>>>, main_module: ModuleSpecifier, - internal_channels: WorkerChannels, ) -> Result<Self, ErrBox> { let import_map: Option<ImportMap> = match global_state.flags.import_map_path.as_ref() { @@ -265,9 +249,9 @@ impl ThreadSafeState { main_module, permissions, import_map, - worker_channels: internal_channels, metrics: Metrics::default(), global_timer: Mutex::new(GlobalTimer::new()), + worker_channels_internal: Mutex::new(None), workers: Mutex::new(HashMap::new()), loading_workers: Mutex::new(HashMap::new()), next_worker_id: AtomicUsize::new(0), @@ -286,7 +270,6 @@ impl ThreadSafeState { global_state: ThreadSafeGlobalState, shared_permissions: Option<Arc<Mutex<DenoPermissions>>>, main_module: ModuleSpecifier, - internal_channels: WorkerChannels, ) -> Result<Self, ErrBox> { let seeded_rng = match global_state.flags.seed { Some(seed) => Some(Mutex::new(StdRng::seed_from_u64(seed))), @@ -304,9 +287,9 @@ impl ThreadSafeState { main_module, permissions, import_map: None, - worker_channels: internal_channels, metrics: Metrics::default(), global_timer: Mutex::new(GlobalTimer::new()), + worker_channels_internal: Mutex::new(None), workers: Mutex::new(HashMap::new()), loading_workers: Mutex::new(HashMap::new()), next_worker_id: AtomicUsize::new(0), @@ -388,17 +371,13 @@ impl ThreadSafeState { } #[cfg(test)] - pub fn mock( - main_module: &str, - internal_channels: WorkerChannels, - ) -> ThreadSafeState { + pub fn mock(main_module: &str) -> ThreadSafeState { let module_specifier = ModuleSpecifier::resolve_url_or_path(main_module) .expect("Invalid entry module"); ThreadSafeState::new( ThreadSafeGlobalState::mock(vec!["deno".to_string()]), None, module_specifier, - internal_channels, ) .unwrap() } @@ -431,6 +410,5 @@ impl ThreadSafeState { #[test] fn thread_safe() { fn f<S: Send + Sync>(_: S) {} - let (int, _) = ThreadSafeState::create_channels(); - f(ThreadSafeState::mock("./hello.js", int)); + f(ThreadSafeState::mock("./hello.js")); } diff --git a/cli/web_worker.rs b/cli/web_worker.rs index d984c47b6..7115b9027 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -2,7 +2,6 @@ use crate::ops; use crate::state::ThreadSafeState; use crate::worker::Worker; -use crate::worker::WorkerChannels; use deno_core; use deno_core::ErrBox; use deno_core::StartupData; @@ -28,10 +27,9 @@ impl WebWorker { name: String, startup_data: StartupData, state: ThreadSafeState, - external_channels: WorkerChannels, ) -> Self { let state_ = state.clone(); - let mut worker = Worker::new(name, startup_data, state_, external_channels); + let mut worker = Worker::new(name, startup_data, state_); { let isolate = &mut worker.isolate; ops::runtime::init(isolate, &state); @@ -76,13 +74,11 @@ mod tests { use crate::tokio_util; fn create_test_worker() -> WebWorker { - let (int, ext) = ThreadSafeState::create_channels(); - let state = ThreadSafeState::mock("./hello.js", int); + let state = ThreadSafeState::mock("./hello.js"); let mut worker = WebWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), state, - ext, ); worker.execute("bootstrapWorkerRuntime(\"TEST\")").unwrap(); worker.execute("runWorkerMessageLoop()").unwrap(); diff --git a/cli/worker.rs b/cli/worker.rs index b81a77a9e..1725dbe14 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -52,6 +52,51 @@ impl WorkerChannels { } } +pub struct WorkerChannelsInternal(WorkerChannels); + +impl Deref for WorkerChannelsInternal { + type Target = WorkerChannels; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for WorkerChannelsInternal { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[derive(Clone)] +pub struct WorkerChannelsExternal(WorkerChannels); + +impl Deref for WorkerChannelsExternal { + type Target = WorkerChannels; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for WorkerChannelsExternal { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) { + let (in_tx, in_rx) = mpsc::channel::<Buf>(1); + let (out_tx, out_rx) = mpsc::channel::<Buf>(1); + let internal_channels = WorkerChannelsInternal(WorkerChannels { + sender: out_tx, + receiver: Arc::new(AsyncMutex::new(in_rx)), + }); + let external_channels = WorkerChannelsExternal(WorkerChannels { + sender: in_tx, + receiver: Arc::new(AsyncMutex::new(out_rx)), + }); + (internal_channels, external_channels) +} + /// Worker is a CLI wrapper for `deno_core::Isolate`. /// /// It provides infrastructure to communicate with a worker and @@ -68,7 +113,7 @@ pub struct Worker { pub name: String, pub isolate: Box<deno_core::EsIsolate>, pub state: ThreadSafeState, - external_channels: WorkerChannels, + external_channels: WorkerChannelsExternal, } impl Worker { @@ -76,7 +121,6 @@ impl Worker { name: String, startup_data: StartupData, state: ThreadSafeState, - external_channels: WorkerChannels, ) -> Self { let mut isolate = deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); @@ -86,6 +130,12 @@ impl Worker { JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) }); + let (internal_channels, external_channels) = create_channels(); + { + let mut c = state.worker_channels_internal.lock().unwrap(); + *c = Some(internal_channels); + } + Self { name, isolate, @@ -128,7 +178,7 @@ impl Worker { } /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WorkerChannels { + pub fn thread_safe_handle(&self) -> WorkerChannelsExternal { self.external_channels.clone() } } @@ -157,10 +207,9 @@ impl MainWorker { name: String, startup_data: StartupData, state: ThreadSafeState, - external_channels: WorkerChannels, ) -> Self { let state_ = state.clone(); - let mut worker = Worker::new(name, startup_data, state_, external_channels); + let mut worker = Worker::new(name, startup_data, state_); { let op_registry = worker.isolate.op_registry.clone(); let isolate = &mut worker.isolate; @@ -233,14 +282,13 @@ mod tests { let global_state = ThreadSafeGlobalState::new(flags::DenoFlags::default(), Progress::new()) .unwrap(); - let (int, ext) = ThreadSafeState::create_channels(); let state = - ThreadSafeState::new(global_state, None, module_specifier.clone(), int) + ThreadSafeState::new(global_state, None, module_specifier.clone()) .unwrap(); let state_ = state.clone(); tokio_util::run_basic(async move { let mut worker = - MainWorker::new("TEST".to_string(), StartupData::None, state, ext); + MainWorker::new("TEST".to_string(), StartupData::None, state); let result = worker .execute_mod_async(&module_specifier, None, false) .await; @@ -269,14 +317,13 @@ mod tests { let global_state = ThreadSafeGlobalState::new(flags::DenoFlags::default(), Progress::new()) .unwrap(); - let (int, ext) = ThreadSafeState::create_channels(); let state = - ThreadSafeState::new(global_state, None, module_specifier.clone(), int) + ThreadSafeState::new(global_state, None, module_specifier.clone()) .unwrap(); let state_ = state.clone(); tokio_util::run_basic(async move { let mut worker = - MainWorker::new("TEST".to_string(), StartupData::None, state, ext); + MainWorker::new("TEST".to_string(), StartupData::None, state); let result = worker .execute_mod_async(&module_specifier, None, false) .await; @@ -312,19 +359,16 @@ mod tests { }; let global_state = ThreadSafeGlobalState::new(flags, Progress::new()).unwrap(); - let (int, ext) = ThreadSafeState::create_channels(); let state = ThreadSafeState::new( global_state.clone(), None, module_specifier.clone(), - int, ) .unwrap(); let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), state.clone(), - ext, ); worker.execute("bootstrapMainRuntime()").unwrap(); let result = worker @@ -346,13 +390,11 @@ mod tests { } fn create_test_worker() -> MainWorker { - let (int, ext) = ThreadSafeState::create_channels(); - let state = ThreadSafeState::mock("./hello.js", int); + let state = ThreadSafeState::mock("./hello.js"); let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), state, - ext, ); worker.execute("bootstrapMainRuntime()").unwrap(); worker |