diff options
Diffstat (limited to 'runtime')
-rw-r--r-- | runtime/js/11_workers.js | 31 | ||||
-rw-r--r-- | runtime/ops/worker_host.rs | 71 |
2 files changed, 77 insertions, 25 deletions
diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js index 86560b20d..c9bfc172a 100644 --- a/runtime/js/11_workers.js +++ b/runtime/js/11_workers.js @@ -139,7 +139,13 @@ class Worker extends EventTarget { #id = 0; #name = ""; - #terminated = false; + + // "RUNNING" | "CLOSED" | "TERMINATED" + // "TERMINATED" means that any controls or messages received will be + // discarded. "CLOSED" means that we have received a control + // indicating that the worker is no longer running, but there might + // still be messages left to receive. + #status = "RUNNING"; constructor(specifier, options = {}) { super(); @@ -243,17 +249,17 @@ } #pollControl = async () => { - while (!this.#terminated) { + while (this.#status === "RUNNING") { const [type, data] = await hostRecvCtrl(this.#id); // If terminate was called then we ignore all messages - if (this.#terminated) { + if (this.#status === "TERMINATED") { return; } switch (type) { case 1: { // TerminalError - this.#terminated = true; + this.#status = "CLOSED"; } /* falls through */ case 2: { // Error if (!this.#handleError(data)) { @@ -270,7 +276,7 @@ } case 3: { // Close log(`Host got "close" message from worker: ${this.#name}`); - this.#terminated = true; + this.#status = "CLOSED"; return; } default: { @@ -281,9 +287,11 @@ }; #pollMessages = async () => { - while (!this.terminated) { + while (this.#status !== "TERMINATED") { const data = await hostRecvMessage(this.#id); - if (data === null) break; + if (this.#status === "TERMINATED" || data === null) { + return; + } let message, transferables; try { const v = deserializeJsMessageData(data); @@ -332,13 +340,14 @@ } const { transfer } = options; const data = serializeJsMessageData(message, transfer); - if (this.#terminated) return; - hostPostMessage(this.#id, data); + if (this.#status === "RUNNING") { + hostPostMessage(this.#id, data); + } } terminate() { - if (!this.#terminated) { - this.#terminated = true; + if (this.#status !== "TERMINATED") { + this.#status = "TERMINATED"; hostTerminateWorker(this.#id); } } diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index d80a39502..829681ab6 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -67,6 +67,12 @@ pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>); pub struct WorkerThread { join_handle: JoinHandle<Result<(), AnyError>>, worker_handle: WebWorkerHandle, + + // A WorkerThread that hasn't been explicitly terminated can only be removed + // from the WorkersTable once close messages have been received for both the + // control and message channels. See `close_channel`. + ctrl_closed: bool, + message_closed: bool, } pub type WorkersTable = HashMap<WorkerId, WorkerThread>; @@ -553,6 +559,8 @@ fn op_create_worker( let worker_thread = WorkerThread { join_handle, worker_handle: worker_handle.into(), + ctrl_closed: false, + message_closed: false, }; // At this point all interactions with worker happen using thread @@ -582,19 +590,49 @@ fn op_host_terminate_worker( Ok(()) } -/// Try to remove worker from workers table - NOTE: `Worker.terminate()` -/// might have been called already meaning that we won't find worker in -/// table - in that case ignore. -fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) { +enum WorkerChannel { + Ctrl, + Messages, +} + +/// Close a worker's channel. If this results in both of a worker's channels +/// being closed, the worker will be removed from the workers table. +fn close_channel( + state: Rc<RefCell<OpState>>, + id: WorkerId, + channel: WorkerChannel, +) { + use std::collections::hash_map::Entry; + let mut s = state.borrow_mut(); let workers = s.borrow_mut::<WorkersTable>(); - if let Some(worker_thread) = workers.remove(&id) { - worker_thread.worker_handle.terminate(); - worker_thread - .join_handle - .join() - .expect("Worker thread panicked") - .expect("Panic in worker event loop"); + + // `Worker.terminate()` might have been called already, meaning that we won't + // find the worker in the table - in that case ignore. + if let Entry::Occupied(mut entry) = workers.entry(id) { + let terminate = { + let worker_thread = entry.get_mut(); + match channel { + WorkerChannel::Ctrl => { + worker_thread.ctrl_closed = true; + worker_thread.message_closed + } + WorkerChannel::Messages => { + worker_thread.message_closed = true; + worker_thread.ctrl_closed + } + } + }; + + if terminate { + let worker_thread = entry.remove(); + worker_thread.worker_handle.terminate(); + worker_thread + .join_handle + .join() + .expect("Worker thread panicked") + .expect("Panic in worker event loop"); + } } } @@ -620,13 +658,13 @@ async fn op_host_recv_ctrl( if let Some(event) = maybe_event { // Terminal error means that worker should be removed from worker table. if let WorkerControlEvent::TerminalError(_) = &event { - try_remove_and_close(state, id); + close_channel(state, id, WorkerChannel::Ctrl); } return Ok(event); } // If there was no event from worker it means it has already been closed. - try_remove_and_close(state, id); + close_channel(state, id, WorkerChannel::Ctrl); Ok(WorkerControlEvent::Close) } @@ -646,7 +684,12 @@ async fn op_host_recv_message( return Ok(None); } }; - worker_handle.port.recv(state).await + + let ret = worker_handle.port.recv(state.clone()).await?; + if ret.is_none() { + close_channel(state, id, WorkerChannel::Messages); + } + Ok(ret) } /// Post message to guest worker as host |