diff options
Diffstat (limited to 'cli/ops')
-rw-r--r-- | cli/ops/web_worker.rs | 44 | ||||
-rw-r--r-- | cli/ops/worker_host.rs | 67 |
2 files changed, 92 insertions, 19 deletions
diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs index dd7b6e34a..8cade7d40 100644 --- a/cli/ops/web_worker.rs +++ b/cli/ops/web_worker.rs @@ -3,10 +3,10 @@ use super::dispatch_json::{JsonOp, Value}; use crate::op_error::OpError; use crate::ops::json_op; use crate::state::State; +use crate::web_worker::WebWorkerHandle; use crate::worker::WorkerEvent; use deno_core::*; use futures::channel::mpsc; -use futures::sink::SinkExt; use std::convert::From; pub fn web_worker_op<D>( @@ -25,7 +25,32 @@ where -> Result<JsonOp, OpError> { dispatcher(&sender, args, zero_copy) } } -pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender<WorkerEvent>) { +pub fn web_worker_op2<D>( + handle: WebWorkerHandle, + sender: mpsc::Sender<WorkerEvent>, + dispatcher: D, +) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError> +where + D: Fn( + WebWorkerHandle, + &mpsc::Sender<WorkerEvent>, + Value, + Option<ZeroCopyBuf>, + ) -> Result<JsonOp, OpError>, +{ + move |args: Value, + zero_copy: Option<ZeroCopyBuf>| + -> Result<JsonOp, OpError> { + dispatcher(handle.clone(), &sender, args, zero_copy) + } +} + +pub fn init( + i: &mut Isolate, + s: &State, + sender: &mpsc::Sender<WorkerEvent>, + handle: WebWorkerHandle, +) { i.register_op( "op_worker_post_message", s.core_op(json_op(web_worker_op( @@ -35,7 +60,11 @@ pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender<WorkerEvent>) { ); i.register_op( "op_worker_close", - s.core_op(json_op(web_worker_op(sender.clone(), op_worker_close))), + s.core_op(json_op(web_worker_op2( + handle, + sender.clone(), + op_worker_close, + ))), ); } @@ -47,18 +76,23 @@ fn op_worker_post_message( ) -> Result<JsonOp, OpError> { let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let mut sender = sender.clone(); - let fut = sender.send(WorkerEvent::Message(d)); - futures::executor::block_on(fut).expect("Failed to post message to host"); + sender + .try_send(WorkerEvent::Message(d)) + .expect("Failed to post message to host"); Ok(JsonOp::Sync(json!({}))) } /// Notify host that guest worker closes fn op_worker_close( + handle: WebWorkerHandle, sender: &mpsc::Sender<WorkerEvent>, _args: Value, _data: Option<ZeroCopyBuf>, ) -> Result<JsonOp, OpError> { let mut sender = sender.clone(); + // Notify parent that we're finished sender.close_channel(); + // Terminate execution of current worker + handle.terminate(); Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index ee448eb83..a04250099 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -1,7 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; use crate::fmt_errors::JSError; -use crate::futures::SinkExt; use crate::global_state::GlobalState; use crate::op_error::OpError; use crate::permissions::DenoPermissions; @@ -9,11 +8,10 @@ use crate::startup_data; use crate::state::State; use crate::tokio_util::create_basic_runtime; use crate::web_worker::WebWorker; +use crate::web_worker::WebWorkerHandle; use crate::worker::WorkerEvent; -use crate::worker::WorkerHandle; use deno_core::*; use futures::future::FutureExt; -use futures::future::TryFutureExt; use std::convert::From; use std::thread::JoinHandle; @@ -58,9 +56,9 @@ fn run_worker_thread( specifier: ModuleSpecifier, has_source_code: bool, source_code: String, -) -> Result<(JoinHandle<()>, WorkerHandle), ErrBox> { +) -> Result<(JoinHandle<()>, WebWorkerHandle), ErrBox> { let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1); + std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, ErrBox>>(1); let builder = std::thread::Builder::new().name(format!("deno-worker-{}", name)); @@ -78,6 +76,7 @@ fn run_worker_thread( } let mut worker = result.unwrap(); + let name = worker.name.to_string(); // Send thread safe handle to newly created worker to host thread handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); drop(handle_sender); @@ -109,7 +108,8 @@ fn run_worker_thread( if let Err(e) = result { let mut sender = worker.internal_channels.sender.clone(); - futures::executor::block_on(sender.send(WorkerEvent::Error(e))) + sender + .try_send(WorkerEvent::TerminalError(e)) .expect("Failed to post message to host"); // Failure to execute script is a terminal error, bye, bye. @@ -120,6 +120,7 @@ fn run_worker_thread( // that means that we should store JoinHandle to thread to ensure // that it actually terminates. rt.block_on(worker).expect("Panic in event loop"); + debug!("Worker thread shuts down {}", &name); })?; let worker_handle = handle_receiver.recv().unwrap()?; @@ -205,6 +206,28 @@ fn op_host_terminate_worker( fn serialize_worker_event(event: WorkerEvent) -> Value { match event { WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }), + WorkerEvent::TerminalError(error) => { + let mut serialized_error = json!({ + "type": "terminalError", + "error": { + "message": error.to_string(), + } + }); + + if let Ok(js_error) = error.downcast::<JSError>() { + serialized_error = json!({ + "type": "terminalError", + "error": { + "message": js_error.message, + "fileName": js_error.script_resource_name, + "lineNumber": js_error.line_number, + "columnNumber": js_error.start_column, + } + }); + } + + serialized_error + } WorkerEvent::Error(error) => { let mut serialized_error = json!({ "type": "error", @@ -247,13 +270,30 @@ fn op_host_get_message( let state_ = state.clone(); let op = async move { let response = match worker_handle.get_event().await { - Some(event) => serialize_worker_event(event), + Some(event) => { + // Terminal error means that worker should be removed from worker table. + if let WorkerEvent::TerminalError(_) = &event { + let mut state_ = state_.borrow_mut(); + if let Some((join_handle, mut worker_handle)) = + state_.workers.remove(&id) + { + worker_handle.sender.close_channel(); + join_handle.join().expect("Worker thread panicked"); + } + } + serialize_worker_event(event) + } None => { + // Worker shuts down let mut state_ = state_.borrow_mut(); - let (join_handle, mut worker_handle) = - state_.workers.remove(&id).expect("No worker handle found"); - worker_handle.sender.close_channel(); - join_handle.join().expect("Worker thread panicked"); + // Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called + // already meaning that we won't find worker in table - in that case ignore. + if let Some((join_handle, mut worker_handle)) = + state_.workers.remove(&id) + { + worker_handle.sender.close_channel(); + join_handle.join().expect("Worker thread panicked"); + } json!({ "type": "close" }) } }; @@ -276,9 +316,8 @@ fn op_host_post_message( let state = state.borrow(); let (_, worker_handle) = state.workers.get(&id).expect("No worker handle found"); - let fut = worker_handle + worker_handle .post_message(msg) - .map_err(|e| OpError::other(e.to_string())); - futures::executor::block_on(fut)?; + .map_err(|e| OpError::other(e.to_string()))?; Ok(JsonOp::Sync(json!({}))) } |