summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-06-17 15:02:32 -0700
committerGitHub <noreply@github.com>2023-06-17 15:02:32 -0700
commit0773463de1ba2c316decc99ac60440f6f7a91de1 (patch)
tree6d2887554d28c178dd0eaf0f318f6c5ffa9d1a1e
parentc8dc6b14ec5c1b6de28118ed3b07d037eaaaf702 (diff)
chore(kv) fix and re-enable queue test (#19529)
The callback draining code is no longer needed after #19513.
-rw-r--r--cli/tests/unit/kv_test.ts11
-rw-r--r--ext/kv/01_db.ts26
2 files changed, 21 insertions, 16 deletions
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts
index 6b19d8a27..5ed0bc644 100644
--- a/cli/tests/unit/kv_test.ts
+++ b/cli/tests/unit/kv_test.ts
@@ -1697,10 +1697,7 @@ Deno.test({
Deno.test({
name: "queue persistence with delay messages",
- ignore: true, // flaky
async fn() {
- const dispatchedPre = Deno.metrics().opsDispatchedAsync;
- const completedPre = Deno.metrics().opsCompletedAsync;
const filename = await Deno.makeTempFile({ prefix: "queue_db" });
try {
await Deno.remove(filename);
@@ -1745,14 +1742,6 @@ Deno.test({
db.close();
await listener;
} finally {
- // Wait until callbacks are drained before deleting the db.
- let dispatched = Deno.metrics().opsDispatchedAsync - dispatchedPre;
- let completed = Deno.metrics().opsCompletedAsync - completedPre;
- while (dispatched !== completed) {
- dispatched = Deno.metrics().opsDispatchedAsync - dispatchedPre;
- completed = Deno.metrics().opsCompletedAsync - completedPre;
- await sleep(100);
- }
try {
await Deno.remove(filename);
} catch {
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index eb103ae0c..2a3101e6c 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -250,6 +250,7 @@ class Kv {
async listenQueue(
handler: (message: unknown) => Promise<void> | void,
): Promise<void> {
+ const finishMessageOps = new Map<number, Promise<void>>();
while (!this.#closed) {
// Wait for the next message.
let next: { 0: Uint8Array; 1: number };
@@ -282,14 +283,29 @@ class Kv {
} catch (error) {
console.error("Exception in queue handler", error);
} finally {
- await core.opAsync(
- "op_kv_finish_dequeued_message",
- handleId,
- success,
- );
+ if (this.#closed) {
+ core.close(handleId);
+ } else {
+ const promise: Promise<void> = core.opAsync(
+ "op_kv_finish_dequeued_message",
+ handleId,
+ success,
+ );
+ finishMessageOps.set(handleId, promise);
+ try {
+ await promise;
+ } finally {
+ finishMessageOps.delete(handleId);
+ }
+ }
}
})();
}
+
+ for (const promise of finishMessageOps.values()) {
+ await promise;
+ }
+ finishMessageOps.clear();
}
close() {