diff options
author | Heyang Zhou <zhy20000919@hotmail.com> | 2023-12-14 00:58:20 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-14 00:58:20 +0800 |
commit | 10ab8c1ef1ccc93bd810c5636e2a70bb7c37e91e (patch) | |
tree | 889605d974ed6d06ee74a44c624e7a1d7c986559 | |
parent | 76a6ea57753be420398d3eba8f313a6c98eab8c3 (diff) |
feat(unstable): append commit versionstamp to key (#21556)
-rw-r--r-- | cli/tests/unit/kv_test.ts | 32 | ||||
-rw-r--r-- | cli/tsc/dts/lib.deno.unstable.d.ts | 16 | ||||
-rw-r--r-- | ext/kv/01_db.ts | 81 | ||||
-rw-r--r-- | ext/kv/lib.rs | 3 |
4 files changed, 97 insertions, 35 deletions
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index f46099ed1..28c913f21 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -2216,3 +2216,35 @@ dbTest("key watch", async (db) => { await work; await reader.cancel(); }); + +dbTest("set with key versionstamp suffix", async (db) => { + const result1 = await Array.fromAsync(db.list({ prefix: ["a"] })); + assertEquals(result1, []); + + const setRes1 = await db.set(["a", db.commitVersionstamp()], "b"); + assert(setRes1.ok); + assert(setRes1.versionstamp > ZERO_VERSIONSTAMP); + + const result2 = await Array.fromAsync(db.list({ prefix: ["a"] })); + assertEquals(result2.length, 1); + assertEquals(result2[0].key[1], setRes1.versionstamp); + assertEquals(result2[0].value, "b"); + assertEquals(result2[0].versionstamp, setRes1.versionstamp); + + const setRes2 = await db.atomic().set(["a", db.commitVersionstamp()], "c") + .commit(); + assert(setRes2.ok); + assert(setRes2.versionstamp > setRes1.versionstamp); + + const result3 = await Array.fromAsync(db.list({ prefix: ["a"] })); + assertEquals(result3.length, 2); + assertEquals(result3[1].key[1], setRes2.versionstamp); + assertEquals(result3[1].value, "c"); + assertEquals(result3[1].versionstamp, setRes2.versionstamp); + + await assertRejects( + async () => await db.set(["a", db.commitVersionstamp(), "a"], "x"), + TypeError, + "expected string, number, bigint, ArrayBufferView, boolean", + ); +}); diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 7778870e1..f1c239ed2 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1488,7 +1488,13 @@ declare namespace Deno { * * @category KV */ - export type KvKeyPart = Uint8Array | string | number | bigint | boolean; + export type KvKeyPart = + | Uint8Array + | string + | number + | bigint + | boolean + | symbol; /** **UNSTABLE**: New API, yet to be vetted. * @@ -2099,6 +2105,14 @@ declare namespace Deno { */ close(): void; + /** + * Get a symbol that represents the versionstamp of the current atomic + * operation. This symbol can be used as the last part of a key in + * `.set()`, both directly on the `Kv` object and on an `AtomicOperation` + * object created from this `Kv` instance. + */ + commitVersionstamp(): symbol; + [Symbol.dispose](): void; } diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 18d190718..1817f9f07 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -75,6 +75,7 @@ type RawValue = { }; const kvSymbol = Symbol("KvRid"); +const commitVersionstampSymbol = Symbol("KvCommitVersionstamp"); class Kv { #rid: number; @@ -94,6 +95,10 @@ class Kv { return new AtomicOperation(this.#rid); } + commitVersionstamp(): symbol { + return commitVersionstampSymbol; + } + async get(key: Deno.KvKey, opts?: { consistency?: Deno.KvConsistencyLevel }) { const [entries]: [RawKvEntry[]] = await core.opAsync( "op_kv_snapshot_read", @@ -148,18 +153,10 @@ class Kv { } async set(key: Deno.KvKey, value: unknown, options?: { expireIn?: number }) { - value = serializeValue(value); - - const checks: Deno.AtomicCheck[] = []; - const mutations = [ - [key, "set", value, options?.expireIn], - ]; - - const versionstamp = await core.opAsync( - "op_kv_atomic_write", + const versionstamp = await doAtomicWriteInPlace( this.#rid, - checks, - mutations, + [], + [[key, "set", serializeValue(value), options?.expireIn]], [], ); if (versionstamp === null) throw new TypeError("Failed to set value"); @@ -167,16 +164,10 @@ class Kv { } async delete(key: Deno.KvKey) { - const checks: Deno.AtomicCheck[] = []; - const mutations = [ - [key, "delete", null, undefined], - ]; - - const result = await core.opAsync( - "op_kv_atomic_write", + const result = await doAtomicWriteInPlace( this.#rid, - checks, - mutations, + [], + [[key, "delete", null, undefined]], [], ); if (!result) throw new TypeError("Failed to set value"); @@ -251,21 +242,18 @@ class Kv { validateBackoffSchedule(opts?.backoffSchedule); } - const enqueues = [ - [ - core.serialize(message, { forStorage: true }), - opts?.delay ?? 0, - opts?.keysIfUndelivered ?? [], - opts?.backoffSchedule ?? null, - ], - ]; - - const versionstamp = await core.opAsync( - "op_kv_atomic_write", + const versionstamp = await doAtomicWriteInPlace( this.#rid, [], [], - enqueues, + [ + [ + core.serialize(message, { forStorage: true }), + opts?.delay ?? 0, + opts?.keysIfUndelivered ?? [], + opts?.backoffSchedule ?? null, + ], + ], ); if (versionstamp === null) throw new TypeError("Failed to enqueue value"); return { ok: true, versionstamp }; @@ -511,8 +499,7 @@ class AtomicOperation { } async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> { - const versionstamp = await core.opAsync( - "op_kv_atomic_write", + const versionstamp = await doAtomicWriteInPlace( this.#rid, this.#checks, this.#mutations, @@ -764,4 +751,30 @@ class KvListIterator extends AsyncIterator } } +async function doAtomicWriteInPlace( + rid: number, + checks: [Deno.KvKey, string | null][], + mutations: [Deno.KvKey, string, RawValue | null, number | undefined][], + enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][], +): Promise<string | null> { + for (const m of mutations) { + const key = m[0]; + if ( + key.length && m[1] === "set" && + key[key.length - 1] === commitVersionstampSymbol + ) { + m[0] = key.slice(0, key.length - 1); + m[1] = "setSuffixVersionstampedKey"; + } + } + + return await core.opAsync( + "op_kv_atomic_write", + rid, + checks, + mutations, + enqueues, + ); +} + export { AtomicOperation, Kv, KvListIterator, KvU64, openKv }; diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index 943aae460..e78526856 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -530,6 +530,9 @@ fn mutation_from_v8( ("sum", Some(value)) => MutationKind::Sum(value.try_into()?), ("min", Some(value)) => MutationKind::Min(value.try_into()?), ("max", Some(value)) => MutationKind::Max(value.try_into()?), + ("setSuffixVersionstampedKey", Some(value)) => { + MutationKind::SetSuffixVersionstampedKey(value.try_into()?) + } (op, Some(_)) => { return Err(type_error(format!("invalid mutation '{op}' with value"))) } |