diff options
Diffstat (limited to 'cli/ops/worker_host.rs')
-rw-r--r-- | cli/ops/worker_host.rs | 89 |
1 files changed, 41 insertions, 48 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index f8b3edfce..fabe0b5e8 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -4,11 +4,11 @@ use crate::deno_error::bad_resource; use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; -use crate::ops::dispatch_json::JsonResult; use crate::ops::json_op; use crate::startup_data; -use crate::state::ThreadSafeState; +use crate::state::State; use crate::web_worker::WebWorker; +use crate::worker::WorkerChannelsExternal; use deno_core::*; use futures; use futures::future::FutureExt; @@ -17,7 +17,7 @@ use std; use std::convert::From; use std::sync::atomic::Ordering; -pub fn init(i: &mut Isolate, s: &ThreadSafeState) { +pub fn init(i: &mut Isolate, s: &State) { i.register_op( "create_worker", s.core_op(json_op(s.stateful_op(op_create_worker))), @@ -48,7 +48,7 @@ struct CreateWorkerArgs { /// Create worker as the host fn op_create_worker( - state: &ThreadSafeState, + state: &State, args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -59,37 +59,31 @@ fn op_create_worker( let source_code = args.source_code.clone(); let args_name = args.name; let parent_state = state.clone(); - - let (load_sender, load_receiver) = - std::sync::mpsc::sync_channel::<JsonResult>(1); + let state = state.borrow(); + let global_state = state.global_state.clone(); + let child_permissions = state.permissions.clone(); + let referrer = state.main_module.to_string(); + drop(state); + + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::<Result<WorkerChannelsExternal, ErrBox>>(1); + + // TODO(bartlomieju): Isn't this wrong? + let result = ModuleSpecifier::resolve_url_or_path(&specifier)?; + let module_specifier = if !has_source_code { + ModuleSpecifier::resolve_import(&specifier, &referrer)? + } else { + result + }; std::thread::spawn(move || { - // TODO(bartlomieju): Isn't this wrong? - let result = ModuleSpecifier::resolve_url_or_path(&specifier); - if let Err(err) = result { - load_sender.send(Err(err.into())).unwrap(); - return; - } - - let module_specifier = if !has_source_code { - let referrer = parent_state.main_module.to_string(); - let result = ModuleSpecifier::resolve_import(&specifier, &referrer); - if let Err(err) = result { - load_sender.send(Err(err.into())).unwrap(); - return; - } - result.unwrap() - } else { - result.unwrap() - }; - - let result = ThreadSafeState::new_for_worker( - parent_state.global_state.clone(), - Some(parent_state.permissions.clone()), // by default share with parent + let result = State::new_for_worker( + global_state, + Some(child_permissions), // by default share with parent module_specifier.clone(), ); if let Err(err) = result { - load_sender.send(Err(err)).unwrap(); + handle_sender.send(Err(err)).unwrap(); return; } let child_state = result.unwrap(); @@ -109,12 +103,12 @@ fn op_create_worker( js_check(worker.execute(&script)); js_check(worker.execute("runWorkerMessageLoop()")); - let worker_id = parent_state.add_child_worker(&worker); + handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); // Has provided source code, execute immediately. if has_source_code { js_check(worker.execute(&source_code)); - load_sender.send(Ok(json!({ "id": worker_id }))).unwrap(); + // FIXME(bartlomieju): runtime is not run in this case return; } @@ -128,14 +122,13 @@ fn op_create_worker( } .boxed_local(); - load_sender.send(Ok(json!({ "id": worker_id }))).unwrap(); - crate::tokio_util::run_basic(fut); }); - let r = load_receiver.recv().unwrap(); + let handle = handle_receiver.recv().unwrap()?; + let worker_id = parent_state.add_child_worker(handle); - Ok(JsonOp::Sync(r.unwrap())) + Ok(JsonOp::Sync(json!({ "id": worker_id }))) } #[derive(Deserialize)] @@ -144,16 +137,15 @@ struct WorkerArgs { } fn op_host_close_worker( - state: &ThreadSafeState, + state: &State, args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let state_ = state.clone(); + let mut state = state.borrow_mut(); - let mut workers_table = state_.workers.lock().unwrap(); - let maybe_worker_handle = workers_table.remove(&id); + let maybe_worker_handle = state.workers.remove(&id); if let Some(worker_handle) = maybe_worker_handle { let mut sender = worker_handle.sender.clone(); sender.close_channel(); @@ -173,16 +165,16 @@ struct HostGetMessageArgs { /// Get message from guest worker as host fn op_host_get_message( - state: &ThreadSafeState, + state: &State, args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { let args: HostGetMessageArgs = serde_json::from_value(args)?; - let state_ = state.clone(); let id = args.id as u32; - let mut table = state_.workers.lock().unwrap(); + + let state = state.borrow(); // TODO: don't return bad resource anymore - let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; + let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?; let fut = worker_handle.get_message(); let op = async move { let maybe_buf = fut.await; @@ -198,7 +190,7 @@ struct HostPostMessageArgs { /// Post message to guest worker as host fn op_host_post_message( - state: &ThreadSafeState, + state: &State, args: Value, data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { @@ -207,9 +199,9 @@ fn op_host_post_message( let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); debug!("post message to worker {}", id); - let mut table = state.workers.lock().unwrap(); + let state = state.borrow(); // TODO: don't return bad resource anymore - let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; + let worker_handle = state.workers.get(&id).ok_or_else(bad_resource)?; let fut = worker_handle .post_message(msg) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); @@ -218,10 +210,11 @@ fn op_host_post_message( } fn op_metrics( - state: &ThreadSafeState, + state: &State, _args: Value, _zero_copy: Option<ZeroCopyBuf>, ) -> Result<JsonOp, ErrBox> { + let state = state.borrow(); let m = &state.metrics; Ok(JsonOp::Sync(json!({ |