summaryrefslogtreecommitdiff
path: root/cli/ops
diff options
context:
space:
mode:
Diffstat (limited to 'cli/ops')
-rw-r--r--cli/ops/worker_host.rs106
1 files changed, 26 insertions, 80 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs
index 81e8f76da..13d4fffff 100644
--- a/cli/ops/worker_host.rs
+++ b/cli/ops/worker_host.rs
@@ -12,18 +12,15 @@ use crate::startup_data;
use crate::state::State;
use crate::tokio_util::create_basic_runtime;
use crate::web_worker::WebWorker;
-use crate::worker::Worker;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
use deno_core::*;
use futures;
-use futures::future::poll_fn;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
-use futures::stream::StreamExt;
use std;
use std::convert::From;
-use std::task::Poll;
+use std::thread::JoinHandle;
pub fn init(i: &mut Isolate, s: &State) {
i.register_op(
@@ -61,64 +58,6 @@ fn create_web_worker(
Ok(worker)
}
-// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
-pub fn run_worker_loop(
- rt: &mut tokio::runtime::Runtime,
- worker: &mut Worker,
-) -> Result<(), ErrBox> {
- let mut worker_is_ready = false;
-
- let fut = poll_fn(|cx| -> Poll<Result<(), ErrBox>> {
- if !worker_is_ready {
- match worker.poll_unpin(cx) {
- Poll::Ready(r) => {
- if let Err(e) = r {
- let mut sender = worker.internal_channels.sender.clone();
- futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
- .expect("Failed to post message to host");
- }
- worker_is_ready = true;
- }
- Poll::Pending => {}
- }
- }
-
- let maybe_msg = {
- match worker.internal_channels.receiver.poll_next_unpin(cx) {
- Poll::Ready(r) => match r {
- Some(msg) => {
- let msg_str = String::from_utf8(msg.to_vec()).unwrap();
- debug!("received message from host: {}", msg_str);
- Some(msg_str)
- }
- None => {
- debug!("channel closed by host, worker event loop shuts down");
- return Poll::Ready(Ok(()));
- }
- },
- Poll::Pending => None,
- }
- };
-
- if let Some(msg) = maybe_msg {
- // TODO: just add second value and then bind using rusty_v8
- // to get structured clone/transfer working
- let script = format!("workerMessageRecvCallback({})", msg);
- worker
- .execute(&script)
- .expect("Failed to execute message cb");
- // Let worker be polled again
- worker_is_ready = false;
- worker.waker.wake();
- }
-
- Poll::Pending
- });
-
- rt.block_on(fut)
-}
-
-// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs`
// TODO(bartlomieju): check if order of actions is aligned to Worker spec
fn run_worker_thread(
name: String,
@@ -127,14 +66,13 @@ fn run_worker_thread(
specifier: ModuleSpecifier,
has_source_code: bool,
source_code: String,
-) -> Result<WorkerHandle, ErrBox> {
+) -> Result<(JoinHandle<()>, WorkerHandle), ErrBox> {
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
let builder =
std::thread::Builder::new().name(format!("deno-worker-{}", name));
- // TODO(bartlomieju): store JoinHandle as well
- builder.spawn(move || {
+ let join_handle = builder.spawn(move || {
// Any error inside this block is terminal:
// - JS worker is useless - meaning it throws an exception and can't do anything else,
// all action done upon it should be noops
@@ -189,10 +127,11 @@ fn run_worker_thread(
// TODO(bartlomieju): this thread should return result of event loop
// that means that we should store JoinHandle to thread to ensure
// that it actually terminates.
- run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop");
+ rt.block_on(worker).expect("Panic in event loop");
})?;
- handle_receiver.recv().unwrap()
+ let worker_handle = handle_receiver.recv().unwrap()?;
+ Ok((join_handle, worker_handle))
}
#[derive(Deserialize)]
@@ -230,7 +169,7 @@ fn op_create_worker(
format!("USER-WORKER-{}", specifier)
});
- let worker_handle = run_worker_thread(
+ let (join_handle, worker_handle) = run_worker_thread(
worker_name,
global_state,
permissions,
@@ -240,7 +179,12 @@ fn op_create_worker(
)?;
// At this point all interactions with worker happen using thread
// safe handler returned from previous function call
- let worker_id = parent_state.add_child_worker(worker_handle);
+ let mut parent_state = parent_state.borrow_mut();
+ let worker_id = parent_state.next_worker_id;
+ parent_state.next_worker_id += 1;
+ parent_state
+ .workers
+ .insert(worker_id, (join_handle, worker_handle));
Ok(JsonOp::Sync(json!({ "id": worker_id })))
}
@@ -258,9 +202,10 @@ fn op_host_terminate_worker(
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let mut state = state.borrow_mut();
- let worker_handle =
+ let (join_handle, worker_handle) =
state.workers.remove(&id).expect("No worker handle found");
worker_handle.terminate();
+ join_handle.join().expect("Panic in worker thread");
Ok(JsonOp::Sync(json!({})))
}
@@ -299,22 +244,22 @@ fn op_host_get_message(
) -> Result<JsonOp, ErrBox> {
let args: WorkerArgs = serde_json::from_value(args)?;
let id = args.id as u32;
- let state_ = state.borrow();
- let worker_handle = state_
- .workers
- .get(&id)
- .expect("No worker handle found")
- .clone();
+ let worker_handle = {
+ let state_ = state.borrow();
+ let (_join_handle, worker_handle) =
+ state_.workers.get(&id).expect("No worker handle found");
+ worker_handle.clone()
+ };
let state_ = state.clone();
let op = async move {
let response = match worker_handle.get_event().await {
Some(event) => serialize_worker_event(event),
None => {
let mut state_ = state_.borrow_mut();
- let mut handle =
+ let (join_handle, mut worker_handle) =
state_.workers.remove(&id).expect("No worker handle found");
- handle.sender.close_channel();
- // TODO(bartlomieju): join thread handle here
+ worker_handle.sender.close_channel();
+ join_handle.join().expect("Worker thread panicked");
json!({ "type": "close" })
}
};
@@ -335,7 +280,8 @@ fn op_host_post_message(
debug!("post message to worker {}", id);
let state = state.borrow();
- let worker_handle = state.workers.get(&id).expect("No worker handle found");
+ let (_, worker_handle) =
+ state.workers.get(&id).expect("No worker handle found");
let fut = worker_handle
.post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()));