summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/kv/01_db.ts36
-rw-r--r--ext/kv/lib.rs3
2 files changed, 35 insertions, 4 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index 2b0e141f8..18d190718 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -43,6 +43,20 @@ function validateQueueDelay(delay: number) {
}
}
+const maxQueueBackoffIntervals = 5;
+const maxQueueBackoffInterval = 60 * 60 * 1000;
+
+function validateBackoffSchedule(backoffSchedule: number[]) {
+ if (backoffSchedule.length > maxQueueBackoffIntervals) {
+ throw new TypeError("invalid backoffSchedule");
+ }
+ for (const interval of backoffSchedule) {
+ if (interval < 0 || interval > maxQueueBackoffInterval || isNaN(interval)) {
+ throw new TypeError("invalid backoffSchedule");
+ }
+ }
+}
+
interface RawKvEntry {
key: Deno.KvKey;
value: RawValue;
@@ -224,18 +238,25 @@ class Kv {
async enqueue(
message: unknown,
- opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
+ opts?: {
+ delay?: number;
+ keysIfUndelivered?: Deno.KvKey[];
+ backoffSchedule?: number[];
+ },
) {
if (opts?.delay !== undefined) {
validateQueueDelay(opts?.delay);
}
+ if (opts?.backoffSchedule !== undefined) {
+ validateBackoffSchedule(opts?.backoffSchedule);
+ }
const enqueues = [
[
core.serialize(message, { forStorage: true }),
opts?.delay ?? 0,
opts?.keysIfUndelivered ?? [],
- null,
+ opts?.backoffSchedule ?? null,
],
];
@@ -468,16 +489,23 @@ class AtomicOperation {
enqueue(
message: unknown,
- opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] },
+ opts?: {
+ delay?: number;
+ keysIfUndelivered?: Deno.KvKey[];
+ backoffSchedule?: number[];
+ },
): this {
if (opts?.delay !== undefined) {
validateQueueDelay(opts?.delay);
}
+ if (opts?.backoffSchedule !== undefined) {
+ validateBackoffSchedule(opts?.backoffSchedule);
+ }
this.#enqueues.push([
core.serialize(message, { forStorage: true }),
opts?.delay ?? 0,
opts?.keysIfUndelivered ?? [],
- null,
+ opts?.backoffSchedule ?? null,
]);
return this;
}
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index 456a1ebf7..943aae460 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -802,6 +802,9 @@ where
for enqueue in &enqueues {
total_payload_size += check_enqueue_payload_size(&enqueue.payload)?;
+ if let Some(schedule) = enqueue.backoff_schedule.as_ref() {
+ total_payload_size += 4 * schedule.len();
+ }
}
if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES {