diff options
author | Igor Zinkovsky <igor@deno.com> | 2023-09-26 20:06:57 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-26 20:06:57 -0700 |
commit | f0a022bed4c8ca46c472c6361f1793cb29f73480 (patch) | |
tree | 4f88ffcd77e94292a8696111ba6828a197eb153e /ext/kv/01_db.ts | |
parent | b433133a1ff1c329b05ce024d8db5fefb8f7b431 (diff) |
fix(kv_queues): graceful shutdown (#20627)
This fixes the `TypeError: Database closed` error during shutdown.
Diffstat (limited to 'ext/kv/01_db.ts')
-rw-r--r-- | ext/kv/01_db.ts | 47 |
1 files changed, 17 insertions, 30 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index e934a3b6d..6e8a571f0 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -61,7 +61,6 @@ const kvSymbol = Symbol("KvRid"); class Kv { #rid: number; - #closed: boolean; constructor(rid: number = undefined, symbol: symbol = undefined) { if (kvSymbol !== symbol) { @@ -70,7 +69,6 @@ class Kv { ); } this.#rid = rid; - this.#closed = false; } atomic() { @@ -251,20 +249,14 @@ class Kv { handler: (message: unknown) => Promise<void> | void, ): Promise<void> { const finishMessageOps = new Map<number, Promise<void>>(); - while (!this.#closed) { + while (true) { // Wait for the next message. - let next: { 0: Uint8Array; 1: number }; - try { - next = await core.opAsync( - "op_kv_dequeue_next_message", - this.#rid, - ); - } catch (error) { - if (this.#closed) { - break; - } else { - throw error; - } + const next: { 0: Uint8Array; 1: number } = await core.opAsync( + "op_kv_dequeue_next_message", + this.#rid, + ); + if (next === null) { + break; } // Deserialize the payload. @@ -283,20 +275,16 @@ class Kv { } catch (error) { console.error("Exception in queue handler", error); } finally { - if (this.#closed) { - core.close(handleId); - } else { - const promise: Promise<void> = core.opAsync( - "op_kv_finish_dequeued_message", - handleId, - success, - ); - finishMessageOps.set(handleId, promise); - try { - await promise; - } finally { - finishMessageOps.delete(handleId); - } + const promise: Promise<void> = core.opAsync( + "op_kv_finish_dequeued_message", + handleId, + success, + ); + finishMessageOps.set(handleId, promise); + try { + await promise; + } finally { + finishMessageOps.delete(handleId); } } })(); @@ -310,7 +298,6 @@ class Kv { close() { core.close(this.#rid); - this.#closed = true; } } |