diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/kv/01_db.ts | 36 | ||||
-rw-r--r-- | ext/kv/lib.rs | 3 |
2 files changed, 35 insertions, 4 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 2b0e141f8..18d190718 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -43,6 +43,20 @@ function validateQueueDelay(delay: number) { } } +const maxQueueBackoffIntervals = 5; +const maxQueueBackoffInterval = 60 * 60 * 1000; + +function validateBackoffSchedule(backoffSchedule: number[]) { + if (backoffSchedule.length > maxQueueBackoffIntervals) { + throw new TypeError("invalid backoffSchedule"); + } + for (const interval of backoffSchedule) { + if (interval < 0 || interval > maxQueueBackoffInterval || isNaN(interval)) { + throw new TypeError("invalid backoffSchedule"); + } + } +} + interface RawKvEntry { key: Deno.KvKey; value: RawValue; @@ -224,18 +238,25 @@ class Kv { async enqueue( message: unknown, - opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + opts?: { + delay?: number; + keysIfUndelivered?: Deno.KvKey[]; + backoffSchedule?: number[]; + }, ) { if (opts?.delay !== undefined) { validateQueueDelay(opts?.delay); } + if (opts?.backoffSchedule !== undefined) { + validateBackoffSchedule(opts?.backoffSchedule); + } const enqueues = [ [ core.serialize(message, { forStorage: true }), opts?.delay ?? 0, opts?.keysIfUndelivered ?? [], - null, + opts?.backoffSchedule ?? null, ], ]; @@ -468,16 +489,23 @@ class AtomicOperation { enqueue( message: unknown, - opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + opts?: { + delay?: number; + keysIfUndelivered?: Deno.KvKey[]; + backoffSchedule?: number[]; + }, ): this { if (opts?.delay !== undefined) { validateQueueDelay(opts?.delay); } + if (opts?.backoffSchedule !== undefined) { + validateBackoffSchedule(opts?.backoffSchedule); + } this.#enqueues.push([ core.serialize(message, { forStorage: true }), opts?.delay ?? 0, opts?.keysIfUndelivered ?? [], - null, + opts?.backoffSchedule ?? null, ]); return this; } diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index 456a1ebf7..943aae460 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -802,6 +802,9 @@ where for enqueue in &enqueues { total_payload_size += check_enqueue_payload_size(&enqueue.payload)?; + if let Some(schedule) = enqueue.backoff_schedule.as_ref() { + total_payload_size += 4 * schedule.len(); + } } if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES { |