summaryrefslogtreecommitdiff
path: root/cli/ops/workers.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops/workers.rs')
-rw-r--r--cli/ops/workers.rs73
1 files changed, 38 insertions, 35 deletions
diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs
index f6dcf8042..cf7378a91 100644
--- a/cli/ops/workers.rs
+++ b/cli/ops/workers.rs
@@ -1,5 +1,6 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
+use crate::deno_error::bad_resource;
use crate::deno_error::js_check;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
@@ -11,7 +12,6 @@ use deno::*;
use futures;
use futures::Async;
use futures::Future;
-use futures::IntoFuture;
use futures::Sink;
use futures::Stream;
use std;
@@ -138,23 +138,23 @@ fn op_create_worker(
}
}
+ let (int, ext) = ThreadSafeState::create_channels();
let child_state = ThreadSafeState::new(
state.global_state.clone(),
Some(module_specifier.clone()),
include_deno_namespace,
+ int,
)?;
- let rid = child_state.rid;
let name = format!("USER-WORKER-{}", specifier);
let deno_main_call = format!("denoMain({})", include_deno_namespace);
let mut worker =
- Worker::new(name, startup_data::deno_isolate_init(), child_state);
+ Worker::new(name, startup_data::deno_isolate_init(), child_state, ext);
js_check(worker.execute(&deno_main_call));
js_check(worker.execute("workerMain()"));
let exec_cb = move |worker: Worker| {
- let mut workers_tl = parent_state.workers.lock().unwrap();
- workers_tl.insert(rid, worker.shared());
- json!(rid)
+ let worker_id = parent_state.add_child_worker(worker);
+ json!(worker_id)
};
// Has provided source code, execute immediately.
@@ -173,7 +173,7 @@ fn op_create_worker(
#[derive(Deserialize)]
struct HostGetWorkerClosedArgs {
- rid: i32,
+ id: i32,
}
/// Return when the worker closes
@@ -183,37 +183,41 @@ fn op_host_get_worker_closed(
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?;
-
- let rid = args.rid as u32;
- let state = state.clone();
-
- let shared_worker_future = {
- let workers_tl = state.workers.lock().unwrap();
- let worker = workers_tl.get(&rid).unwrap();
- worker.clone()
- };
-
- let op =
- shared_worker_future.then(move |_result| futures::future::ok(json!({})));
+ let id = args.id as u32;
+ let state_ = state.clone();
+ let workers_table = state.workers.lock().unwrap();
+ // TODO: handle bad worker id gracefully
+ let worker = workers_table.get(&id).unwrap();
+ let shared_worker_future = worker.clone().shared();
+
+ let op = shared_worker_future.then(move |_result| {
+ let mut workers_table = state_.workers.lock().unwrap();
+ workers_table.remove(&id);
+ futures::future::ok(json!({}))
+ });
Ok(JsonOp::Async(Box::new(op)))
}
#[derive(Deserialize)]
struct HostGetMessageArgs {
- rid: i32,
+ id: i32,
}
/// Get message from guest worker as host
fn op_host_get_message(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostGetMessageArgs = serde_json::from_value(args)?;
- let rid = args.rid as u32;
- let op = Worker::get_message_from_resource(rid)
+ let id = args.id as u32;
+ let mut table = state.workers.lock().unwrap();
+ // TODO: don't return bad resource anymore
+ let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
+ let op = worker
+ .get_message()
.map_err(move |_| -> ErrBox { unimplemented!() })
.and_then(move |maybe_buf| {
futures::future::ok(json!({
@@ -226,27 +230,26 @@ fn op_host_get_message(
#[derive(Deserialize)]
struct HostPostMessageArgs {
- rid: i32,
+ id: i32,
}
/// Post message to guest worker as host
fn op_host_post_message(
- _state: &ThreadSafeState,
+ state: &ThreadSafeState,
args: Value,
data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostPostMessageArgs = serde_json::from_value(args)?;
-
- let rid = args.rid as u32;
-
- let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
-
- // TODO: rename to post_message_to_child(rid, d)
- Worker::post_message_to_resource(rid, d)
- .into_future()
- .wait()
+ let id = args.id as u32;
+ let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
+
+ debug!("post message to worker {}", id);
+ let mut table = state.workers.lock().unwrap();
+ // TODO: don't return bad resource anymore
+ let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
+ worker
+ .post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
-
Ok(JsonOp::Sync(json!({})))
}