summaryrefslogtreecommitdiff
path: root/cli/ops/worker_host.rs
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2020-09-10 09:57:45 -0400
committerGitHub <noreply@github.com>2020-09-10 09:57:45 -0400
commit7c2e7c660804afca823d60e6496aa853f75db16c (patch)
treeb7746b181c1564c6b1abd2e906662f9e6b008417 /cli/ops/worker_host.rs
parent6f70e6e72ba2d5c1de7495adac37c1e4f4e86b24 (diff)
Use gotham-like state for ops (#7385)
Provides a concrete state type that can be dynamically added. This is necessary for op crates. * renames BasicState to OpState * async ops take `Rc<RefCell<OpState>>` * sync ops take `&mut OpState` * removes `OpRegistry`, `OpRouter` traits * `get_error_class_fn` moved to OpState * ResourceTable moved to OpState
Diffstat (limited to 'cli/ops/worker_host.rs')
-rw-r--r--cli/ops/worker_host.rs89
1 files changed, 51 insertions, 38 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index 83f84064b..158865abc 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -5,7 +5,6 @@ use crate::global_state::GlobalState;
use crate::ops::io::get_stdio;
use crate::permissions::Permissions;
use crate::startup_data;
-use crate::state::State;
use crate::tokio_util::create_basic_runtime;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
@@ -13,21 +12,26 @@ use crate::worker::WorkerEvent;
use deno_core::BufVec;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
-use deno_core::OpRegistry;
+use deno_core::OpState;
use deno_core::ZeroCopyBuf;
use futures::future::FutureExt;
use serde_derive::Deserialize;
use serde_json::Value;
+use std::cell::RefCell;
use std::convert::From;
use std::rc::Rc;
use std::sync::Arc;
use std::thread::JoinHandle;
-pub fn init(s: &Rc<State>) {
- s.register_op_json_sync("op_create_worker", op_create_worker);
- s.register_op_json_sync("op_host_terminate_worker", op_host_terminate_worker);
- s.register_op_json_sync("op_host_post_message", op_host_post_message);
- s.register_op_json_async("op_host_get_message", op_host_get_message);
+pub fn init(rt: &mut deno_core::JsRuntime) {
+ super::reg_json_sync(rt, "op_create_worker", op_create_worker);
+ super::reg_json_sync(
+ rt,
+ "op_host_terminate_worker",
+ op_host_terminate_worker,
+ );
+ super::reg_json_sync(rt, "op_host_post_message", op_host_post_message);
+ super::reg_json_async(rt, "op_host_get_message", op_host_get_message);
}
fn create_web_worker(
@@ -38,27 +42,31 @@ fn create_web_worker(
specifier: ModuleSpecifier,
has_deno_namespace: bool,
) -> Result<WebWorker, ErrBox> {
- let state =
- State::new_for_worker(global_state, Some(permissions), specifier)?;
+ let cli_state = crate::state::State::new_for_worker(
+ global_state,
+ Some(permissions),
+ specifier,
+ )?;
let mut worker = WebWorker::new(
name.clone(),
startup_data::deno_isolate_init(),
- &state,
+ &cli_state,
has_deno_namespace,
);
if has_deno_namespace {
- let mut resource_table = state.resource_table.borrow_mut();
+ let state = worker.isolate.op_state();
+ let mut state = state.borrow_mut();
let (stdin, stdout, stderr) = get_stdio();
if let Some(stream) = stdin {
- resource_table.add("stdin", Box::new(stream));
+ state.resource_table.add("stdin", Box::new(stream));
}
if let Some(stream) = stdout {
- resource_table.add("stdout", Box::new(stream));
+ state.resource_table.add("stdout", Box::new(stream));
}
if let Some(stream) = stderr {
- resource_table.add("stderr", Box::new(stream));
+ state.resource_table.add("stderr", Box::new(stream));
}
}
@@ -172,10 +180,11 @@ struct CreateWorkerArgs {
/// Create worker as the host
fn op_create_worker(
- state: &State,
+ state: &mut OpState,
args: Value,
_data: &mut [ZeroCopyBuf],
) -> Result<Value, ErrBox> {
+ let cli_state = super::cli_state(state);
let args: CreateWorkerArgs = serde_json::from_value(args)?;
let specifier = args.specifier.clone();
@@ -187,12 +196,12 @@ fn op_create_worker(
let args_name = args.name;
let use_deno_namespace = args.use_deno_namespace;
if use_deno_namespace {
- state.check_unstable("Worker.deno");
+ cli_state.check_unstable("Worker.deno");
}
- let global_state = state.global_state.clone();
- let permissions = state.permissions.borrow().clone();
- let worker_id = state.next_worker_id.get();
- state.next_worker_id.set(worker_id + 1);
+ let global_state = cli_state.global_state.clone();
+ let permissions = cli_state.permissions.borrow().clone();
+ let worker_id = cli_state.next_worker_id.get();
+ cli_state.next_worker_id.set(worker_id + 1);
let module_specifier = ModuleSpecifier::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_else(|| "".to_string());
@@ -208,7 +217,8 @@ fn op_create_worker(
)?;
// At this point all interactions with worker happen using thread
// safe handler returned from previous function call
- state
+ let cli_state = super::cli_state(state);
+ cli_state
.workers
.borrow_mut()
.insert(worker_id, (join_handle, worker_handle));
@@ -222,13 +232,14 @@ struct WorkerArgs {
}
fn op_host_terminate_worker(
- state: &State,
+ state: &mut OpState,
args: Value,
_data: &mut [ZeroCopyBuf],
) -> Result<Value, ErrBox> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
- let (join_handle, worker_handle) = state
+ let cli_state = super::cli_state(state);
+ let (join_handle, worker_handle) = cli_state
.workers
.borrow_mut()
.remove(&id)
@@ -290,40 +301,41 @@ fn serialize_worker_event(event: WorkerEvent) -> Value {
/// Get message from guest worker as host
async fn op_host_get_message(
- state: Rc<State>,
+ state: Rc<RefCell<OpState>>,
args: Value,
_zero_copy: BufVec,
) -> Result<Value, ErrBox> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
- let state = state.clone();
+ let cli_state = super::cli_state2(&state);
- let workers_table = state.workers.borrow();
- let maybe_handle = workers_table.get(&id);
- let worker_handle = if let Some(handle) = maybe_handle {
- handle.1.clone()
- } else {
- // If handle was not found it means worker has already shutdown
- return Ok(json!({ "type": "close" }));
+ let worker_handle = {
+ let workers_table = cli_state.workers.borrow();
+ let maybe_handle = workers_table.get(&id);
+ if let Some(handle) = maybe_handle {
+ handle.1.clone()
+ } else {
+ // If handle was not found it means worker has already shutdown
+ return Ok(json!({ "type": "close" }));
+ }
};
- drop(workers_table);
let response = match worker_handle.get_event().await? {
Some(event) => {
// Terminal error means that worker should be removed from worker table.
if let WorkerEvent::TerminalError(_) = &event {
if let Some((join_handle, mut worker_handle)) =
- state.workers.borrow_mut().remove(&id)
+ cli_state.workers.borrow_mut().remove(&id)
{
worker_handle.sender.close_channel();
join_handle.join().expect("Worker thread panicked");
- }
+ };
}
serialize_worker_event(event)
}
None => {
// Worker shuts down
- let mut workers = state.workers.borrow_mut();
+ let mut workers = cli_state.workers.borrow_mut();
// Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called
// already meaning that we won't find worker in table - in that case ignore.
if let Some((join_handle, mut worker_handle)) = workers.remove(&id) {
@@ -338,7 +350,7 @@ async fn op_host_get_message(
/// Post message to guest worker as host
fn op_host_post_message(
- state: &State,
+ state: &mut OpState,
args: Value,
data: &mut [ZeroCopyBuf],
) -> Result<Value, ErrBox> {
@@ -348,7 +360,8 @@ fn op_host_post_message(
let msg = Vec::from(&*data[0]).into_boxed_slice();
debug!("post message to worker {}", id);
- let workers = state.workers.borrow();
+ let cli_state = super::cli_state(state);
+ let workers = cli_state.workers.borrow();
let worker_handle = workers[&id].1.clone();
worker_handle.post_message(msg)?;
Ok(json!({}))