diff options
author | Igor Zinkovsky <igor@deno.com> | 2023-06-17 15:02:32 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-17 15:02:32 -0700 |
commit | 0773463de1ba2c316decc99ac60440f6f7a91de1 (patch) | |
tree | 6d2887554d28c178dd0eaf0f318f6c5ffa9d1a1e /ext/kv/01_db.ts | |
parent | c8dc6b14ec5c1b6de28118ed3b07d037eaaaf702 (diff) |
chore(kv) fix and re-enable queue test (#19529)
The callback draining code is no longer needed after #19513.
Diffstat (limited to 'ext/kv/01_db.ts')
-rw-r--r-- | ext/kv/01_db.ts | 26 |
1 files changed, 21 insertions, 5 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index eb103ae0c..2a3101e6c 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -250,6 +250,7 @@ class Kv { async listenQueue( handler: (message: unknown) => Promise<void> | void, ): Promise<void> { + const finishMessageOps = new Map<number, Promise<void>>(); while (!this.#closed) { // Wait for the next message. let next: { 0: Uint8Array; 1: number }; @@ -282,14 +283,29 @@ class Kv { } catch (error) { console.error("Exception in queue handler", error); } finally { - await core.opAsync( - "op_kv_finish_dequeued_message", - handleId, - success, - ); + if (this.#closed) { + core.close(handleId); + } else { + const promise: Promise<void> = core.opAsync( + "op_kv_finish_dequeued_message", + handleId, + success, + ); + finishMessageOps.set(handleId, promise); + try { + await promise; + } finally { + finishMessageOps.delete(handleId); + } + } } })(); } + + for (const promise of finishMessageOps.values()) { + await promise; + } + finishMessageOps.clear(); } close() { |