summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHeyang Zhou <zhy20000919@hotmail.com>2023-12-14 00:58:20 +0800
committerGitHub <noreply@github.com>2023-12-14 00:58:20 +0800
commit10ab8c1ef1ccc93bd810c5636e2a70bb7c37e91e (patch)
tree889605d974ed6d06ee74a44c624e7a1d7c986559
parent76a6ea57753be420398d3eba8f313a6c98eab8c3 (diff)
feat(unstable): append commit versionstamp to key (#21556)
-rw-r--r--cli/tests/unit/kv_test.ts32
-rw-r--r--cli/tsc/dts/lib.deno.unstable.d.ts16
-rw-r--r--ext/kv/01_db.ts81
-rw-r--r--ext/kv/lib.rs3
4 files changed, 97 insertions, 35 deletions
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts
index f46099ed1..28c913f21 100644
--- a/cli/tests/unit/kv_test.ts
+++ b/cli/tests/unit/kv_test.ts
@@ -2216,3 +2216,35 @@ dbTest("key watch", async (db) => {
await work;
await reader.cancel();
});
+
+dbTest("set with key versionstamp suffix", async (db) => {
+ const result1 = await Array.fromAsync(db.list({ prefix: ["a"] }));
+ assertEquals(result1, []);
+
+ const setRes1 = await db.set(["a", db.commitVersionstamp()], "b");
+ assert(setRes1.ok);
+ assert(setRes1.versionstamp > ZERO_VERSIONSTAMP);
+
+ const result2 = await Array.fromAsync(db.list({ prefix: ["a"] }));
+ assertEquals(result2.length, 1);
+ assertEquals(result2[0].key[1], setRes1.versionstamp);
+ assertEquals(result2[0].value, "b");
+ assertEquals(result2[0].versionstamp, setRes1.versionstamp);
+
+ const setRes2 = await db.atomic().set(["a", db.commitVersionstamp()], "c")
+ .commit();
+ assert(setRes2.ok);
+ assert(setRes2.versionstamp > setRes1.versionstamp);
+
+ const result3 = await Array.fromAsync(db.list({ prefix: ["a"] }));
+ assertEquals(result3.length, 2);
+ assertEquals(result3[1].key[1], setRes2.versionstamp);
+ assertEquals(result3[1].value, "c");
+ assertEquals(result3[1].versionstamp, setRes2.versionstamp);
+
+ await assertRejects(
+ async () => await db.set(["a", db.commitVersionstamp(), "a"], "x"),
+ TypeError,
+ "expected string, number, bigint, ArrayBufferView, boolean",
+ );
+});
diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts
index 7778870e1..f1c239ed2 100644
--- a/cli/tsc/dts/lib.deno.unstable.d.ts
+++ b/cli/tsc/dts/lib.deno.unstable.d.ts
@@ -1488,7 +1488,13 @@ declare namespace Deno {
*
* @category KV
*/
- export type KvKeyPart = Uint8Array | string | number | bigint | boolean;
+ export type KvKeyPart =
+ | Uint8Array
+ | string
+ | number
+ | bigint
+ | boolean
+ | symbol;
/** **UNSTABLE**: New API, yet to be vetted.
*
@@ -2099,6 +2105,14 @@ declare namespace Deno {
*/
close(): void;
+ /**
+ * Get a symbol that represents the versionstamp of the current atomic
+ * operation. This symbol can be used as the last part of a key in
+ * `.set()`, both directly on the `Kv` object and on an `AtomicOperation`
+ * object created from this `Kv` instance.
+ */
+ commitVersionstamp(): symbol;
+
[Symbol.dispose](): void;
}
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index 18d190718..1817f9f07 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -75,6 +75,7 @@ type RawValue = {
};
const kvSymbol = Symbol("KvRid");
+const commitVersionstampSymbol = Symbol("KvCommitVersionstamp");
class Kv {
#rid: number;
@@ -94,6 +95,10 @@ class Kv {
return new AtomicOperation(this.#rid);
}
+ commitVersionstamp(): symbol {
+ return commitVersionstampSymbol;
+ }
+
async get(key: Deno.KvKey, opts?: { consistency?: Deno.KvConsistencyLevel }) {
const [entries]: [RawKvEntry[]] = await core.opAsync(
"op_kv_snapshot_read",
@@ -148,18 +153,10 @@ class Kv {
}
async set(key: Deno.KvKey, value: unknown, options?: { expireIn?: number }) {
- value = serializeValue(value);
-
- const checks: Deno.AtomicCheck[] = [];
- const mutations = [
- [key, "set", value, options?.expireIn],
- ];
-
- const versionstamp = await core.opAsync(
- "op_kv_atomic_write",
+ const versionstamp = await doAtomicWriteInPlace(
this.#rid,
- checks,
- mutations,
+ [],
+ [[key, "set", serializeValue(value), options?.expireIn]],
[],
);
if (versionstamp === null) throw new TypeError("Failed to set value");
@@ -167,16 +164,10 @@ class Kv {
}
async delete(key: Deno.KvKey) {
- const checks: Deno.AtomicCheck[] = [];
- const mutations = [
- [key, "delete", null, undefined],
- ];
-
- const result = await core.opAsync(
- "op_kv_atomic_write",
+ const result = await doAtomicWriteInPlace(
this.#rid,
- checks,
- mutations,
+ [],
+ [[key, "delete", null, undefined]],
[],
);
if (!result) throw new TypeError("Failed to set value");
@@ -251,21 +242,18 @@ class Kv {
validateBackoffSchedule(opts?.backoffSchedule);
}
- const enqueues = [
- [
- core.serialize(message, { forStorage: true }),
- opts?.delay ?? 0,
- opts?.keysIfUndelivered ?? [],
- opts?.backoffSchedule ?? null,
- ],
- ];
-
- const versionstamp = await core.opAsync(
- "op_kv_atomic_write",
+ const versionstamp = await doAtomicWriteInPlace(
this.#rid,
[],
[],
- enqueues,
+ [
+ [
+ core.serialize(message, { forStorage: true }),
+ opts?.delay ?? 0,
+ opts?.keysIfUndelivered ?? [],
+ opts?.backoffSchedule ?? null,
+ ],
+ ],
);
if (versionstamp === null) throw new TypeError("Failed to enqueue value");
return { ok: true, versionstamp };
@@ -511,8 +499,7 @@ class AtomicOperation {
}
async commit(): Promise<Deno.KvCommitResult | Deno.KvCommitError> {
- const versionstamp = await core.opAsync(
- "op_kv_atomic_write",
+ const versionstamp = await doAtomicWriteInPlace(
this.#rid,
this.#checks,
this.#mutations,
@@ -764,4 +751,30 @@ class KvListIterator extends AsyncIterator
}
}
+async function doAtomicWriteInPlace(
+ rid: number,
+ checks: [Deno.KvKey, string | null][],
+ mutations: [Deno.KvKey, string, RawValue | null, number | undefined][],
+ enqueues: [Uint8Array, number, Deno.KvKey[], number[] | null][],
+): Promise<string | null> {
+ for (const m of mutations) {
+ const key = m[0];
+ if (
+ key.length && m[1] === "set" &&
+ key[key.length - 1] === commitVersionstampSymbol
+ ) {
+ m[0] = key.slice(0, key.length - 1);
+ m[1] = "setSuffixVersionstampedKey";
+ }
+ }
+
+ return await core.opAsync(
+ "op_kv_atomic_write",
+ rid,
+ checks,
+ mutations,
+ enqueues,
+ );
+}
+
export { AtomicOperation, Kv, KvListIterator, KvU64, openKv };
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index 943aae460..e78526856 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -530,6 +530,9 @@ fn mutation_from_v8(
("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
("min", Some(value)) => MutationKind::Min(value.try_into()?),
("max", Some(value)) => MutationKind::Max(value.try_into()?),
+ ("setSuffixVersionstampedKey", Some(value)) => {
+ MutationKind::SetSuffixVersionstampedKey(value.try_into()?)
+ }
(op, Some(_)) => {
return Err(type_error(format!("invalid mutation '{op}' with value")))
}