diff options
Diffstat (limited to 'runtime/ops/worker_host.rs')
-rw-r--r-- | runtime/ops/worker_host.rs | 71 |
1 files changed, 57 insertions, 14 deletions
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index d80a39502..829681ab6 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -67,6 +67,12 @@ pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>); pub struct WorkerThread { join_handle: JoinHandle<Result<(), AnyError>>, worker_handle: WebWorkerHandle, + + // A WorkerThread that hasn't been explicitly terminated can only be removed + // from the WorkersTable once close messages have been received for both the + // control and message channels. See `close_channel`. + ctrl_closed: bool, + message_closed: bool, } pub type WorkersTable = HashMap<WorkerId, WorkerThread>; @@ -553,6 +559,8 @@ fn op_create_worker( let worker_thread = WorkerThread { join_handle, worker_handle: worker_handle.into(), + ctrl_closed: false, + message_closed: false, }; // At this point all interactions with worker happen using thread @@ -582,19 +590,49 @@ fn op_host_terminate_worker( Ok(()) } -/// Try to remove worker from workers table - NOTE: `Worker.terminate()` -/// might have been called already meaning that we won't find worker in -/// table - in that case ignore. -fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) { +enum WorkerChannel { + Ctrl, + Messages, +} + +/// Close a worker's channel. If this results in both of a worker's channels +/// being closed, the worker will be removed from the workers table. +fn close_channel( + state: Rc<RefCell<OpState>>, + id: WorkerId, + channel: WorkerChannel, +) { + use std::collections::hash_map::Entry; + let mut s = state.borrow_mut(); let workers = s.borrow_mut::<WorkersTable>(); - if let Some(worker_thread) = workers.remove(&id) { - worker_thread.worker_handle.terminate(); - worker_thread - .join_handle - .join() - .expect("Worker thread panicked") - .expect("Panic in worker event loop"); + + // `Worker.terminate()` might have been called already, meaning that we won't + // find the worker in the table - in that case ignore. + if let Entry::Occupied(mut entry) = workers.entry(id) { + let terminate = { + let worker_thread = entry.get_mut(); + match channel { + WorkerChannel::Ctrl => { + worker_thread.ctrl_closed = true; + worker_thread.message_closed + } + WorkerChannel::Messages => { + worker_thread.message_closed = true; + worker_thread.ctrl_closed + } + } + }; + + if terminate { + let worker_thread = entry.remove(); + worker_thread.worker_handle.terminate(); + worker_thread + .join_handle + .join() + .expect("Worker thread panicked") + .expect("Panic in worker event loop"); + } } } @@ -620,13 +658,13 @@ async fn op_host_recv_ctrl( if let Some(event) = maybe_event { // Terminal error means that worker should be removed from worker table. if let WorkerControlEvent::TerminalError(_) = &event { - try_remove_and_close(state, id); + close_channel(state, id, WorkerChannel::Ctrl); } return Ok(event); } // If there was no event from worker it means it has already been closed. - try_remove_and_close(state, id); + close_channel(state, id, WorkerChannel::Ctrl); Ok(WorkerControlEvent::Close) } @@ -646,7 +684,12 @@ async fn op_host_recv_message( return Ok(None); } }; - worker_handle.port.recv(state).await + + let ret = worker_handle.port.recv(state.clone()).await?; + if ret.is_none() { + close_channel(state, id, WorkerChannel::Messages); + } + Ok(ret) } /// Post message to guest worker as host |