diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2020-09-10 09:57:45 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-10 09:57:45 -0400 |
commit | 7c2e7c660804afca823d60e6496aa853f75db16c (patch) | |
tree | b7746b181c1564c6b1abd2e906662f9e6b008417 /cli/ops/worker_host.rs | |
parent | 6f70e6e72ba2d5c1de7495adac37c1e4f4e86b24 (diff) |
Use gotham-like state for ops (#7385)
Provides a concrete state type that can be dynamically added. This is necessary for op crates.
* renames BasicState to OpState
* async ops take `Rc<RefCell<OpState>>`
* sync ops take `&mut OpState`
* removes `OpRegistry`, `OpRouter` traits
* `get_error_class_fn` moved to OpState
* ResourceTable moved to OpState
Diffstat (limited to 'cli/ops/worker_host.rs')
-rw-r--r-- | cli/ops/worker_host.rs | 89 |
1 files changed, 51 insertions, 38 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index 83f84064b..158865abc 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -5,7 +5,6 @@ use crate::global_state::GlobalState; use crate::ops::io::get_stdio; use crate::permissions::Permissions; use crate::startup_data; -use crate::state::State; use crate::tokio_util::create_basic_runtime; use crate::web_worker::WebWorker; use crate::web_worker::WebWorkerHandle; @@ -13,21 +12,26 @@ use crate::worker::WorkerEvent; use deno_core::BufVec; use deno_core::ErrBox; use deno_core::ModuleSpecifier; -use deno_core::OpRegistry; +use deno_core::OpState; use deno_core::ZeroCopyBuf; use futures::future::FutureExt; use serde_derive::Deserialize; use serde_json::Value; +use std::cell::RefCell; use std::convert::From; use std::rc::Rc; use std::sync::Arc; use std::thread::JoinHandle; -pub fn init(s: &Rc<State>) { - s.register_op_json_sync("op_create_worker", op_create_worker); - s.register_op_json_sync("op_host_terminate_worker", op_host_terminate_worker); - s.register_op_json_sync("op_host_post_message", op_host_post_message); - s.register_op_json_async("op_host_get_message", op_host_get_message); +pub fn init(rt: &mut deno_core::JsRuntime) { + super::reg_json_sync(rt, "op_create_worker", op_create_worker); + super::reg_json_sync( + rt, + "op_host_terminate_worker", + op_host_terminate_worker, + ); + super::reg_json_sync(rt, "op_host_post_message", op_host_post_message); + super::reg_json_async(rt, "op_host_get_message", op_host_get_message); } fn create_web_worker( @@ -38,27 +42,31 @@ fn create_web_worker( specifier: ModuleSpecifier, has_deno_namespace: bool, ) -> Result<WebWorker, ErrBox> { - let state = - State::new_for_worker(global_state, Some(permissions), specifier)?; + let cli_state = crate::state::State::new_for_worker( + global_state, + Some(permissions), + specifier, + )?; let mut worker = WebWorker::new( name.clone(), startup_data::deno_isolate_init(), - &state, + &cli_state, has_deno_namespace, ); if has_deno_namespace { - let mut resource_table = state.resource_table.borrow_mut(); + let state = worker.isolate.op_state(); + let mut state = state.borrow_mut(); let (stdin, stdout, stderr) = get_stdio(); if let Some(stream) = stdin { - resource_table.add("stdin", Box::new(stream)); + state.resource_table.add("stdin", Box::new(stream)); } if let Some(stream) = stdout { - resource_table.add("stdout", Box::new(stream)); + state.resource_table.add("stdout", Box::new(stream)); } if let Some(stream) = stderr { - resource_table.add("stderr", Box::new(stream)); + state.resource_table.add("stderr", Box::new(stream)); } } @@ -172,10 +180,11 @@ struct CreateWorkerArgs { /// Create worker as the host fn op_create_worker( - state: &State, + state: &mut OpState, args: Value, _data: &mut [ZeroCopyBuf], ) -> Result<Value, ErrBox> { + let cli_state = super::cli_state(state); let args: CreateWorkerArgs = serde_json::from_value(args)?; let specifier = args.specifier.clone(); @@ -187,12 +196,12 @@ fn op_create_worker( let args_name = args.name; let use_deno_namespace = args.use_deno_namespace; if use_deno_namespace { - state.check_unstable("Worker.deno"); + cli_state.check_unstable("Worker.deno"); } - let global_state = state.global_state.clone(); - let permissions = state.permissions.borrow().clone(); - let worker_id = state.next_worker_id.get(); - state.next_worker_id.set(worker_id + 1); + let global_state = cli_state.global_state.clone(); + let permissions = cli_state.permissions.borrow().clone(); + let worker_id = cli_state.next_worker_id.get(); + cli_state.next_worker_id.set(worker_id + 1); let module_specifier = ModuleSpecifier::resolve_url(&specifier)?; let worker_name = args_name.unwrap_or_else(|| "".to_string()); @@ -208,7 +217,8 @@ fn op_create_worker( )?; // At this point all interactions with worker happen using thread // safe handler returned from previous function call - state + let cli_state = super::cli_state(state); + cli_state .workers .borrow_mut() .insert(worker_id, (join_handle, worker_handle)); @@ -222,13 +232,14 @@ struct WorkerArgs { } fn op_host_terminate_worker( - state: &State, + state: &mut OpState, args: Value, _data: &mut [ZeroCopyBuf], ) -> Result<Value, ErrBox> { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let (join_handle, worker_handle) = state + let cli_state = super::cli_state(state); + let (join_handle, worker_handle) = cli_state .workers .borrow_mut() .remove(&id) @@ -290,40 +301,41 @@ fn serialize_worker_event(event: WorkerEvent) -> Value { /// Get message from guest worker as host async fn op_host_get_message( - state: Rc<State>, + state: Rc<RefCell<OpState>>, args: Value, _zero_copy: BufVec, ) -> Result<Value, ErrBox> { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let state = state.clone(); + let cli_state = super::cli_state2(&state); - let workers_table = state.workers.borrow(); - let maybe_handle = workers_table.get(&id); - let worker_handle = if let Some(handle) = maybe_handle { - handle.1.clone() - } else { - // If handle was not found it means worker has already shutdown - return Ok(json!({ "type": "close" })); + let worker_handle = { + let workers_table = cli_state.workers.borrow(); + let maybe_handle = workers_table.get(&id); + if let Some(handle) = maybe_handle { + handle.1.clone() + } else { + // If handle was not found it means worker has already shutdown + return Ok(json!({ "type": "close" })); + } }; - drop(workers_table); let response = match worker_handle.get_event().await? { Some(event) => { // Terminal error means that worker should be removed from worker table. if let WorkerEvent::TerminalError(_) = &event { if let Some((join_handle, mut worker_handle)) = - state.workers.borrow_mut().remove(&id) + cli_state.workers.borrow_mut().remove(&id) { worker_handle.sender.close_channel(); join_handle.join().expect("Worker thread panicked"); - } + }; } serialize_worker_event(event) } None => { // Worker shuts down - let mut workers = state.workers.borrow_mut(); + let mut workers = cli_state.workers.borrow_mut(); // Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called // already meaning that we won't find worker in table - in that case ignore. if let Some((join_handle, mut worker_handle)) = workers.remove(&id) { @@ -338,7 +350,7 @@ async fn op_host_get_message( /// Post message to guest worker as host fn op_host_post_message( - state: &State, + state: &mut OpState, args: Value, data: &mut [ZeroCopyBuf], ) -> Result<Value, ErrBox> { @@ -348,7 +360,8 @@ fn op_host_post_message( let msg = Vec::from(&*data[0]).into_boxed_slice(); debug!("post message to worker {}", id); - let workers = state.workers.borrow(); + let cli_state = super::cli_state(state); + let workers = cli_state.workers.borrow(); let worker_handle = workers[&id].1.clone(); worker_handle.post_message(msg)?; Ok(json!({})) |