summaryrefslogtreecommitdiff
path: root/ext/kv/01_db.ts
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-06-17 15:02:32 -0700
committerGitHub <noreply@github.com>2023-06-17 15:02:32 -0700
commit0773463de1ba2c316decc99ac60440f6f7a91de1 (patch)
tree6d2887554d28c178dd0eaf0f318f6c5ffa9d1a1e /ext/kv/01_db.ts
parentc8dc6b14ec5c1b6de28118ed3b07d037eaaaf702 (diff)
chore(kv) fix and re-enable queue test (#19529)
The callback draining code is no longer needed after #19513.
Diffstat (limited to 'ext/kv/01_db.ts')
-rw-r--r--ext/kv/01_db.ts26
1 files changed, 21 insertions, 5 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index eb103ae0c..2a3101e6c 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -250,6 +250,7 @@ class Kv {
async listenQueue(
handler: (message: unknown) => Promise<void> | void,
): Promise<void> {
+ const finishMessageOps = new Map<number, Promise<void>>();
while (!this.#closed) {
// Wait for the next message.
let next: { 0: Uint8Array; 1: number };
@@ -282,14 +283,29 @@ class Kv {
} catch (error) {
console.error("Exception in queue handler", error);
} finally {
- await core.opAsync(
- "op_kv_finish_dequeued_message",
- handleId,
- success,
- );
+ if (this.#closed) {
+ core.close(handleId);
+ } else {
+ const promise: Promise<void> = core.opAsync(
+ "op_kv_finish_dequeued_message",
+ handleId,
+ success,
+ );
+ finishMessageOps.set(handleId, promise);
+ try {
+ await promise;
+ } finally {
+ finishMessageOps.delete(handleId);
+ }
+ }
}
})();
}
+
+ for (const promise of finishMessageOps.values()) {
+ await promise;
+ }
+ finishMessageOps.clear();
}
close() {