From 0773463de1ba2c316decc99ac60440f6f7a91de1 Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Sat, 17 Jun 2023 15:02:32 -0700 Subject: chore(kv) fix and re-enable queue test (#19529) The callback draining code is no longer needed after #19513. --- ext/kv/01_db.ts | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) (limited to 'ext') diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index eb103ae0c..2a3101e6c 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -250,6 +250,7 @@ class Kv { async listenQueue( handler: (message: unknown) => Promise | void, ): Promise { + const finishMessageOps = new Map>(); while (!this.#closed) { // Wait for the next message. let next: { 0: Uint8Array; 1: number }; @@ -282,14 +283,29 @@ class Kv { } catch (error) { console.error("Exception in queue handler", error); } finally { - await core.opAsync( - "op_kv_finish_dequeued_message", - handleId, - success, - ); + if (this.#closed) { + core.close(handleId); + } else { + const promise: Promise = core.opAsync( + "op_kv_finish_dequeued_message", + handleId, + success, + ); + finishMessageOps.set(handleId, promise); + try { + await promise; + } finally { + finishMessageOps.delete(handleId); + } + } } })(); } + + for (const promise of finishMessageOps.values()) { + await promise; + } + finishMessageOps.clear(); } close() { -- cgit v1.2.3