summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/kv/01_db.ts26
1 files changed, 21 insertions, 5 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index eb103ae0c..2a3101e6c 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -250,6 +250,7 @@ class Kv {
async listenQueue(
handler: (message: unknown) => Promise<void> | void,
): Promise<void> {
+ const finishMessageOps = new Map<number, Promise<void>>();
while (!this.#closed) {
// Wait for the next message.
let next: { 0: Uint8Array; 1: number };
@@ -282,14 +283,29 @@ class Kv {
} catch (error) {
console.error("Exception in queue handler", error);
} finally {
- await core.opAsync(
- "op_kv_finish_dequeued_message",
- handleId,
- success,
- );
+ if (this.#closed) {
+ core.close(handleId);
+ } else {
+ const promise: Promise<void> = core.opAsync(
+ "op_kv_finish_dequeued_message",
+ handleId,
+ success,
+ );
+ finishMessageOps.set(handleId, promise);
+ try {
+ await promise;
+ } finally {
+ finishMessageOps.delete(handleId);
+ }
+ }
}
})();
}
+
+ for (const promise of finishMessageOps.values()) {
+ await promise;
+ }
+ finishMessageOps.clear();
}
close() {