diff options
| author | Igor Zinkovsky <igor@deno.com> | 2023-12-12 22:51:23 -0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-12-12 22:51:23 -0800 |
| commit | 86769b0d1cfb7341df31278913524492d5f9ab52 (patch) | |
| tree | dba094702ec2c4ad19a59f61151a777fd3f3e0b9 /ext/kv | |
| parent | 0ceae7a490d933144f63be5bc9d5ba0b27bf5c6e (diff) | |
feat(ext/kv) add backoffSchedule to enqueue (#21474)
Also reduces the time to run `kv_queue_undelivered_test.ts` test from
100 seconds down to 3 seconds.
closes #21437
Diffstat (limited to 'ext/kv')
| -rw-r--r-- | ext/kv/01_db.ts | 36 | ||||
| -rw-r--r-- | ext/kv/lib.rs | 3 |
2 files changed, 35 insertions, 4 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 2b0e141f8..18d190718 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -43,6 +43,20 @@ function validateQueueDelay(delay: number) { } } +const maxQueueBackoffIntervals = 5; +const maxQueueBackoffInterval = 60 * 60 * 1000; + +function validateBackoffSchedule(backoffSchedule: number[]) { + if (backoffSchedule.length > maxQueueBackoffIntervals) { + throw new TypeError("invalid backoffSchedule"); + } + for (const interval of backoffSchedule) { + if (interval < 0 || interval > maxQueueBackoffInterval || isNaN(interval)) { + throw new TypeError("invalid backoffSchedule"); + } + } +} + interface RawKvEntry { key: Deno.KvKey; value: RawValue; @@ -224,18 +238,25 @@ class Kv { async enqueue( message: unknown, - opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + opts?: { + delay?: number; + keysIfUndelivered?: Deno.KvKey[]; + backoffSchedule?: number[]; + }, ) { if (opts?.delay !== undefined) { validateQueueDelay(opts?.delay); } + if (opts?.backoffSchedule !== undefined) { + validateBackoffSchedule(opts?.backoffSchedule); + } const enqueues = [ [ core.serialize(message, { forStorage: true }), opts?.delay ?? 0, opts?.keysIfUndelivered ?? [], - null, + opts?.backoffSchedule ?? null, ], ]; @@ -468,16 +489,23 @@ class AtomicOperation { enqueue( message: unknown, - opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + opts?: { + delay?: number; + keysIfUndelivered?: Deno.KvKey[]; + backoffSchedule?: number[]; + }, ): this { if (opts?.delay !== undefined) { validateQueueDelay(opts?.delay); } + if (opts?.backoffSchedule !== undefined) { + validateBackoffSchedule(opts?.backoffSchedule); + } this.#enqueues.push([ core.serialize(message, { forStorage: true }), opts?.delay ?? 0, opts?.keysIfUndelivered ?? [], - null, + opts?.backoffSchedule ?? null, ]); return this; } diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index 456a1ebf7..943aae460 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -802,6 +802,9 @@ where for enqueue in &enqueues { total_payload_size += check_enqueue_payload_size(&enqueue.payload)?; + if let Some(schedule) = enqueue.backoff_schedule.as_ref() { + total_payload_size += 4 * schedule.len(); + } } if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES { |
