diff options
Diffstat (limited to 'cli/ops/worker_host.rs')
-rw-r--r-- | cli/ops/worker_host.rs | 64 |
1 files changed, 29 insertions, 35 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index e3571f713..47ebd9c7f 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -17,9 +17,11 @@ use deno_core::ModuleSpecifier; use deno_core::ZeroCopyBuf; use futures::future::FutureExt; use std::convert::From; +use std::rc::Rc; +use std::sync::Arc; use std::thread::JoinHandle; -pub fn init(i: &mut CoreIsolate, s: &State) { +pub fn init(i: &mut CoreIsolate, s: &Rc<State>) { i.register_op("op_create_worker", s.stateful_json_op(op_create_worker)); i.register_op( "op_host_terminate_worker", @@ -38,7 +40,7 @@ pub fn init(i: &mut CoreIsolate, s: &State) { fn create_web_worker( worker_id: u32, name: String, - global_state: GlobalState, + global_state: &Arc<GlobalState>, permissions: Permissions, specifier: ModuleSpecifier, has_deno_namespace: bool, @@ -49,7 +51,7 @@ fn create_web_worker( let mut worker = WebWorker::new( name.clone(), startup_data::deno_isolate_init(), - state, + &state, has_deno_namespace, ); @@ -84,12 +86,13 @@ fn create_web_worker( fn run_worker_thread( worker_id: u32, name: String, - global_state: GlobalState, + global_state: &Arc<GlobalState>, permissions: Permissions, specifier: ModuleSpecifier, has_deno_namespace: bool, maybe_source_code: Option<String>, ) -> Result<(JoinHandle<()>, WebWorkerHandle), ErrBox> { + let global_state = global_state.clone(); let (handle_sender, handle_receiver) = std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, ErrBox>>(1); @@ -103,7 +106,7 @@ fn run_worker_thread( let result = create_web_worker( worker_id, name, - global_state, + &global_state, permissions, specifier.clone(), has_deno_namespace, @@ -178,7 +181,7 @@ struct CreateWorkerArgs { /// Create worker as the host fn op_create_worker( - state: &State, + state: &Rc<State>, args: Value, _data: &mut [ZeroCopyBuf], ) -> Result<JsonOp, OpError> { @@ -196,12 +199,10 @@ fn op_create_worker( state.check_unstable("Worker.deno"); } let parent_state = state.clone(); - let mut state = state.borrow_mut(); let global_state = state.global_state.clone(); - let permissions = state.permissions.clone(); - let worker_id = state.next_worker_id; - state.next_worker_id += 1; - drop(state); + let permissions = state.permissions.borrow().clone(); + let worker_id = state.next_worker_id.get(); + state.next_worker_id.set(worker_id + 1); let module_specifier = ModuleSpecifier::resolve_url(&specifier)?; let worker_name = args_name.unwrap_or_else(|| "".to_string()); @@ -209,7 +210,7 @@ fn op_create_worker( let (join_handle, worker_handle) = run_worker_thread( worker_id, worker_name, - global_state, + &global_state, permissions, module_specifier, use_deno_namespace, @@ -218,9 +219,9 @@ fn op_create_worker( .map_err(|e| OpError::other(e.to_string()))?; // At this point all interactions with worker happen using thread // safe handler returned from previous function call - let mut parent_state = parent_state.borrow_mut(); parent_state .workers + .borrow_mut() .insert(worker_id, (join_handle, worker_handle)); Ok(JsonOp::Sync(json!({ "id": worker_id }))) @@ -232,15 +233,17 @@ struct WorkerArgs { } fn op_host_terminate_worker( - state: &State, + state: &Rc<State>, args: Value, _data: &mut [ZeroCopyBuf], ) -> Result<JsonOp, OpError> { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let mut state = state.borrow_mut(); - let (join_handle, worker_handle) = - state.workers.remove(&id).expect("No worker handle found"); + let (join_handle, worker_handle) = state + .workers + .borrow_mut() + .remove(&id) + .expect("No worker handle found"); worker_handle.terminate(); join_handle.join().expect("Panic in worker thread"); Ok(JsonOp::Sync(json!({}))) @@ -298,27 +301,21 @@ fn serialize_worker_event(event: WorkerEvent) -> Value { /// Get message from guest worker as host fn op_host_get_message( - state: &State, + state: &Rc<State>, args: Value, _data: &mut [ZeroCopyBuf], ) -> Result<JsonOp, OpError> { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let worker_handle = { - let state_ = state.borrow(); - let (_join_handle, worker_handle) = - state_.workers.get(&id).expect("No worker handle found"); - worker_handle.clone() - }; - let state_ = state.clone(); + let state = state.clone(); + let worker_handle = state.workers.borrow()[&id].1.clone(); let op = async move { let response = match worker_handle.get_event().await? { Some(event) => { // Terminal error means that worker should be removed from worker table. if let WorkerEvent::TerminalError(_) = &event { - let mut state_ = state_.borrow_mut(); if let Some((join_handle, mut worker_handle)) = - state_.workers.remove(&id) + state.workers.borrow_mut().remove(&id) { worker_handle.sender.close_channel(); join_handle.join().expect("Worker thread panicked"); @@ -328,12 +325,10 @@ fn op_host_get_message( } None => { // Worker shuts down - let mut state_ = state_.borrow_mut(); + let mut workers = state.workers.borrow_mut(); // Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called // already meaning that we won't find worker in table - in that case ignore. - if let Some((join_handle, mut worker_handle)) = - state_.workers.remove(&id) - { + if let Some((join_handle, mut worker_handle)) = workers.remove(&id) { worker_handle.sender.close_channel(); join_handle.join().expect("Worker thread panicked"); } @@ -347,7 +342,7 @@ fn op_host_get_message( /// Post message to guest worker as host fn op_host_post_message( - state: &State, + state: &Rc<State>, args: Value, data: &mut [ZeroCopyBuf], ) -> Result<JsonOp, OpError> { @@ -357,9 +352,8 @@ fn op_host_post_message( let msg = Vec::from(&*data[0]).into_boxed_slice(); debug!("post message to worker {}", id); - let state = state.borrow(); - let (_, worker_handle) = - state.workers.get(&id).expect("No worker handle found"); + let workers = state.workers.borrow(); + let worker_handle = workers[&id].1.clone(); worker_handle .post_message(msg) .map_err(|e| OpError::other(e.to_string()))?; |