diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/kv/01_db.ts | 81 | ||||
-rw-r--r-- | ext/kv/lib.rs | 3 |
2 files changed, 50 insertions, 34 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 18d190718..1817f9f07 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -75,6 +75,7 @@ type RawValue = { }; const kvSymbol = Symbol("KvRid"); +const commitVersionstampSymbol = Symbol("KvCommitVersionstamp"); class Kv { #rid: number; @@ -94,6 +95,10 @@ class Kv { return new AtomicOperation(this.#rid); } + commitVersionstamp(): symbol { + return commitVersionstampSymbol; + } + async get(key: Deno.KvKey, opts?: { consistency?: Deno.KvConsistencyLevel }) { const [entries]: [RawKvEntry[]] = await core.opAsync( "op_kv_snapshot_read", @@ -148,18 +153,10 @@ class Kv { } async set(key: Deno.KvKey, value: unknown, options?: { expireIn?: number }) { - value = serializeValue(value); - - const checks: Deno.AtomicCheck[] = []; - const mutations = [ - [key, "set", value, options?.expireIn], - ]; - - const versionstamp = await core.opAsync( - "op_kv_atomic_write", + const versionstamp = await doAtomicWriteInPlace( this.#rid, - checks, - mutations, + [], + [[key, "set", serializeValue(value), options?.expireIn]], [], ); if (versionstamp === null) throw new TypeError("Failed to set value"); @@ -167,16 +164,10 @@ class Kv { } async delete(key: Deno.KvKey) { - const checks: Deno.AtomicCheck[] = []; - const mutations = [ - [key, "delete", null, undefined], - ]; - - const result = await core.opAsync( - "op_kv_atomic_write", + const result = await doAtomicWriteInPlace( this.#rid, - checks, - mutations, + [], + [[key, "delete", null, undefined]], [], ); if (!result) throw new TypeError("Failed to set value"); @@ -251,21 +242,18 @@ class Kv { validateBackoffSchedule(opts?.backoffSchedule); } - const enqueues = [ - [ - core.serialize(message, { forStorage: true }), - opts?.delay ?? 0, - opts?.keysIfUndelivered ?? [], - opts?.backoffSchedule ?? null, - ], - ]; - - const versionstamp = await core.opAsync( - "op_kv_atomic_write", + const versionstamp = await doAtomicWriteInPlace( this.#rid, [], [], - enqueues, + [ + [ + core.serialize(message, { forStorage: true }), + opts?.delay ?? 0, + opts?.keysIfUndelivered ?? [], + opts?.backoffSchedule ?? null, + ], + ], ); if (versionstamp === null) throw new TypeError("Failed to enqueue value"); return { ok: true, versionstamp }; @@ -511,8 +499,7 @@ class AtomicOperation { } async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> { - const versionstamp = await core.opAsync( - "op_kv_atomic_write", + const versionstamp = await doAtomicWriteInPlace( this.#rid, this.#checks, this.#mutations, @@ -764,4 +751,30 @@ class KvListIterator extends AsyncIterator } } +async function doAtomicWriteInPlace( + rid: number, + checks: [Deno.KvKey, string | null][], + mutations: [Deno.KvKey, string, RawValue | null, number | undefined][], + enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][], +): Promise<string | null> { + for (const m of mutations) { + const key = m[0]; + if ( + key.length && m[1] === "set" && + key[key.length - 1] === commitVersionstampSymbol + ) { + m[0] = key.slice(0, key.length - 1); + m[1] = "setSuffixVersionstampedKey"; + } + } + + return await core.opAsync( + "op_kv_atomic_write", + rid, + checks, + mutations, + enqueues, + ); +} + export { AtomicOperation, Kv, KvListIterator, KvU64, openKv }; diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index 943aae460..e78526856 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -530,6 +530,9 @@ fn mutation_from_v8( ("sum", Some(value)) => MutationKind::Sum(value.try_into()?), ("min", Some(value)) => MutationKind::Min(value.try_into()?), ("max", Some(value)) => MutationKind::Max(value.try_into()?), + ("setSuffixVersionstampedKey", Some(value)) => { + MutationKind::SetSuffixVersionstampedKey(value.try_into()?) + } (op, Some(_)) => { return Err(type_error(format!("invalid mutation '{op}' with value"))) } |