diff options
Diffstat (limited to 'cli/ops')
-rw-r--r-- | cli/ops/worker_host.rs | 106 |
1 files changed, 26 insertions, 80 deletions
diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index 81e8f76da..13d4fffff 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -12,18 +12,15 @@ use crate::startup_data; use crate::state::State; use crate::tokio_util::create_basic_runtime; use crate::web_worker::WebWorker; -use crate::worker::Worker; use crate::worker::WorkerEvent; use crate::worker::WorkerHandle; use deno_core::*; use futures; -use futures::future::poll_fn; use futures::future::FutureExt; use futures::future::TryFutureExt; -use futures::stream::StreamExt; use std; use std::convert::From; -use std::task::Poll; +use std::thread::JoinHandle; pub fn init(i: &mut Isolate, s: &State) { i.register_op( @@ -61,64 +58,6 @@ fn create_web_worker( Ok(worker) } -// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs` -pub fn run_worker_loop( - rt: &mut tokio::runtime::Runtime, - worker: &mut Worker, -) -> Result<(), ErrBox> { - let mut worker_is_ready = false; - - let fut = poll_fn(|cx| -> Poll<Result<(), ErrBox>> { - if !worker_is_ready { - match worker.poll_unpin(cx) { - Poll::Ready(r) => { - if let Err(e) = r { - let mut sender = worker.internal_channels.sender.clone(); - futures::executor::block_on(sender.send(WorkerEvent::Error(e))) - .expect("Failed to post message to host"); - } - worker_is_ready = true; - } - Poll::Pending => {} - } - } - - let maybe_msg = { - match worker.internal_channels.receiver.poll_next_unpin(cx) { - Poll::Ready(r) => match r { - Some(msg) => { - let msg_str = String::from_utf8(msg.to_vec()).unwrap(); - debug!("received message from host: {}", msg_str); - Some(msg_str) - } - None => { - debug!("channel closed by host, worker event loop shuts down"); - return Poll::Ready(Ok(())); - } - }, - Poll::Pending => None, - } - }; - - if let Some(msg) = maybe_msg { - // TODO: just add second value and then bind using rusty_v8 - // to get structured clone/transfer working - let script = format!("workerMessageRecvCallback({})", msg); - worker - .execute(&script) - .expect("Failed to execute message cb"); - // Let worker be polled again - worker_is_ready = false; - worker.waker.wake(); - } - - Poll::Pending - }); - - rt.block_on(fut) -} - -// TODO(bartlomieju): this function should probably live in `cli/web_worker.rs` // TODO(bartlomieju): check if order of actions is aligned to Worker spec fn run_worker_thread( name: String, @@ -127,14 +66,13 @@ fn run_worker_thread( specifier: ModuleSpecifier, has_source_code: bool, source_code: String, -) -> Result<WorkerHandle, ErrBox> { +) -> Result<(JoinHandle<()>, WorkerHandle), ErrBox> { let (handle_sender, handle_receiver) = std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1); let builder = std::thread::Builder::new().name(format!("deno-worker-{}", name)); - // TODO(bartlomieju): store JoinHandle as well - builder.spawn(move || { + let join_handle = builder.spawn(move || { // Any error inside this block is terminal: // - JS worker is useless - meaning it throws an exception and can't do anything else, // all action done upon it should be noops @@ -189,10 +127,11 @@ fn run_worker_thread( // TODO(bartlomieju): this thread should return result of event loop // that means that we should store JoinHandle to thread to ensure // that it actually terminates. - run_worker_loop(&mut rt, &mut worker).expect("Panic in event loop"); + rt.block_on(worker).expect("Panic in event loop"); })?; - handle_receiver.recv().unwrap() + let worker_handle = handle_receiver.recv().unwrap()?; + Ok((join_handle, worker_handle)) } #[derive(Deserialize)] @@ -230,7 +169,7 @@ fn op_create_worker( format!("USER-WORKER-{}", specifier) }); - let worker_handle = run_worker_thread( + let (join_handle, worker_handle) = run_worker_thread( worker_name, global_state, permissions, @@ -240,7 +179,12 @@ fn op_create_worker( )?; // At this point all interactions with worker happen using thread // safe handler returned from previous function call - let worker_id = parent_state.add_child_worker(worker_handle); + let mut parent_state = parent_state.borrow_mut(); + let worker_id = parent_state.next_worker_id; + parent_state.next_worker_id += 1; + parent_state + .workers + .insert(worker_id, (join_handle, worker_handle)); Ok(JsonOp::Sync(json!({ "id": worker_id }))) } @@ -258,9 +202,10 @@ fn op_host_terminate_worker( let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; let mut state = state.borrow_mut(); - let worker_handle = + let (join_handle, worker_handle) = state.workers.remove(&id).expect("No worker handle found"); worker_handle.terminate(); + join_handle.join().expect("Panic in worker thread"); Ok(JsonOp::Sync(json!({}))) } @@ -299,22 +244,22 @@ fn op_host_get_message( ) -> Result<JsonOp, ErrBox> { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let state_ = state.borrow(); - let worker_handle = state_ - .workers - .get(&id) - .expect("No worker handle found") - .clone(); + let worker_handle = { + let state_ = state.borrow(); + let (_join_handle, worker_handle) = + state_.workers.get(&id).expect("No worker handle found"); + worker_handle.clone() + }; let state_ = state.clone(); let op = async move { let response = match worker_handle.get_event().await { Some(event) => serialize_worker_event(event), None => { let mut state_ = state_.borrow_mut(); - let mut handle = + let (join_handle, mut worker_handle) = state_.workers.remove(&id).expect("No worker handle found"); - handle.sender.close_channel(); - // TODO(bartlomieju): join thread handle here + worker_handle.sender.close_channel(); + join_handle.join().expect("Worker thread panicked"); json!({ "type": "close" }) } }; @@ -335,7 +280,8 @@ fn op_host_post_message( debug!("post message to worker {}", id); let state = state.borrow(); - let worker_handle = state.workers.get(&id).expect("No worker handle found"); + let (_, worker_handle) = + state.workers.get(&id).expect("No worker handle found"); let fut = worker_handle .post_message(msg) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string())); |