diff options
-rw-r--r-- | Cargo.lock | 72 | ||||
-rw-r--r-- | Cargo.toml | 4 | ||||
-rw-r--r-- | cli/build.rs | 2 | ||||
-rw-r--r-- | cli/tests/unit/kv_test.ts | 351 | ||||
-rw-r--r-- | ext/kv/Cargo.toml | 5 | ||||
-rw-r--r-- | ext/kv/README.md | 73 | ||||
-rw-r--r-- | ext/kv/build.rs | 19 | ||||
-rw-r--r-- | ext/kv/codec.rs | 543 | ||||
-rw-r--r-- | ext/kv/dynamic.rs | 44 | ||||
-rw-r--r-- | ext/kv/interface.rs | 315 | ||||
-rw-r--r-- | ext/kv/lib.rs | 232 | ||||
-rw-r--r-- | ext/kv/proto/datapath.proto | 97 | ||||
-rw-r--r-- | ext/kv/proto/mod.rs | 7 | ||||
-rw-r--r-- | ext/kv/remote.rs | 590 | ||||
-rw-r--r-- | ext/kv/sqlite.rs | 1057 | ||||
-rw-r--r-- | runtime/build.rs | 2 | ||||
-rw-r--r-- | runtime/errors.rs | 4 | ||||
-rw-r--r-- | runtime/web_worker.rs | 14 | ||||
-rw-r--r-- | runtime/worker.rs | 10 | ||||
-rw-r--r-- | test_util/Cargo.toml | 1 | ||||
-rw-r--r-- | test_util/build.rs | 22 | ||||
-rw-r--r-- | test_util/src/kv_remote.rs | 7 | ||||
-rw-r--r-- | test_util/src/lib.rs | 19 |
23 files changed, 558 insertions, 2932 deletions
diff --git a/Cargo.lock b/Cargo.lock index eb9b7cdf3..138d0cd4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1412,8 +1412,13 @@ dependencies = [ "base64 0.21.4", "chrono", "deno_core", + "deno_fetch", "deno_node", + "deno_tls", "deno_unsync 0.1.1", + "denokv_proto", + "denokv_remote", + "denokv_sqlite", "hex", "log", "num-bigint", @@ -1777,6 +1782,64 @@ dependencies = [ ] [[package]] +name = "denokv_proto" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8952fb8c38c1dcd796d49b00030afb74aa184160ae86817b72a32a994c8e16f0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "num-bigint", + "prost", + "prost-build", + "serde", + "uuid 1.5.0", +] + +[[package]] +name = "denokv_remote" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edfc8447324d783b01e215bd5040ff9149c34d9715c7b7b5080dd648ebf1148a" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "chrono", + "denokv_proto", + "log", + "prost", + "rand 0.8.5", + "reqwest", + "serde", + "serde_json", + "tokio", + "url", + "uuid 1.5.0", +] + +[[package]] +name = "denokv_sqlite" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ec76b691ff069f14e56e3e053c2b2163540b27e4b60179f2b120064a7e4960d" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "denokv_proto", + "futures", + "log", + "num-bigint", + "rand 0.8.5", + "rusqlite", + "serde_json", + "tokio", + "uuid 1.5.0", +] + +[[package]] name = "der" version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -5621,6 +5684,7 @@ dependencies = [ "base64 0.21.4", "bytes", "console_static_text", + "denokv_proto", "fastwebsockets", "flate2", "futures", @@ -5743,9 +5807,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", @@ -5773,9 +5837,9 @@ dependencies = [ [[package]] name = "tokio-metrics" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4b2fc67d5dec41db679b9b052eb572269616926040b7831e32c8a152df77b84" +checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112" dependencies = [ "futures-util", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index e859641dd..1f136401e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,10 @@ test_util = { path = "./test_util" } deno_lockfile = "0.17.2" deno_media_type = { version = "0.1.1", features = ["module_specifier"] } +denokv_proto = "0.2.1" +denokv_sqlite = "0.2.1" +denokv_remote = "0.2.3" + # exts deno_broadcast_channel = { version = "0.115.0", path = "./ext/broadcast_channel" } deno_cache = { version = "0.53.0", path = "./ext/cache" } diff --git a/cli/build.rs b/cli/build.rs index f01c006be..e6b9dc0a4 100644 --- a/cli/build.rs +++ b/cli/build.rs @@ -381,7 +381,7 @@ fn create_cli_snapshot(snapshot_path: PathBuf) -> CreateSnapshotOutput { deno_net::deno_net::init_ops::<PermissionsContainer>(None, None), deno_tls::deno_tls::init_ops(), deno_kv::deno_kv::init_ops(SqliteDbHandler::<PermissionsContainer>::new( - None, + None, None, )), deno_napi::deno_napi::init_ops::<PermissionsContainer>(), deno_http::deno_http::init_ops::<DefaultHttpPropertyExtractor>(), diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 7cb8ebccf..4e3ce5385 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -55,9 +55,7 @@ function dbTest(name: string, fn: (db: Deno.Kv) => Promise<void> | void) { // https://github.com/denoland/deno/issues/18363 ignore: Deno.build.os === "darwin" && isCI, async fn() { - const db: Deno.Kv = await Deno.openKv( - ":memory:", - ); + const db: Deno.Kv = await Deno.openKv(":memory:"); try { await fn(db); } finally { @@ -73,14 +71,14 @@ function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) { // https://github.com/denoland/deno/issues/18363 ignore: Deno.build.os === "darwin" && isCI, async fn() { - const db: Deno.Kv = await Deno.openKv( - ":memory:", - ); + const db: Deno.Kv = await Deno.openKv(":memory:"); await fn(db); }, }); } +const ZERO_VERSIONSTAMP = "00000000000000000000"; + dbTest("basic read-write-delete and versionstamps", async (db) => { const result1 = await db.get(["a"]); assertEquals(result1.key, ["a"]); @@ -89,17 +87,19 @@ dbTest("basic read-write-delete and versionstamps", async (db) => { const setRes = await db.set(["a"], "b"); assert(setRes.ok); - assertEquals(setRes.versionstamp, "00000000000000010000"); + assert(setRes.versionstamp > ZERO_VERSIONSTAMP); const result2 = await db.get(["a"]); assertEquals(result2.key, ["a"]); assertEquals(result2.value, "b"); - assertEquals(result2.versionstamp, "00000000000000010000"); + assertEquals(result2.versionstamp, setRes.versionstamp); - await db.set(["a"], "c"); + const setRes2 = await db.set(["a"], "c"); + assert(setRes2.ok); + assert(setRes2.versionstamp > setRes.versionstamp); const result3 = await db.get(["a"]); assertEquals(result3.key, ["a"]); assertEquals(result3.value, "c"); - assertEquals(result3.versionstamp, "00000000000000020000"); + assertEquals(result3.versionstamp, setRes2.versionstamp); await db.delete(["a"]); const result4 = await db.get(["a"]); @@ -230,17 +230,18 @@ dbTest("compare and mutate", async (db) => { await db.set(["t"], "1"); const currentValue = await db.get(["t"]); - assertEquals(currentValue.versionstamp, "00000000000000010000"); + assert(currentValue.versionstamp); + assert(currentValue.versionstamp > ZERO_VERSIONSTAMP); let res = await db.atomic() .check({ key: ["t"], versionstamp: currentValue.versionstamp }) .set(currentValue.key, "2") .commit(); assert(res.ok); - assertEquals(res.versionstamp, "00000000000000020000"); + assert(res.versionstamp > currentValue.versionstamp); const newValue = await db.get(["t"]); - assertEquals(newValue.versionstamp, "00000000000000020000"); + assertEquals(newValue.versionstamp, res.versionstamp); assertEquals(newValue.value, "2"); res = await db.atomic() @@ -250,7 +251,7 @@ dbTest("compare and mutate", async (db) => { assert(!res.ok); const newValue2 = await db.get(["t"]); - assertEquals(newValue2.versionstamp, "00000000000000020000"); + assertEquals(newValue2.versionstamp, newValue.versionstamp); assertEquals(newValue2.value, "2"); }); @@ -260,9 +261,10 @@ dbTest("compare and mutate not exists", async (db) => { .set(["t"], "1") .commit(); assert(res.ok); + assert(res.versionstamp > ZERO_VERSIONSTAMP); const newValue = await db.get(["t"]); - assertEquals(newValue.versionstamp, "00000000000000010000"); + assertEquals(newValue.versionstamp, res.versionstamp); assertEquals(newValue.value, "1"); res = await db.atomic() @@ -303,13 +305,17 @@ dbTest("atomic mutation helper (max)", async (db) => { }); dbTest("compare multiple and mutate", async (db) => { - await db.set(["t1"], "1"); - await db.set(["t2"], "2"); + const setRes1 = await db.set(["t1"], "1"); + const setRes2 = await db.set(["t2"], "2"); + assert(setRes1.ok); + assert(setRes1.versionstamp > ZERO_VERSIONSTAMP); + assert(setRes2.ok); + assert(setRes2.versionstamp > ZERO_VERSIONSTAMP); const currentValue1 = await db.get(["t1"]); - assertEquals(currentValue1.versionstamp, "00000000000000010000"); + assertEquals(currentValue1.versionstamp, setRes1.versionstamp); const currentValue2 = await db.get(["t2"]); - assertEquals(currentValue2.versionstamp, "00000000000000020000"); + assertEquals(currentValue2.versionstamp, setRes2.versionstamp); const res = await db.atomic() .check({ key: ["t1"], versionstamp: currentValue1.versionstamp }) @@ -318,12 +324,13 @@ dbTest("compare multiple and mutate", async (db) => { .set(currentValue2.key, "4") .commit(); assert(res.ok); + assert(res.versionstamp > setRes2.versionstamp); const newValue1 = await db.get(["t1"]); - assertEquals(newValue1.versionstamp, "00000000000000030000"); + assertEquals(newValue1.versionstamp, res.versionstamp); assertEquals(newValue1.value, "3"); const newValue2 = await db.get(["t2"]); - assertEquals(newValue2.versionstamp, "00000000000000030000"); + assertEquals(newValue2.versionstamp, res.versionstamp); assertEquals(newValue2.value, "4"); // just one of the two checks failed @@ -336,10 +343,10 @@ dbTest("compare multiple and mutate", async (db) => { assert(!res2.ok); const newValue3 = await db.get(["t1"]); - assertEquals(newValue3.versionstamp, "00000000000000030000"); + assertEquals(newValue3.versionstamp, res.versionstamp); assertEquals(newValue3.value, "3"); const newValue4 = await db.get(["t2"]); - assertEquals(newValue4.versionstamp, "00000000000000030000"); + assertEquals(newValue4.versionstamp, res.versionstamp); assertEquals(newValue4.value, "4"); }); @@ -635,8 +642,8 @@ async function collect<T>( return entries; } -async function setupData(db: Deno.Kv) { - await db.atomic() +async function setupData(db: Deno.Kv): Promise<string> { + const res = await db.atomic() .set(["a"], -1) .set(["a", "a"], 0) .set(["a", "b"], 1) @@ -646,27 +653,29 @@ async function setupData(db: Deno.Kv) { .set(["b"], 99) .set(["b", "a"], 100) .commit(); + assert(res.ok); + return res.versionstamp; } dbTest("get many", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await db.getMany([["b", "a"], ["a"], ["c"]]); assertEquals(entries, [ - { key: ["b", "a"], value: 100, versionstamp: "00000000000000010000" }, - { key: ["a"], value: -1, versionstamp: "00000000000000010000" }, + { key: ["b", "a"], value: 100, versionstamp }, + { key: ["a"], value: -1, versionstamp }, { key: ["c"], value: null, versionstamp: null }, ]); }); dbTest("list prefix", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect(db.list({ prefix: ["a"] })); assertEquals(entries, [ - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "e"], value: 4, versionstamp }, ]); }); @@ -680,12 +689,12 @@ dbTest("list prefix empty", async (db) => { }); dbTest("list prefix with start", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect(db.list({ prefix: ["a"], start: ["a", "c"] })); assertEquals(entries, [ - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "e"], value: 4, versionstamp }, ]); }); @@ -696,11 +705,11 @@ dbTest("list prefix with start empty", async (db) => { }); dbTest("list prefix with end", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect(db.list({ prefix: ["a"], end: ["a", "c"] })); assertEquals(entries, [ - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, ]); }); @@ -711,35 +720,34 @@ dbTest("list prefix with end empty", async (db) => { }); dbTest("list prefix with empty prefix", async (db) => { - await db.set(["a"], 1); + const res = await db.set(["a"], 1); const entries = await collect(db.list({ prefix: [] })); assertEquals(entries, [ - { key: ["a"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a"], value: 1, versionstamp: res.versionstamp }, ]); }); dbTest("list prefix reverse", async (db) => { - await setupData(db); - + const versionstamp = await setupData(db); const entries = await collect(db.list({ prefix: ["a"] }, { reverse: true })); assertEquals(entries, [ - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "a"], value: 0, versionstamp }, ]); }); dbTest("list prefix reverse with start", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ prefix: ["a"], start: ["a", "c"] }, { reverse: true }), ); assertEquals(entries, [ - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, ]); }); @@ -752,13 +760,13 @@ dbTest("list prefix reverse with start empty", async (db) => { }); dbTest("list prefix reverse with end", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ prefix: ["a"], end: ["a", "c"] }, { reverse: true }), ); assertEquals(entries, [ - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "a"], value: 0, versionstamp }, ]); }); @@ -771,83 +779,82 @@ dbTest("list prefix reverse with end empty", async (db) => { }); dbTest("list prefix limit", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect(db.list({ prefix: ["a"] }, { limit: 2 })); assertEquals(entries, [ - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, ]); }); dbTest("list prefix limit reverse", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ prefix: ["a"] }, { limit: 2, reverse: true }), ); assertEquals(entries, [ - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, ]); }); dbTest("list prefix with small batch size", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect(db.list({ prefix: ["a"] }, { batchSize: 2 })); assertEquals(entries, [ - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "e"], value: 4, versionstamp }, ]); }); dbTest("list prefix with small batch size reverse", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ prefix: ["a"] }, { batchSize: 2, reverse: true }), ); assertEquals(entries, [ - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "a"], value: 0, versionstamp }, ]); }); dbTest("list prefix with small batch size and limit", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ prefix: ["a"] }, { batchSize: 2, limit: 3 }), ); assertEquals(entries, [ - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, ]); }); dbTest("list prefix with small batch size and limit reverse", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ prefix: ["a"] }, { batchSize: 2, limit: 3, reverse: true }), ); assertEquals(entries, [ - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, ]); }); dbTest("list prefix with manual cursor", async (db) => { - await setupData(db); - + const versionstamp = await setupData(db); const iterator = db.list({ prefix: ["a"] }, { limit: 2 }); const values = await collect(iterator); assertEquals(values, [ - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, ]); const cursor = iterator.cursor; @@ -856,20 +863,20 @@ dbTest("list prefix with manual cursor", async (db) => { const iterator2 = db.list({ prefix: ["a"] }, { cursor }); const values2 = await collect(iterator2); assertEquals(values2, [ - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "e"], value: 4, versionstamp }, ]); }); dbTest("list prefix with manual cursor reverse", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const iterator = db.list({ prefix: ["a"] }, { limit: 2, reverse: true }); const values = await collect(iterator); assertEquals(values, [ - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, ]); const cursor = iterator.cursor; @@ -878,57 +885,57 @@ dbTest("list prefix with manual cursor reverse", async (db) => { const iterator2 = db.list({ prefix: ["a"] }, { cursor, reverse: true }); const values2 = await collect(iterator2); assertEquals(values2, [ - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "a"], value: 0, versionstamp }, ]); }); dbTest("list range", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ start: ["a", "a"], end: ["a", "z"] }), ); assertEquals(entries, [ - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "e"], value: 4, versionstamp }, ]); }); dbTest("list range reverse", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ start: ["a", "a"], end: ["a", "z"] }, { reverse: true }), ); assertEquals(entries, [ - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "a"], value: 0, versionstamp }, ]); }); dbTest("list range with limit", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ start: ["a", "a"], end: ["a", "z"] }, { limit: 3 }), ); assertEquals(entries, [ - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "a"], value: 0, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, ]); }); dbTest("list range with limit reverse", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ start: ["a", "a"], end: ["a", "z"] }, { @@ -937,46 +944,46 @@ dbTest("list range with limit reverse", async (db) => { }), ); assertEquals(entries, [ - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, ]); }); dbTest("list range nesting", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect(db.list({ start: ["a"], end: ["a", "d"] })); assertEquals(entries, [ - { key: ["a"], value: -1, versionstamp: "00000000000000010000" }, - { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a"], value: -1, versionstamp }, + { key: ["a", "a"], value: 0, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, ]); }); dbTest("list range short", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const entries = await collect( db.list({ start: ["a", "b"], end: ["a", "d"] }), ); assertEquals(entries, [ - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, ]); }); dbTest("list range with manual cursor", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const iterator = db.list({ start: ["a", "b"], end: ["a", "z"] }, { limit: 2, }); const entries = await collect(iterator); assertEquals(entries, [ - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, + { key: ["a", "b"], value: 1, versionstamp }, + { key: ["a", "c"], value: 2, versionstamp }, ]); const cursor = iterator.cursor; @@ -985,13 +992,13 @@ dbTest("list range with manual cursor", async (db) => { }); const entries2 = await collect(iterator2); assertEquals(entries2, [ - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, + { key: ["a", "d"], value: 3, versionstamp }, + { key: ["a", "e"], value: 4, versionstamp }, ]); }); dbTest("list range with manual cursor reverse", async (db) => { - await setupData(db); + const versionstamp = await setupData(db); const iterator = db.list({ start: ["a", "b"], end: ["a", "z"] }, { limit: 2, @@ -999,8 +1006,8 @@ dbTest("list range with manual cursor reverse", async (db) => { }); const entries = await collect(iterator); assertEquals(entries, [ - { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" }, - { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" }, + { key: ["a", "e"], value: 4, versionstamp }, + { key: ["a", "d"], value: 3, versionstamp }, ]); const cursor = iterator.cursor; @@ -1010,8 +1017,8 @@ dbTest("list range with manual cursor reverse", async (db) => { }); const entries2 = await collect(iterator2); assertEquals(entries2, [ - { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" }, - { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" }, + { key: ["a", "c"], value: 2, versionstamp }, + { key: ["a", "b"], value: 1, versionstamp }, ]); }); @@ -1110,12 +1117,12 @@ dbTest("key size limit", async (db) => { const lastValidKey = new Uint8Array(2046).fill(1); const firstInvalidKey = new Uint8Array(2047).fill(1); - await db.set([lastValidKey], 1); + const res = await db.set([lastValidKey], 1); assertEquals(await db.get([lastValidKey]), { key: [lastValidKey], value: 1, - versionstamp: "00000000000000010000", + versionstamp: res.versionstamp, }); await assertRejects( @@ -1135,11 +1142,11 @@ dbTest("value size limit", async (db) => { const lastValidValue = new Uint8Array(65536); const firstInvalidValue = new Uint8Array(65537); - await db.set(["a"], lastValidValue); + const res = await db.set(["a"], lastValidValue); assertEquals(await db.get(["a"]), { key: ["a"], value: lastValidValue, - versionstamp: "00000000000000010000", + versionstamp: res.versionstamp, }); await assertRejects( @@ -1415,21 +1422,17 @@ for (const { name, value } of VALUE_CASES) { queueTest(`listenQueue and enqueue ${name}`, async (db) => { const numEnqueues = 10; let count = 0; - const promises: Deferred<void>[] = []; - const dequeuedMessages: unknown[] = []; + const promises: Deferred<unknown>[] = []; const listeners: Promise<void>[] = []; - listeners.push(db.listenQueue((msg) => { - dequeuedMessages.push(msg); - promises[count++].resolve(); + listeners.push(db.listenQueue((msg: unknown) => { + promises[count++].resolve(msg); })); try { for (let i = 0; i < numEnqueues; i++) { promises.push(deferred()); await db.enqueue(value); } - for (let i = 0; i < numEnqueues; i++) { - await promises[i]; - } + const dequeuedMessages = await Promise.all(promises); for (let i = 0; i < numEnqueues; i++) { assertEquals(dequeuedMessages[i], value); } @@ -1445,7 +1448,7 @@ for (const { name, value } of VALUE_CASES) { queueTest("queue mixed types", async (db) => { let promise: Deferred<void>; let dequeuedMessage: unknown = null; - const listener = db.listenQueue((msg) => { + const listener = db.listenQueue((msg: unknown) => { dequeuedMessage = msg; promise.resolve(); }); @@ -2066,25 +2069,16 @@ Deno.test({ const db = await Deno.openKv( "http://localhost:4545/kv_remote_authorize_invalid_format", ); - let ok = false; - try { - await db.set(["some-key"], 1); - } catch (e) { - if ( - e.name === "TypeError" && - e.message.startsWith("Metadata error: Failed to decode metadata: ") - ) { - ok = true; - } else { - throw e; - } - } finally { - db.close(); - } - if (!ok) { - throw new Error("did not get expected error"); - } + await assertRejects( + async () => { + await db.set(["some-key"], 1); + }, + Error, + "Failed to parse metadata: ", + ); + + db.close(); }, }); @@ -2094,24 +2088,15 @@ Deno.test({ const db = await Deno.openKv( "http://localhost:4545/kv_remote_authorize_invalid_version", ); - let ok = false; - try { - await db.set(["some-key"], 1); - } catch (e) { - if ( - e.name === "TypeError" && - e.message === "Metadata error: Unsupported metadata version: 2" - ) { - ok = true; - } else { - throw e; - } - } finally { - db.close(); - } - if (!ok) { - throw new Error("did not get expected error"); - } + await assertRejects( + async () => { + await db.set(["some-key"], 1); + }, + Error, + "Failed to parse metadata: unsupported metadata version: 1000", + ); + + db.close(); }, }); diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml index 47bf11177..c85d5f7a8 100644 --- a/ext/kv/Cargo.toml +++ b/ext/kv/Cargo.toml @@ -19,8 +19,13 @@ async-trait.workspace = true base64.workspace = true chrono.workspace = true deno_core.workspace = true +deno_fetch.workspace = true deno_node.workspace = true +deno_tls.workspace = true deno_unsync = "0.1.1" +denokv_proto.workspace = true +denokv_remote.workspace = true +denokv_sqlite.workspace = true hex.workspace = true log.workspace = true num-bigint.workspace = true diff --git a/ext/kv/README.md b/ext/kv/README.md index f5c2de9ed..21c8e9e72 100644 --- a/ext/kv/README.md +++ b/ext/kv/README.md @@ -8,74 +8,19 @@ please read the [manual](https://deno.land/manual/runtime/kv). Deno KV has a pluggable storage interface that supports multiple backends: - SQLite - backed by a local SQLite database. This backend is suitable for - development and is the default when running locally. + development and is the default when running locally. It is implemented in the + [denokv_sqlite crate](https://github.com/denoland/denokv/blob/main/sqlite). - Remote - backed by a remote service that implements the [KV Connect](#kv-connect) protocol, for example [Deno Deploy](https://deno.com/deploy). -Additional backends can be added by implementing the `DatabaseHandler` trait. +Additional backends can be added by implementing the `Database` trait. ## KV Connect -The KV Connect protocol has separate control and data planes to maximize -throughput and minimize latency. _Metadata Exchange_ and _Data Path_ are the two -sub-protocols that are used when talking to a KV Connect-compatible service. - -### Metadata Exchange - -To connect to a KV Connect service, the user provides an HTTP or HTTPS URL to -`Deno.openKv`. A background task is then spawned to periodically make HTTP POST -requests to the provided URL to refresh database metadata. - -The HTTP `Authorization` header is included and have the format -`Bearer <access-token>`. The `<access-token>` is a static token issued by the -service provider. For Deno Deploy, this is the personal access token generated -from the dashboard. You can specify the access token with the environment -variable `DENO_KV_ACCESS_TOKEN`. - -Request body is currently unused. The response is a JSON message that satisfies -the [JSON Schema](https://json-schema.org/) definition in -`cli/schemas/kv-metadata-exchange-response.v1.json`. - -Semantics of the response fields: - -- `version`: Protocol version. The only supported value is `1`. -- `databaseId`: UUID of the database. -- `endpoints`: Data plane endpoints that can serve requests to the database, - along with their consistency levels. -- `token`: An ephemeral authentication token that must be included in all - requests to the data plane. This value is an opaque string and the client - should not depend on its format. -- `expiresAt`: The time at which the token expires. Encoded as an ISO 8601 - string. - -### Data Path - -After the first metadata exchange has completed, the client can talk to the data -plane endpoints listed in the `endpoints` field using a Protobuf-over-HTTP -protocol called the _Data Path_. The Protobuf messages are defined in -`proto/datapath.proto`. - -Two sub-endpoints are available under a data plane endpoint URL: - -- `POST /snapshot_read`: Used for read operations: `kv.get()` and - `kv.getMany()`. - - **Request type**: `SnapshotRead` - - **Response type**: `SnapshotReadOutput` -- `POST /atomic_write`: Used for write operations: `kv.set()` and - `kv.atomic().commit()`. - - **Request type**: `AtomicWrite` - - **Response type**: `AtomicWriteOutput` - -An HTTP `Authorization` header in the format `Bearer <ephemeral-token>` must be -included in all requests to the data plane. The value of `<ephemeral-token>` is -the `token` field from the metadata exchange response. - -### Error handling - -All non-client errors (i.e. network errors and HTTP 5xx status codes) are -handled by retrying the request. Randomized exponential backoff is applied to -each retry. - -Client errors cannot be recovered by retrying. A JavaScript exception is -generated for each of those errors. +The KV Connect protocol allows the Deno CLI to communicate with a remote KV +database. The +[specification for the protocol](https://github.com/denoland/denokv/blob/main/proto/kv-connect.md), +and the +[protobuf definitions](https://github.com/denoland/denokv/blob/main/proto/schema/datapath.proto) +can be found in the `denokv` repository, under the `proto` directory. diff --git a/ext/kv/build.rs b/ext/kv/build.rs deleted file mode 100644 index eba8a20f7..000000000 --- a/ext/kv/build.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -use std::env; -use std::io; -use std::path::PathBuf; - -fn main() -> io::Result<()> { - println!("cargo:rerun-if-changed=./proto"); - - let descriptor_path = - PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin"); - - prost_build::Config::new() - .file_descriptor_set_path(&descriptor_path) - .compile_well_known_types() - .compile_protos(&["proto/datapath.proto"], &["proto/"])?; - - Ok(()) -} diff --git a/ext/kv/codec.rs b/ext/kv/codec.rs deleted file mode 100644 index 522c2e9bc..000000000 --- a/ext/kv/codec.rs +++ /dev/null @@ -1,543 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -// Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/main/foundationdb/src/tuple/pack.rs - -use crate::Key; -use crate::KeyPart; - -//const NIL: u8 = 0x00; -const BYTES: u8 = 0x01; -const STRING: u8 = 0x02; -//const NESTED: u8 = 0x05; -const NEGINTSTART: u8 = 0x0b; -const INTZERO: u8 = 0x14; -const POSINTEND: u8 = 0x1d; -//const FLOAT: u8 = 0x20; -const DOUBLE: u8 = 0x21; -const FALSE: u8 = 0x26; -const TRUE: u8 = 0x27; - -const ESCAPE: u8 = 0xff; - -const CANONICAL_NAN_POS: u64 = 0x7ff8000000000000u64; -const CANONICAL_NAN_NEG: u64 = 0xfff8000000000000u64; - -pub fn canonicalize_f64(n: f64) -> f64 { - if n.is_nan() { - if n.is_sign_negative() { - f64::from_bits(CANONICAL_NAN_NEG) - } else { - f64::from_bits(CANONICAL_NAN_POS) - } - } else { - n - } -} - -pub fn encode_key(key: &Key) -> std::io::Result<Vec<u8>> { - let mut output: Vec<u8> = vec![]; - for part in &key.0 { - match part { - KeyPart::String(key) => { - output.push(STRING); - escape_raw_bytes_into(&mut output, key.as_bytes()); - output.push(0); - } - KeyPart::Int(key) => { - bigint::encode_into(&mut output, key)?; - } - KeyPart::Float(key) => { - double::encode_into(&mut output, *key); - } - KeyPart::Bytes(key) => { - output.push(BYTES); - escape_raw_bytes_into(&mut output, key); - output.push(0); - } - KeyPart::False => { - output.push(FALSE); - } - KeyPart::True => { - output.push(TRUE); - } - } - } - Ok(output) -} - -pub fn decode_key(mut bytes: &[u8]) -> std::io::Result<Key> { - let mut key = Key(vec![]); - while !bytes.is_empty() { - let tag = bytes[0]; - bytes = &bytes[1..]; - - let next_bytes = match tag { - self::STRING => { - let (next_bytes, data) = parse_slice(bytes)?; - let data = String::from_utf8(data).map_err(|_| { - std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid utf8") - })?; - key.0.push(KeyPart::String(data)); - next_bytes - } - self::NEGINTSTART..=self::POSINTEND => { - let (next_bytes, data) = bigint::decode_from(bytes, tag)?; - key.0.push(KeyPart::Int(data)); - next_bytes - } - self::DOUBLE => { - let (next_bytes, data) = double::decode_from(bytes)?; - key.0.push(KeyPart::Float(data)); - next_bytes - } - self::BYTES => { - let (next_bytes, data) = parse_slice(bytes)?; - key.0.push(KeyPart::Bytes(data)); - next_bytes - } - self::FALSE => { - key.0.push(KeyPart::False); - bytes - } - self::TRUE => { - key.0.push(KeyPart::True); - bytes - } - _ => { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "invalid tag", - )) - } - }; - - bytes = next_bytes; - } - Ok(key) -} - -fn escape_raw_bytes_into(out: &mut Vec<u8>, x: &[u8]) { - for &b in x { - out.push(b); - if b == 0 { - out.push(ESCAPE); - } - } -} - -mod bigint { - use num_bigint::BigInt; - use num_bigint::Sign; - - use super::parse_byte; - use super::parse_bytes; - const MAX_SZ: usize = 8; - - // Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/7415e116d5d96c2630976058de28e439eed7e809/foundationdb/src/tuple/pack.rs#L575 - pub fn encode_into(out: &mut Vec<u8>, key: &BigInt) -> std::io::Result<()> { - if key.sign() == Sign::NoSign { - out.push(super::INTZERO); - return Ok(()); - } - let (sign, mut bytes) = key.to_bytes_be(); - let n = bytes.len(); - match sign { - Sign::Minus => { - if n <= MAX_SZ { - out.push(super::INTZERO - n as u8); - } else { - out.extend_from_slice(&[super::NEGINTSTART, bigint_n(n)? ^ 0xff]); - } - invert(&mut bytes); - out.extend_from_slice(&bytes); - } - Sign::NoSign => unreachable!(), - Sign::Plus => { - if n <= MAX_SZ { - out.push(super::INTZERO + n as u8); - } else { - out.extend_from_slice(&[super::POSINTEND, bigint_n(n)?]); - } - out.extend_from_slice(&bytes); - } - } - Ok(()) - } - - pub fn decode_from( - input: &[u8], - tag: u8, - ) -> std::io::Result<(&[u8], BigInt)> { - if super::INTZERO <= tag && tag <= super::INTZERO + MAX_SZ as u8 { - let n = (tag - super::INTZERO) as usize; - let (input, bytes) = parse_bytes(input, n)?; - Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes))) - } else if super::INTZERO - MAX_SZ as u8 <= tag && tag < super::INTZERO { - let n = (super::INTZERO - tag) as usize; - let (input, bytes) = parse_bytes(input, n)?; - Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes)))) - } else if tag == super::NEGINTSTART { - let (input, raw_length) = parse_byte(input)?; - let n = usize::from(raw_length ^ 0xff); - let (input, bytes) = parse_bytes(input, n)?; - Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes)))) - } else if tag == super::POSINTEND { - let (input, raw_length) = parse_byte(input)?; - let n: usize = usize::from(raw_length); - let (input, bytes) = parse_bytes(input, n)?; - Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes))) - } else { - Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("unknown bigint tag: {}", tag), - )) - } - } - - fn invert(bytes: &mut [u8]) { - // The ones' complement of a binary number is defined as the value - // obtained by inverting all the bits in the binary representation - // of the number (swapping 0s for 1s and vice versa). - for byte in bytes.iter_mut() { - *byte = !*byte; - } - } - - fn inverted(bytes: &[u8]) -> Vec<u8> { - // The ones' complement of a binary number is defined as the value - // obtained by inverting all the bits in the binary representation - // of the number (swapping 0s for 1s and vice versa). - bytes.iter().map(|byte| !*byte).collect() - } - - fn bigint_n(n: usize) -> std::io::Result<u8> { - u8::try_from(n).map_err(|_| { - std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "BigUint requires more than 255 bytes to be represented", - ) - }) - } -} - -mod double { - macro_rules! sign_bit { - ($type:ident) => { - (1 << (std::mem::size_of::<$type>() * 8 - 1)) - }; - } - - fn f64_to_ux_be_bytes(f: f64) -> [u8; 8] { - let u = if f.is_sign_negative() { - f.to_bits() ^ ::std::u64::MAX - } else { - f.to_bits() ^ sign_bit!(u64) - }; - u.to_be_bytes() - } - - pub fn encode_into(out: &mut Vec<u8>, x: f64) { - out.push(super::DOUBLE); - out.extend_from_slice(&f64_to_ux_be_bytes(super::canonicalize_f64(x))); - } - - pub fn decode_from(input: &[u8]) -> std::io::Result<(&[u8], f64)> { - let (input, bytes) = super::parse_bytes(input, 8)?; - let mut arr = [0u8; 8]; - arr.copy_from_slice(bytes); - let u = u64::from_be_bytes(arr); - Ok(( - input, - f64::from_bits(if (u & sign_bit!(u64)) == 0 { - u ^ ::std::u64::MAX - } else { - u ^ sign_bit!(u64) - }), - )) - } -} - -#[inline] -fn parse_bytes(input: &[u8], num: usize) -> std::io::Result<(&[u8], &[u8])> { - if input.len() < num { - Err(std::io::ErrorKind::UnexpectedEof.into()) - } else { - Ok((&input[num..], &input[..num])) - } -} - -#[inline] -fn parse_byte(input: &[u8]) -> std::io::Result<(&[u8], u8)> { - if input.is_empty() { - Err(std::io::ErrorKind::UnexpectedEof.into()) - } else { - Ok((&input[1..], input[0])) - } -} - -fn parse_slice(input: &[u8]) -> std::io::Result<(&[u8], Vec<u8>)> { - let mut output: Vec<u8> = Vec::new(); - let mut i = 0usize; - - while i < input.len() { - let byte = input[i]; - i += 1; - - if byte == 0 { - if input.get(i).copied() == Some(ESCAPE) { - output.push(0); - i += 1; - continue; - } else { - return Ok((&input[i..], output)); - } - } - - output.push(byte); - } - - Err(std::io::ErrorKind::UnexpectedEof.into()) -} - -#[cfg(test)] -mod tests { - use num_bigint::BigInt; - use std::cmp::Ordering; - - use crate::Key; - use crate::KeyPart; - - use super::decode_key; - use super::encode_key; - - fn roundtrip(key: Key) { - let bytes = encode_key(&key).unwrap(); - let decoded = decode_key(&bytes).unwrap(); - assert_eq!(&key, &decoded); - assert_eq!(format!("{:?}", key), format!("{:?}", decoded)); - } - - fn check_order(a: Key, b: Key, expected: Ordering) { - let a_bytes = encode_key(&a).unwrap(); - let b_bytes = encode_key(&b).unwrap(); - - assert_eq!(a.cmp(&b), expected); - assert_eq!(a_bytes.cmp(&b_bytes), expected); - } - - fn check_bijection(key: Key, serialized: &[u8]) { - let bytes = encode_key(&key).unwrap(); - assert_eq!(&bytes[..], serialized); - let decoded = decode_key(serialized).unwrap(); - assert_eq!(&key, &decoded); - } - - #[test] - fn simple_roundtrip() { - roundtrip(Key(vec![ - KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00]), - KeyPart::String("foo".to_string()), - KeyPart::Float(-f64::NAN), - KeyPart::Float(-f64::INFINITY), - KeyPart::Float(-42.1), - KeyPart::Float(-0.0), - KeyPart::Float(0.0), - KeyPart::Float(42.1), - KeyPart::Float(f64::INFINITY), - KeyPart::Float(f64::NAN), - KeyPart::Int(BigInt::from(-10000)), - KeyPart::Int(BigInt::from(-1)), - KeyPart::Int(BigInt::from(0)), - KeyPart::Int(BigInt::from(1)), - KeyPart::Int(BigInt::from(10000)), - KeyPart::False, - KeyPart::True, - ])); - } - - #[test] - #[rustfmt::skip] - fn order_bytes() { - check_order( - Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), - Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), - Ordering::Equal, - ); - - check_order( - Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), - Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]), - Ordering::Less, - ); - - check_order( - Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]), - Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), - Ordering::Greater, - ); - - check_order( - Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), - Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]), - Ordering::Less, - ); - - check_order( - Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]), - Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]), - Ordering::Greater, - ); - } - - #[test] - #[rustfmt::skip] - fn order_tags() { - check_order( - Key(vec![KeyPart::Bytes(vec![])]), - Key(vec![KeyPart::String("".into())]), - Ordering::Less, - ); - - check_order( - Key(vec![KeyPart::String("".into())]), - Key(vec![KeyPart::Int(BigInt::from(0))]), - Ordering::Less, - ); - - check_order( - Key(vec![KeyPart::Int(BigInt::from(0))]), - Key(vec![KeyPart::Float(0.0)]), - Ordering::Less, - ); - - check_order( - Key(vec![KeyPart::Float(0.0)]), - Key(vec![KeyPart::False]), - Ordering::Less, - ); - - check_order( - Key(vec![KeyPart::False]), - Key(vec![KeyPart::True]), - Ordering::Less, - ); - - check_order( - Key(vec![KeyPart::True]), - Key(vec![KeyPart::Bytes(vec![])]), - Ordering::Greater, - ); - } - - #[test] - #[rustfmt::skip] - fn order_floats() { - check_order( - Key(vec![KeyPart::Float(-f64::NAN)]), - Key(vec![KeyPart::Float(-f64::INFINITY)]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Float(-f64::INFINITY)]), - Key(vec![KeyPart::Float(-10.0)]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Float(-10.0)]), - Key(vec![KeyPart::Float(-0.0)]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Float(-0.0)]), - Key(vec![KeyPart::Float(0.0)]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Float(0.0)]), - Key(vec![KeyPart::Float(10.0)]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Float(10.0)]), - Key(vec![KeyPart::Float(f64::INFINITY)]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Float(f64::INFINITY)]), - Key(vec![KeyPart::Float(f64::NAN)]), - Ordering::Less, - ); - } - - #[test] - #[rustfmt::skip] - fn order_ints() { - check_order( - Key(vec![KeyPart::Int(BigInt::from(-10000))]), - Key(vec![KeyPart::Int(BigInt::from(-100))]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Int(BigInt::from(-100))]), - Key(vec![KeyPart::Int(BigInt::from(-1))]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Int(BigInt::from(-1))]), - Key(vec![KeyPart::Int(BigInt::from(0))]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Int(BigInt::from(0))]), - Key(vec![KeyPart::Int(BigInt::from(1))]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Int(BigInt::from(1))]), - Key(vec![KeyPart::Int(BigInt::from(100))]), - Ordering::Less, - ); - check_order( - Key(vec![KeyPart::Int(BigInt::from(100))]), - Key(vec![KeyPart::Int(BigInt::from(10000))]), - Ordering::Less, - ); - } - - #[test] - #[rustfmt::skip] - fn float_canonicalization() { - let key1 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000001))]); - let key2 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000002))]); - - assert_eq!(key1, key2); - assert_eq!(encode_key(&key1).unwrap(), encode_key(&key2).unwrap()); - } - - #[test] - #[rustfmt::skip] - fn explicit_bijection() { - // string - check_bijection( - Key(vec![KeyPart::String("hello".into())]), - &[0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00], - ); - - // zero byte escape - check_bijection( - Key(vec![KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08])]), - &[0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00], - ); - - // array - check_bijection( - Key(vec![ - KeyPart::String("hello".into()), - KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08]), - ]), - &[ - 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, /* string */ - 0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00, /* bytes */ - ], - ); - } -} diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs index 9084cc1bf..b772d26b8 100644 --- a/ext/kv/dynamic.rs +++ b/ext/kv/dynamic.rs @@ -7,17 +7,17 @@ use crate::remote::RemoteDbHandlerPermissions; use crate::sqlite::SqliteDbHandler; use crate::sqlite::SqliteDbHandlerPermissions; use crate::AtomicWrite; -use crate::CommitResult; use crate::Database; use crate::DatabaseHandler; use crate::QueueMessageHandle; use crate::ReadRange; -use crate::ReadRangeOutput; use crate::SnapshotReadOptions; use async_trait::async_trait; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::OpState; +use denokv_proto::CommitResult; +use denokv_proto::ReadRangeOutput; pub struct MultiBackendDbHandler { backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>, @@ -34,15 +34,20 @@ impl MultiBackendDbHandler { P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static, >( default_storage_dir: Option<std::path::PathBuf>, + versionstamp_rng_seed: Option<u64>, + http_options: crate::remote::HttpOptions, ) -> Self { Self::new(vec![ ( &["https://", "http://"], - Box::new(crate::remote::RemoteDbHandler::<P>::new()), + Box::new(crate::remote::RemoteDbHandler::<P>::new(http_options)), ), ( &[""], - Box::new(SqliteDbHandler::<P>::new(default_storage_dir)), + Box::new(SqliteDbHandler::<P>::new( + default_storage_dir, + versionstamp_rng_seed, + )), ), ]) } @@ -118,20 +123,17 @@ where pub trait DynamicDb { async fn dyn_snapshot_read( &self, - state: Rc<RefCell<OpState>>, requests: Vec<ReadRange>, options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError>; async fn dyn_atomic_write( &self, - state: Rc<RefCell<OpState>>, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError>; async fn dyn_dequeue_next_message( &self, - state: Rc<RefCell<OpState>>, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>; fn dyn_close(&self); @@ -143,26 +145,23 @@ impl Database for Box<dyn DynamicDb> { async fn snapshot_read( &self, - state: Rc<RefCell<OpState>>, requests: Vec<ReadRange>, options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError> { - (**self).dyn_snapshot_read(state, requests, options).await + (**self).dyn_snapshot_read(requests, options).await } async fn atomic_write( &self, - state: Rc<RefCell<OpState>>, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError> { - (**self).dyn_atomic_write(state, write).await + (**self).dyn_atomic_write(write).await } async fn dequeue_next_message( &self, - state: Rc<RefCell<OpState>>, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> { - (**self).dyn_dequeue_next_message(state).await + (**self).dyn_dequeue_next_message().await } fn close(&self) { @@ -178,28 +177,25 @@ where { async fn dyn_snapshot_read( &self, - state: Rc<RefCell<OpState>>, requests: Vec<ReadRange>, options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError> { - Ok(self.snapshot_read(state, requests, options).await?) + Ok(self.snapshot_read(requests, options).await?) } async fn dyn_atomic_write( &self, - state: Rc<RefCell<OpState>>, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError> { - Ok(self.atomic_write(state, write).await?) + Ok(self.atomic_write(write).await?) } async fn dyn_dequeue_next_message( &self, - state: Rc<RefCell<OpState>>, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> { Ok( self - .dequeue_next_message(state) + .dequeue_next_message() .await? .map(|x| Box::new(x) as Box<dyn QueueMessageHandle>), ) @@ -209,13 +205,3 @@ where self.close() } } - -#[async_trait(?Send)] -impl QueueMessageHandle for Box<dyn QueueMessageHandle> { - async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> { - (**self).take_payload().await - } - async fn finish(&self, success: bool) -> Result<(), AnyError> { - (**self).finish(success).await - } -} diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs index 1acf3ce16..d7aa68368 100644 --- a/ext/kv/interface.rs +++ b/ext/kv/interface.rs @@ -1,16 +1,12 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use std::cell::RefCell; -use std::cmp::Ordering; -use std::num::NonZeroU32; use std::rc::Rc; use async_trait::async_trait; use deno_core::error::AnyError; use deno_core::OpState; -use num_bigint::BigInt; - -use crate::codec::canonicalize_f64; +use denokv_proto::Database; #[async_trait(?Send)] pub trait DatabaseHandler { @@ -22,312 +18,3 @@ pub trait DatabaseHandler { path: Option<String>, ) -> Result<Self::DB, AnyError>; } - -#[async_trait(?Send)] -pub trait Database { - type QMH: QueueMessageHandle + 'static; - - async fn snapshot_read( - &self, - state: Rc<RefCell<OpState>>, - requests: Vec<ReadRange>, - options: SnapshotReadOptions, - ) -> Result<Vec<ReadRangeOutput>, AnyError>; - - async fn atomic_write( - &self, - state: Rc<RefCell<OpState>>, - write: AtomicWrite, - ) -> Result<Option<CommitResult>, AnyError>; - - async fn dequeue_next_message( - &self, - state: Rc<RefCell<OpState>>, - ) -> Result<Option<Self::QMH>, AnyError>; - - fn close(&self); -} - -#[async_trait(?Send)] -pub trait QueueMessageHandle { - async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError>; - async fn finish(&self, success: bool) -> Result<(), AnyError>; -} - -/// Options for a snapshot read. -pub struct SnapshotReadOptions { - pub consistency: Consistency, -} - -/// The consistency of a read. -#[derive(Eq, PartialEq, Copy, Clone, Debug)] -pub enum Consistency { - Strong, - Eventual, -} - -/// A key is for a KV pair. It is a vector of KeyParts. -/// -/// The ordering of the keys is defined by the ordering of the KeyParts. The -/// first KeyPart is the most significant, and the last KeyPart is the least -/// significant. -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)] -pub struct Key(pub Vec<KeyPart>); - -/// A key part is single item in a key. It can be a boolean, a double float, a -/// variable precision signed integer, a UTF-8 string, or an arbitrary byte -/// array. -/// -/// The ordering of a KeyPart is dependent on the type of the KeyPart. -/// -/// Between different types, the ordering is as follows: arbitrary byte array < -/// UTF-8 string < variable precision signed integer < double float < false < true. -/// -/// Within a type, the ordering is as follows: -/// - For a **boolean**, false is less than true. -/// - For a **double float**, the ordering must follow -NaN < -Infinity < -100.0 < -1.0 < -0.5 < -0.0 < 0.0 < 0.5 < 1.0 < 100.0 < Infinity < NaN. -/// - For a **variable precision signed integer**, the ordering must follow mathematical ordering. -/// - For a **UTF-8 string**, the ordering must follow the UTF-8 byte ordering. -/// - For an **arbitrary byte array**, the ordering must follow the byte ordering. -/// -/// This means that the key part `1.0` is less than the key part `2.0`, but is -/// greater than the key part `0n`, because `1.0` is a double float and `0n` -/// is a variable precision signed integer, and the ordering types obviously has -/// precedence over the ordering within a type. -#[derive(Clone, Debug)] -pub enum KeyPart { - Bytes(Vec<u8>), - String(String), - Int(BigInt), - Float(f64), - False, - True, -} - -impl KeyPart { - fn tag_ordering(&self) -> u8 { - match self { - KeyPart::Bytes(_) => 0, - KeyPart::String(_) => 1, - KeyPart::Int(_) => 2, - KeyPart::Float(_) => 3, - KeyPart::False => 4, - KeyPart::True => 5, - } - } -} - -impl Eq for KeyPart {} - -impl PartialEq for KeyPart { - fn eq(&self, other: &Self) -> bool { - self.cmp(other) == Ordering::Equal - } -} - -impl Ord for KeyPart { - fn cmp(&self, other: &Self) -> Ordering { - match (self, other) { - (KeyPart::Bytes(b1), KeyPart::Bytes(b2)) => b1.cmp(b2), - (KeyPart::String(s1), KeyPart::String(s2)) => { - s1.as_bytes().cmp(s2.as_bytes()) - } - (KeyPart::Int(i1), KeyPart::Int(i2)) => i1.cmp(i2), - (KeyPart::Float(f1), KeyPart::Float(f2)) => { - canonicalize_f64(*f1).total_cmp(&canonicalize_f64(*f2)) - } - _ => self.tag_ordering().cmp(&other.tag_ordering()), - } - } -} - -impl PartialOrd for KeyPart { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -/// A request to read a range of keys from the database. If `end` is `None`, -/// then the range is from `start` shall also be used as the end of the range. -/// -/// The range is inclusive of the start and exclusive of the end. The start may -/// not be greater than the end. -/// -/// The range is limited to `limit` number of entries. -pub struct ReadRange { - pub start: Vec<u8>, - pub end: Vec<u8>, - pub limit: NonZeroU32, - pub reverse: bool, -} - -/// A response to a `ReadRange` request. -pub struct ReadRangeOutput { - pub entries: Vec<KvEntry>, -} - -/// A versionstamp is a 10 byte array that is used to represent the version of -/// a key in the database. -type Versionstamp = [u8; 10]; - -/// A key-value entry with a versionstamp. -pub struct KvEntry { - pub key: Vec<u8>, - pub value: Value, - pub versionstamp: Versionstamp, -} - -/// A serialized value for a KV pair as stored in the database. All values -/// **can** be serialized into the V8 representation, but not all values are. -/// -/// The V8 representation is an opaque byte array that is only meaningful to -/// the V8 engine. It is guaranteed to be backwards compatible. Because this -/// representation is opaque, it is not possible to inspect or modify the value -/// without deserializing it. -/// -/// The inability to inspect or modify the value without deserializing it means -/// that these values can not be quickly modified when performing atomic -/// read-modify-write operations on the database (because the database may not -/// have the ability to deserialize the V8 value into a modifiable value). -/// -/// Because of this constraint, there are more specialized representations for -/// certain types of values that can be used in atomic read-modify-write -/// operations. These specialized representations are: -/// -/// - **Bytes**: an arbitrary byte array. -/// - **U64**: a 64-bit unsigned integer. -pub enum Value { - V8(Vec<u8>), - Bytes(Vec<u8>), - U64(u64), -} - -/// A request to perform an atomic check-modify-write operation on the database. -/// -/// The operation is performed atomically, meaning that the operation will -/// either succeed or fail. If the operation fails, then the database will be -/// left in the same state as before the operation was attempted. If the -/// operation succeeds, then the database will be left in a new state. -/// -/// The operation is performed by first checking the database for the current -/// state of the keys, defined by the `checks` field. If the current state of -/// the keys does not match the expected state, then the operation fails. If -/// the current state of the keys matches the expected state, then the -/// mutations are applied to the database. -/// -/// All checks and mutations are performed atomically. -/// -/// The mutations are performed in the order that they are specified in the -/// `mutations` field. The order of checks is not specified, and is also not -/// important because this ordering is un-observable. -pub struct AtomicWrite { - pub checks: Vec<KvCheck>, - pub mutations: Vec<KvMutation>, - pub enqueues: Vec<Enqueue>, -} - -/// A request to perform a check on a key in the database. The check is not -/// performed on the value of the key, but rather on the versionstamp of the -/// key. -pub struct KvCheck { - pub key: Vec<u8>, - pub versionstamp: Option<Versionstamp>, -} - -/// A request to perform a mutation on a key in the database. The mutation is -/// performed on the value of the key. -/// -/// The type of mutation is specified by the `kind` field. The action performed -/// by each mutation kind is specified in the docs for [MutationKind]. -pub struct KvMutation { - pub key: Vec<u8>, - pub kind: MutationKind, - pub expire_at: Option<u64>, -} - -/// A request to enqueue a message to the database. This message is delivered -/// to a listener of the queue at least once. -/// -/// ## Retry -/// -/// When the delivery of a message fails, it is retried for a finite number -/// of times. Each retry happens after a backoff period. The backoff periods -/// are specified by the `backoff_schedule` field in milliseconds. If -/// unspecified, the default backoff schedule of the platform (CLI or Deploy) -/// is used. -/// -/// If all retry attempts failed, the message is written to the KV under all -/// keys specified in `keys_if_undelivered`. -pub struct Enqueue { - pub payload: Vec<u8>, - pub delay_ms: u64, - pub keys_if_undelivered: Vec<Vec<u8>>, - pub backoff_schedule: Option<Vec<u32>>, -} - -/// The type of mutation to perform on a key in the database. -/// -/// ## Set -/// -/// The set mutation sets the value of the key to the specified value. It -/// discards the previous value of the key, if any. -/// -/// This operand supports all [Value] types. -/// -/// ## Delete -/// -/// The delete mutation deletes the value of the key. -/// -/// ## Sum -/// -/// The sum mutation adds the specified value to the existing value of the key. -/// -/// This operand supports only value types [Value::U64]. The existing value in -/// the database must match the type of the value specified in the mutation. If -/// the key does not exist in the database, then the value specified in the -/// mutation is used as the new value of the key. -/// -/// ## Min -/// -/// The min mutation sets the value of the key to the minimum of the existing -/// value of the key and the specified value. -/// -/// This operand supports only value types [Value::U64]. The existing value in -/// the database must match the type of the value specified in the mutation. If -/// the key does not exist in the database, then the value specified in the -/// mutation is used as the new value of the key. -/// -/// ## Max -/// -/// The max mutation sets the value of the key to the maximum of the existing -/// value of the key and the specified value. -/// -/// This operand supports only value types [Value::U64]. The existing value in -/// the database must match the type of the value specified in the mutation. If -/// the key does not exist in the database, then the value specified in the -/// mutation is used as the new value of the key. -pub enum MutationKind { - Set(Value), - Delete, - Sum(Value), - Min(Value), - Max(Value), -} - -impl MutationKind { - pub fn value(&self) -> Option<&Value> { - match self { - MutationKind::Set(value) => Some(value), - MutationKind::Sum(value) => Some(value), - MutationKind::Min(value) => Some(value), - MutationKind::Max(value) => Some(value), - MutationKind::Delete => None, - } - } -} - -/// The result of a successful commit of an atomic write operation. -pub struct CommitResult { - /// The new versionstamp of the data that was committed. - pub versionstamp: Versionstamp, -} diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index fb68596fa..9e2273108 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -1,9 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -pub mod codec; pub mod dynamic; mod interface; -mod proto; pub mod remote; pub mod sqlite; mod time; @@ -12,11 +10,12 @@ use std::borrow::Cow; use std::cell::RefCell; use std::num::NonZeroU32; use std::rc::Rc; +use std::time::Duration; use base64::prelude::BASE64_URL_SAFE; use base64::Engine; -use codec::decode_key; -use codec::encode_key; +use chrono::DateTime; +use chrono::Utc; use deno_core::anyhow::Context; use deno_core::error::get_custom_error_class; use deno_core::error::type_error; @@ -30,8 +29,26 @@ use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ToJsBuffer; +use denokv_proto::decode_key; +use denokv_proto::encode_key; +use denokv_proto::AtomicWrite; +use denokv_proto::Check; +use denokv_proto::Consistency; +use denokv_proto::Database; +use denokv_proto::Enqueue; +use denokv_proto::Key; +use denokv_proto::KeyPart; +use denokv_proto::KvEntry; +use denokv_proto::KvValue; +use denokv_proto::Mutation; +use denokv_proto::MutationKind; +use denokv_proto::QueueMessageHandle; +use denokv_proto::ReadRange; +use denokv_proto::SnapshotReadOptions; +use log::debug; use serde::Deserialize; use serde::Serialize; +use time::utc_now; pub use crate::interface::*; @@ -110,30 +127,26 @@ where type KvKey = Vec<AnyValue>; -impl From<AnyValue> for KeyPart { - fn from(value: AnyValue) -> Self { - match value { - AnyValue::Bool(false) => KeyPart::False, - AnyValue::Bool(true) => KeyPart::True, - AnyValue::Number(n) => KeyPart::Float(n), - AnyValue::BigInt(n) => KeyPart::Int(n), - AnyValue::String(s) => KeyPart::String(s), - AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()), - AnyValue::RustBuffer(_) => unreachable!(), - } +fn key_part_from_v8(value: AnyValue) -> KeyPart { + match value { + AnyValue::Bool(false) => KeyPart::False, + AnyValue::Bool(true) => KeyPart::True, + AnyValue::Number(n) => KeyPart::Float(n), + AnyValue::BigInt(n) => KeyPart::Int(n), + AnyValue::String(s) => KeyPart::String(s), + AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()), + AnyValue::RustBuffer(_) => unreachable!(), } } -impl From<KeyPart> for AnyValue { - fn from(value: KeyPart) -> Self { - match value { - KeyPart::False => AnyValue::Bool(false), - KeyPart::True => AnyValue::Bool(true), - KeyPart::Float(n) => AnyValue::Number(n), - KeyPart::Int(n) => AnyValue::BigInt(n), - KeyPart::String(s) => AnyValue::String(s), - KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()), - } +fn key_part_to_v8(value: KeyPart) -> AnyValue { + match value { + KeyPart::False => AnyValue::Bool(false), + KeyPart::True => AnyValue::Bool(true), + KeyPart::Float(n) => AnyValue::Number(n), + KeyPart::Int(n) => AnyValue::BigInt(n), + KeyPart::String(s) => AnyValue::String(s), + KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()), } } @@ -153,25 +166,25 @@ enum ToV8Value { U64(BigInt), } -impl TryFrom<FromV8Value> for Value { +impl TryFrom<FromV8Value> for KvValue { type Error = AnyError; fn try_from(value: FromV8Value) -> Result<Self, AnyError> { Ok(match value { - FromV8Value::V8(buf) => Value::V8(buf.to_vec()), - FromV8Value::Bytes(buf) => Value::Bytes(buf.to_vec()), + FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()), + FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()), FromV8Value::U64(n) => { - Value::U64(num_bigint::BigInt::from(n).try_into()?) + KvValue::U64(num_bigint::BigInt::from(n).try_into()?) } }) } } -impl From<Value> for ToV8Value { - fn from(value: Value) -> Self { +impl From<KvValue> for ToV8Value { + fn from(value: KvValue) -> Self { match value { - Value::V8(buf) => ToV8Value::V8(buf.into()), - Value::Bytes(buf) => ToV8Value::Bytes(buf.into()), - Value::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()), + KvValue::V8(buf) => ToV8Value::V8(buf.into()), + KvValue::Bytes(buf) => ToV8Value::Bytes(buf.into()), + KvValue::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()), } } } @@ -190,7 +203,7 @@ impl TryFrom<KvEntry> for ToV8KvEntry { key: decode_key(&entry.key)? .0 .into_iter() - .map(Into::into) + .map(key_part_to_v8) .collect(), value: entry.value.into(), versionstamp: hex::encode(entry.versionstamp).into(), @@ -282,8 +295,7 @@ where let opts = SnapshotReadOptions { consistency: consistency.into(), }; - let output_ranges = - db.snapshot_read(state.clone(), read_ranges, opts).await?; + let output_ranges = db.snapshot_read(read_ranges, opts).await?; let output_ranges = output_ranges .into_iter() .map(|x| { @@ -302,7 +314,7 @@ struct QueueMessageResource<QPH: QueueMessageHandle + 'static> { impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> { fn name(&self) -> Cow<str> { - "queue_message".into() + "queueMessage".into() } } @@ -331,7 +343,7 @@ where resource.db.clone() }; - let Some(mut handle) = db.dequeue_next_message(state.clone()).await? else { + let Some(mut handle) = db.dequeue_next_message().await? else { return Ok(None); }; let payload = handle.take_payload().await?.into(); @@ -361,81 +373,81 @@ where .map_err(|_| type_error("Queue message not found"))? .handle }; - handle.finish(success).await + // if we fail to finish the message, there is not much we can do and the + // message will be retried anyway, so we just ignore the error + if let Err(err) = handle.finish(success).await { + debug!("Failed to finish dequeued message: {}", err); + }; + Ok(()) } type V8KvCheck = (KvKey, Option<ByteString>); -impl TryFrom<V8KvCheck> for KvCheck { - type Error = AnyError; - fn try_from(value: V8KvCheck) -> Result<Self, AnyError> { - let versionstamp = match value.1 { - Some(data) => { - let mut out = [0u8; 10]; - hex::decode_to_slice(data, &mut out) - .map_err(|_| type_error("invalid versionstamp"))?; - Some(out) - } - None => None, - }; - Ok(KvCheck { - key: encode_v8_key(value.0)?, - versionstamp, - }) - } +fn check_from_v8(value: V8KvCheck) -> Result<Check, AnyError> { + let versionstamp = match value.1 { + Some(data) => { + let mut out = [0u8; 10]; + hex::decode_to_slice(data, &mut out) + .map_err(|_| type_error("invalid versionstamp"))?; + Some(out) + } + None => None, + }; + Ok(Check { + key: encode_v8_key(value.0)?, + versionstamp, + }) } type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>); -impl TryFrom<(V8KvMutation, u64)> for KvMutation { - type Error = AnyError; - fn try_from( - (value, current_timstamp): (V8KvMutation, u64), - ) -> Result<Self, AnyError> { - let key = encode_v8_key(value.0)?; - let kind = match (value.1.as_str(), value.2) { - ("set", Some(value)) => MutationKind::Set(value.try_into()?), - ("delete", None) => MutationKind::Delete, - ("sum", Some(value)) => MutationKind::Sum(value.try_into()?), - ("min", Some(value)) => MutationKind::Min(value.try_into()?), - ("max", Some(value)) => MutationKind::Max(value.try_into()?), - (op, Some(_)) => { - return Err(type_error(format!("invalid mutation '{op}' with value"))) - } - (op, None) => { - return Err(type_error(format!( - "invalid mutation '{op}' without value" - ))) - } - }; - Ok(KvMutation { - key, - kind, - expire_at: value.3.map(|expire_in| current_timstamp + expire_in), - }) - } +fn mutation_from_v8( + (value, current_timstamp): (V8KvMutation, DateTime<Utc>), +) -> Result<Mutation, AnyError> { + let key = encode_v8_key(value.0)?; + let kind = match (value.1.as_str(), value.2) { + ("set", Some(value)) => MutationKind::Set(value.try_into()?), + ("delete", None) => MutationKind::Delete, + ("sum", Some(value)) => MutationKind::Sum(value.try_into()?), + ("min", Some(value)) => MutationKind::Min(value.try_into()?), + ("max", Some(value)) => MutationKind::Max(value.try_into()?), + (op, Some(_)) => { + return Err(type_error(format!("invalid mutation '{op}' with value"))) + } + (op, None) => { + return Err(type_error(format!("invalid mutation '{op}' without value"))) + } + }; + Ok(Mutation { + key, + kind, + expire_at: value + .3 + .map(|expire_in| current_timstamp + Duration::from_millis(expire_in)), + }) } type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>); -impl TryFrom<V8Enqueue> for Enqueue { - type Error = AnyError; - fn try_from(value: V8Enqueue) -> Result<Self, AnyError> { - Ok(Enqueue { - payload: value.0.to_vec(), - delay_ms: value.1, - keys_if_undelivered: value - .2 - .into_iter() - .map(encode_v8_key) - .collect::<std::io::Result<_>>()?, - backoff_schedule: value.3, - }) - } +fn enqueue_from_v8( + value: V8Enqueue, + current_timestamp: DateTime<Utc>, +) -> Result<Enqueue, AnyError> { + Ok(Enqueue { + payload: value.0.to_vec(), + deadline: current_timestamp + + chrono::Duration::milliseconds(value.1 as i64), + keys_if_undelivered: value + .2 + .into_iter() + .map(encode_v8_key) + .collect::<std::io::Result<_>>()?, + backoff_schedule: value.3, + }) } fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> { - encode_key(&Key(key.into_iter().map(From::from).collect())) + encode_key(&Key(key.into_iter().map(key_part_from_v8).collect())) } enum RawSelector { @@ -610,7 +622,7 @@ async fn op_kv_atomic_write<DBH>( where DBH: DatabaseHandler + 'static, { - let current_timestamp = time::utc_now().timestamp_millis() as u64; + let current_timestamp = utc_now(); let db = { let state = state.borrow(); let resource = @@ -631,17 +643,17 @@ where let checks = checks .into_iter() - .map(TryInto::try_into) - .collect::<Result<Vec<KvCheck>, AnyError>>() + .map(check_from_v8) + .collect::<Result<Vec<Check>, AnyError>>() .with_context(|| "invalid check")?; let mutations = mutations .into_iter() - .map(|mutation| TryFrom::try_from((mutation, current_timestamp))) - .collect::<Result<Vec<KvMutation>, AnyError>>() + .map(|mutation| mutation_from_v8((mutation, current_timestamp))) + .collect::<Result<Vec<Mutation>, AnyError>>() .with_context(|| "invalid mutation")?; let enqueues = enqueues .into_iter() - .map(TryInto::try_into) + .map(|e| enqueue_from_v8(e, current_timestamp)) .collect::<Result<Vec<Enqueue>, AnyError>>() .with_context(|| "invalid enqueue")?; @@ -690,7 +702,7 @@ where enqueues, }; - let result = db.atomic_write(state.clone(), atomic_write).await?; + let result = db.atomic_write(atomic_write).await?; Ok(result.map(|res| hex::encode(res.versionstamp))) } @@ -732,11 +744,11 @@ fn check_write_key_size(key: &[u8]) -> Result<usize, AnyError> { } } -fn check_value_size(value: &Value) -> Result<usize, AnyError> { +fn check_value_size(value: &KvValue) -> Result<usize, AnyError> { let payload = match value { - Value::Bytes(x) => x, - Value::V8(x) => x, - Value::U64(_) => return Ok(8), + KvValue::Bytes(x) => x, + KvValue::V8(x) => x, + KvValue::U64(_) => return Ok(8), }; if payload.len() > MAX_VALUE_SIZE_BYTES { diff --git a/ext/kv/proto/datapath.proto b/ext/kv/proto/datapath.proto deleted file mode 100644 index 59793000b..000000000 --- a/ext/kv/proto/datapath.proto +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -syntax = "proto3"; - -package datapath; - -message SnapshotRead { - repeated ReadRange ranges = 1; -} - -message SnapshotReadOutput { - repeated ReadRangeOutput ranges = 1; - bool read_disabled = 2; - repeated string regions_if_read_disabled = 3; - bool read_is_strongly_consistent = 4; - string primary_if_not_strongly_consistent = 5; -} - -message ReadRange { - bytes start = 1; - bytes end = 2; - int32 limit = 3; - bool reverse = 4; -} - -message ReadRangeOutput { - repeated KvEntry values = 1; -} - -message AtomicWrite { - repeated KvCheck kv_checks = 1; - repeated KvMutation kv_mutations = 2; - repeated Enqueue enqueues = 3; -} - -message AtomicWriteOutput { - AtomicWriteStatus status = 1; - bytes versionstamp = 2; - string primary_if_write_disabled = 3; -} - -message KvCheck { - bytes key = 1; - bytes versionstamp = 2; // 10-byte raw versionstamp -} - -message KvMutation { - bytes key = 1; - KvValue value = 2; - KvMutationType mutation_type = 3; - int64 expire_at_ms = 4; -} - -message KvValue { - bytes data = 1; - KvValueEncoding encoding = 2; -} - -message KvEntry { - bytes key = 1; - bytes value = 2; - KvValueEncoding encoding = 3; - bytes versionstamp = 4; -} - -enum KvMutationType { - M_UNSPECIFIED = 0; - M_SET = 1; - M_CLEAR = 2; - M_SUM = 3; - M_MAX = 4; - M_MIN = 5; -} - -enum KvValueEncoding { - VE_UNSPECIFIED = 0; - VE_V8 = 1; - VE_LE64 = 2; - VE_BYTES = 3; -} - -enum AtomicWriteStatus { - AW_UNSPECIFIED = 0; - AW_SUCCESS = 1; - AW_CHECK_FAILURE = 2; - AW_UNSUPPORTED_WRITE = 3; - AW_USAGE_LIMIT_EXCEEDED = 4; - AW_WRITE_DISABLED = 5; - AW_QUEUE_BACKLOG_LIMIT_EXCEEDED = 6; -} - -message Enqueue { - bytes payload = 1; - int64 deadline_ms = 2; - repeated bytes kv_keys_if_undelivered = 3; - repeated uint32 backoff_schedule = 4; -} diff --git a/ext/kv/proto/mod.rs b/ext/kv/proto/mod.rs deleted file mode 100644 index d258a0551..000000000 --- a/ext/kv/proto/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -// Generated code, disable lints -#[allow(clippy::all, non_snake_case)] -pub mod datapath { - include!(concat!(env!("OUT_DIR"), "/datapath.rs")); -} diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs index 0a061b35b..7cac6b9c3 100644 --- a/ext/kv/remote.rs +++ b/ext/kv/remote.rs @@ -1,43 +1,42 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use std::cell::RefCell; -use std::fmt; -use std::io::Write; use std::marker::PhantomData; use std::rc::Rc; use std::sync::Arc; -use std::time::Duration; -use crate::proto::datapath as pb; -use crate::AtomicWrite; -use crate::CommitResult; -use crate::Database; use crate::DatabaseHandler; -use crate::KvEntry; -use crate::MutationKind; -use crate::QueueMessageHandle; -use crate::ReadRange; -use crate::ReadRangeOutput; -use crate::SnapshotReadOptions; use anyhow::Context; use async_trait::async_trait; -use chrono::DateTime; -use chrono::Utc; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures::TryFutureExt; -use deno_core::unsync::JoinHandle; use deno_core::OpState; -use prost::Message; -use rand::Rng; -use serde::Deserialize; -use termcolor::Ansi; -use termcolor::Color; -use termcolor::ColorSpec; -use termcolor::WriteColor; -use tokio::sync::watch; +use deno_fetch::create_http_client; +use deno_fetch::CreateHttpClientOptions; +use deno_tls::rustls::RootCertStore; +use deno_tls::Proxy; +use deno_tls::RootCertStoreProvider; +use denokv_remote::MetadataEndpoint; +use denokv_remote::Remote; use url::Url; -use uuid::Uuid; + +#[derive(Clone)] +pub struct HttpOptions { + pub user_agent: String, + pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>, + pub proxy: Option<Proxy>, + pub unsafely_ignore_certificate_errors: Option<Vec<String>>, + pub client_cert_chain_and_key: Option<(String, String)>, +} + +impl HttpOptions { + pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, AnyError> { + Ok(match &self.root_cert_store_provider { + Some(provider) => Some(provider.get_or_try_init()?.clone()), + None => None, + }) + } +} pub trait RemoteDbHandlerPermissions { fn check_env(&mut self, var: &str) -> Result<(), AnyError>; @@ -49,50 +48,39 @@ pub trait RemoteDbHandlerPermissions { } pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> { + http_options: HttpOptions, _p: std::marker::PhantomData<P>, } impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> { - pub fn new() -> Self { - Self { _p: PhantomData } - } -} - -impl<P: RemoteDbHandlerPermissions> Default for RemoteDbHandler<P> { - fn default() -> Self { - Self::new() + pub fn new(http_options: HttpOptions) -> Self { + Self { + http_options, + _p: PhantomData, + } } } -#[derive(Deserialize)] -struct VersionInfo { - version: u64, -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -#[allow(dead_code)] -struct DatabaseMetadata { - version: u64, - database_id: Uuid, - endpoints: Vec<EndpointInfo>, - token: String, - expires_at: DateTime<Utc>, +pub struct PermissionChecker<P: RemoteDbHandlerPermissions> { + state: Rc<RefCell<OpState>>, + _permissions: PhantomData<P>, } -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct EndpointInfo { - pub url: String, - - // Using `String` instead of an enum, so that parsing doesn't - // break if more consistency levels are added. - pub consistency: String, +impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions + for PermissionChecker<P> +{ + fn check_net_url(&self, url: &Url) -> Result<(), anyhow::Error> { + let mut state = self.state.borrow_mut(); + let permissions = state.borrow_mut::<P>(); + permissions.check_net_url(url, "Deno.openKv") + } } #[async_trait(?Send)] -impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> { - type DB = RemoteDb<P>; +impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler + for RemoteDbHandler<P> +{ + type DB = Remote<PermissionChecker<P>>; async fn open( &self, @@ -122,470 +110,36 @@ impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> { "Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account." })?; - let refresher = MetadataRefresher::new(url, access_token); - - let db = RemoteDb { - client: reqwest::Client::new(), - refresher, - _p: PhantomData, - }; - Ok(db) - } -} - -pub struct RemoteDb<P: RemoteDbHandlerPermissions + 'static> { - client: reqwest::Client, - refresher: MetadataRefresher, - _p: std::marker::PhantomData<P>, -} - -pub struct DummyQueueMessageHandle {} - -#[async_trait(?Send)] -impl QueueMessageHandle for DummyQueueMessageHandle { - async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> { - unimplemented!() - } - - async fn finish(&self, _success: bool) -> Result<(), AnyError> { - unimplemented!() - } -} - -#[async_trait(?Send)] -impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> { - type QMH = DummyQueueMessageHandle; - - async fn snapshot_read( - &self, - state: Rc<RefCell<OpState>>, - requests: Vec<ReadRange>, - _options: SnapshotReadOptions, - ) -> Result<Vec<ReadRangeOutput>, AnyError> { - let req = pb::SnapshotRead { - ranges: requests - .into_iter() - .map(|r| pb::ReadRange { - start: r.start, - end: r.end, - limit: r.limit.get() as _, - reverse: r.reverse, - }) - .collect(), - }; - - let res: pb::SnapshotReadOutput = call_remote::<P, _, _>( - &state, - &self.refresher, - &self.client, - "snapshot_read", - &req, - ) - .await?; - - if res.read_disabled { - return Err(type_error("Reads are disabled for this database.")); - } - - let out = res - .ranges - .into_iter() - .map(|r| { - Ok(ReadRangeOutput { - entries: r - .values - .into_iter() - .map(|e| { - let encoding = e.encoding(); - Ok(KvEntry { - key: e.key, - value: decode_value(e.value, encoding)?, - versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?, - }) - }) - .collect::<Result<_, AnyError>>()?, - }) - }) - .collect::<Result<Vec<_>, AnyError>>()?; - Ok(out) - } - - async fn atomic_write( - &self, - state: Rc<RefCell<OpState>>, - write: AtomicWrite, - ) -> Result<Option<CommitResult>, AnyError> { - if !write.enqueues.is_empty() { - return Err(type_error("Enqueue operations are not supported yet.")); - } - - let req = pb::AtomicWrite { - kv_checks: write - .checks - .into_iter() - .map(|x| { - Ok(pb::KvCheck { - key: x.key, - versionstamp: x.versionstamp.unwrap_or([0u8; 10]).to_vec(), - }) - }) - .collect::<anyhow::Result<_>>()?, - kv_mutations: write.mutations.into_iter().map(encode_mutation).collect(), - enqueues: vec![], - }; - - let res: pb::AtomicWriteOutput = call_remote::<P, _, _>( - &state, - &self.refresher, - &self.client, - "atomic_write", - &req, - ) - .await?; - match res.status() { - pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult { - versionstamp: if res.versionstamp.is_empty() { - Default::default() - } else { - res.versionstamp[..].try_into()? - }, - })), - pb::AtomicWriteStatus::AwCheckFailure => Ok(None), - pb::AtomicWriteStatus::AwUnsupportedWrite => { - Err(type_error("Unsupported write")) - } - pb::AtomicWriteStatus::AwUsageLimitExceeded => { - Err(type_error("The database usage limit has been exceeded.")) - } - pb::AtomicWriteStatus::AwWriteDisabled => { - // TODO: Auto retry - Err(type_error("Writes are disabled for this database.")) - } - pb::AtomicWriteStatus::AwUnspecified => { - Err(type_error("Unspecified error")) - } - pb::AtomicWriteStatus::AwQueueBacklogLimitExceeded => { - Err(type_error("Queue backlog limit exceeded")) - } - } - } - - async fn dequeue_next_message( - &self, - _state: Rc<RefCell<OpState>>, - ) -> Result<Option<Self::QMH>, AnyError> { - let msg = "Deno.Kv.listenQueue is not supported for remote KV databases"; - eprintln!("{}", yellow(msg)); - deno_core::futures::future::pending().await - } - - fn close(&self) {} -} - -fn yellow<S: AsRef<str>>(s: S) -> impl fmt::Display { - if std::env::var_os("NO_COLOR").is_some() { - return String::from(s.as_ref()); - } - let mut style_spec = ColorSpec::new(); - style_spec.set_fg(Some(Color::Yellow)); - let mut v = Vec::new(); - let mut ansi_writer = Ansi::new(&mut v); - ansi_writer.set_color(&style_spec).unwrap(); - ansi_writer.write_all(s.as_ref().as_bytes()).unwrap(); - ansi_writer.reset().unwrap(); - String::from_utf8_lossy(&v).into_owned() -} - -fn decode_value( - value: Vec<u8>, - encoding: pb::KvValueEncoding, -) -> anyhow::Result<crate::Value> { - match encoding { - pb::KvValueEncoding::VeV8 => Ok(crate::Value::V8(value)), - pb::KvValueEncoding::VeBytes => Ok(crate::Value::Bytes(value)), - pb::KvValueEncoding::VeLe64 => Ok(crate::Value::U64(u64::from_le_bytes( - <[u8; 8]>::try_from(&value[..])?, - ))), - pb::KvValueEncoding::VeUnspecified => { - Err(anyhow::anyhow!("Unspecified value encoding, cannot decode")) - } - } -} - -fn encode_value(value: crate::Value) -> pb::KvValue { - match value { - crate::Value::V8(data) => pb::KvValue { - data, - encoding: pb::KvValueEncoding::VeV8 as _, - }, - crate::Value::Bytes(data) => pb::KvValue { - data, - encoding: pb::KvValueEncoding::VeBytes as _, - }, - crate::Value::U64(x) => pb::KvValue { - data: x.to_le_bytes().to_vec(), - encoding: pb::KvValueEncoding::VeLe64 as _, - }, - } -} - -fn encode_mutation(m: crate::KvMutation) -> pb::KvMutation { - let key = m.key; - let expire_at_ms = - m.expire_at.and_then(|x| i64::try_from(x).ok()).unwrap_or(0); - - match m.kind { - MutationKind::Set(x) => pb::KvMutation { - key, - value: Some(encode_value(x)), - mutation_type: pb::KvMutationType::MSet as _, - expire_at_ms, - }, - MutationKind::Delete => pb::KvMutation { - key, - value: Some(encode_value(crate::Value::Bytes(vec![]))), - mutation_type: pb::KvMutationType::MClear as _, - expire_at_ms, - }, - MutationKind::Max(x) => pb::KvMutation { - key, - value: Some(encode_value(x)), - mutation_type: pb::KvMutationType::MMax as _, - expire_at_ms, - }, - MutationKind::Min(x) => pb::KvMutation { - key, - value: Some(encode_value(x)), - mutation_type: pb::KvMutationType::MMin as _, - expire_at_ms, - }, - MutationKind::Sum(x) => pb::KvMutation { - key, - value: Some(encode_value(x)), - mutation_type: pb::KvMutationType::MSum as _, - expire_at_ms, - }, - } -} - -#[derive(Clone)] -enum MetadataState { - Ready(Arc<DatabaseMetadata>), - Invalid(String), - Pending, -} - -struct MetadataRefresher { - metadata_rx: watch::Receiver<MetadataState>, - handle: JoinHandle<()>, -} - -impl MetadataRefresher { - pub fn new(url: String, access_token: String) -> Self { - let (tx, rx) = watch::channel(MetadataState::Pending); - let handle = - deno_core::unsync::spawn(metadata_refresh_task(url, access_token, tx)); - Self { - handle, - metadata_rx: rx, - } - } -} - -impl Drop for MetadataRefresher { - fn drop(&mut self) { - self.handle.abort(); - } -} - -async fn metadata_refresh_task( - metadata_url: String, - access_token: String, - tx: watch::Sender<MetadataState>, -) { - let client = reqwest::Client::new(); - loop { - let mut attempt = 0u64; - let metadata = loop { - match fetch_metadata(&client, &metadata_url, &access_token).await { - Ok(Ok(x)) => break x, - Ok(Err(e)) => { - if tx.send(MetadataState::Invalid(e)).is_err() { - return; - } - } - Err(e) => { - log::error!("Failed to fetch database metadata: {}", e); - } - } - randomized_exponential_backoff(Duration::from_secs(5), attempt).await; - attempt += 1; + let metadata_endpoint = MetadataEndpoint { + url: parsed_url.clone(), + access_token: access_token.clone(), }; - let ms_until_expire = u64::try_from( - metadata - .expires_at - .timestamp_millis() - .saturating_sub(crate::time::utc_now().timestamp_millis()), - ) - .unwrap_or_default(); - - // Refresh 10 minutes before expiry - // In case of buggy clocks, don't refresh more than once per minute - let interval = Duration::from_millis(ms_until_expire) - .saturating_sub(Duration::from_secs(600)) - .max(Duration::from_secs(60)); - - if tx.send(MetadataState::Ready(Arc::new(metadata))).is_err() { - return; - } - - tokio::time::sleep(interval).await; - } -} - -async fn fetch_metadata( - client: &reqwest::Client, - metadata_url: &str, - access_token: &str, -) -> anyhow::Result<Result<DatabaseMetadata, String>> { - let res = client - .post(metadata_url) - .header("authorization", format!("Bearer {}", access_token)) - .send() - .await?; - - if !res.status().is_success() { - if res.status().is_client_error() { - return Ok(Err(format!( - "Client error while fetching metadata: {:?} {}", - res.status(), - res.text().await? - ))); - } else { - anyhow::bail!( - "remote returned error: {:?} {}", - res.status(), - res.text().await? - ); - } - } - - let res = res.bytes().await?; - let version_info: VersionInfo = match serde_json::from_slice(&res) { - Ok(x) => x, - Err(e) => return Ok(Err(format!("Failed to decode version info: {}", e))), - }; - if version_info.version > 1 { - return Ok(Err(format!( - "Unsupported metadata version: {}", - version_info.version - ))); - } - - Ok( - serde_json::from_slice(&res) - .map_err(|e| format!("Failed to decode metadata: {}", e)), - ) -} - -async fn randomized_exponential_backoff(base: Duration, attempt: u64) { - let attempt = attempt.min(12); - let delay = base.as_millis() as u64 + (2 << attempt); - let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1); - tokio::time::sleep(std::time::Duration::from_millis(delay)).await; -} - -async fn call_remote< - P: RemoteDbHandlerPermissions + 'static, - T: Message, - R: Message + Default, ->( - state: &RefCell<OpState>, - refresher: &MetadataRefresher, - client: &reqwest::Client, - method: &str, - req: &T, -) -> anyhow::Result<R> { - let mut attempt = 0u64; - let res = loop { - let mut metadata_rx = refresher.metadata_rx.clone(); - let metadata = loop { - match &*metadata_rx.borrow() { - MetadataState::Pending => {} - MetadataState::Ready(x) => break x.clone(), - MetadataState::Invalid(e) => { - return Err(type_error(format!("Metadata error: {}", e))) - } - } - // `unwrap()` never fails because `tx` is owned by the task held by `refresher`. - metadata_rx.changed().await.unwrap(); - }; - let Some(sc_endpoint) = metadata - .endpoints - .iter() - .find(|x| x.consistency == "strong") - else { - return Err(type_error( - "No strong consistency endpoint is available for this database", - )); + let options = &self.http_options; + let client = create_http_client( + &options.user_agent, + CreateHttpClientOptions { + root_cert_store: options.root_cert_store()?, + ca_certs: vec![], + proxy: options.proxy.clone(), + unsafely_ignore_certificate_errors: options + .unsafely_ignore_certificate_errors + .clone(), + client_cert_chain_and_key: options.client_cert_chain_and_key.clone(), + pool_max_idle_per_host: None, + pool_idle_timeout: None, + http1: true, + http2: true, + }, + )?; + + let permissions = PermissionChecker { + state: state.clone(), + _permissions: PhantomData, }; - let full_url = format!("{}/{}", sc_endpoint.url, method); - { - let parsed_url = Url::parse(&full_url)?; - let mut state = state.borrow_mut(); - let permissions = state.borrow_mut::<P>(); - permissions.check_net_url(&parsed_url, "Deno.Kv")?; - } - - let res = client - .post(&full_url) - .header("x-transaction-domain-id", metadata.database_id.to_string()) - .header("authorization", format!("Bearer {}", metadata.token)) - .body(req.encode_to_vec()) - .send() - .map_err(anyhow::Error::from) - .and_then(|x| async move { - if x.status().is_success() { - Ok(Ok(x.bytes().await?)) - } else if x.status().is_client_error() { - Ok(Err((x.status(), x.text().await?))) - } else { - Err(anyhow::anyhow!( - "server error ({:?}): {}", - x.status(), - x.text().await? - )) - } - }) - .await; - - match res { - Ok(x) => break x, - Err(e) => { - log::error!("retryable error in {}: {}", method, e); - randomized_exponential_backoff(Duration::from_millis(0), attempt).await; - attempt += 1; - } - } - }; - - let res = match res { - Ok(x) => x, - Err((status, message)) => { - return Err(type_error(format!( - "client error in {} (status {:?}): {}", - method, status, message - ))) - } - }; + let remote = Remote::new(client, permissions, metadata_endpoint); - match R::decode(&*res) { - Ok(x) => Ok(x), - Err(e) => Err(type_error(format!( - "failed to decode response from {}: {}", - method, e - ))), + Ok(remote) } } diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index 327091f05..b4e251f96 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -1,176 +1,37 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use std::borrow::Cow; use std::cell::RefCell; use std::collections::HashMap; use std::env::current_dir; -use std::future::Future; use std::io::ErrorKind; use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; -use std::rc::Weak; use std::sync::Arc; use std::sync::Mutex; -use std::time::Duration; -use std::time::SystemTime; +use std::sync::OnceLock; use async_trait::async_trait; -use deno_core::error::get_custom_error_class; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures; -use deno_core::futures::FutureExt; -use deno_core::unsync::spawn; use deno_core::unsync::spawn_blocking; -use deno_core::AsyncRefCell; use deno_core::OpState; use deno_node::PathClean; -use rand::Rng; -use rusqlite::params; +pub use denokv_sqlite::TypeError; +use rand::RngCore; +use rand::SeedableRng; use rusqlite::OpenFlags; -use rusqlite::OptionalExtension; -use rusqlite::Transaction; -use tokio::sync::broadcast; -use tokio::sync::broadcast::error::RecvError; -use tokio::sync::mpsc; -use tokio::sync::watch; -use tokio::sync::OnceCell; -use tokio::sync::OwnedSemaphorePermit; -use tokio::sync::Semaphore; -use uuid::Uuid; +use tokio::sync::Notify; -use crate::AtomicWrite; -use crate::CommitResult; -use crate::Database; use crate::DatabaseHandler; -use crate::KvEntry; -use crate::MutationKind; -use crate::QueueMessageHandle; -use crate::ReadRange; -use crate::ReadRangeOutput; -use crate::SnapshotReadOptions; -use crate::Value; -const STATEMENT_INC_AND_GET_DATA_VERSION: &str = - "update data_version set version = version + 1 where k = 0 returning version"; -const STATEMENT_KV_RANGE_SCAN: &str = - "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k asc limit ?"; -const STATEMENT_KV_RANGE_SCAN_REVERSE: &str = - "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k desc limit ?"; -const STATEMENT_KV_POINT_GET_VALUE_ONLY: &str = - "select v, v_encoding from kv where k = ?"; -const STATEMENT_KV_POINT_GET_VERSION_ONLY: &str = - "select version from kv where k = ?"; -const STATEMENT_KV_POINT_SET: &str = - "insert into kv (k, v, v_encoding, version, expiration_ms) values (:k, :v, :v_encoding, :version, :expiration_ms) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version, expiration_ms = :expiration_ms"; -const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?"; - -const STATEMENT_QUEUE_ADD_READY: &str = "insert into queue (ts, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)"; -const STATEMENT_QUEUE_GET_NEXT_READY: &str = "select ts, id, data, backoff_schedule, keys_if_undelivered from queue where ts <= ? order by ts limit 100"; -const STATEMENT_QUEUE_GET_EARLIEST_READY: &str = - "select ts from queue order by ts limit 1"; -const STATEMENT_QUEUE_REMOVE_READY: &str = "delete from queue where id = ?"; -const STATEMENT_QUEUE_ADD_RUNNING: &str = "insert into queue_running (deadline, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)"; -const STATEMENT_QUEUE_REMOVE_RUNNING: &str = - "delete from queue_running where id = ?"; -const STATEMENT_QUEUE_GET_RUNNING_BY_ID: &str = "select deadline, id, data, backoff_schedule, keys_if_undelivered from queue_running where id = ?"; -const STATEMENT_QUEUE_GET_RUNNING: &str = - "select id from queue_running order by deadline limit 100"; - -const STATEMENT_CREATE_MIGRATION_TABLE: &str = " -create table if not exists migration_state( - k integer not null primary key, - version integer not null -) -"; - -const MIGRATIONS: [&str; 3] = [ - " -create table data_version ( - k integer primary key, - version integer not null -); -insert into data_version (k, version) values (0, 0); -create table kv ( - k blob primary key, - v blob not null, - v_encoding integer not null, - version integer not null -) without rowid; -", - " -create table queue ( - ts integer not null, - id text not null, - data blob not null, - backoff_schedule text not null, - keys_if_undelivered blob not null, - - primary key (ts, id) -); -create table queue_running( - deadline integer not null, - id text not null, - data blob not null, - backoff_schedule text not null, - keys_if_undelivered blob not null, - - primary key (deadline, id) -); -", - " -alter table kv add column seq integer not null default 0; -alter table data_version add column seq integer not null default 0; -alter table kv add column expiration_ms integer not null default -1; -create index kv_expiration_ms_idx on kv (expiration_ms); -", -]; - -const DISPATCH_CONCURRENCY_LIMIT: usize = 100; -const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1000, 5000, 30000, 60000]; - -const ERROR_USING_CLOSED_DATABASE: &str = "Attempted to use a closed database"; - -#[derive(Clone)] -struct ProtectedConn { - guard: Rc<AsyncRefCell<()>>, - conn: Arc<Mutex<Option<rusqlite::Connection>>>, -} - -#[derive(Clone)] -struct WeakProtectedConn { - guard: Weak<AsyncRefCell<()>>, - conn: std::sync::Weak<Mutex<Option<rusqlite::Connection>>>, -} - -impl ProtectedConn { - fn new(conn: rusqlite::Connection) -> Self { - Self { - guard: Rc::new(AsyncRefCell::new(())), - conn: Arc::new(Mutex::new(Some(conn))), - } - } - - fn downgrade(&self) -> WeakProtectedConn { - WeakProtectedConn { - guard: Rc::downgrade(&self.guard), - conn: Arc::downgrade(&self.conn), - } - } -} - -impl WeakProtectedConn { - fn upgrade(&self) -> Option<ProtectedConn> { - let guard = self.guard.upgrade()?; - let conn = self.conn.upgrade()?; - Some(ProtectedConn { guard, conn }) - } -} +static QUEUE_WAKER_MAP: OnceLock<Mutex<HashMap<PathBuf, Arc<Notify>>>> = + OnceLock::new(); pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> { pub default_storage_dir: Option<PathBuf>, + versionstamp_rng_seed: Option<u64>, _permissions: PhantomData<P>, } @@ -180,9 +41,13 @@ pub trait SqliteDbHandlerPermissions { } impl<P: SqliteDbHandlerPermissions> SqliteDbHandler<P> { - pub fn new(default_storage_dir: Option<PathBuf>) -> Self { + pub fn new( + default_storage_dir: Option<PathBuf>, + versionstamp_rng_seed: Option<u64>, + ) -> Self { Self { default_storage_dir, + versionstamp_rng_seed, _permissions: PhantomData, } } @@ -190,7 +55,7 @@ impl<P: SqliteDbHandlerPermissions> SqliteDbHandler<P> { #[async_trait(?Send)] impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { - type DB = SqliteDb; + type DB = denokv_sqlite::Sqlite; async fn open( &self, @@ -218,866 +83,61 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { } } - let (conn, queue_waker_key) = sqlite_retry_loop(|| { - let path = path.clone(); - let default_storage_dir = self.default_storage_dir.clone(); - async move { - spawn_blocking(move || { - let (conn, queue_waker_key) = - match (path.as_deref(), &default_storage_dir) { - (Some(":memory:"), _) | (None, None) => { - (rusqlite::Connection::open_in_memory()?, None) - } - (Some(path), _) => { - let flags = - OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); - let resolved_path = canonicalize_path(&PathBuf::from(path))?; - ( - rusqlite::Connection::open_with_flags(path, flags)?, - Some(resolved_path), - ) - } - (None, Some(path)) => { - std::fs::create_dir_all(path)?; - let path = path.join("kv.sqlite3"); - (rusqlite::Connection::open(path.clone())?, Some(path)) - } - }; - - conn.pragma_update(None, "journal_mode", "wal")?; - - Ok::<_, AnyError>((conn, queue_waker_key)) - }) - .await - .unwrap() - } - }) - .await?; - let conn = ProtectedConn::new(conn); - SqliteDb::run_tx(conn.clone(), |tx| { - tx.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?; - - let current_version: usize = tx - .query_row( - "select version from migration_state where k = 0", - [], - |row| row.get(0), - ) - .optional()? - .unwrap_or(0); - - for (i, migration) in MIGRATIONS.iter().enumerate() { - let version = i + 1; - if version > current_version { - tx.execute_batch(migration)?; - tx.execute( - "replace into migration_state (k, version) values(?, ?)", - [&0, &version], - )?; - } - } - - tx.commit()?; - - Ok(()) - }) - .await?; - - let expiration_watcher = spawn(watch_expiration(conn.clone())); - - Ok(SqliteDb { - conn, - queue: OnceCell::new(), - queue_waker_key, - expiration_watcher, - }) - } -} - -pub struct SqliteDb { - conn: ProtectedConn, - queue: OnceCell<SqliteQueue>, - queue_waker_key: Option<PathBuf>, - expiration_watcher: deno_core::unsync::JoinHandle<()>, -} - -impl Drop for SqliteDb { - fn drop(&mut self) { - self.close(); - } -} - -async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>( - mut f: impl FnMut() -> Fut, -) -> Result<R, AnyError> { - loop { - match f().await { - Ok(x) => return Ok(x), - Err(e) => { - if let Some(x) = e.downcast_ref::<rusqlite::Error>() { - if x.sqlite_error_code() == Some(rusqlite::ErrorCode::DatabaseBusy) { - log::debug!("kv: Database is busy, retrying"); - tokio::time::sleep(Duration::from_millis( - rand::thread_rng().gen_range(5..20), - )) - .await; - continue; - } - } - return Err(e); - } - } - } -} - -impl SqliteDb { - async fn run_tx<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError> - where - F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>) - + Clone - + Send - + 'static, - R: Send + 'static, - { - sqlite_retry_loop(|| Self::run_tx_inner(conn.clone(), f.clone())).await - } - - async fn run_tx_inner<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError> - where - F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>) - + Send - + 'static, - R: Send + 'static, - { - // `run_tx` runs in an asynchronous context. First acquire the async lock to - // coordinate with other async invocations. - let _guard_holder = conn.guard.borrow_mut().await; - - // Then, take the synchronous lock. This operation is guaranteed to success without waiting, - // unless the database is being closed. - let db = conn.conn.clone(); - spawn_blocking(move || { - let mut db = db.try_lock().ok(); - let Some(db) = db.as_mut().and_then(|x| x.as_mut()) else { - return Err(type_error(ERROR_USING_CLOSED_DATABASE)); - }; - let result = match db.transaction() { - Ok(tx) => f(tx), - Err(e) => Err(e.into()), - }; - result - }) - .await - .unwrap() - } -} - -pub struct DequeuedMessage { - conn: WeakProtectedConn, - id: String, - payload: Option<Vec<u8>>, - waker_tx: broadcast::Sender<()>, - _permit: OwnedSemaphorePermit, -} - -#[async_trait(?Send)] -impl QueueMessageHandle for DequeuedMessage { - async fn finish(&self, success: bool) -> Result<(), AnyError> { - let Some(conn) = self.conn.upgrade() else { - return Ok(()); - }; - let id = self.id.clone(); - let requeued = SqliteDb::run_tx(conn, move |tx| { - let requeued = { - if success { - let changed = tx - .prepare_cached(STATEMENT_QUEUE_REMOVE_RUNNING)? - .execute([&id])?; - assert!(changed <= 1); - false - } else { - SqliteQueue::requeue_message(&id, &tx)? - } - }; - tx.commit()?; - Ok(requeued) - }) - .await; - let requeued = match requeued { - Ok(x) => x, - Err(e) => { - // Silently ignore the error if the database has been closed - // This message will be delivered on the next run - if is_conn_closed_error(&e) { - return Ok(()); - } - return Err(e); - } - }; - if requeued { - // If the message was requeued, wake up the dequeue loop. - let _ = self.waker_tx.send(()); - } - Ok(()) - } - - async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> { - self - .payload - .take() - .ok_or_else(|| type_error("Payload already consumed")) - } -} - -type DequeueReceiver = mpsc::Receiver<(Vec<u8>, String)>; - -struct SqliteQueue { - conn: ProtectedConn, - dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>, - concurrency_limiter: Arc<Semaphore>, - waker_tx: broadcast::Sender<()>, - shutdown_tx: watch::Sender<()>, -} - -impl SqliteQueue { - fn new( - conn: ProtectedConn, - waker_tx: broadcast::Sender<()>, - waker_rx: broadcast::Receiver<()>, - ) -> Self { - let conn_clone = conn.clone(); - let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); - let (dequeue_tx, dequeue_rx) = mpsc::channel::<(Vec<u8>, String)>(64); - - spawn(async move { - // Oneshot requeue of all inflight messages. - if let Err(e) = Self::requeue_inflight_messages(conn.clone()).await { - // Exit the dequeue loop cleanly if the database has been closed. - if is_conn_closed_error(&e) { - return; - } - panic!("kv: Error in requeue_inflight_messages: {}", e); - } - - // Continuous dequeue loop. - if let Err(e) = - Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx) - .await - { - // Exit the dequeue loop cleanly if the database has been closed. - if is_conn_closed_error(&e) { - return; - } - panic!("kv: Error in dequeue_loop: {}", e); - } - }); - - Self { - conn: conn_clone, - dequeue_rx: Rc::new(AsyncRefCell::new(dequeue_rx)), - waker_tx, - shutdown_tx, - concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)), - } - } - - async fn dequeue(&self) -> Result<Option<DequeuedMessage>, AnyError> { - // Wait for the next message to be available from dequeue_rx. - let (payload, id) = { - let mut queue_rx = self.dequeue_rx.borrow_mut().await; - let Some(msg) = queue_rx.recv().await else { - return Ok(None); - }; - msg - }; - - let permit = self.concurrency_limiter.clone().acquire_owned().await?; - - Ok(Some(DequeuedMessage { - conn: self.conn.downgrade(), - id, - payload: Some(payload), - waker_tx: self.waker_tx.clone(), - _permit: permit, - })) - } - - fn shutdown(&self) { - let _ = self.shutdown_tx.send(()); - } - - async fn dequeue_loop( - conn: ProtectedConn, - dequeue_tx: mpsc::Sender<(Vec<u8>, String)>, - mut shutdown_rx: watch::Receiver<()>, - mut waker_rx: broadcast::Receiver<()>, - ) -> Result<(), AnyError> { - loop { - let messages = SqliteDb::run_tx(conn.clone(), move |tx| { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - - let messages = tx - .prepare_cached(STATEMENT_QUEUE_GET_NEXT_READY)? - .query_map([now], |row| { - let ts: u64 = row.get(0)?; - let id: String = row.get(1)?; - let data: Vec<u8> = row.get(2)?; - let backoff_schedule: String = row.get(3)?; - let keys_if_undelivered: String = row.get(4)?; - Ok((ts, id, data, backoff_schedule, keys_if_undelivered)) - })? - .collect::<Result<Vec<_>, rusqlite::Error>>()?; - - for (ts, id, data, backoff_schedule, keys_if_undelivered) in &messages { - let changed = tx - .prepare_cached(STATEMENT_QUEUE_REMOVE_READY)? - .execute(params![id])?; - assert_eq!(changed, 1); - - let changed = - tx.prepare_cached(STATEMENT_QUEUE_ADD_RUNNING)?.execute( - params![ts, id, &data, &backoff_schedule, &keys_if_undelivered], - )?; - assert_eq!(changed, 1); - } - tx.commit()?; - - Ok( - messages - .into_iter() - .map(|(_, id, data, _, _)| (id, data)) - .collect::<Vec<_>>(), - ) - }) - .await?; - - let busy = !messages.is_empty(); - - for (id, data) in messages { - if dequeue_tx.send((data, id)).await.is_err() { - // Queue receiver was dropped. Stop the dequeue loop. - return Ok(()); - } - } - - if !busy { - // There's nothing to dequeue right now; sleep until one of the - // following happens: - // - It's time to dequeue the next message based on its timestamp - // - A new message is added to the queue - // - The database is closed - let sleep_fut = { - match Self::get_earliest_ready_ts(conn.clone()).await? { - Some(ts) => { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - if ts <= now { - continue; - } - tokio::time::sleep(Duration::from_millis(ts - now)).boxed() - } - None => futures::future::pending().boxed(), - } - }; - tokio::select! { - _ = sleep_fut => {} - x = waker_rx.recv() => { - if let Err(RecvError::Closed) = x {return Ok(());} - }, - _ = shutdown_rx.changed() => return Ok(()) - } - } - } - } - - async fn get_earliest_ready_ts( - conn: ProtectedConn, - ) -> Result<Option<u64>, AnyError> { - SqliteDb::run_tx(conn.clone(), move |tx| { - let ts = tx - .prepare_cached(STATEMENT_QUEUE_GET_EARLIEST_READY)? - .query_row([], |row| { - let ts: u64 = row.get(0)?; - Ok(ts) - }) - .optional()?; - Ok(ts) - }) - .await - } - - async fn requeue_inflight_messages( - conn: ProtectedConn, - ) -> Result<(), AnyError> { - loop { - let done = SqliteDb::run_tx(conn.clone(), move |tx| { - let entries = tx - .prepare_cached(STATEMENT_QUEUE_GET_RUNNING)? - .query_map([], |row| { - let id: String = row.get(0)?; - Ok(id) - })? - .collect::<Result<Vec<_>, rusqlite::Error>>()?; - for id in &entries { - Self::requeue_message(id, &tx)?; - } - tx.commit()?; - Ok(entries.is_empty()) - }) - .await?; - if done { - return Ok(()); - } - } - } - - fn requeue_message( - id: &str, - tx: &rusqlite::Transaction<'_>, - ) -> Result<bool, AnyError> { - let Some((_, id, data, backoff_schedule, keys_if_undelivered)) = tx - .prepare_cached(STATEMENT_QUEUE_GET_RUNNING_BY_ID)? - .query_row([id], |row| { - let deadline: u64 = row.get(0)?; - let id: String = row.get(1)?; - let data: Vec<u8> = row.get(2)?; - let backoff_schedule: String = row.get(3)?; - let keys_if_undelivered: String = row.get(4)?; - Ok((deadline, id, data, backoff_schedule, keys_if_undelivered)) - }) - .optional()? - else { - return Ok(false); - }; - - let backoff_schedule = { - let backoff_schedule = - serde_json::from_str::<Option<Vec<u64>>>(&backoff_schedule)?; - backoff_schedule.unwrap_or_default() - }; - - let mut requeued = false; - if !backoff_schedule.is_empty() { - // Requeue based on backoff schedule - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - let new_ts = now + backoff_schedule[0]; - let new_backoff_schedule = serde_json::to_string(&backoff_schedule[1..])?; - let changed = tx - .prepare_cached(STATEMENT_QUEUE_ADD_READY)? - .execute(params![ - new_ts, - id, - &data, - &new_backoff_schedule, - &keys_if_undelivered - ]) - .unwrap(); - assert_eq!(changed, 1); - requeued = true; - } else if !keys_if_undelivered.is_empty() { - // No more requeues. Insert the message into the undelivered queue. - let keys_if_undelivered = - serde_json::from_str::<Vec<Vec<u8>>>(&keys_if_undelivered)?; - - let version: i64 = tx - .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)? - .query_row([], |row| row.get(0))?; - - for key in keys_if_undelivered { - let changed = tx - .prepare_cached(STATEMENT_KV_POINT_SET)? - .execute(params![key, &data, &VALUE_ENCODING_V8, &version, -1i64])?; - assert_eq!(changed, 1); - } - } - - // Remove from running - let changed = tx - .prepare_cached(STATEMENT_QUEUE_REMOVE_RUNNING)? - .execute(params![id])?; - assert_eq!(changed, 1); - - Ok(requeued) - } -} - -async fn watch_expiration(db: ProtectedConn) { - loop { - // Scan for expired keys - let res = SqliteDb::run_tx(db.clone(), move |tx| { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - tx.prepare_cached( - "delete from kv where expiration_ms >= 0 and expiration_ms <= ?", - )? - .execute(params![now])?; - tx.commit()?; - Ok(()) - }) - .await; - if let Err(e) = res { - eprintln!("kv: Error in expiration watcher: {}", e); - } - let sleep_duration = - Duration::from_secs_f64(60.0 + rand::thread_rng().gen_range(0.0..30.0)); - tokio::time::sleep(sleep_duration).await; - } -} - -#[async_trait(?Send)] -impl Database for SqliteDb { - type QMH = DequeuedMessage; - - async fn snapshot_read( - &self, - _state: Rc<RefCell<OpState>>, - requests: Vec<ReadRange>, - _options: SnapshotReadOptions, - ) -> Result<Vec<ReadRangeOutput>, AnyError> { - let requests = Arc::new(requests); - Self::run_tx(self.conn.clone(), move |tx| { - let mut responses = Vec::with_capacity(requests.len()); - for request in &*requests { - let mut stmt = tx.prepare_cached(if request.reverse { - STATEMENT_KV_RANGE_SCAN_REVERSE - } else { - STATEMENT_KV_RANGE_SCAN - })?; - let entries = stmt - .query_map( - ( - request.start.as_slice(), - request.end.as_slice(), - request.limit.get(), - ), - |row| { - let key: Vec<u8> = row.get(0)?; - let value: Vec<u8> = row.get(1)?; - let encoding: i64 = row.get(2)?; - - let value = decode_value(value, encoding); - - let version: i64 = row.get(3)?; - Ok(KvEntry { - key, - value, - versionstamp: version_to_versionstamp(version), - }) - }, - )? - .collect::<Result<Vec<_>, rusqlite::Error>>()?; - responses.push(ReadRangeOutput { entries }); - } - - Ok(responses) - }) - .await - } - - async fn atomic_write( - &self, - state: Rc<RefCell<OpState>>, - write: AtomicWrite, - ) -> Result<Option<CommitResult>, AnyError> { - let write = Arc::new(write); - let (has_enqueues, commit_result) = - Self::run_tx(self.conn.clone(), move |tx| { - for check in &write.checks { - let real_versionstamp = tx - .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)? - .query_row([check.key.as_slice()], |row| row.get(0)) - .optional()? - .map(version_to_versionstamp); - if real_versionstamp != check.versionstamp { - return Ok((false, None)); - } - } - - let version: i64 = tx - .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)? - .query_row([], |row| row.get(0))?; - - for mutation in &write.mutations { - match &mutation.kind { - MutationKind::Set(value) => { - let (value, encoding) = encode_value(value); - let changed = - tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![ - mutation.key, - value, - &encoding, - &version, - mutation - .expire_at - .and_then(|x| i64::try_from(x).ok()) - .unwrap_or(-1i64) - ])?; - assert_eq!(changed, 1) - } - MutationKind::Delete => { - let changed = tx - .prepare_cached(STATEMENT_KV_POINT_DELETE)? - .execute(params![mutation.key])?; - assert!(changed == 0 || changed == 1) - } - MutationKind::Sum(operand) => { - mutate_le64( - &tx, - &mutation.key, - "sum", - operand, - version, - |a, b| a.wrapping_add(b), - )?; + let path = path.clone(); + let default_storage_dir = self.default_storage_dir.clone(); + let (conn, queue_waker_key) = spawn_blocking(move || { + denokv_sqlite::sqlite_retry_loop(|| { + let (conn, queue_waker_key) = + match (path.as_deref(), &default_storage_dir) { + (Some(":memory:"), _) | (None, None) => { + (rusqlite::Connection::open_in_memory()?, None) } - MutationKind::Min(operand) => { - mutate_le64( - &tx, - &mutation.key, - "min", - operand, - version, - |a, b| a.min(b), - )?; + (Some(path), _) => { + let flags = + OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); + let resolved_path = canonicalize_path(&PathBuf::from(path))?; + ( + rusqlite::Connection::open_with_flags(path, flags)?, + Some(resolved_path), + ) } - MutationKind::Max(operand) => { - mutate_le64( - &tx, - &mutation.key, - "max", - operand, - version, - |a, b| a.max(b), - )?; + (None, Some(path)) => { + std::fs::create_dir_all(path)?; + let path = path.join("kv.sqlite3"); + (rusqlite::Connection::open(path.clone())?, Some(path)) } - } - } - - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - - let has_enqueues = !write.enqueues.is_empty(); - for enqueue in &write.enqueues { - let id = Uuid::new_v4().to_string(); - let backoff_schedule = serde_json::to_string( - &enqueue - .backoff_schedule - .as_deref() - .or_else(|| Some(&DEFAULT_BACKOFF_SCHEDULE[..])), - )?; - let keys_if_undelivered = - serde_json::to_string(&enqueue.keys_if_undelivered)?; + }; - let changed = - tx.prepare_cached(STATEMENT_QUEUE_ADD_READY)? - .execute(params![ - now + enqueue.delay_ms, - id, - &enqueue.payload, - &backoff_schedule, - &keys_if_undelivered - ])?; - assert_eq!(changed, 1) - } - - tx.commit()?; - let new_versionstamp = version_to_versionstamp(version); - - Ok(( - has_enqueues, - Some(CommitResult { - versionstamp: new_versionstamp, - }), - )) - }) - .await?; - - if has_enqueues { - match self.queue.get() { - Some(queue) => { - let _ = queue.waker_tx.send(()); - } - None => { - if let Some(waker_key) = &self.queue_waker_key { - let (waker_tx, _) = - shared_queue_waker_channel(waker_key, state.clone()); - let _ = waker_tx.send(()); - } - } - } - } - Ok(commit_result) - } + conn.pragma_update(None, "journal_mode", "wal")?; - async fn dequeue_next_message( - &self, - state: Rc<RefCell<OpState>>, - ) -> Result<Option<Self::QMH>, AnyError> { - let queue = self - .queue - .get_or_init(|| async move { - let (waker_tx, waker_rx) = { - match &self.queue_waker_key { - Some(waker_key) => { - shared_queue_waker_channel(waker_key, state.clone()) - } - None => broadcast::channel(1), - } - }; - SqliteQueue::new(self.conn.clone(), waker_tx, waker_rx) + Ok::<_, AnyError>((conn, queue_waker_key)) }) - .await; - let handle = queue.dequeue().await?; - Ok(handle) - } - - fn close(&self) { - if let Some(queue) = self.queue.get() { - queue.shutdown(); - } - - self.expiration_watcher.abort(); - - // The above `abort()` operation is asynchronous. It's not - // guaranteed that the sqlite connection will be closed immediately. - // So here we synchronously take the conn mutex and drop the connection. - // - // This blocks the event loop if the connection is still being used, - // but ensures correctness - deleting the database file after calling - // the `close` method will always work. - self.conn.conn.lock().unwrap().take(); - } -} - -/// Mutates a LE64 value in the database, defaulting to setting it to the -/// operand if it doesn't exist. -fn mutate_le64( - tx: &Transaction, - key: &[u8], - op_name: &str, - operand: &Value, - new_version: i64, - mutate: impl FnOnce(u64, u64) -> u64, -) -> Result<(), AnyError> { - let Value::U64(operand) = *operand else { - return Err(type_error(format!( - "Failed to perform '{op_name}' mutation on a non-U64 operand" - ))); - }; - - let old_value = tx - .prepare_cached(STATEMENT_KV_POINT_GET_VALUE_ONLY)? - .query_row([key], |row| { - let value: Vec<u8> = row.get(0)?; - let encoding: i64 = row.get(1)?; - - let value = decode_value(value, encoding); - Ok(value) }) - .optional()?; - - let new_value = match old_value { - Some(Value::U64(old_value) ) => mutate(old_value, operand), - Some(_) => return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 value in the database"))), - None => operand, - }; - - let new_value = Value::U64(new_value); - let (new_value, encoding) = encode_value(&new_value); - - let changed = tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![ - key, - &new_value[..], - encoding, - new_version, - -1i64, - ])?; - assert_eq!(changed, 1); - - Ok(()) -} - -fn version_to_versionstamp(version: i64) -> [u8; 10] { - let mut versionstamp = [0; 10]; - versionstamp[..8].copy_from_slice(&version.to_be_bytes()); - versionstamp -} + .await + .unwrap()?; -const VALUE_ENCODING_V8: i64 = 1; -const VALUE_ENCODING_LE64: i64 = 2; -const VALUE_ENCODING_BYTES: i64 = 3; + let dequeue_notify = if let Some(queue_waker_key) = queue_waker_key { + QUEUE_WAKER_MAP + .get_or_init(Default::default) + .lock() + .unwrap() + .entry(queue_waker_key) + .or_default() + .clone() + } else { + Arc::new(Notify::new()) + }; -fn decode_value(value: Vec<u8>, encoding: i64) -> crate::Value { - match encoding { - VALUE_ENCODING_V8 => crate::Value::V8(value), - VALUE_ENCODING_BYTES => crate::Value::Bytes(value), - VALUE_ENCODING_LE64 => { - let mut buf = [0; 8]; - buf.copy_from_slice(&value); - crate::Value::U64(u64::from_le_bytes(buf)) - } - _ => todo!(), - } -} + let versionstamp_rng: Box<dyn RngCore + Send> = + match &self.versionstamp_rng_seed { + Some(seed) => Box::new(rand::rngs::StdRng::seed_from_u64(*seed)), + None => Box::new(rand::rngs::StdRng::from_entropy()), + }; -fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) { - match value { - crate::Value::V8(value) => (Cow::Borrowed(value), VALUE_ENCODING_V8), - crate::Value::Bytes(value) => (Cow::Borrowed(value), VALUE_ENCODING_BYTES), - crate::Value::U64(value) => { - let mut buf = [0; 8]; - buf.copy_from_slice(&value.to_le_bytes()); - (Cow::Owned(buf.to_vec()), VALUE_ENCODING_LE64) - } + denokv_sqlite::Sqlite::new(conn, dequeue_notify, versionstamp_rng) } } -pub struct QueueWaker { - wakers_tx: HashMap<PathBuf, broadcast::Sender<()>>, -} - -fn shared_queue_waker_channel( - waker_key: &Path, - state: Rc<RefCell<OpState>>, -) -> (broadcast::Sender<()>, broadcast::Receiver<()>) { - let mut state = state.borrow_mut(); - let waker = { - let waker = state.try_borrow_mut::<QueueWaker>(); - match waker { - Some(waker) => waker, - None => { - let waker = QueueWaker { - wakers_tx: HashMap::new(), - }; - state.put::<QueueWaker>(waker); - state.borrow_mut::<QueueWaker>() - } - } - }; - - let waker_tx = waker - .wakers_tx - .entry(waker_key.to_path_buf()) - .or_insert_with(|| { - let (waker_tx, _) = broadcast::channel(1); - waker_tx - }); - - (waker_tx.clone(), waker_tx.subscribe()) -} - /// Same as Path::canonicalize, but also handles non-existing paths. fn canonicalize_path(path: &Path) -> Result<PathBuf, AnyError> { let path = path.to_path_buf().clean(); @@ -1106,8 +166,3 @@ fn canonicalize_path(path: &Path) -> Result<PathBuf, AnyError> { } } } - -fn is_conn_closed_error(e: &AnyError) -> bool { - get_custom_error_class(e) == Some("TypeError") - && e.to_string() == ERROR_USING_CLOSED_DATABASE -} diff --git a/runtime/build.rs b/runtime/build.rs index dec687b6e..ce1896e6f 100644 --- a/runtime/build.rs +++ b/runtime/build.rs @@ -222,7 +222,7 @@ mod startup_snapshot { deno_tls::deno_tls::init_ops_and_esm(), deno_kv::deno_kv::init_ops_and_esm(deno_kv::sqlite::SqliteDbHandler::< Permissions, - >::new(None)), + >::new(None, None)), deno_napi::deno_napi::init_ops_and_esm::<Permissions>(), deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(), deno_io::deno_io::init_ops_and_esm(Default::default()), diff --git a/runtime/errors.rs b/runtime/errors.rs index e6ae14abb..47925fe5c 100644 --- a/runtime/errors.rs +++ b/runtime/errors.rs @@ -212,6 +212,10 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { .map(get_url_parse_error_class) }) .or_else(|| { + e.downcast_ref::<deno_kv::sqlite::TypeError>() + .map(|_| "TypeError") + }) + .or_else(|| { #[cfg(unix)] let maybe_get_nix_error_class = || e.downcast_ref::<nix::Error>().map(get_nix_error_class); diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index c1fe5a619..de69ce43b 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -436,7 +436,19 @@ impl WebWorker { ), deno_tls::deno_tls::init_ops_and_esm(), deno_kv::deno_kv::init_ops_and_esm( - MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(None), + MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>( + None, + options.seed, + deno_kv::remote::HttpOptions { + user_agent: options.bootstrap.user_agent.clone(), + root_cert_store_provider: options.root_cert_store_provider.clone(), + unsafely_ignore_certificate_errors: options + .unsafely_ignore_certificate_errors + .clone(), + client_cert_chain_and_key: None, + proxy: None, + }, + ), ), deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(), deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(), diff --git a/runtime/worker.rs b/runtime/worker.rs index e8d9ca6bc..f0fc25aa2 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -261,6 +261,16 @@ impl MainWorker { deno_kv::deno_kv::init_ops_and_esm( MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>( options.origin_storage_dir.clone(), + options.seed, + deno_kv::remote::HttpOptions { + user_agent: options.bootstrap.user_agent.clone(), + root_cert_store_provider: options.root_cert_store_provider.clone(), + unsafely_ignore_certificate_errors: options + .unsafely_ignore_certificate_errors + .clone(), + client_cert_chain_and_key: None, + proxy: None, + }, ), ), deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(), diff --git a/test_util/Cargo.toml b/test_util/Cargo.toml index f48fce0b3..72126ae66 100644 --- a/test_util/Cargo.toml +++ b/test_util/Cargo.toml @@ -19,6 +19,7 @@ async-stream = "0.3.3" base64.workspace = true bytes.workspace = true console_static_text.workspace = true +denokv_proto.workspace = true fastwebsockets = { workspace = true, features = ["upgrade"] } flate2.workspace = true futures.workspace = true diff --git a/test_util/build.rs b/test_util/build.rs deleted file mode 100644 index 420abd0a1..000000000 --- a/test_util/build.rs +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -use std::env; -use std::io; -use std::path::PathBuf; - -fn main() -> io::Result<()> { - println!("cargo:rerun-if-changed=../ext/kv/proto"); - - let descriptor_path = - PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin"); - - prost_build::Config::new() - .file_descriptor_set_path(&descriptor_path) - .compile_well_known_types() - .compile_protos( - &["../ext/kv/proto/datapath.proto"], - &["../ext/kv/proto/"], - )?; - - Ok(()) -} diff --git a/test_util/src/kv_remote.rs b/test_util/src/kv_remote.rs deleted file mode 100644 index d258a0551..000000000 --- a/test_util/src/kv_remote.rs +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -// Generated code, disable lints -#[allow(clippy::all, non_snake_case)] -pub mod datapath { - include!(concat!(env!("OUT_DIR"), "/datapath.rs")); -} diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs index b7106a2b3..692a6a08c 100644 --- a/test_util/src/lib.rs +++ b/test_util/src/lib.rs @@ -4,6 +4,12 @@ use anyhow::anyhow; use base64::prelude::BASE64_STANDARD; use base64::Engine; +use denokv_proto::datapath::AtomicWrite; +use denokv_proto::datapath::AtomicWriteOutput; +use denokv_proto::datapath::AtomicWriteStatus; +use denokv_proto::datapath::ReadRangeOutput; +use denokv_proto::datapath::SnapshotRead; +use denokv_proto::datapath::SnapshotReadOutput; use futures::Future; use futures::FutureExt; use futures::Stream; @@ -18,12 +24,6 @@ use hyper::Body; use hyper::Request; use hyper::Response; use hyper::StatusCode; -use kv_remote::datapath::AtomicWrite; -use kv_remote::datapath::AtomicWriteOutput; -use kv_remote::datapath::AtomicWriteStatus; -use kv_remote::datapath::ReadRangeOutput; -use kv_remote::datapath::SnapshotRead; -use kv_remote::datapath::SnapshotReadOutput; use npm::CUSTOM_NPM_PACKAGE_CACHE; use once_cell::sync::Lazy; use pretty_assertions::assert_eq; @@ -70,7 +70,6 @@ pub mod assertions; mod builders; pub mod factory; mod fs; -mod kv_remote; pub mod lsp; mod npm; pub mod pty; @@ -1206,7 +1205,7 @@ async fn main_server( .header("content-type", "application/json") .body(Body::from( serde_json::json!({ - "version": 2, + "version": 1000, "databaseId": KV_DATABASE_ID, "endpoints": [ { @@ -1268,9 +1267,7 @@ async fn main_server( .map(|_| ReadRangeOutput { values: vec![] }) .collect(), read_disabled: false, - regions_if_read_disabled: vec![], read_is_strongly_consistent: true, - primary_if_not_strongly_consistent: "".into(), } .encode_to_vec(), )) @@ -1311,7 +1308,7 @@ async fn main_server( AtomicWriteOutput { status: AtomicWriteStatus::AwSuccess.into(), versionstamp: vec![0u8; 10], - primary_if_write_disabled: "".into(), + failed_checks: vec![], } .encode_to_vec(), )) |