diff options
Diffstat (limited to 'cli/ops/worker_host.rs')
-rw-r--r-- | cli/ops/worker_host.rs | 108 |
1 files changed, 63 insertions, 45 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index 4cd55fef1..35ec11223 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -1,5 +1,5 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use super::dispatch_json::{Deserialize, JsonOp, Value}; +use super::dispatch_json::{Deserialize, Value}; use crate::fmt_errors::JSError; use crate::global_state::GlobalState; use crate::ops::io::get_stdio; @@ -10,29 +10,37 @@ use crate::tokio_util::create_basic_runtime; use crate::web_worker::WebWorker; use crate::web_worker::WebWorkerHandle; use crate::worker::WorkerEvent; +use deno_core::BufVec; use deno_core::CoreIsolate; use deno_core::ErrBox; use deno_core::ModuleSpecifier; +use deno_core::ResourceTable; use deno_core::ZeroCopyBuf; use futures::future::FutureExt; +use std::cell::RefCell; use std::convert::From; use std::rc::Rc; use std::sync::Arc; use std::thread::JoinHandle; pub fn init(i: &mut CoreIsolate, s: &Rc<State>) { - i.register_op("op_create_worker", s.stateful_json_op(op_create_worker)); + let t = &CoreIsolate::state(i).borrow().resource_table.clone(); + + i.register_op( + "op_create_worker", + s.stateful_json_op_sync(t, op_create_worker), + ); i.register_op( "op_host_terminate_worker", - s.stateful_json_op(op_host_terminate_worker), + s.stateful_json_op_sync(t, op_host_terminate_worker), ); i.register_op( "op_host_post_message", - s.stateful_json_op(op_host_post_message), + s.stateful_json_op_sync(t, op_host_post_message), ); i.register_op( "op_host_get_message", - s.stateful_json_op(op_host_get_message), + s.stateful_json_op_async(t, op_host_get_message), ); } @@ -180,10 +188,11 @@ struct CreateWorkerArgs { /// Create worker as the host fn op_create_worker( - state: &Rc<State>, + state: &State, + _resource_table: &mut ResourceTable, args: Value, _data: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { +) -> Result<Value, ErrBox> { let args: CreateWorkerArgs = serde_json::from_value(args)?; let specifier = args.specifier.clone(); @@ -197,7 +206,6 @@ fn op_create_worker( if use_deno_namespace { state.check_unstable("Worker.deno"); } - let parent_state = state.clone(); let global_state = state.global_state.clone(); let permissions = state.permissions.borrow().clone(); let worker_id = state.next_worker_id.get(); @@ -217,12 +225,12 @@ fn op_create_worker( )?; // At this point all interactions with worker happen using thread // safe handler returned from previous function call - parent_state + state .workers .borrow_mut() .insert(worker_id, (join_handle, worker_handle)); - Ok(JsonOp::Sync(json!({ "id": worker_id }))) + Ok(json!({ "id": worker_id })) } #[derive(Deserialize)] @@ -231,10 +239,11 @@ struct WorkerArgs { } fn op_host_terminate_worker( - state: &Rc<State>, + state: &State, + _resource_table: &mut ResourceTable, args: Value, _data: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { +) -> Result<Value, ErrBox> { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; let (join_handle, worker_handle) = state @@ -244,7 +253,7 @@ fn op_host_terminate_worker( .expect("No worker handle found"); worker_handle.terminate(); join_handle.join().expect("Panic in worker thread"); - Ok(JsonOp::Sync(json!({}))) + Ok(json!({})) } fn serialize_worker_event(event: WorkerEvent) -> Value { @@ -298,52 +307,61 @@ fn serialize_worker_event(event: WorkerEvent) -> Value { } /// Get message from guest worker as host -fn op_host_get_message( - state: &Rc<State>, +async fn op_host_get_message( + state: Rc<State>, + _resource_table: Rc<RefCell<ResourceTable>>, args: Value, - _data: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { + _zero_copy: BufVec, +) -> Result<Value, ErrBox> { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; let state = state.clone(); - let worker_handle = state.workers.borrow()[&id].1.clone(); - let op = async move { - let response = match worker_handle.get_event().await? { - Some(event) => { - // Terminal error means that worker should be removed from worker table. - if let WorkerEvent::TerminalError(_) = &event { - if let Some((join_handle, mut worker_handle)) = - state.workers.borrow_mut().remove(&id) - { - worker_handle.sender.close_channel(); - join_handle.join().expect("Worker thread panicked"); - } - } - serialize_worker_event(event) - } - None => { - // Worker shuts down - let mut workers = state.workers.borrow_mut(); - // Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called - // already meaning that we won't find worker in table - in that case ignore. - if let Some((join_handle, mut worker_handle)) = workers.remove(&id) { + + let workers_table = state.workers.borrow(); + let maybe_handle = workers_table.get(&id); + let worker_handle = if let Some(handle) = maybe_handle { + handle.1.clone() + } else { + // If handle was not found it means worker has already shutdown + return Ok(json!({ "type": "close" })); + }; + drop(workers_table); + + let response = match worker_handle.get_event().await? { + Some(event) => { + // Terminal error means that worker should be removed from worker table. + if let WorkerEvent::TerminalError(_) = &event { + if let Some((join_handle, mut worker_handle)) = + state.workers.borrow_mut().remove(&id) + { worker_handle.sender.close_channel(); join_handle.join().expect("Worker thread panicked"); } - json!({ "type": "close" }) } - }; - Ok(response) + serialize_worker_event(event) + } + None => { + // Worker shuts down + let mut workers = state.workers.borrow_mut(); + // Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called + // already meaning that we won't find worker in table - in that case ignore. + if let Some((join_handle, mut worker_handle)) = workers.remove(&id) { + worker_handle.sender.close_channel(); + join_handle.join().expect("Worker thread panicked"); + } + json!({ "type": "close" }) + } }; - Ok(JsonOp::Async(op.boxed_local())) + Ok(response) } /// Post message to guest worker as host fn op_host_post_message( - state: &Rc<State>, + state: &State, + _resource_table: &mut ResourceTable, args: Value, data: &mut [ZeroCopyBuf], -) -> Result<JsonOp, ErrBox> { +) -> Result<Value, ErrBox> { assert_eq!(data.len(), 1, "Invalid number of arguments"); let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; @@ -353,5 +371,5 @@ fn op_host_post_message( let workers = state.workers.borrow(); let worker_handle = workers[&id].1.clone(); worker_handle.post_message(msg)?; - Ok(JsonOp::Sync(json!({}))) + Ok(json!({})) } |