summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/integration/run_tests.rs5
-rw-r--r--cli/tests/testdata/worker_message_before_close.js16
-rw-r--r--cli/tests/testdata/worker_message_before_close.js.out8
-rw-r--r--cli/tests/testdata/workers/message_before_close.js6
-rw-r--r--runtime/js/11_workers.js31
-rw-r--r--runtime/ops/worker_host.rs71
6 files changed, 112 insertions, 25 deletions
diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs
index 981b85199..2c0a16582 100644
--- a/cli/tests/integration/run_tests.rs
+++ b/cli/tests/integration/run_tests.rs
@@ -1177,6 +1177,11 @@ itest!(worker_close_race {
output: "worker_close_race.js.out",
});
+itest!(worker_message_before_close {
+ args: "run --quiet --reload --allow-read worker_message_before_close.js",
+ output: "worker_message_before_close.js.out",
+});
+
#[test]
fn no_validate_asm() {
let output = util::deno_cmd()
diff --git a/cli/tests/testdata/worker_message_before_close.js b/cli/tests/testdata/worker_message_before_close.js
new file mode 100644
index 000000000..6a9149af1
--- /dev/null
+++ b/cli/tests/testdata/worker_message_before_close.js
@@ -0,0 +1,16 @@
+for (let i = 0; i < 4; i++) {
+ const worker = new Worker(
+ new URL("./workers/message_before_close.js", import.meta.url).href,
+ { type: "module", name: String(i) },
+ );
+
+ worker.addEventListener("message", (message) => {
+ // Only print responses after all reception logs.
+ setTimeout(() => {
+ console.log("response from worker %d received", message.data);
+ }, 500);
+ });
+ worker.postMessage(i);
+}
+
+export {};
diff --git a/cli/tests/testdata/worker_message_before_close.js.out b/cli/tests/testdata/worker_message_before_close.js.out
new file mode 100644
index 000000000..f1cc558a2
--- /dev/null
+++ b/cli/tests/testdata/worker_message_before_close.js.out
@@ -0,0 +1,8 @@
+message received in worker 0
+message received in worker 1
+message received in worker 2
+message received in worker 3
+response from worker 0 received
+response from worker 1 received
+response from worker 2 received
+response from worker 3 received
diff --git a/cli/tests/testdata/workers/message_before_close.js b/cli/tests/testdata/workers/message_before_close.js
new file mode 100644
index 000000000..0213abcb3
--- /dev/null
+++ b/cli/tests/testdata/workers/message_before_close.js
@@ -0,0 +1,6 @@
+self.onmessage = (params) => {
+ const workerId = params.data;
+ console.log("message received in worker %d", workerId);
+ self.postMessage(workerId);
+ self.close();
+};
diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js
index 86560b20d..c9bfc172a 100644
--- a/runtime/js/11_workers.js
+++ b/runtime/js/11_workers.js
@@ -139,7 +139,13 @@
class Worker extends EventTarget {
#id = 0;
#name = "";
- #terminated = false;
+
+ // "RUNNING" | "CLOSED" | "TERMINATED"
+ // "TERMINATED" means that any controls or messages received will be
+ // discarded. "CLOSED" means that we have received a control
+ // indicating that the worker is no longer running, but there might
+ // still be messages left to receive.
+ #status = "RUNNING";
constructor(specifier, options = {}) {
super();
@@ -243,17 +249,17 @@
}
#pollControl = async () => {
- while (!this.#terminated) {
+ while (this.#status === "RUNNING") {
const [type, data] = await hostRecvCtrl(this.#id);
// If terminate was called then we ignore all messages
- if (this.#terminated) {
+ if (this.#status === "TERMINATED") {
return;
}
switch (type) {
case 1: { // TerminalError
- this.#terminated = true;
+ this.#status = "CLOSED";
} /* falls through */
case 2: { // Error
if (!this.#handleError(data)) {
@@ -270,7 +276,7 @@
}
case 3: { // Close
log(`Host got "close" message from worker: ${this.#name}`);
- this.#terminated = true;
+ this.#status = "CLOSED";
return;
}
default: {
@@ -281,9 +287,11 @@
};
#pollMessages = async () => {
- while (!this.terminated) {
+ while (this.#status !== "TERMINATED") {
const data = await hostRecvMessage(this.#id);
- if (data === null) break;
+ if (this.#status === "TERMINATED" || data === null) {
+ return;
+ }
let message, transferables;
try {
const v = deserializeJsMessageData(data);
@@ -332,13 +340,14 @@
}
const { transfer } = options;
const data = serializeJsMessageData(message, transfer);
- if (this.#terminated) return;
- hostPostMessage(this.#id, data);
+ if (this.#status === "RUNNING") {
+ hostPostMessage(this.#id, data);
+ }
}
terminate() {
- if (!this.#terminated) {
- this.#terminated = true;
+ if (this.#status !== "TERMINATED") {
+ this.#status = "TERMINATED";
hostTerminateWorker(this.#id);
}
}
diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs
index d80a39502..829681ab6 100644
--- a/runtime/ops/worker_host.rs
+++ b/runtime/ops/worker_host.rs
@@ -67,6 +67,12 @@ pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>);
pub struct WorkerThread {
join_handle: JoinHandle<Result<(), AnyError>>,
worker_handle: WebWorkerHandle,
+
+ // A WorkerThread that hasn't been explicitly terminated can only be removed
+ // from the WorkersTable once close messages have been received for both the
+ // control and message channels. See `close_channel`.
+ ctrl_closed: bool,
+ message_closed: bool,
}
pub type WorkersTable = HashMap<WorkerId, WorkerThread>;
@@ -553,6 +559,8 @@ fn op_create_worker(
let worker_thread = WorkerThread {
join_handle,
worker_handle: worker_handle.into(),
+ ctrl_closed: false,
+ message_closed: false,
};
// At this point all interactions with worker happen using thread
@@ -582,19 +590,49 @@ fn op_host_terminate_worker(
Ok(())
}
-/// Try to remove worker from workers table - NOTE: `Worker.terminate()`
-/// might have been called already meaning that we won't find worker in
-/// table - in that case ignore.
-fn try_remove_and_close(state: Rc<RefCell<OpState>>, id: WorkerId) {
+enum WorkerChannel {
+ Ctrl,
+ Messages,
+}
+
+/// Close a worker's channel. If this results in both of a worker's channels
+/// being closed, the worker will be removed from the workers table.
+fn close_channel(
+ state: Rc<RefCell<OpState>>,
+ id: WorkerId,
+ channel: WorkerChannel,
+) {
+ use std::collections::hash_map::Entry;
+
let mut s = state.borrow_mut();
let workers = s.borrow_mut::<WorkersTable>();
- if let Some(worker_thread) = workers.remove(&id) {
- worker_thread.worker_handle.terminate();
- worker_thread
- .join_handle
- .join()
- .expect("Worker thread panicked")
- .expect("Panic in worker event loop");
+
+ // `Worker.terminate()` might have been called already, meaning that we won't
+ // find the worker in the table - in that case ignore.
+ if let Entry::Occupied(mut entry) = workers.entry(id) {
+ let terminate = {
+ let worker_thread = entry.get_mut();
+ match channel {
+ WorkerChannel::Ctrl => {
+ worker_thread.ctrl_closed = true;
+ worker_thread.message_closed
+ }
+ WorkerChannel::Messages => {
+ worker_thread.message_closed = true;
+ worker_thread.ctrl_closed
+ }
+ }
+ };
+
+ if terminate {
+ let worker_thread = entry.remove();
+ worker_thread.worker_handle.terminate();
+ worker_thread
+ .join_handle
+ .join()
+ .expect("Worker thread panicked")
+ .expect("Panic in worker event loop");
+ }
}
}
@@ -620,13 +658,13 @@ async fn op_host_recv_ctrl(
if let Some(event) = maybe_event {
// Terminal error means that worker should be removed from worker table.
if let WorkerControlEvent::TerminalError(_) = &event {
- try_remove_and_close(state, id);
+ close_channel(state, id, WorkerChannel::Ctrl);
}
return Ok(event);
}
// If there was no event from worker it means it has already been closed.
- try_remove_and_close(state, id);
+ close_channel(state, id, WorkerChannel::Ctrl);
Ok(WorkerControlEvent::Close)
}
@@ -646,7 +684,12 @@ async fn op_host_recv_message(
return Ok(None);
}
};
- worker_handle.port.recv(state).await
+
+ let ret = worker_handle.port.recv(state.clone()).await?;
+ if ret.is_none() {
+ close_channel(state, id, WorkerChannel::Messages);
+ }
+ Ok(ret)
}
/// Post message to guest worker as host