summaryrefslogtreecommitdiff
path: root/ext/kv/01_db.ts
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-09-26 20:06:57 -0700
committerGitHub <noreply@github.com>2023-09-26 20:06:57 -0700
commitf0a022bed4c8ca46c472c6361f1793cb29f73480 (patch)
tree4f88ffcd77e94292a8696111ba6828a197eb153e /ext/kv/01_db.ts
parentb433133a1ff1c329b05ce024d8db5fefb8f7b431 (diff)
fix(kv_queues): graceful shutdown (#20627)
This fixes the `TypeError: Database closed` error during shutdown.
Diffstat (limited to 'ext/kv/01_db.ts')
-rw-r--r--ext/kv/01_db.ts47
1 files changed, 17 insertions, 30 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index e934a3b6d..6e8a571f0 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -61,7 +61,6 @@ const kvSymbol = Symbol("KvRid");
class Kv {
#rid: number;
- #closed: boolean;
constructor(rid: number = undefined, symbol: symbol = undefined) {
if (kvSymbol !== symbol) {
@@ -70,7 +69,6 @@ class Kv {
);
}
this.#rid = rid;
- this.#closed = false;
}
atomic() {
@@ -251,20 +249,14 @@ class Kv {
handler: (message: unknown) => Promise<void> | void,
): Promise<void> {
const finishMessageOps = new Map<number, Promise<void>>();
- while (!this.#closed) {
+ while (true) {
// Wait for the next message.
- let next: { 0: Uint8Array; 1: number };
- try {
- next = await core.opAsync(
- "op_kv_dequeue_next_message",
- this.#rid,
- );
- } catch (error) {
- if (this.#closed) {
- break;
- } else {
- throw error;
- }
+ const next: { 0: Uint8Array; 1: number } = await core.opAsync(
+ "op_kv_dequeue_next_message",
+ this.#rid,
+ );
+ if (next === null) {
+ break;
}
// Deserialize the payload.
@@ -283,20 +275,16 @@ class Kv {
} catch (error) {
console.error("Exception in queue handler", error);
} finally {
- if (this.#closed) {
- core.close(handleId);
- } else {
- const promise: Promise<void> = core.opAsync(
- "op_kv_finish_dequeued_message",
- handleId,
- success,
- );
- finishMessageOps.set(handleId, promise);
- try {
- await promise;
- } finally {
- finishMessageOps.delete(handleId);
- }
+ const promise: Promise<void> = core.opAsync(
+ "op_kv_finish_dequeued_message",
+ handleId,
+ success,
+ );
+ finishMessageOps.set(handleId, promise);
+ try {
+ await promise;
+ } finally {
+ finishMessageOps.delete(handleId);
}
}
})();
@@ -310,7 +298,6 @@ class Kv {
close() {
core.close(this.#rid);
- this.#closed = true;
}
}