diff options
Diffstat (limited to 'cli/worker.rs')
-rw-r--r-- | cli/worker.rs | 76 |
1 files changed, 59 insertions, 17 deletions
diff --git a/cli/worker.rs b/cli/worker.rs index b81a77a9e..1725dbe14 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -52,6 +52,51 @@ impl WorkerChannels { } } +pub struct WorkerChannelsInternal(WorkerChannels); + +impl Deref for WorkerChannelsInternal { + type Target = WorkerChannels; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for WorkerChannelsInternal { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[derive(Clone)] +pub struct WorkerChannelsExternal(WorkerChannels); + +impl Deref for WorkerChannelsExternal { + type Target = WorkerChannels; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for WorkerChannelsExternal { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +fn create_channels() -> (WorkerChannelsInternal, WorkerChannelsExternal) { + let (in_tx, in_rx) = mpsc::channel::<Buf>(1); + let (out_tx, out_rx) = mpsc::channel::<Buf>(1); + let internal_channels = WorkerChannelsInternal(WorkerChannels { + sender: out_tx, + receiver: Arc::new(AsyncMutex::new(in_rx)), + }); + let external_channels = WorkerChannelsExternal(WorkerChannels { + sender: in_tx, + receiver: Arc::new(AsyncMutex::new(out_rx)), + }); + (internal_channels, external_channels) +} + /// Worker is a CLI wrapper for `deno_core::Isolate`. /// /// It provides infrastructure to communicate with a worker and @@ -68,7 +113,7 @@ pub struct Worker { pub name: String, pub isolate: Box<deno_core::EsIsolate>, pub state: ThreadSafeState, - external_channels: WorkerChannels, + external_channels: WorkerChannelsExternal, } impl Worker { @@ -76,7 +121,6 @@ impl Worker { name: String, startup_data: StartupData, state: ThreadSafeState, - external_channels: WorkerChannels, ) -> Self { let mut isolate = deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); @@ -86,6 +130,12 @@ impl Worker { JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) }); + let (internal_channels, external_channels) = create_channels(); + { + let mut c = state.worker_channels_internal.lock().unwrap(); + *c = Some(internal_channels); + } + Self { name, isolate, @@ -128,7 +178,7 @@ impl Worker { } /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WorkerChannels { + pub fn thread_safe_handle(&self) -> WorkerChannelsExternal { self.external_channels.clone() } } @@ -157,10 +207,9 @@ impl MainWorker { name: String, startup_data: StartupData, state: ThreadSafeState, - external_channels: WorkerChannels, ) -> Self { let state_ = state.clone(); - let mut worker = Worker::new(name, startup_data, state_, external_channels); + let mut worker = Worker::new(name, startup_data, state_); { let op_registry = worker.isolate.op_registry.clone(); let isolate = &mut worker.isolate; @@ -233,14 +282,13 @@ mod tests { let global_state = ThreadSafeGlobalState::new(flags::DenoFlags::default(), Progress::new()) .unwrap(); - let (int, ext) = ThreadSafeState::create_channels(); let state = - ThreadSafeState::new(global_state, None, module_specifier.clone(), int) + ThreadSafeState::new(global_state, None, module_specifier.clone()) .unwrap(); let state_ = state.clone(); tokio_util::run_basic(async move { let mut worker = - MainWorker::new("TEST".to_string(), StartupData::None, state, ext); + MainWorker::new("TEST".to_string(), StartupData::None, state); let result = worker .execute_mod_async(&module_specifier, None, false) .await; @@ -269,14 +317,13 @@ mod tests { let global_state = ThreadSafeGlobalState::new(flags::DenoFlags::default(), Progress::new()) .unwrap(); - let (int, ext) = ThreadSafeState::create_channels(); let state = - ThreadSafeState::new(global_state, None, module_specifier.clone(), int) + ThreadSafeState::new(global_state, None, module_specifier.clone()) .unwrap(); let state_ = state.clone(); tokio_util::run_basic(async move { let mut worker = - MainWorker::new("TEST".to_string(), StartupData::None, state, ext); + MainWorker::new("TEST".to_string(), StartupData::None, state); let result = worker .execute_mod_async(&module_specifier, None, false) .await; @@ -312,19 +359,16 @@ mod tests { }; let global_state = ThreadSafeGlobalState::new(flags, Progress::new()).unwrap(); - let (int, ext) = ThreadSafeState::create_channels(); let state = ThreadSafeState::new( global_state.clone(), None, module_specifier.clone(), - int, ) .unwrap(); let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), state.clone(), - ext, ); worker.execute("bootstrapMainRuntime()").unwrap(); let result = worker @@ -346,13 +390,11 @@ mod tests { } fn create_test_worker() -> MainWorker { - let (int, ext) = ThreadSafeState::create_channels(); - let state = ThreadSafeState::mock("./hello.js", int); + let state = ThreadSafeState::mock("./hello.js"); let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), state, - ext, ); worker.execute("bootstrapMainRuntime()").unwrap(); worker |