diff options
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r-- | cli/ops/workers.rs | 73 |
1 files changed, 38 insertions, 35 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index f6dcf8042..cf7378a91 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -1,5 +1,6 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::deno_error::bad_resource; use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; @@ -11,7 +12,6 @@ use deno::*; use futures; use futures::Async; use futures::Future; -use futures::IntoFuture; use futures::Sink; use futures::Stream; use std; @@ -138,23 +138,23 @@ fn op_create_worker( } } + let (int, ext) = ThreadSafeState::create_channels(); let child_state = ThreadSafeState::new( state.global_state.clone(), Some(module_specifier.clone()), include_deno_namespace, + int, )?; - let rid = child_state.rid; let name = format!("USER-WORKER-{}", specifier); let deno_main_call = format!("denoMain({})", include_deno_namespace); let mut worker = - Worker::new(name, startup_data::deno_isolate_init(), child_state); + Worker::new(name, startup_data::deno_isolate_init(), child_state, ext); js_check(worker.execute(&deno_main_call)); js_check(worker.execute("workerMain()")); let exec_cb = move |worker: Worker| { - let mut workers_tl = parent_state.workers.lock().unwrap(); - workers_tl.insert(rid, worker.shared()); - json!(rid) + let worker_id = parent_state.add_child_worker(worker); + json!(worker_id) }; // Has provided source code, execute immediately. @@ -173,7 +173,7 @@ fn op_create_worker( #[derive(Deserialize)] struct HostGetWorkerClosedArgs { - rid: i32, + id: i32, } /// Return when the worker closes @@ -183,37 +183,41 @@ fn op_host_get_worker_closed( _data: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?; - - let rid = args.rid as u32; - let state = state.clone(); - - let shared_worker_future = { - let workers_tl = state.workers.lock().unwrap(); - let worker = workers_tl.get(&rid).unwrap(); - worker.clone() - }; - - let op = - shared_worker_future.then(move |_result| futures::future::ok(json!({}))); + let id = args.id as u32; + let state_ = state.clone(); + let workers_table = state.workers.lock().unwrap(); + // TODO: handle bad worker id gracefully + let worker = workers_table.get(&id).unwrap(); + let shared_worker_future = worker.clone().shared(); + + let op = shared_worker_future.then(move |_result| { + let mut workers_table = state_.workers.lock().unwrap(); + workers_table.remove(&id); + futures::future::ok(json!({})) + }); Ok(JsonOp::Async(Box::new(op))) } #[derive(Deserialize)] struct HostGetMessageArgs { - rid: i32, + id: i32, } /// Get message from guest worker as host fn op_host_get_message( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, _data: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: HostGetMessageArgs = serde_json::from_value(args)?; - let rid = args.rid as u32; - let op = Worker::get_message_from_resource(rid) + let id = args.id as u32; + let mut table = state.workers.lock().unwrap(); + // TODO: don't return bad resource anymore + let worker = table.get_mut(&id).ok_or_else(bad_resource)?; + let op = worker + .get_message() .map_err(move |_| -> ErrBox { unimplemented!() }) .and_then(move |maybe_buf| { futures::future::ok(json!({ @@ -226,27 +230,26 @@ fn op_host_get_message( #[derive(Deserialize)] struct HostPostMessageArgs { - rid: i32, + id: i32, } /// Post message to guest worker as host fn op_host_post_message( - _state: &ThreadSafeState, + state: &ThreadSafeState, args: Value, data: Option<PinnedBuf>, ) -> Result<JsonOp, ErrBox> { let args: HostPostMessageArgs = serde_json::from_value(args)?; - - let rid = args.rid as u32; - - let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - - // TODO: rename to post_message_to_child(rid, d) - Worker::post_message_to_resource(rid, d) - .into_future() - .wait() + let id = args.id as u32; + let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); + + debug!("post message to worker {}", id); + let mut table = state.workers.lock().unwrap(); + // TODO: don't return bad resource anymore + let worker = table.get_mut(&id).ok_or_else(bad_resource)?; + worker + .post_message(msg) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; - Ok(JsonOp::Sync(json!({}))) } |