diff options
-rw-r--r-- | cli/tests/unit/kv_test.ts | 11 | ||||
-rw-r--r-- | ext/kv/01_db.ts | 26 |
2 files changed, 21 insertions, 16 deletions
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 6b19d8a27..5ed0bc644 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -1697,10 +1697,7 @@ Deno.test({ Deno.test({ name: "queue persistence with delay messages", - ignore: true, // flaky async fn() { - const dispatchedPre = Deno.metrics().opsDispatchedAsync; - const completedPre = Deno.metrics().opsCompletedAsync; const filename = await Deno.makeTempFile({ prefix: "queue_db" }); try { await Deno.remove(filename); @@ -1745,14 +1742,6 @@ Deno.test({ db.close(); await listener; } finally { - // Wait until callbacks are drained before deleting the db. - let dispatched = Deno.metrics().opsDispatchedAsync - dispatchedPre; - let completed = Deno.metrics().opsCompletedAsync - completedPre; - while (dispatched !== completed) { - dispatched = Deno.metrics().opsDispatchedAsync - dispatchedPre; - completed = Deno.metrics().opsCompletedAsync - completedPre; - await sleep(100); - } try { await Deno.remove(filename); } catch { diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index eb103ae0c..2a3101e6c 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -250,6 +250,7 @@ class Kv { async listenQueue( handler: (message: unknown) => Promise<void> | void, ): Promise<void> { + const finishMessageOps = new Map<number, Promise<void>>(); while (!this.#closed) { // Wait for the next message. let next: { 0: Uint8Array; 1: number }; @@ -282,14 +283,29 @@ class Kv { } catch (error) { console.error("Exception in queue handler", error); } finally { - await core.opAsync( - "op_kv_finish_dequeued_message", - handleId, - success, - ); + if (this.#closed) { + core.close(handleId); + } else { + const promise: Promise<void> = core.opAsync( + "op_kv_finish_dequeued_message", + handleId, + success, + ); + finishMessageOps.set(handleId, promise); + try { + await promise; + } finally { + finishMessageOps.delete(handleId); + } + } } })(); } + + for (const promise of finishMessageOps.values()) { + await promise; + } + finishMessageOps.clear(); } close() { |