diff options
author | Heyang Zhou <zhy20000919@hotmail.com> | 2023-12-14 00:58:20 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-14 00:58:20 +0800 |
commit | 10ab8c1ef1ccc93bd810c5636e2a70bb7c37e91e (patch) | |
tree | 889605d974ed6d06ee74a44c624e7a1d7c986559 /ext/kv/01_db.ts | |
parent | 76a6ea57753be420398d3eba8f313a6c98eab8c3 (diff) |
feat(unstable): append commit versionstamp to key (#21556)
Diffstat (limited to 'ext/kv/01_db.ts')
-rw-r--r-- | ext/kv/01_db.ts | 81 |
1 files changed, 47 insertions, 34 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 18d190718..1817f9f07 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -75,6 +75,7 @@ type RawValue = { }; const kvSymbol = Symbol("KvRid"); +const commitVersionstampSymbol = Symbol("KvCommitVersionstamp"); class Kv { #rid: number; @@ -94,6 +95,10 @@ class Kv { return new AtomicOperation(this.#rid); } + commitVersionstamp(): symbol { + return commitVersionstampSymbol; + } + async get(key: Deno.KvKey, opts?: { consistency?: Deno.KvConsistencyLevel }) { const [entries]: [RawKvEntry[]] = await core.opAsync( "op_kv_snapshot_read", @@ -148,18 +153,10 @@ class Kv { } async set(key: Deno.KvKey, value: unknown, options?: { expireIn?: number }) { - value = serializeValue(value); - - const checks: Deno.AtomicCheck[] = []; - const mutations = [ - [key, "set", value, options?.expireIn], - ]; - - const versionstamp = await core.opAsync( - "op_kv_atomic_write", + const versionstamp = await doAtomicWriteInPlace( this.#rid, - checks, - mutations, + [], + [[key, "set", serializeValue(value), options?.expireIn]], [], ); if (versionstamp === null) throw new TypeError("Failed to set value"); @@ -167,16 +164,10 @@ class Kv { } async delete(key: Deno.KvKey) { - const checks: Deno.AtomicCheck[] = []; - const mutations = [ - [key, "delete", null, undefined], - ]; - - const result = await core.opAsync( - "op_kv_atomic_write", + const result = await doAtomicWriteInPlace( this.#rid, - checks, - mutations, + [], + [[key, "delete", null, undefined]], [], ); if (!result) throw new TypeError("Failed to set value"); @@ -251,21 +242,18 @@ class Kv { validateBackoffSchedule(opts?.backoffSchedule); } - const enqueues = [ - [ - core.serialize(message, { forStorage: true }), - opts?.delay ?? 0, - opts?.keysIfUndelivered ?? [], - opts?.backoffSchedule ?? null, - ], - ]; - - const versionstamp = await core.opAsync( - "op_kv_atomic_write", + const versionstamp = await doAtomicWriteInPlace( this.#rid, [], [], - enqueues, + [ + [ + core.serialize(message, { forStorage: true }), + opts?.delay ?? 0, + opts?.keysIfUndelivered ?? [], + opts?.backoffSchedule ?? null, + ], + ], ); if (versionstamp === null) throw new TypeError("Failed to enqueue value"); return { ok: true, versionstamp }; @@ -511,8 +499,7 @@ class AtomicOperation { } async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> { - const versionstamp = await core.opAsync( - "op_kv_atomic_write", + const versionstamp = await doAtomicWriteInPlace( this.#rid, this.#checks, this.#mutations, @@ -764,4 +751,30 @@ class KvListIterator extends AsyncIterator } } +async function doAtomicWriteInPlace( + rid: number, + checks: [Deno.KvKey, string | null][], + mutations: [Deno.KvKey, string, RawValue | null, number | undefined][], + enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][], +): Promise<string | null> { + for (const m of mutations) { + const key = m[0]; + if ( + key.length && m[1] === "set" && + key[key.length - 1] === commitVersionstampSymbol + ) { + m[0] = key.slice(0, key.length - 1); + m[1] = "setSuffixVersionstampedKey"; + } + } + + return await core.opAsync( + "op_kv_atomic_write", + rid, + checks, + mutations, + enqueues, + ); +} + export { AtomicOperation, Kv, KvListIterator, KvU64, openKv }; |