summaryrefslogtreecommitdiff
path: root/runtime/ops/worker_host.rs
diff options
context:
space:
mode:
Diffstat (limited to 'runtime/ops/worker_host.rs')
-rw-r--r--runtime/ops/worker_host.rs71
1 files changed, 57 insertions, 14 deletions
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index d80a39502..829681ab6 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -67,6 +67,12 @@ pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>);
pub struct WorkerThread {
join_handle: JoinHandle<Result<(), AnyError>>,
worker_handle: WebWorkerHandle,
+
+ // A WorkerThread that hasn't been explicitly terminated can only be removed
+ // from the WorkersTable once close messages have been received for both the
+ // control and message channels. See `close_channel`.
+ ctrl_closed: bool,
+ message_closed: bool,
}
pub type WorkersTable = HashMap<WorkerId, WorkerThread>;
@@ -553,6 +559,8 @@ fn op_create_worker(
let worker_thread = WorkerThread {
join_handle,
worker_handle: worker_handle.into(),
+ ctrl_closed: false,
+ message_closed: false,
};
// At this point all interactions with worker happen using thread
@@ -582,19 +590,49 @@ fn op_host_terminate_worker(
Ok(())
}
-/// Try to remove worker from workers table - NOTE: `Worker.terminate()`
-/// might have been called already meaning that we won't find worker in
-/// table - in that case ignore.
-fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
+enum WorkerChannel {
+ Ctrl,
+ Messages,
+}
+
+/// Close a worker's channel. If this results in both of a worker's channels
+/// being closed, the worker will be removed from the workers table.
+fn close_channel(
+ state: Rc<RefCell<OpState>>,
+ id: WorkerId,
+ channel: WorkerChannel,
+) {
+ use std::collections::hash_map::Entry;
+
let mut s = state.borrow_mut();
let workers = s.borrow_mut::<WorkersTable>();
- if let Some(worker_thread) = workers.remove(&id) {
- worker_thread.worker_handle.terminate();
- worker_thread
- .join_handle
- .join()
- .expect("Worker thread panicked")
- .expect("Panic in worker event loop");
+
+ // `Worker.terminate()` might have been called already, meaning that we won't
+ // find the worker in the table - in that case ignore.
+ if let Entry::Occupied(mut entry) = workers.entry(id) {
+ let terminate = {
+ let worker_thread = entry.get_mut();
+ match channel {
+ WorkerChannel::Ctrl => {
+ worker_thread.ctrl_closed = true;
+ worker_thread.message_closed
+ }
+ WorkerChannel::Messages => {
+ worker_thread.message_closed = true;
+ worker_thread.ctrl_closed
+ }
+ }
+ };
+
+ if terminate {
+ let worker_thread = entry.remove();
+ worker_thread.worker_handle.terminate();
+ worker_thread
+ .join_handle
+ .join()
+ .expect("Worker thread panicked")
+ .expect("Panic in worker event loop");
+ }
}
}
@@ -620,13 +658,13 @@ async fn op_host_recv_ctrl(
if let Some(event) = maybe_event {
// Terminal error means that worker should be removed from worker table.
if let WorkerControlEvent::TerminalError(_) = &event {
- try_remove_and_close(state, id);
+ close_channel(state, id, WorkerChannel::Ctrl);
}
return Ok(event);
}
// If there was no event from worker it means it has already been closed.
- try_remove_and_close(state, id);
+ close_channel(state, id, WorkerChannel::Ctrl);
Ok(WorkerControlEvent::Close)
}
@@ -646,7 +684,12 @@ async fn op_host_recv_message(
return Ok(None);
}
};
- worker_handle.port.recv(state).await
+
+ let ret = worker_handle.port.recv(state.clone()).await?;
+ if ret.is_none() {
+ close_channel(state, id, WorkerChannel::Messages);
+ }
+ Ok(ret)
}
/// Post message to guest worker as host