summaryrefslogtreecommitdiff
path: root/cli/ops/worker_host.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/worker_host.rs')
-rw-r--r--cli/ops/worker_host.rs64
1 files changed, 29 insertions, 35 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index e3571f713..47ebd9c7f 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -17,9 +17,11 @@ use deno_core::ModuleSpecifier;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
use std::convert::From;
+use std::rc::Rc;
+use std::sync::Arc;
use std::thread::JoinHandle;
-pub fn init(i: &mut CoreIsolate, s: &State) {
+pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
i.register_op("op_create_worker", s.stateful_json_op(op_create_worker));
i.register_op(
"op_host_terminate_worker",
@@ -38,7 +40,7 @@ pub fn init(i: &mut CoreIsolate, s: &State) {
fn create_web_worker(
worker_id: u32,
name: String,
- global_state: GlobalState,
+ global_state: &Arc<GlobalState>,
permissions: Permissions,
specifier: ModuleSpecifier,
has_deno_namespace: bool,
@@ -49,7 +51,7 @@ fn create_web_worker(
let mut worker = WebWorker::new(
name.clone(),
startup_data::deno_isolate_init(),
- state,
+ &state,
has_deno_namespace,
);
@@ -84,12 +86,13 @@ fn create_web_worker(
fn run_worker_thread(
worker_id: u32,
name: String,
- global_state: GlobalState,
+ global_state: &Arc<GlobalState>,
permissions: Permissions,
specifier: ModuleSpecifier,
has_deno_namespace: bool,
maybe_source_code: Option<String>,
) -> Result<(JoinHandle<()>, WebWorkerHandle), ErrBox> {
+ let global_state = global_state.clone();
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, ErrBox>>(1);
@@ -103,7 +106,7 @@ fn run_worker_thread(
let result = create_web_worker(
worker_id,
name,
- global_state,
+ &global_state,
permissions,
specifier.clone(),
has_deno_namespace,
@@ -178,7 +181,7 @@ struct CreateWorkerArgs {
/// Create worker as the host
fn op_create_worker(
- state: &State,
+ state: &Rc<State>,
args: Value,
_data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
@@ -196,12 +199,10 @@ fn op_create_worker(
state.check_unstable("Worker.deno");
}
let parent_state = state.clone();
- let mut state = state.borrow_mut();
let global_state = state.global_state.clone();
- let permissions = state.permissions.clone();
- let worker_id = state.next_worker_id;
- state.next_worker_id += 1;
- drop(state);
+ let permissions = state.permissions.borrow().clone();
+ let worker_id = state.next_worker_id.get();
+ state.next_worker_id.set(worker_id + 1);
let module_specifier = ModuleSpecifier::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_else(|| "".to_string());
@@ -209,7 +210,7 @@ fn op_create_worker(
let (join_handle, worker_handle) = run_worker_thread(
worker_id,
worker_name,
- global_state,
+ &global_state,
permissions,
module_specifier,
use_deno_namespace,
@@ -218,9 +219,9 @@ fn op_create_worker(
.map_err(|e| OpError::other(e.to_string()))?;
// At this point all interactions with worker happen using thread
// safe handler returned from previous function call
- let mut parent_state = parent_state.borrow_mut();
parent_state
.workers
+ .borrow_mut()
.insert(worker_id, (join_handle, worker_handle));
Ok(JsonOp::Sync(json!({ "id": worker_id })))
@@ -232,15 +233,17 @@ struct WorkerArgs {
}
fn op_host_terminate_worker(
- state: &State,
+ state: &Rc<State>,
args: Value,
_data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
- let mut state = state.borrow_mut();
- let (join_handle, worker_handle) =
- state.workers.remove(&id).expect("No worker handle found");
+ let (join_handle, worker_handle) = state
+ .workers
+ .borrow_mut()
+ .remove(&id)
+ .expect("No worker handle found");
worker_handle.terminate();
join_handle.join().expect("Panic in worker thread");
Ok(JsonOp::Sync(json!({})))
@@ -298,27 +301,21 @@ fn serialize_worker_event(event: WorkerEvent) -> Value {
/// Get message from guest worker as host
fn op_host_get_message(
- state: &State,
+ state: &Rc<State>,
args: Value,
_data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
- let worker_handle = {
- let state_ = state.borrow();
- let (_join_handle, worker_handle) =
- state_.workers.get(&id).expect("No worker handle found");
- worker_handle.clone()
- };
- let state_ = state.clone();
+ let state = state.clone();
+ let worker_handle = state.workers.borrow()[&id].1.clone();
let op = async move {
let response = match worker_handle.get_event().await? {
Some(event) => {
// Terminal error means that worker should be removed from worker table.
if let WorkerEvent::TerminalError(_) = &event {
- let mut state_ = state_.borrow_mut();
if let Some((join_handle, mut worker_handle)) =
- state_.workers.remove(&id)
+ state.workers.borrow_mut().remove(&id)
{
worker_handle.sender.close_channel();
join_handle.join().expect("Worker thread panicked");
@@ -328,12 +325,10 @@ fn op_host_get_message(
}
None => {
// Worker shuts down
- let mut state_ = state_.borrow_mut();
+ let mut workers = state.workers.borrow_mut();
// Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called
// already meaning that we won't find worker in table - in that case ignore.
- if let Some((join_handle, mut worker_handle)) =
- state_.workers.remove(&id)
- {
+ if let Some((join_handle, mut worker_handle)) = workers.remove(&id) {
worker_handle.sender.close_channel();
join_handle.join().expect("Worker thread panicked");
}
@@ -347,7 +342,7 @@ fn op_host_get_message(
/// Post message to guest worker as host
fn op_host_post_message(
- state: &State,
+ state: &Rc<State>,
args: Value,
data: &mut [ZeroCopyBuf],
) -> Result<JsonOp, OpError> {
@@ -357,9 +352,8 @@ fn op_host_post_message(
let msg = Vec::from(&*data[0]).into_boxed_slice();
debug!("post message to worker {}", id);
- let state = state.borrow();
- let (_, worker_handle) =
- state.workers.get(&id).expect("No worker handle found");
+ let workers = state.workers.borrow();
+ let worker_handle = workers[&id].1.clone();
worker_handle
.post_message(msg)
.map_err(|e| OpError::other(e.to_string()))?;