summaryrefslogtreecommitdiff
path: root/cli/ops/worker_host.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/worker_host.rs')
-rw-r--r--cli/ops/worker_host.rs108
1 files changed, 63 insertions, 45 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index 4cd55fef1..35ec11223 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -1,5 +1,5 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
-use super::dispatch_json::{Deserialize, JsonOp, Value};
+use super::dispatch_json::{Deserialize, Value};
use crate::fmt_errors::JSError;
use crate::global_state::GlobalState;
use crate::ops::io::get_stdio;
@@ -10,29 +10,37 @@ use crate::tokio_util::create_basic_runtime;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
use crate::worker::WorkerEvent;
+use deno_core::BufVec;
use deno_core::CoreIsolate;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
+use deno_core::ResourceTable;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
+use std::cell::RefCell;
use std::convert::From;
use std::rc::Rc;
use std::sync::Arc;
use std::thread::JoinHandle;
pub fn init(i: &mut CoreIsolate, s: &Rc<State>) {
- i.register_op("op_create_worker", s.stateful_json_op(op_create_worker));
+ let t = &CoreIsolate::state(i).borrow().resource_table.clone();
+
+ i.register_op(
+ "op_create_worker",
+ s.stateful_json_op_sync(t, op_create_worker),
+ );
i.register_op(
"op_host_terminate_worker",
- s.stateful_json_op(op_host_terminate_worker),
+ s.stateful_json_op_sync(t, op_host_terminate_worker),
);
i.register_op(
"op_host_post_message",
- s.stateful_json_op(op_host_post_message),
+ s.stateful_json_op_sync(t, op_host_post_message),
);
i.register_op(
"op_host_get_message",
- s.stateful_json_op(op_host_get_message),
+ s.stateful_json_op_async(t, op_host_get_message),
);
}
@@ -180,10 +188,11 @@ struct CreateWorkerArgs {
/// Create worker as the host
fn op_create_worker(
- state: &Rc<State>,
+ state: &State,
+ _resource_table: &mut ResourceTable,
args: Value,
_data: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+) -> Result<Value, ErrBox> {
let args: CreateWorkerArgs = serde_json::from_value(args)?;
let specifier = args.specifier.clone();
@@ -197,7 +206,6 @@ fn op_create_worker(
if use_deno_namespace {
state.check_unstable("Worker.deno");
}
- let parent_state = state.clone();
let global_state = state.global_state.clone();
let permissions = state.permissions.borrow().clone();
let worker_id = state.next_worker_id.get();
@@ -217,12 +225,12 @@ fn op_create_worker(
)?;
// At this point all interactions with worker happen using thread
// safe handler returned from previous function call
- parent_state
+ state
.workers
.borrow_mut()
.insert(worker_id, (join_handle, worker_handle));
- Ok(JsonOp::Sync(json!({ "id": worker_id })))
+ Ok(json!({ "id": worker_id }))
}
#[derive(Deserialize)]
@@ -231,10 +239,11 @@ struct WorkerArgs {
}
fn op_host_terminate_worker(
- state: &Rc<State>,
+ state: &State,
+ _resource_table: &mut ResourceTable,
args: Value,
_data: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+) -> Result<Value, ErrBox> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let (join_handle, worker_handle) = state
@@ -244,7 +253,7 @@ fn op_host_terminate_worker(
.expect("No worker handle found");
worker_handle.terminate();
join_handle.join().expect("Panic in worker thread");
- Ok(JsonOp::Sync(json!({})))
+ Ok(json!({}))
}
fn serialize_worker_event(event: WorkerEvent) -> Value {
@@ -298,52 +307,61 @@ fn serialize_worker_event(event: WorkerEvent) -> Value {
}
/// Get message from guest worker as host
-fn op_host_get_message(
- state: &Rc<State>,
+async fn op_host_get_message(
+ state: Rc<State>,
+ _resource_table: Rc<RefCell<ResourceTable>>,
args: Value,
- _data: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+ _zero_copy: BufVec,
+) -> Result<Value, ErrBox> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let state = state.clone();
- let worker_handle = state.workers.borrow()[&id].1.clone();
- let op = async move {
- let response = match worker_handle.get_event().await? {
- Some(event) => {
- // Terminal error means that worker should be removed from worker table.
- if let WorkerEvent::TerminalError(_) = &event {
- if let Some((join_handle, mut worker_handle)) =
- state.workers.borrow_mut().remove(&id)
- {
- worker_handle.sender.close_channel();
- join_handle.join().expect("Worker thread panicked");
- }
- }
- serialize_worker_event(event)
- }
- None => {
- // Worker shuts down
- let mut workers = state.workers.borrow_mut();
- // Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called
- // already meaning that we won't find worker in table - in that case ignore.
- if let Some((join_handle, mut worker_handle)) = workers.remove(&id) {
+
+ let workers_table = state.workers.borrow();
+ let maybe_handle = workers_table.get(&id);
+ let worker_handle = if let Some(handle) = maybe_handle {
+ handle.1.clone()
+ } else {
+ // If handle was not found it means worker has already shutdown
+ return Ok(json!({ "type": "close" }));
+ };
+ drop(workers_table);
+
+ let response = match worker_handle.get_event().await? {
+ Some(event) => {
+ // Terminal error means that worker should be removed from worker table.
+ if let WorkerEvent::TerminalError(_) = &event {
+ if let Some((join_handle, mut worker_handle)) =
+ state.workers.borrow_mut().remove(&id)
+ {
worker_handle.sender.close_channel();
join_handle.join().expect("Worker thread panicked");
}
- json!({ "type": "close" })
}
- };
- Ok(response)
+ serialize_worker_event(event)
+ }
+ None => {
+ // Worker shuts down
+ let mut workers = state.workers.borrow_mut();
+ // Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called
+ // already meaning that we won't find worker in table - in that case ignore.
+ if let Some((join_handle, mut worker_handle)) = workers.remove(&id) {
+ worker_handle.sender.close_channel();
+ join_handle.join().expect("Worker thread panicked");
+ }
+ json!({ "type": "close" })
+ }
};
- Ok(JsonOp::Async(op.boxed_local()))
+ Ok(response)
}
/// Post message to guest worker as host
fn op_host_post_message(
- state: &Rc<State>,
+ state: &State,
+ _resource_table: &mut ResourceTable,
args: Value,
data: &mut [ZeroCopyBuf],
-) -> Result<JsonOp, ErrBox> {
+) -> Result<Value, ErrBox> {
assert_eq!(data.len(), 1, "Invalid number of arguments");
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
@@ -353,5 +371,5 @@ fn op_host_post_message(
let workers = state.workers.borrow();
let worker_handle = workers[&id].1.clone();
worker_handle.post_message(msg)?;
- Ok(JsonOp::Sync(json!({})))
+ Ok(json!({}))
}