diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/ops/worker_host.rs | 92 |
1 files changed, 51 insertions, 41 deletions
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index 23ffcd8b1..bd1f1e3f5 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -17,6 +17,8 @@ use deno_core::futures::future::LocalFutureObj; use deno_core::op; use deno_core::serde::Deserialize; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::Extension; use deno_core::ModuleSpecifier; use deno_core::OpState; @@ -27,7 +29,6 @@ use std::collections::HashMap; use std::rc::Rc; use std::sync::atomic::AtomicI32; use std::sync::Arc; -use std::thread::JoinHandle; pub struct CreateWebWorkerArgs { pub name: String, @@ -66,9 +67,8 @@ pub struct FormatJsErrorFnHolder(Option<Arc<FormatJsErrorFn>>); pub struct PreloadModuleCbHolder(Arc<PreloadModuleCb>); pub struct WorkerThread { - // It's an Option so we can take the value before dropping the WorkerThread. - join_handle: Option<JoinHandle<Result<(), AnyError>>>, worker_handle: WebWorkerHandle, + cancel_handle: Rc<CancelHandle>, // A WorkerThread that hasn't been explicitly terminated can only be removed // from the WorkersTable once close messages have been received for both the @@ -78,30 +78,16 @@ pub struct WorkerThread { } impl WorkerThread { - fn terminate(mut self) { - self.worker_handle.clone().terminate(); - self - .join_handle - .take() - .unwrap() - .join() - .expect("Worker thread panicked") - .expect("Panic in worker event loop"); - - // Optimization so the Drop impl doesn't try to terminate the worker handle - // again. - self.ctrl_closed = true; - self.message_closed = true; + fn terminate(self) { + // Cancel recv ops when terminating the worker, so they don't show up as + // pending ops. + self.cancel_handle.cancel(); } } impl Drop for WorkerThread { fn drop(&mut self) { - // If either of the channels is closed, the worker thread has at least - // started closing, and its event loop won't start another run. - if !(self.ctrl_closed || self.message_closed) { - self.worker_handle.clone().terminate(); - } + self.worker_handle.clone().terminate(); } } @@ -217,7 +203,7 @@ fn op_create_worker( std::thread::Builder::new().name(format!("{}", worker_id)); // Spawn it - let join_handle = thread_builder.spawn(move || { + thread_builder.spawn(move || { // Any error inside this block is terminal: // - JS worker is useless - meaning it throws an exception and can't do anything else, // all action done upon it should be noops @@ -256,8 +242,8 @@ fn op_create_worker( let worker_handle = handle_receiver.recv().unwrap()?; let worker_thread = WorkerThread { - join_handle: Some(join_handle), worker_handle: worker_handle.into(), + cancel_handle: CancelHandle::new_rc(), ctrl_closed: false, message_closed: false, }; @@ -330,30 +316,41 @@ async fn op_host_recv_ctrl( state: Rc<RefCell<OpState>>, id: WorkerId, ) -> Result<WorkerControlEvent, AnyError> { - let worker_handle = { + let (worker_handle, cancel_handle) = { let state = state.borrow(); let workers_table = state.borrow::<WorkersTable>(); let maybe_handle = workers_table.get(&id); if let Some(handle) = maybe_handle { - handle.worker_handle.clone() + (handle.worker_handle.clone(), handle.cancel_handle.clone()) } else { // If handle was not found it means worker has already shutdown return Ok(WorkerControlEvent::Close); } }; - let maybe_event = worker_handle.get_control_event().await?; - if let Some(event) = maybe_event { - // Terminal error means that worker should be removed from worker table. - if let WorkerControlEvent::TerminalError(_) = &event { + let maybe_event = worker_handle + .get_control_event() + .or_cancel(cancel_handle) + .await; + match maybe_event { + Ok(Ok(Some(event))) => { + // Terminal error means that worker should be removed from worker table. + if let WorkerControlEvent::TerminalError(_) = &event { + close_channel(state, id, WorkerChannel::Ctrl); + } + Ok(event) + } + Ok(Ok(None)) => { + // If there was no event from worker it means it has already been closed. close_channel(state, id, WorkerChannel::Ctrl); + Ok(WorkerControlEvent::Close) + } + Ok(Err(err)) => Err(err), + Err(_) => { + // The worker was terminated. + Ok(WorkerControlEvent::Close) } - return Ok(event); } - - // If there was no event from worker it means it has already been closed. - close_channel(state, id, WorkerChannel::Ctrl); - Ok(WorkerControlEvent::Close) } #[op] @@ -361,23 +358,36 @@ async fn op_host_recv_message( state: Rc<RefCell<OpState>>, id: WorkerId, ) -> Result<Option<JsMessageData>, AnyError> { - let worker_handle = { + let (worker_handle, cancel_handle) = { let s = state.borrow(); let workers_table = s.borrow::<WorkersTable>(); let maybe_handle = workers_table.get(&id); if let Some(handle) = maybe_handle { - handle.worker_handle.clone() + (handle.worker_handle.clone(), handle.cancel_handle.clone()) } else { // If handle was not found it means worker has already shutdown return Ok(None); } }; - let ret = worker_handle.port.recv(state.clone()).await?; - if ret.is_none() { - close_channel(state, id, WorkerChannel::Messages); + let ret = worker_handle + .port + .recv(state.clone()) + .or_cancel(cancel_handle) + .await; + match ret { + Ok(Ok(ret)) => { + if ret.is_none() { + close_channel(state, id, WorkerChannel::Messages); + } + Ok(ret) + } + Ok(Err(err)) => Err(err), + Err(_) => { + // The worker was terminated. + Ok(None) + } } - Ok(ret) } /// Post message to guest worker as host |