summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock72
-rw-r--r--Cargo.toml4
-rw-r--r--cli/build.rs2
-rw-r--r--cli/tests/unit/kv_test.ts351
-rw-r--r--ext/kv/Cargo.toml5
-rw-r--r--ext/kv/README.md73
-rw-r--r--ext/kv/build.rs19
-rw-r--r--ext/kv/codec.rs543
-rw-r--r--ext/kv/dynamic.rs44
-rw-r--r--ext/kv/interface.rs315
-rw-r--r--ext/kv/lib.rs232
-rw-r--r--ext/kv/proto/datapath.proto97
-rw-r--r--ext/kv/proto/mod.rs7
-rw-r--r--ext/kv/remote.rs590
-rw-r--r--ext/kv/sqlite.rs1057
-rw-r--r--runtime/build.rs2
-rw-r--r--runtime/errors.rs4
-rw-r--r--runtime/web_worker.rs14
-rw-r--r--runtime/worker.rs10
-rw-r--r--test_util/Cargo.toml1
-rw-r--r--test_util/build.rs22
-rw-r--r--test_util/src/kv_remote.rs7
-rw-r--r--test_util/src/lib.rs19
23 files changed, 558 insertions, 2932 deletions
diff --git a/Cargo.lock b/Cargo.lock
index eb9b7cdf3..138d0cd4a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1412,8 +1412,13 @@ dependencies = [
"base64 0.21.4",
"chrono",
"deno_core",
+ "deno_fetch",
"deno_node",
+ "deno_tls",
"deno_unsync 0.1.1",
+ "denokv_proto",
+ "denokv_remote",
+ "denokv_sqlite",
"hex",
"log",
"num-bigint",
@@ -1777,6 +1782,64 @@ dependencies = [
]
[[package]]
+name = "denokv_proto"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8952fb8c38c1dcd796d49b00030afb74aa184160ae86817b72a32a994c8e16f0"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "chrono",
+ "num-bigint",
+ "prost",
+ "prost-build",
+ "serde",
+ "uuid 1.5.0",
+]
+
+[[package]]
+name = "denokv_remote"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "edfc8447324d783b01e215bd5040ff9149c34d9715c7b7b5080dd648ebf1148a"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "bytes",
+ "chrono",
+ "denokv_proto",
+ "log",
+ "prost",
+ "rand 0.8.5",
+ "reqwest",
+ "serde",
+ "serde_json",
+ "tokio",
+ "url",
+ "uuid 1.5.0",
+]
+
+[[package]]
+name = "denokv_sqlite"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ec76b691ff069f14e56e3e053c2b2163540b27e4b60179f2b120064a7e4960d"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "chrono",
+ "denokv_proto",
+ "futures",
+ "log",
+ "num-bigint",
+ "rand 0.8.5",
+ "rusqlite",
+ "serde_json",
+ "tokio",
+ "uuid 1.5.0",
+]
+
+[[package]]
name = "der"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -5621,6 +5684,7 @@ dependencies = [
"base64 0.21.4",
"bytes",
"console_static_text",
+ "denokv_proto",
"fastwebsockets",
"flate2",
"futures",
@@ -5743,9 +5807,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
-version = "1.32.0"
+version = "1.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
+checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653"
dependencies = [
"backtrace",
"bytes",
@@ -5773,9 +5837,9 @@ dependencies = [
[[package]]
name = "tokio-metrics"
-version = "0.3.0"
+version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d4b2fc67d5dec41db679b9b052eb572269616926040b7831e32c8a152df77b84"
+checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112"
dependencies = [
"futures-util",
"pin-project-lite",
diff --git a/Cargo.toml b/Cargo.toml
index e859641dd..1f136401e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -48,6 +48,10 @@ test_util = { path = "./test_util" }
deno_lockfile = "0.17.2"
deno_media_type = { version = "0.1.1", features = ["module_specifier"] }
+denokv_proto = "0.2.1"
+denokv_sqlite = "0.2.1"
+denokv_remote = "0.2.3"
+
# exts
deno_broadcast_channel = { version = "0.115.0", path = "./ext/broadcast_channel" }
deno_cache = { version = "0.53.0", path = "./ext/cache" }
diff --git a/cli/build.rs b/cli/build.rs
index f01c006be..e6b9dc0a4 100644
--- a/cli/build.rs
+++ b/cli/build.rs
@@ -381,7 +381,7 @@ fn create_cli_snapshot(snapshot_path: PathBuf) -> CreateSnapshotOutput {
deno_net::deno_net::init_ops::<PermissionsContainer>(None, None),
deno_tls::deno_tls::init_ops(),
deno_kv::deno_kv::init_ops(SqliteDbHandler::<PermissionsContainer>::new(
- None,
+ None, None,
)),
deno_napi::deno_napi::init_ops::<PermissionsContainer>(),
deno_http::deno_http::init_ops::<DefaultHttpPropertyExtractor>(),
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts
index 7cb8ebccf..4e3ce5385 100644
--- a/cli/tests/unit/kv_test.ts
+++ b/cli/tests/unit/kv_test.ts
@@ -55,9 +55,7 @@ function dbTest(name: string, fn: (db: Deno.Kv) => Promise<void> | void) {
// https://github.com/denoland/deno/issues/18363
ignore: Deno.build.os === "darwin" && isCI,
async fn() {
- const db: Deno.Kv = await Deno.openKv(
- ":memory:",
- );
+ const db: Deno.Kv = await Deno.openKv(":memory:");
try {
await fn(db);
} finally {
@@ -73,14 +71,14 @@ function queueTest(name: string, fn: (db: Deno.Kv) => Promise<void>) {
// https://github.com/denoland/deno/issues/18363
ignore: Deno.build.os === "darwin" && isCI,
async fn() {
- const db: Deno.Kv = await Deno.openKv(
- ":memory:",
- );
+ const db: Deno.Kv = await Deno.openKv(":memory:");
await fn(db);
},
});
}
+const ZERO_VERSIONSTAMP = "00000000000000000000";
+
dbTest("basic read-write-delete and versionstamps", async (db) => {
const result1 = await db.get(["a"]);
assertEquals(result1.key, ["a"]);
@@ -89,17 +87,19 @@ dbTest("basic read-write-delete and versionstamps", async (db) => {
const setRes = await db.set(["a"], "b");
assert(setRes.ok);
- assertEquals(setRes.versionstamp, "00000000000000010000");
+ assert(setRes.versionstamp > ZERO_VERSIONSTAMP);
const result2 = await db.get(["a"]);
assertEquals(result2.key, ["a"]);
assertEquals(result2.value, "b");
- assertEquals(result2.versionstamp, "00000000000000010000");
+ assertEquals(result2.versionstamp, setRes.versionstamp);
- await db.set(["a"], "c");
+ const setRes2 = await db.set(["a"], "c");
+ assert(setRes2.ok);
+ assert(setRes2.versionstamp > setRes.versionstamp);
const result3 = await db.get(["a"]);
assertEquals(result3.key, ["a"]);
assertEquals(result3.value, "c");
- assertEquals(result3.versionstamp, "00000000000000020000");
+ assertEquals(result3.versionstamp, setRes2.versionstamp);
await db.delete(["a"]);
const result4 = await db.get(["a"]);
@@ -230,17 +230,18 @@ dbTest("compare and mutate", async (db) => {
await db.set(["t"], "1");
const currentValue = await db.get(["t"]);
- assertEquals(currentValue.versionstamp, "00000000000000010000");
+ assert(currentValue.versionstamp);
+ assert(currentValue.versionstamp > ZERO_VERSIONSTAMP);
let res = await db.atomic()
.check({ key: ["t"], versionstamp: currentValue.versionstamp })
.set(currentValue.key, "2")
.commit();
assert(res.ok);
- assertEquals(res.versionstamp, "00000000000000020000");
+ assert(res.versionstamp > currentValue.versionstamp);
const newValue = await db.get(["t"]);
- assertEquals(newValue.versionstamp, "00000000000000020000");
+ assertEquals(newValue.versionstamp, res.versionstamp);
assertEquals(newValue.value, "2");
res = await db.atomic()
@@ -250,7 +251,7 @@ dbTest("compare and mutate", async (db) => {
assert(!res.ok);
const newValue2 = await db.get(["t"]);
- assertEquals(newValue2.versionstamp, "00000000000000020000");
+ assertEquals(newValue2.versionstamp, newValue.versionstamp);
assertEquals(newValue2.value, "2");
});
@@ -260,9 +261,10 @@ dbTest("compare and mutate not exists", async (db) => {
.set(["t"], "1")
.commit();
assert(res.ok);
+ assert(res.versionstamp > ZERO_VERSIONSTAMP);
const newValue = await db.get(["t"]);
- assertEquals(newValue.versionstamp, "00000000000000010000");
+ assertEquals(newValue.versionstamp, res.versionstamp);
assertEquals(newValue.value, "1");
res = await db.atomic()
@@ -303,13 +305,17 @@ dbTest("atomic mutation helper (max)", async (db) => {
});
dbTest("compare multiple and mutate", async (db) => {
- await db.set(["t1"], "1");
- await db.set(["t2"], "2");
+ const setRes1 = await db.set(["t1"], "1");
+ const setRes2 = await db.set(["t2"], "2");
+ assert(setRes1.ok);
+ assert(setRes1.versionstamp > ZERO_VERSIONSTAMP);
+ assert(setRes2.ok);
+ assert(setRes2.versionstamp > ZERO_VERSIONSTAMP);
const currentValue1 = await db.get(["t1"]);
- assertEquals(currentValue1.versionstamp, "00000000000000010000");
+ assertEquals(currentValue1.versionstamp, setRes1.versionstamp);
const currentValue2 = await db.get(["t2"]);
- assertEquals(currentValue2.versionstamp, "00000000000000020000");
+ assertEquals(currentValue2.versionstamp, setRes2.versionstamp);
const res = await db.atomic()
.check({ key: ["t1"], versionstamp: currentValue1.versionstamp })
@@ -318,12 +324,13 @@ dbTest("compare multiple and mutate", async (db) => {
.set(currentValue2.key, "4")
.commit();
assert(res.ok);
+ assert(res.versionstamp > setRes2.versionstamp);
const newValue1 = await db.get(["t1"]);
- assertEquals(newValue1.versionstamp, "00000000000000030000");
+ assertEquals(newValue1.versionstamp, res.versionstamp);
assertEquals(newValue1.value, "3");
const newValue2 = await db.get(["t2"]);
- assertEquals(newValue2.versionstamp, "00000000000000030000");
+ assertEquals(newValue2.versionstamp, res.versionstamp);
assertEquals(newValue2.value, "4");
// just one of the two checks failed
@@ -336,10 +343,10 @@ dbTest("compare multiple and mutate", async (db) => {
assert(!res2.ok);
const newValue3 = await db.get(["t1"]);
- assertEquals(newValue3.versionstamp, "00000000000000030000");
+ assertEquals(newValue3.versionstamp, res.versionstamp);
assertEquals(newValue3.value, "3");
const newValue4 = await db.get(["t2"]);
- assertEquals(newValue4.versionstamp, "00000000000000030000");
+ assertEquals(newValue4.versionstamp, res.versionstamp);
assertEquals(newValue4.value, "4");
});
@@ -635,8 +642,8 @@ async function collect<T>(
return entries;
}
-async function setupData(db: Deno.Kv) {
- await db.atomic()
+async function setupData(db: Deno.Kv): Promise<string> {
+ const res = await db.atomic()
.set(["a"], -1)
.set(["a", "a"], 0)
.set(["a", "b"], 1)
@@ -646,27 +653,29 @@ async function setupData(db: Deno.Kv) {
.set(["b"], 99)
.set(["b", "a"], 100)
.commit();
+ assert(res.ok);
+ return res.versionstamp;
}
dbTest("get many", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await db.getMany([["b", "a"], ["a"], ["c"]]);
assertEquals(entries, [
- { key: ["b", "a"], value: 100, versionstamp: "00000000000000010000" },
- { key: ["a"], value: -1, versionstamp: "00000000000000010000" },
+ { key: ["b", "a"], value: 100, versionstamp },
+ { key: ["a"], value: -1, versionstamp },
{ key: ["c"], value: null, versionstamp: null },
]);
});
dbTest("list prefix", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }));
assertEquals(entries, [
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
+ { key: ["a", "a"], value: 0, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "e"], value: 4, versionstamp },
]);
});
@@ -680,12 +689,12 @@ dbTest("list prefix empty", async (db) => {
});
dbTest("list prefix with start", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"], start: ["a", "c"] }));
assertEquals(entries, [
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
+ { key: ["a", "c"], value: 2, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "e"], value: 4, versionstamp },
]);
});
@@ -696,11 +705,11 @@ dbTest("list prefix with start empty", async (db) => {
});
dbTest("list prefix with end", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"], end: ["a", "c"] }));
assertEquals(entries, [
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
+ { key: ["a", "a"], value: 0, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
]);
});
@@ -711,35 +720,34 @@ dbTest("list prefix with end empty", async (db) => {
});
dbTest("list prefix with empty prefix", async (db) => {
- await db.set(["a"], 1);
+ const res = await db.set(["a"], 1);
const entries = await collect(db.list({ prefix: [] }));
assertEquals(entries, [
- { key: ["a"], value: 1, versionstamp: "00000000000000010000" },
+ { key: ["a"], value: 1, versionstamp: res.versionstamp },
]);
});
dbTest("list prefix reverse", async (db) => {
- await setupData(db);
-
+ const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }, { reverse: true }));
assertEquals(entries, [
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
+ { key: ["a", "e"], value: 4, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "a"], value: 0, versionstamp },
]);
});
dbTest("list prefix reverse with start", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"], start: ["a", "c"] }, { reverse: true }),
);
assertEquals(entries, [
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
+ { key: ["a", "e"], value: 4, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
]);
});
@@ -752,13 +760,13 @@ dbTest("list prefix reverse with start empty", async (db) => {
});
dbTest("list prefix reverse with end", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"], end: ["a", "c"] }, { reverse: true }),
);
assertEquals(entries, [
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "a"], value: 0, versionstamp },
]);
});
@@ -771,83 +779,82 @@ dbTest("list prefix reverse with end empty", async (db) => {
});
dbTest("list prefix limit", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }, { limit: 2 }));
assertEquals(entries, [
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
+ { key: ["a", "a"], value: 0, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
]);
});
dbTest("list prefix limit reverse", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { limit: 2, reverse: true }),
);
assertEquals(entries, [
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
+ { key: ["a", "e"], value: 4, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
]);
});
dbTest("list prefix with small batch size", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(db.list({ prefix: ["a"] }, { batchSize: 2 }));
assertEquals(entries, [
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
+ { key: ["a", "a"], value: 0, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "e"], value: 4, versionstamp },
]);
});
dbTest("list prefix with small batch size reverse", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { batchSize: 2, reverse: true }),
);
assertEquals(entries, [
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
+ { key: ["a", "e"], value: 4, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "a"], value: 0, versionstamp },
]);
});
dbTest("list prefix with small batch size and limit", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { batchSize: 2, limit: 3 }),
);
assertEquals(entries, [
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
+ { key: ["a", "a"], value: 0, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list prefix with small batch size and limit reverse", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ prefix: ["a"] }, { batchSize: 2, limit: 3, reverse: true }),
);
assertEquals(entries, [
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
+ { key: ["a", "e"], value: 4, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list prefix with manual cursor", async (db) => {
- await setupData(db);
-
+ const versionstamp = await setupData(db);
const iterator = db.list({ prefix: ["a"] }, { limit: 2 });
const values = await collect(iterator);
assertEquals(values, [
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
+ { key: ["a", "a"], value: 0, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
]);
const cursor = iterator.cursor;
@@ -856,20 +863,20 @@ dbTest("list prefix with manual cursor", async (db) => {
const iterator2 = db.list({ prefix: ["a"] }, { cursor });
const values2 = await collect(iterator2);
assertEquals(values2, [
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
+ { key: ["a", "c"], value: 2, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "e"], value: 4, versionstamp },
]);
});
dbTest("list prefix with manual cursor reverse", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const iterator = db.list({ prefix: ["a"] }, { limit: 2, reverse: true });
const values = await collect(iterator);
assertEquals(values, [
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
+ { key: ["a", "e"], value: 4, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
]);
const cursor = iterator.cursor;
@@ -878,57 +885,57 @@ dbTest("list prefix with manual cursor reverse", async (db) => {
const iterator2 = db.list({ prefix: ["a"] }, { cursor, reverse: true });
const values2 = await collect(iterator2);
assertEquals(values2, [
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
+ { key: ["a", "c"], value: 2, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "a"], value: 0, versionstamp },
]);
});
dbTest("list range", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }),
);
assertEquals(entries, [
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
+ { key: ["a", "a"], value: 0, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "e"], value: 4, versionstamp },
]);
});
dbTest("list range reverse", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }, { reverse: true }),
);
assertEquals(entries, [
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
+ { key: ["a", "e"], value: 4, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "a"], value: 0, versionstamp },
]);
});
dbTest("list range with limit", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }, { limit: 3 }),
);
assertEquals(entries, [
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
+ { key: ["a", "a"], value: 0, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list range with limit reverse", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ start: ["a", "a"], end: ["a", "z"] }, {
@@ -937,46 +944,46 @@ dbTest("list range with limit reverse", async (db) => {
}),
);
assertEquals(entries, [
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
+ { key: ["a", "e"], value: 4, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list range nesting", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(db.list({ start: ["a"], end: ["a", "d"] }));
assertEquals(entries, [
- { key: ["a"], value: -1, versionstamp: "00000000000000010000" },
- { key: ["a", "a"], value: 0, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
+ { key: ["a"], value: -1, versionstamp },
+ { key: ["a", "a"], value: 0, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list range short", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const entries = await collect(
db.list({ start: ["a", "b"], end: ["a", "d"] }),
);
assertEquals(entries, [
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
]);
});
dbTest("list range with manual cursor", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const iterator = db.list({ start: ["a", "b"], end: ["a", "z"] }, {
limit: 2,
});
const entries = await collect(iterator);
assertEquals(entries, [
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
+ { key: ["a", "b"], value: 1, versionstamp },
+ { key: ["a", "c"], value: 2, versionstamp },
]);
const cursor = iterator.cursor;
@@ -985,13 +992,13 @@ dbTest("list range with manual cursor", async (db) => {
});
const entries2 = await collect(iterator2);
assertEquals(entries2, [
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
+ { key: ["a", "d"], value: 3, versionstamp },
+ { key: ["a", "e"], value: 4, versionstamp },
]);
});
dbTest("list range with manual cursor reverse", async (db) => {
- await setupData(db);
+ const versionstamp = await setupData(db);
const iterator = db.list({ start: ["a", "b"], end: ["a", "z"] }, {
limit: 2,
@@ -999,8 +1006,8 @@ dbTest("list range with manual cursor reverse", async (db) => {
});
const entries = await collect(iterator);
assertEquals(entries, [
- { key: ["a", "e"], value: 4, versionstamp: "00000000000000010000" },
- { key: ["a", "d"], value: 3, versionstamp: "00000000000000010000" },
+ { key: ["a", "e"], value: 4, versionstamp },
+ { key: ["a", "d"], value: 3, versionstamp },
]);
const cursor = iterator.cursor;
@@ -1010,8 +1017,8 @@ dbTest("list range with manual cursor reverse", async (db) => {
});
const entries2 = await collect(iterator2);
assertEquals(entries2, [
- { key: ["a", "c"], value: 2, versionstamp: "00000000000000010000" },
- { key: ["a", "b"], value: 1, versionstamp: "00000000000000010000" },
+ { key: ["a", "c"], value: 2, versionstamp },
+ { key: ["a", "b"], value: 1, versionstamp },
]);
});
@@ -1110,12 +1117,12 @@ dbTest("key size limit", async (db) => {
const lastValidKey = new Uint8Array(2046).fill(1);
const firstInvalidKey = new Uint8Array(2047).fill(1);
- await db.set([lastValidKey], 1);
+ const res = await db.set([lastValidKey], 1);
assertEquals(await db.get([lastValidKey]), {
key: [lastValidKey],
value: 1,
- versionstamp: "00000000000000010000",
+ versionstamp: res.versionstamp,
});
await assertRejects(
@@ -1135,11 +1142,11 @@ dbTest("value size limit", async (db) => {
const lastValidValue = new Uint8Array(65536);
const firstInvalidValue = new Uint8Array(65537);
- await db.set(["a"], lastValidValue);
+ const res = await db.set(["a"], lastValidValue);
assertEquals(await db.get(["a"]), {
key: ["a"],
value: lastValidValue,
- versionstamp: "00000000000000010000",
+ versionstamp: res.versionstamp,
});
await assertRejects(
@@ -1415,21 +1422,17 @@ for (const { name, value } of VALUE_CASES) {
queueTest(`listenQueue and enqueue ${name}`, async (db) => {
const numEnqueues = 10;
let count = 0;
- const promises: Deferred<void>[] = [];
- const dequeuedMessages: unknown[] = [];
+ const promises: Deferred<unknown>[] = [];
const listeners: Promise<void>[] = [];
- listeners.push(db.listenQueue((msg) => {
- dequeuedMessages.push(msg);
- promises[count++].resolve();
+ listeners.push(db.listenQueue((msg: unknown) => {
+ promises[count++].resolve(msg);
}));
try {
for (let i = 0; i < numEnqueues; i++) {
promises.push(deferred());
await db.enqueue(value);
}
- for (let i = 0; i < numEnqueues; i++) {
- await promises[i];
- }
+ const dequeuedMessages = await Promise.all(promises);
for (let i = 0; i < numEnqueues; i++) {
assertEquals(dequeuedMessages[i], value);
}
@@ -1445,7 +1448,7 @@ for (const { name, value } of VALUE_CASES) {
queueTest("queue mixed types", async (db) => {
let promise: Deferred<void>;
let dequeuedMessage: unknown = null;
- const listener = db.listenQueue((msg) => {
+ const listener = db.listenQueue((msg: unknown) => {
dequeuedMessage = msg;
promise.resolve();
});
@@ -2066,25 +2069,16 @@ Deno.test({
const db = await Deno.openKv(
"http://localhost:4545/kv_remote_authorize_invalid_format",
);
- let ok = false;
- try {
- await db.set(["some-key"], 1);
- } catch (e) {
- if (
- e.name === "TypeError" &&
- e.message.startsWith("Metadata error: Failed to decode metadata: ")
- ) {
- ok = true;
- } else {
- throw e;
- }
- } finally {
- db.close();
- }
- if (!ok) {
- throw new Error("did not get expected error");
- }
+ await assertRejects(
+ async () => {
+ await db.set(["some-key"], 1);
+ },
+ Error,
+ "Failed to parse metadata: ",
+ );
+
+ db.close();
},
});
@@ -2094,24 +2088,15 @@ Deno.test({
const db = await Deno.openKv(
"http://localhost:4545/kv_remote_authorize_invalid_version",
);
- let ok = false;
- try {
- await db.set(["some-key"], 1);
- } catch (e) {
- if (
- e.name === "TypeError" &&
- e.message === "Metadata error: Unsupported metadata version: 2"
- ) {
- ok = true;
- } else {
- throw e;
- }
- } finally {
- db.close();
- }
- if (!ok) {
- throw new Error("did not get expected error");
- }
+ await assertRejects(
+ async () => {
+ await db.set(["some-key"], 1);
+ },
+ Error,
+ "Failed to parse metadata: unsupported metadata version: 1000",
+ );
+
+ db.close();
},
});
diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml
index 47bf11177..c85d5f7a8 100644
--- a/ext/kv/Cargo.toml
+++ b/ext/kv/Cargo.toml
@@ -19,8 +19,13 @@ async-trait.workspace = true
base64.workspace = true
chrono.workspace = true
deno_core.workspace = true
+deno_fetch.workspace = true
deno_node.workspace = true
+deno_tls.workspace = true
deno_unsync = "0.1.1"
+denokv_proto.workspace = true
+denokv_remote.workspace = true
+denokv_sqlite.workspace = true
hex.workspace = true
log.workspace = true
num-bigint.workspace = true
diff --git a/ext/kv/README.md b/ext/kv/README.md
index f5c2de9ed..21c8e9e72 100644
--- a/ext/kv/README.md
+++ b/ext/kv/README.md
@@ -8,74 +8,19 @@ please read the [manual](https://deno.land/manual/runtime/kv).
Deno KV has a pluggable storage interface that supports multiple backends:
- SQLite - backed by a local SQLite database. This backend is suitable for
- development and is the default when running locally.
+ development and is the default when running locally. It is implemented in the
+ [denokv_sqlite crate](https://github.com/denoland/denokv/blob/main/sqlite).
- Remote - backed by a remote service that implements the
[KV Connect](#kv-connect) protocol, for example
[Deno Deploy](https://deno.com/deploy).
-Additional backends can be added by implementing the `DatabaseHandler` trait.
+Additional backends can be added by implementing the `Database` trait.
## KV Connect
-The KV Connect protocol has separate control and data planes to maximize
-throughput and minimize latency. _Metadata Exchange_ and _Data Path_ are the two
-sub-protocols that are used when talking to a KV Connect-compatible service.
-
-### Metadata Exchange
-
-To connect to a KV Connect service, the user provides an HTTP or HTTPS URL to
-`Deno.openKv`. A background task is then spawned to periodically make HTTP POST
-requests to the provided URL to refresh database metadata.
-
-The HTTP `Authorization` header is included and have the format
-`Bearer <access-token>`. The `<access-token>` is a static token issued by the
-service provider. For Deno Deploy, this is the personal access token generated
-from the dashboard. You can specify the access token with the environment
-variable `DENO_KV_ACCESS_TOKEN`.
-
-Request body is currently unused. The response is a JSON message that satisfies
-the [JSON Schema](https://json-schema.org/) definition in
-`cli/schemas/kv-metadata-exchange-response.v1.json`.
-
-Semantics of the response fields:
-
-- `version`: Protocol version. The only supported value is `1`.
-- `databaseId`: UUID of the database.
-- `endpoints`: Data plane endpoints that can serve requests to the database,
- along with their consistency levels.
-- `token`: An ephemeral authentication token that must be included in all
- requests to the data plane. This value is an opaque string and the client
- should not depend on its format.
-- `expiresAt`: The time at which the token expires. Encoded as an ISO 8601
- string.
-
-### Data Path
-
-After the first metadata exchange has completed, the client can talk to the data
-plane endpoints listed in the `endpoints` field using a Protobuf-over-HTTP
-protocol called the _Data Path_. The Protobuf messages are defined in
-`proto/datapath.proto`.
-
-Two sub-endpoints are available under a data plane endpoint URL:
-
-- `POST /snapshot_read`: Used for read operations: `kv.get()` and
- `kv.getMany()`.
- - **Request type**: `SnapshotRead`
- - **Response type**: `SnapshotReadOutput`
-- `POST /atomic_write`: Used for write operations: `kv.set()` and
- `kv.atomic().commit()`.
- - **Request type**: `AtomicWrite`
- - **Response type**: `AtomicWriteOutput`
-
-An HTTP `Authorization` header in the format `Bearer <ephemeral-token>` must be
-included in all requests to the data plane. The value of `<ephemeral-token>` is
-the `token` field from the metadata exchange response.
-
-### Error handling
-
-All non-client errors (i.e. network errors and HTTP 5xx status codes) are
-handled by retrying the request. Randomized exponential backoff is applied to
-each retry.
-
-Client errors cannot be recovered by retrying. A JavaScript exception is
-generated for each of those errors.
+The KV Connect protocol allows the Deno CLI to communicate with a remote KV
+database. The
+[specification for the protocol](https://github.com/denoland/denokv/blob/main/proto/kv-connect.md),
+and the
+[protobuf definitions](https://github.com/denoland/denokv/blob/main/proto/schema/datapath.proto)
+can be found in the `denokv` repository, under the `proto` directory.
diff --git a/ext/kv/build.rs b/ext/kv/build.rs
deleted file mode 100644
index eba8a20f7..000000000
--- a/ext/kv/build.rs
+++ /dev/null
@@ -1,19 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-use std::env;
-use std::io;
-use std::path::PathBuf;
-
-fn main() -> io::Result<()> {
- println!("cargo:rerun-if-changed=./proto");
-
- let descriptor_path =
- PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
-
- prost_build::Config::new()
- .file_descriptor_set_path(&descriptor_path)
- .compile_well_known_types()
- .compile_protos(&["proto/datapath.proto"], &["proto/"])?;
-
- Ok(())
-}
diff --git a/ext/kv/codec.rs b/ext/kv/codec.rs
deleted file mode 100644
index 522c2e9bc..000000000
--- a/ext/kv/codec.rs
+++ /dev/null
@@ -1,543 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-// Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/main/foundationdb/src/tuple/pack.rs
-
-use crate::Key;
-use crate::KeyPart;
-
-//const NIL: u8 = 0x00;
-const BYTES: u8 = 0x01;
-const STRING: u8 = 0x02;
-//const NESTED: u8 = 0x05;
-const NEGINTSTART: u8 = 0x0b;
-const INTZERO: u8 = 0x14;
-const POSINTEND: u8 = 0x1d;
-//const FLOAT: u8 = 0x20;
-const DOUBLE: u8 = 0x21;
-const FALSE: u8 = 0x26;
-const TRUE: u8 = 0x27;
-
-const ESCAPE: u8 = 0xff;
-
-const CANONICAL_NAN_POS: u64 = 0x7ff8000000000000u64;
-const CANONICAL_NAN_NEG: u64 = 0xfff8000000000000u64;
-
-pub fn canonicalize_f64(n: f64) -> f64 {
- if n.is_nan() {
- if n.is_sign_negative() {
- f64::from_bits(CANONICAL_NAN_NEG)
- } else {
- f64::from_bits(CANONICAL_NAN_POS)
- }
- } else {
- n
- }
-}
-
-pub fn encode_key(key: &Key) -> std::io::Result<Vec<u8>> {
- let mut output: Vec<u8> = vec![];
- for part in &key.0 {
- match part {
- KeyPart::String(key) => {
- output.push(STRING);
- escape_raw_bytes_into(&mut output, key.as_bytes());
- output.push(0);
- }
- KeyPart::Int(key) => {
- bigint::encode_into(&mut output, key)?;
- }
- KeyPart::Float(key) => {
- double::encode_into(&mut output, *key);
- }
- KeyPart::Bytes(key) => {
- output.push(BYTES);
- escape_raw_bytes_into(&mut output, key);
- output.push(0);
- }
- KeyPart::False => {
- output.push(FALSE);
- }
- KeyPart::True => {
- output.push(TRUE);
- }
- }
- }
- Ok(output)
-}
-
-pub fn decode_key(mut bytes: &[u8]) -> std::io::Result<Key> {
- let mut key = Key(vec![]);
- while !bytes.is_empty() {
- let tag = bytes[0];
- bytes = &bytes[1..];
-
- let next_bytes = match tag {
- self::STRING => {
- let (next_bytes, data) = parse_slice(bytes)?;
- let data = String::from_utf8(data).map_err(|_| {
- std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid utf8")
- })?;
- key.0.push(KeyPart::String(data));
- next_bytes
- }
- self::NEGINTSTART..=self::POSINTEND => {
- let (next_bytes, data) = bigint::decode_from(bytes, tag)?;
- key.0.push(KeyPart::Int(data));
- next_bytes
- }
- self::DOUBLE => {
- let (next_bytes, data) = double::decode_from(bytes)?;
- key.0.push(KeyPart::Float(data));
- next_bytes
- }
- self::BYTES => {
- let (next_bytes, data) = parse_slice(bytes)?;
- key.0.push(KeyPart::Bytes(data));
- next_bytes
- }
- self::FALSE => {
- key.0.push(KeyPart::False);
- bytes
- }
- self::TRUE => {
- key.0.push(KeyPart::True);
- bytes
- }
- _ => {
- return Err(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- "invalid tag",
- ))
- }
- };
-
- bytes = next_bytes;
- }
- Ok(key)
-}
-
-fn escape_raw_bytes_into(out: &mut Vec<u8>, x: &[u8]) {
- for &b in x {
- out.push(b);
- if b == 0 {
- out.push(ESCAPE);
- }
- }
-}
-
-mod bigint {
- use num_bigint::BigInt;
- use num_bigint::Sign;
-
- use super::parse_byte;
- use super::parse_bytes;
- const MAX_SZ: usize = 8;
-
- // Ported from https://github.com/foundationdb-rs/foundationdb-rs/blob/7415e116d5d96c2630976058de28e439eed7e809/foundationdb/src/tuple/pack.rs#L575
- pub fn encode_into(out: &mut Vec<u8>, key: &BigInt) -> std::io::Result<()> {
- if key.sign() == Sign::NoSign {
- out.push(super::INTZERO);
- return Ok(());
- }
- let (sign, mut bytes) = key.to_bytes_be();
- let n = bytes.len();
- match sign {
- Sign::Minus => {
- if n <= MAX_SZ {
- out.push(super::INTZERO - n as u8);
- } else {
- out.extend_from_slice(&[super::NEGINTSTART, bigint_n(n)? ^ 0xff]);
- }
- invert(&mut bytes);
- out.extend_from_slice(&bytes);
- }
- Sign::NoSign => unreachable!(),
- Sign::Plus => {
- if n <= MAX_SZ {
- out.push(super::INTZERO + n as u8);
- } else {
- out.extend_from_slice(&[super::POSINTEND, bigint_n(n)?]);
- }
- out.extend_from_slice(&bytes);
- }
- }
- Ok(())
- }
-
- pub fn decode_from(
- input: &[u8],
- tag: u8,
- ) -> std::io::Result<(&[u8], BigInt)> {
- if super::INTZERO <= tag && tag <= super::INTZERO + MAX_SZ as u8 {
- let n = (tag - super::INTZERO) as usize;
- let (input, bytes) = parse_bytes(input, n)?;
- Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes)))
- } else if super::INTZERO - MAX_SZ as u8 <= tag && tag < super::INTZERO {
- let n = (super::INTZERO - tag) as usize;
- let (input, bytes) = parse_bytes(input, n)?;
- Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes))))
- } else if tag == super::NEGINTSTART {
- let (input, raw_length) = parse_byte(input)?;
- let n = usize::from(raw_length ^ 0xff);
- let (input, bytes) = parse_bytes(input, n)?;
- Ok((input, BigInt::from_bytes_be(Sign::Minus, &inverted(bytes))))
- } else if tag == super::POSINTEND {
- let (input, raw_length) = parse_byte(input)?;
- let n: usize = usize::from(raw_length);
- let (input, bytes) = parse_bytes(input, n)?;
- Ok((input, BigInt::from_bytes_be(Sign::Plus, bytes)))
- } else {
- Err(std::io::Error::new(
- std::io::ErrorKind::InvalidData,
- format!("unknown bigint tag: {}", tag),
- ))
- }
- }
-
- fn invert(bytes: &mut [u8]) {
- // The ones' complement of a binary number is defined as the value
- // obtained by inverting all the bits in the binary representation
- // of the number (swapping 0s for 1s and vice versa).
- for byte in bytes.iter_mut() {
- *byte = !*byte;
- }
- }
-
- fn inverted(bytes: &[u8]) -> Vec<u8> {
- // The ones' complement of a binary number is defined as the value
- // obtained by inverting all the bits in the binary representation
- // of the number (swapping 0s for 1s and vice versa).
- bytes.iter().map(|byte| !*byte).collect()
- }
-
- fn bigint_n(n: usize) -> std::io::Result<u8> {
- u8::try_from(n).map_err(|_| {
- std::io::Error::new(
- std::io::ErrorKind::InvalidInput,
- "BigUint requires more than 255 bytes to be represented",
- )
- })
- }
-}
-
-mod double {
- macro_rules! sign_bit {
- ($type:ident) => {
- (1 << (std::mem::size_of::<$type>() * 8 - 1))
- };
- }
-
- fn f64_to_ux_be_bytes(f: f64) -> [u8; 8] {
- let u = if f.is_sign_negative() {
- f.to_bits() ^ ::std::u64::MAX
- } else {
- f.to_bits() ^ sign_bit!(u64)
- };
- u.to_be_bytes()
- }
-
- pub fn encode_into(out: &mut Vec<u8>, x: f64) {
- out.push(super::DOUBLE);
- out.extend_from_slice(&f64_to_ux_be_bytes(super::canonicalize_f64(x)));
- }
-
- pub fn decode_from(input: &[u8]) -> std::io::Result<(&[u8], f64)> {
- let (input, bytes) = super::parse_bytes(input, 8)?;
- let mut arr = [0u8; 8];
- arr.copy_from_slice(bytes);
- let u = u64::from_be_bytes(arr);
- Ok((
- input,
- f64::from_bits(if (u & sign_bit!(u64)) == 0 {
- u ^ ::std::u64::MAX
- } else {
- u ^ sign_bit!(u64)
- }),
- ))
- }
-}
-
-#[inline]
-fn parse_bytes(input: &[u8], num: usize) -> std::io::Result<(&[u8], &[u8])> {
- if input.len() < num {
- Err(std::io::ErrorKind::UnexpectedEof.into())
- } else {
- Ok((&input[num..], &input[..num]))
- }
-}
-
-#[inline]
-fn parse_byte(input: &[u8]) -> std::io::Result<(&[u8], u8)> {
- if input.is_empty() {
- Err(std::io::ErrorKind::UnexpectedEof.into())
- } else {
- Ok((&input[1..], input[0]))
- }
-}
-
-fn parse_slice(input: &[u8]) -> std::io::Result<(&[u8], Vec<u8>)> {
- let mut output: Vec<u8> = Vec::new();
- let mut i = 0usize;
-
- while i < input.len() {
- let byte = input[i];
- i += 1;
-
- if byte == 0 {
- if input.get(i).copied() == Some(ESCAPE) {
- output.push(0);
- i += 1;
- continue;
- } else {
- return Ok((&input[i..], output));
- }
- }
-
- output.push(byte);
- }
-
- Err(std::io::ErrorKind::UnexpectedEof.into())
-}
-
-#[cfg(test)]
-mod tests {
- use num_bigint::BigInt;
- use std::cmp::Ordering;
-
- use crate::Key;
- use crate::KeyPart;
-
- use super::decode_key;
- use super::encode_key;
-
- fn roundtrip(key: Key) {
- let bytes = encode_key(&key).unwrap();
- let decoded = decode_key(&bytes).unwrap();
- assert_eq!(&key, &decoded);
- assert_eq!(format!("{:?}", key), format!("{:?}", decoded));
- }
-
- fn check_order(a: Key, b: Key, expected: Ordering) {
- let a_bytes = encode_key(&a).unwrap();
- let b_bytes = encode_key(&b).unwrap();
-
- assert_eq!(a.cmp(&b), expected);
- assert_eq!(a_bytes.cmp(&b_bytes), expected);
- }
-
- fn check_bijection(key: Key, serialized: &[u8]) {
- let bytes = encode_key(&key).unwrap();
- assert_eq!(&bytes[..], serialized);
- let decoded = decode_key(serialized).unwrap();
- assert_eq!(&key, &decoded);
- }
-
- #[test]
- fn simple_roundtrip() {
- roundtrip(Key(vec![
- KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00]),
- KeyPart::String("foo".to_string()),
- KeyPart::Float(-f64::NAN),
- KeyPart::Float(-f64::INFINITY),
- KeyPart::Float(-42.1),
- KeyPart::Float(-0.0),
- KeyPart::Float(0.0),
- KeyPart::Float(42.1),
- KeyPart::Float(f64::INFINITY),
- KeyPart::Float(f64::NAN),
- KeyPart::Int(BigInt::from(-10000)),
- KeyPart::Int(BigInt::from(-1)),
- KeyPart::Int(BigInt::from(0)),
- KeyPart::Int(BigInt::from(1)),
- KeyPart::Int(BigInt::from(10000)),
- KeyPart::False,
- KeyPart::True,
- ]));
- }
-
- #[test]
- #[rustfmt::skip]
- fn order_bytes() {
- check_order(
- Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
- Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
- Ordering::Equal,
- );
-
- check_order(
- Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
- Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]),
- Ordering::Less,
- );
-
- check_order(
- Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x01])]),
- Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
- Ordering::Greater,
- );
-
- check_order(
- Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
- Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]),
- Ordering::Less,
- );
-
- check_order(
- Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00, 0x00])]),
- Key(vec![KeyPart::Bytes(vec![0, 1, 2, 3, 0xff, 0x00, 0xff, 0x00])]),
- Ordering::Greater,
- );
- }
-
- #[test]
- #[rustfmt::skip]
- fn order_tags() {
- check_order(
- Key(vec![KeyPart::Bytes(vec![])]),
- Key(vec![KeyPart::String("".into())]),
- Ordering::Less,
- );
-
- check_order(
- Key(vec![KeyPart::String("".into())]),
- Key(vec![KeyPart::Int(BigInt::from(0))]),
- Ordering::Less,
- );
-
- check_order(
- Key(vec![KeyPart::Int(BigInt::from(0))]),
- Key(vec![KeyPart::Float(0.0)]),
- Ordering::Less,
- );
-
- check_order(
- Key(vec![KeyPart::Float(0.0)]),
- Key(vec![KeyPart::False]),
- Ordering::Less,
- );
-
- check_order(
- Key(vec![KeyPart::False]),
- Key(vec![KeyPart::True]),
- Ordering::Less,
- );
-
- check_order(
- Key(vec![KeyPart::True]),
- Key(vec![KeyPart::Bytes(vec![])]),
- Ordering::Greater,
- );
- }
-
- #[test]
- #[rustfmt::skip]
- fn order_floats() {
- check_order(
- Key(vec![KeyPart::Float(-f64::NAN)]),
- Key(vec![KeyPart::Float(-f64::INFINITY)]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Float(-f64::INFINITY)]),
- Key(vec![KeyPart::Float(-10.0)]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Float(-10.0)]),
- Key(vec![KeyPart::Float(-0.0)]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Float(-0.0)]),
- Key(vec![KeyPart::Float(0.0)]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Float(0.0)]),
- Key(vec![KeyPart::Float(10.0)]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Float(10.0)]),
- Key(vec![KeyPart::Float(f64::INFINITY)]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Float(f64::INFINITY)]),
- Key(vec![KeyPart::Float(f64::NAN)]),
- Ordering::Less,
- );
- }
-
- #[test]
- #[rustfmt::skip]
- fn order_ints() {
- check_order(
- Key(vec![KeyPart::Int(BigInt::from(-10000))]),
- Key(vec![KeyPart::Int(BigInt::from(-100))]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Int(BigInt::from(-100))]),
- Key(vec![KeyPart::Int(BigInt::from(-1))]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Int(BigInt::from(-1))]),
- Key(vec![KeyPart::Int(BigInt::from(0))]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Int(BigInt::from(0))]),
- Key(vec![KeyPart::Int(BigInt::from(1))]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Int(BigInt::from(1))]),
- Key(vec![KeyPart::Int(BigInt::from(100))]),
- Ordering::Less,
- );
- check_order(
- Key(vec![KeyPart::Int(BigInt::from(100))]),
- Key(vec![KeyPart::Int(BigInt::from(10000))]),
- Ordering::Less,
- );
- }
-
- #[test]
- #[rustfmt::skip]
- fn float_canonicalization() {
- let key1 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000001))]);
- let key2 = Key(vec![KeyPart::Float(f64::from_bits(0x7ff8000000000002))]);
-
- assert_eq!(key1, key2);
- assert_eq!(encode_key(&key1).unwrap(), encode_key(&key2).unwrap());
- }
-
- #[test]
- #[rustfmt::skip]
- fn explicit_bijection() {
- // string
- check_bijection(
- Key(vec![KeyPart::String("hello".into())]),
- &[0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00],
- );
-
- // zero byte escape
- check_bijection(
- Key(vec![KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08])]),
- &[0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00],
- );
-
- // array
- check_bijection(
- Key(vec![
- KeyPart::String("hello".into()),
- KeyPart::Bytes(vec![0x01, 0x02, 0x00, 0x07, 0x08]),
- ]),
- &[
- 0x02, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x00, /* string */
- 0x01, 0x01, 0x02, 0x00, 0xff, 0x07, 0x08, 0x00, /* bytes */
- ],
- );
- }
-}
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs
index 9084cc1bf..b772d26b8 100644
--- a/ext/kv/dynamic.rs
+++ b/ext/kv/dynamic.rs
@@ -7,17 +7,17 @@ use crate::remote::RemoteDbHandlerPermissions;
use crate::sqlite::SqliteDbHandler;
use crate::sqlite::SqliteDbHandlerPermissions;
use crate::AtomicWrite;
-use crate::CommitResult;
use crate::Database;
use crate::DatabaseHandler;
use crate::QueueMessageHandle;
use crate::ReadRange;
-use crate::ReadRangeOutput;
use crate::SnapshotReadOptions;
use async_trait::async_trait;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::OpState;
+use denokv_proto::CommitResult;
+use denokv_proto::ReadRangeOutput;
pub struct MultiBackendDbHandler {
backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
@@ -34,15 +34,20 @@ impl MultiBackendDbHandler {
P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static,
>(
default_storage_dir: Option<std::path::PathBuf>,
+ versionstamp_rng_seed: Option<u64>,
+ http_options: crate::remote::HttpOptions,
) -> Self {
Self::new(vec![
(
&["https://", "http://"],
- Box::new(crate::remote::RemoteDbHandler::<P>::new()),
+ Box::new(crate::remote::RemoteDbHandler::<P>::new(http_options)),
),
(
&[""],
- Box::new(SqliteDbHandler::<P>::new(default_storage_dir)),
+ Box::new(SqliteDbHandler::<P>::new(
+ default_storage_dir,
+ versionstamp_rng_seed,
+ )),
),
])
}
@@ -118,20 +123,17 @@ where
pub trait DynamicDb {
async fn dyn_snapshot_read(
&self,
- state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError>;
async fn dyn_atomic_write(
&self,
- state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError>;
async fn dyn_dequeue_next_message(
&self,
- state: Rc<RefCell<OpState>>,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;
fn dyn_close(&self);
@@ -143,26 +145,23 @@ impl Database for Box<dyn DynamicDb> {
async fn snapshot_read(
&self,
- state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
- (**self).dyn_snapshot_read(state, requests, options).await
+ (**self).dyn_snapshot_read(requests, options).await
}
async fn atomic_write(
&self,
- state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
- (**self).dyn_atomic_write(state, write).await
+ (**self).dyn_atomic_write(write).await
}
async fn dequeue_next_message(
&self,
- state: Rc<RefCell<OpState>>,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
- (**self).dyn_dequeue_next_message(state).await
+ (**self).dyn_dequeue_next_message().await
}
fn close(&self) {
@@ -178,28 +177,25 @@ where
{
async fn dyn_snapshot_read(
&self,
- state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
- Ok(self.snapshot_read(state, requests, options).await?)
+ Ok(self.snapshot_read(requests, options).await?)
}
async fn dyn_atomic_write(
&self,
- state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
- Ok(self.atomic_write(state, write).await?)
+ Ok(self.atomic_write(write).await?)
}
async fn dyn_dequeue_next_message(
&self,
- state: Rc<RefCell<OpState>>,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
Ok(
self
- .dequeue_next_message(state)
+ .dequeue_next_message()
.await?
.map(|x| Box::new(x) as Box<dyn QueueMessageHandle>),
)
@@ -209,13 +205,3 @@ where
self.close()
}
}
-
-#[async_trait(?Send)]
-impl QueueMessageHandle for Box<dyn QueueMessageHandle> {
- async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
- (**self).take_payload().await
- }
- async fn finish(&self, success: bool) -> Result<(), AnyError> {
- (**self).finish(success).await
- }
-}
diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs
index 1acf3ce16..d7aa68368 100644
--- a/ext/kv/interface.rs
+++ b/ext/kv/interface.rs
@@ -1,16 +1,12 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
-use std::cmp::Ordering;
-use std::num::NonZeroU32;
use std::rc::Rc;
use async_trait::async_trait;
use deno_core::error::AnyError;
use deno_core::OpState;
-use num_bigint::BigInt;
-
-use crate::codec::canonicalize_f64;
+use denokv_proto::Database;
#[async_trait(?Send)]
pub trait DatabaseHandler {
@@ -22,312 +18,3 @@ pub trait DatabaseHandler {
path: Option<String>,
) -> Result<Self::DB, AnyError>;
}
-
-#[async_trait(?Send)]
-pub trait Database {
- type QMH: QueueMessageHandle + 'static;
-
- async fn snapshot_read(
- &self,
- state: Rc<RefCell<OpState>>,
- requests: Vec<ReadRange>,
- options: SnapshotReadOptions,
- ) -> Result<Vec<ReadRangeOutput>, AnyError>;
-
- async fn atomic_write(
- &self,
- state: Rc<RefCell<OpState>>,
- write: AtomicWrite,
- ) -> Result<Option<CommitResult>, AnyError>;
-
- async fn dequeue_next_message(
- &self,
- state: Rc<RefCell<OpState>>,
- ) -> Result<Option<Self::QMH>, AnyError>;
-
- fn close(&self);
-}
-
-#[async_trait(?Send)]
-pub trait QueueMessageHandle {
- async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError>;
- async fn finish(&self, success: bool) -> Result<(), AnyError>;
-}
-
-/// Options for a snapshot read.
-pub struct SnapshotReadOptions {
- pub consistency: Consistency,
-}
-
-/// The consistency of a read.
-#[derive(Eq, PartialEq, Copy, Clone, Debug)]
-pub enum Consistency {
- Strong,
- Eventual,
-}
-
-/// A key is for a KV pair. It is a vector of KeyParts.
-///
-/// The ordering of the keys is defined by the ordering of the KeyParts. The
-/// first KeyPart is the most significant, and the last KeyPart is the least
-/// significant.
-#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug)]
-pub struct Key(pub Vec<KeyPart>);
-
-/// A key part is single item in a key. It can be a boolean, a double float, a
-/// variable precision signed integer, a UTF-8 string, or an arbitrary byte
-/// array.
-///
-/// The ordering of a KeyPart is dependent on the type of the KeyPart.
-///
-/// Between different types, the ordering is as follows: arbitrary byte array <
-/// UTF-8 string < variable precision signed integer < double float < false < true.
-///
-/// Within a type, the ordering is as follows:
-/// - For a **boolean**, false is less than true.
-/// - For a **double float**, the ordering must follow -NaN < -Infinity < -100.0 < -1.0 < -0.5 < -0.0 < 0.0 < 0.5 < 1.0 < 100.0 < Infinity < NaN.
-/// - For a **variable precision signed integer**, the ordering must follow mathematical ordering.
-/// - For a **UTF-8 string**, the ordering must follow the UTF-8 byte ordering.
-/// - For an **arbitrary byte array**, the ordering must follow the byte ordering.
-///
-/// This means that the key part `1.0` is less than the key part `2.0`, but is
-/// greater than the key part `0n`, because `1.0` is a double float and `0n`
-/// is a variable precision signed integer, and the ordering types obviously has
-/// precedence over the ordering within a type.
-#[derive(Clone, Debug)]
-pub enum KeyPart {
- Bytes(Vec<u8>),
- String(String),
- Int(BigInt),
- Float(f64),
- False,
- True,
-}
-
-impl KeyPart {
- fn tag_ordering(&self) -> u8 {
- match self {
- KeyPart::Bytes(_) => 0,
- KeyPart::String(_) => 1,
- KeyPart::Int(_) => 2,
- KeyPart::Float(_) => 3,
- KeyPart::False => 4,
- KeyPart::True => 5,
- }
- }
-}
-
-impl Eq for KeyPart {}
-
-impl PartialEq for KeyPart {
- fn eq(&self, other: &Self) -> bool {
- self.cmp(other) == Ordering::Equal
- }
-}
-
-impl Ord for KeyPart {
- fn cmp(&self, other: &Self) -> Ordering {
- match (self, other) {
- (KeyPart::Bytes(b1), KeyPart::Bytes(b2)) => b1.cmp(b2),
- (KeyPart::String(s1), KeyPart::String(s2)) => {
- s1.as_bytes().cmp(s2.as_bytes())
- }
- (KeyPart::Int(i1), KeyPart::Int(i2)) => i1.cmp(i2),
- (KeyPart::Float(f1), KeyPart::Float(f2)) => {
- canonicalize_f64(*f1).total_cmp(&canonicalize_f64(*f2))
- }
- _ => self.tag_ordering().cmp(&other.tag_ordering()),
- }
- }
-}
-
-impl PartialOrd for KeyPart {
- fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- Some(self.cmp(other))
- }
-}
-
-/// A request to read a range of keys from the database. If `end` is `None`,
-/// then the range is from `start` shall also be used as the end of the range.
-///
-/// The range is inclusive of the start and exclusive of the end. The start may
-/// not be greater than the end.
-///
-/// The range is limited to `limit` number of entries.
-pub struct ReadRange {
- pub start: Vec<u8>,
- pub end: Vec<u8>,
- pub limit: NonZeroU32,
- pub reverse: bool,
-}
-
-/// A response to a `ReadRange` request.
-pub struct ReadRangeOutput {
- pub entries: Vec<KvEntry>,
-}
-
-/// A versionstamp is a 10 byte array that is used to represent the version of
-/// a key in the database.
-type Versionstamp = [u8; 10];
-
-/// A key-value entry with a versionstamp.
-pub struct KvEntry {
- pub key: Vec<u8>,
- pub value: Value,
- pub versionstamp: Versionstamp,
-}
-
-/// A serialized value for a KV pair as stored in the database. All values
-/// **can** be serialized into the V8 representation, but not all values are.
-///
-/// The V8 representation is an opaque byte array that is only meaningful to
-/// the V8 engine. It is guaranteed to be backwards compatible. Because this
-/// representation is opaque, it is not possible to inspect or modify the value
-/// without deserializing it.
-///
-/// The inability to inspect or modify the value without deserializing it means
-/// that these values can not be quickly modified when performing atomic
-/// read-modify-write operations on the database (because the database may not
-/// have the ability to deserialize the V8 value into a modifiable value).
-///
-/// Because of this constraint, there are more specialized representations for
-/// certain types of values that can be used in atomic read-modify-write
-/// operations. These specialized representations are:
-///
-/// - **Bytes**: an arbitrary byte array.
-/// - **U64**: a 64-bit unsigned integer.
-pub enum Value {
- V8(Vec<u8>),
- Bytes(Vec<u8>),
- U64(u64),
-}
-
-/// A request to perform an atomic check-modify-write operation on the database.
-///
-/// The operation is performed atomically, meaning that the operation will
-/// either succeed or fail. If the operation fails, then the database will be
-/// left in the same state as before the operation was attempted. If the
-/// operation succeeds, then the database will be left in a new state.
-///
-/// The operation is performed by first checking the database for the current
-/// state of the keys, defined by the `checks` field. If the current state of
-/// the keys does not match the expected state, then the operation fails. If
-/// the current state of the keys matches the expected state, then the
-/// mutations are applied to the database.
-///
-/// All checks and mutations are performed atomically.
-///
-/// The mutations are performed in the order that they are specified in the
-/// `mutations` field. The order of checks is not specified, and is also not
-/// important because this ordering is un-observable.
-pub struct AtomicWrite {
- pub checks: Vec<KvCheck>,
- pub mutations: Vec<KvMutation>,
- pub enqueues: Vec<Enqueue>,
-}
-
-/// A request to perform a check on a key in the database. The check is not
-/// performed on the value of the key, but rather on the versionstamp of the
-/// key.
-pub struct KvCheck {
- pub key: Vec<u8>,
- pub versionstamp: Option<Versionstamp>,
-}
-
-/// A request to perform a mutation on a key in the database. The mutation is
-/// performed on the value of the key.
-///
-/// The type of mutation is specified by the `kind` field. The action performed
-/// by each mutation kind is specified in the docs for [MutationKind].
-pub struct KvMutation {
- pub key: Vec<u8>,
- pub kind: MutationKind,
- pub expire_at: Option<u64>,
-}
-
-/// A request to enqueue a message to the database. This message is delivered
-/// to a listener of the queue at least once.
-///
-/// ## Retry
-///
-/// When the delivery of a message fails, it is retried for a finite number
-/// of times. Each retry happens after a backoff period. The backoff periods
-/// are specified by the `backoff_schedule` field in milliseconds. If
-/// unspecified, the default backoff schedule of the platform (CLI or Deploy)
-/// is used.
-///
-/// If all retry attempts failed, the message is written to the KV under all
-/// keys specified in `keys_if_undelivered`.
-pub struct Enqueue {
- pub payload: Vec<u8>,
- pub delay_ms: u64,
- pub keys_if_undelivered: Vec<Vec<u8>>,
- pub backoff_schedule: Option<Vec<u32>>,
-}
-
-/// The type of mutation to perform on a key in the database.
-///
-/// ## Set
-///
-/// The set mutation sets the value of the key to the specified value. It
-/// discards the previous value of the key, if any.
-///
-/// This operand supports all [Value] types.
-///
-/// ## Delete
-///
-/// The delete mutation deletes the value of the key.
-///
-/// ## Sum
-///
-/// The sum mutation adds the specified value to the existing value of the key.
-///
-/// This operand supports only value types [Value::U64]. The existing value in
-/// the database must match the type of the value specified in the mutation. If
-/// the key does not exist in the database, then the value specified in the
-/// mutation is used as the new value of the key.
-///
-/// ## Min
-///
-/// The min mutation sets the value of the key to the minimum of the existing
-/// value of the key and the specified value.
-///
-/// This operand supports only value types [Value::U64]. The existing value in
-/// the database must match the type of the value specified in the mutation. If
-/// the key does not exist in the database, then the value specified in the
-/// mutation is used as the new value of the key.
-///
-/// ## Max
-///
-/// The max mutation sets the value of the key to the maximum of the existing
-/// value of the key and the specified value.
-///
-/// This operand supports only value types [Value::U64]. The existing value in
-/// the database must match the type of the value specified in the mutation. If
-/// the key does not exist in the database, then the value specified in the
-/// mutation is used as the new value of the key.
-pub enum MutationKind {
- Set(Value),
- Delete,
- Sum(Value),
- Min(Value),
- Max(Value),
-}
-
-impl MutationKind {
- pub fn value(&self) -> Option<&Value> {
- match self {
- MutationKind::Set(value) => Some(value),
- MutationKind::Sum(value) => Some(value),
- MutationKind::Min(value) => Some(value),
- MutationKind::Max(value) => Some(value),
- MutationKind::Delete => None,
- }
- }
-}
-
-/// The result of a successful commit of an atomic write operation.
-pub struct CommitResult {
- /// The new versionstamp of the data that was committed.
- pub versionstamp: Versionstamp,
-}
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index fb68596fa..9e2273108 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -1,9 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-pub mod codec;
pub mod dynamic;
mod interface;
-mod proto;
pub mod remote;
pub mod sqlite;
mod time;
@@ -12,11 +10,12 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::num::NonZeroU32;
use std::rc::Rc;
+use std::time::Duration;
use base64::prelude::BASE64_URL_SAFE;
use base64::Engine;
-use codec::decode_key;
-use codec::encode_key;
+use chrono::DateTime;
+use chrono::Utc;
use deno_core::anyhow::Context;
use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
@@ -30,8 +29,26 @@ use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ToJsBuffer;
+use denokv_proto::decode_key;
+use denokv_proto::encode_key;
+use denokv_proto::AtomicWrite;
+use denokv_proto::Check;
+use denokv_proto::Consistency;
+use denokv_proto::Database;
+use denokv_proto::Enqueue;
+use denokv_proto::Key;
+use denokv_proto::KeyPart;
+use denokv_proto::KvEntry;
+use denokv_proto::KvValue;
+use denokv_proto::Mutation;
+use denokv_proto::MutationKind;
+use denokv_proto::QueueMessageHandle;
+use denokv_proto::ReadRange;
+use denokv_proto::SnapshotReadOptions;
+use log::debug;
use serde::Deserialize;
use serde::Serialize;
+use time::utc_now;
pub use crate::interface::*;
@@ -110,30 +127,26 @@ where
type KvKey = Vec<AnyValue>;
-impl From<AnyValue> for KeyPart {
- fn from(value: AnyValue) -> Self {
- match value {
- AnyValue::Bool(false) => KeyPart::False,
- AnyValue::Bool(true) => KeyPart::True,
- AnyValue::Number(n) => KeyPart::Float(n),
- AnyValue::BigInt(n) => KeyPart::Int(n),
- AnyValue::String(s) => KeyPart::String(s),
- AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
- AnyValue::RustBuffer(_) => unreachable!(),
- }
+fn key_part_from_v8(value: AnyValue) -> KeyPart {
+ match value {
+ AnyValue::Bool(false) => KeyPart::False,
+ AnyValue::Bool(true) => KeyPart::True,
+ AnyValue::Number(n) => KeyPart::Float(n),
+ AnyValue::BigInt(n) => KeyPart::Int(n),
+ AnyValue::String(s) => KeyPart::String(s),
+ AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
+ AnyValue::RustBuffer(_) => unreachable!(),
}
}
-impl From<KeyPart> for AnyValue {
- fn from(value: KeyPart) -> Self {
- match value {
- KeyPart::False => AnyValue::Bool(false),
- KeyPart::True => AnyValue::Bool(true),
- KeyPart::Float(n) => AnyValue::Number(n),
- KeyPart::Int(n) => AnyValue::BigInt(n),
- KeyPart::String(s) => AnyValue::String(s),
- KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()),
- }
+fn key_part_to_v8(value: KeyPart) -> AnyValue {
+ match value {
+ KeyPart::False => AnyValue::Bool(false),
+ KeyPart::True => AnyValue::Bool(true),
+ KeyPart::Float(n) => AnyValue::Number(n),
+ KeyPart::Int(n) => AnyValue::BigInt(n),
+ KeyPart::String(s) => AnyValue::String(s),
+ KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()),
}
}
@@ -153,25 +166,25 @@ enum ToV8Value {
U64(BigInt),
}
-impl TryFrom<FromV8Value> for Value {
+impl TryFrom<FromV8Value> for KvValue {
type Error = AnyError;
fn try_from(value: FromV8Value) -> Result<Self, AnyError> {
Ok(match value {
- FromV8Value::V8(buf) => Value::V8(buf.to_vec()),
- FromV8Value::Bytes(buf) => Value::Bytes(buf.to_vec()),
+ FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()),
+ FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()),
FromV8Value::U64(n) => {
- Value::U64(num_bigint::BigInt::from(n).try_into()?)
+ KvValue::U64(num_bigint::BigInt::from(n).try_into()?)
}
})
}
}
-impl From<Value> for ToV8Value {
- fn from(value: Value) -> Self {
+impl From<KvValue> for ToV8Value {
+ fn from(value: KvValue) -> Self {
match value {
- Value::V8(buf) => ToV8Value::V8(buf.into()),
- Value::Bytes(buf) => ToV8Value::Bytes(buf.into()),
- Value::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()),
+ KvValue::V8(buf) => ToV8Value::V8(buf.into()),
+ KvValue::Bytes(buf) => ToV8Value::Bytes(buf.into()),
+ KvValue::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()),
}
}
}
@@ -190,7 +203,7 @@ impl TryFrom<KvEntry> for ToV8KvEntry {
key: decode_key(&entry.key)?
.0
.into_iter()
- .map(Into::into)
+ .map(key_part_to_v8)
.collect(),
value: entry.value.into(),
versionstamp: hex::encode(entry.versionstamp).into(),
@@ -282,8 +295,7 @@ where
let opts = SnapshotReadOptions {
consistency: consistency.into(),
};
- let output_ranges =
- db.snapshot_read(state.clone(), read_ranges, opts).await?;
+ let output_ranges = db.snapshot_read(read_ranges, opts).await?;
let output_ranges = output_ranges
.into_iter()
.map(|x| {
@@ -302,7 +314,7 @@ struct QueueMessageResource<QPH: QueueMessageHandle + 'static> {
impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
fn name(&self) -> Cow<str> {
- "queue_message".into()
+ "queueMessage".into()
}
}
@@ -331,7 +343,7 @@ where
resource.db.clone()
};
- let Some(mut handle) = db.dequeue_next_message(state.clone()).await? else {
+ let Some(mut handle) = db.dequeue_next_message().await? else {
return Ok(None);
};
let payload = handle.take_payload().await?.into();
@@ -361,81 +373,81 @@ where
.map_err(|_| type_error("Queue message not found"))?
.handle
};
- handle.finish(success).await
+ // if we fail to finish the message, there is not much we can do and the
+ // message will be retried anyway, so we just ignore the error
+ if let Err(err) = handle.finish(success).await {
+ debug!("Failed to finish dequeued message: {}", err);
+ };
+ Ok(())
}
type V8KvCheck = (KvKey, Option<ByteString>);
-impl TryFrom<V8KvCheck> for KvCheck {
- type Error = AnyError;
- fn try_from(value: V8KvCheck) -> Result<Self, AnyError> {
- let versionstamp = match value.1 {
- Some(data) => {
- let mut out = [0u8; 10];
- hex::decode_to_slice(data, &mut out)
- .map_err(|_| type_error("invalid versionstamp"))?;
- Some(out)
- }
- None => None,
- };
- Ok(KvCheck {
- key: encode_v8_key(value.0)?,
- versionstamp,
- })
- }
+fn check_from_v8(value: V8KvCheck) -> Result<Check, AnyError> {
+ let versionstamp = match value.1 {
+ Some(data) => {
+ let mut out = [0u8; 10];
+ hex::decode_to_slice(data, &mut out)
+ .map_err(|_| type_error("invalid versionstamp"))?;
+ Some(out)
+ }
+ None => None,
+ };
+ Ok(Check {
+ key: encode_v8_key(value.0)?,
+ versionstamp,
+ })
}
type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);
-impl TryFrom<(V8KvMutation, u64)> for KvMutation {
- type Error = AnyError;
- fn try_from(
- (value, current_timstamp): (V8KvMutation, u64),
- ) -> Result<Self, AnyError> {
- let key = encode_v8_key(value.0)?;
- let kind = match (value.1.as_str(), value.2) {
- ("set", Some(value)) => MutationKind::Set(value.try_into()?),
- ("delete", None) => MutationKind::Delete,
- ("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
- ("min", Some(value)) => MutationKind::Min(value.try_into()?),
- ("max", Some(value)) => MutationKind::Max(value.try_into()?),
- (op, Some(_)) => {
- return Err(type_error(format!("invalid mutation '{op}' with value")))
- }
- (op, None) => {
- return Err(type_error(format!(
- "invalid mutation '{op}' without value"
- )))
- }
- };
- Ok(KvMutation {
- key,
- kind,
- expire_at: value.3.map(|expire_in| current_timstamp + expire_in),
- })
- }
+fn mutation_from_v8(
+ (value, current_timstamp): (V8KvMutation, DateTime<Utc>),
+) -> Result<Mutation, AnyError> {
+ let key = encode_v8_key(value.0)?;
+ let kind = match (value.1.as_str(), value.2) {
+ ("set", Some(value)) => MutationKind::Set(value.try_into()?),
+ ("delete", None) => MutationKind::Delete,
+ ("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
+ ("min", Some(value)) => MutationKind::Min(value.try_into()?),
+ ("max", Some(value)) => MutationKind::Max(value.try_into()?),
+ (op, Some(_)) => {
+ return Err(type_error(format!("invalid mutation '{op}' with value")))
+ }
+ (op, None) => {
+ return Err(type_error(format!("invalid mutation '{op}' without value")))
+ }
+ };
+ Ok(Mutation {
+ key,
+ kind,
+ expire_at: value
+ .3
+ .map(|expire_in| current_timstamp + Duration::from_millis(expire_in)),
+ })
}
type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>);
-impl TryFrom<V8Enqueue> for Enqueue {
- type Error = AnyError;
- fn try_from(value: V8Enqueue) -> Result<Self, AnyError> {
- Ok(Enqueue {
- payload: value.0.to_vec(),
- delay_ms: value.1,
- keys_if_undelivered: value
- .2
- .into_iter()
- .map(encode_v8_key)
- .collect::<std::io::Result<_>>()?,
- backoff_schedule: value.3,
- })
- }
+fn enqueue_from_v8(
+ value: V8Enqueue,
+ current_timestamp: DateTime<Utc>,
+) -> Result<Enqueue, AnyError> {
+ Ok(Enqueue {
+ payload: value.0.to_vec(),
+ deadline: current_timestamp
+ + chrono::Duration::milliseconds(value.1 as i64),
+ keys_if_undelivered: value
+ .2
+ .into_iter()
+ .map(encode_v8_key)
+ .collect::<std::io::Result<_>>()?,
+ backoff_schedule: value.3,
+ })
}
fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> {
- encode_key(&Key(key.into_iter().map(From::from).collect()))
+ encode_key(&Key(key.into_iter().map(key_part_from_v8).collect()))
}
enum RawSelector {
@@ -610,7 +622,7 @@ async fn op_kv_atomic_write<DBH>(
where
DBH: DatabaseHandler + 'static,
{
- let current_timestamp = time::utc_now().timestamp_millis() as u64;
+ let current_timestamp = utc_now();
let db = {
let state = state.borrow();
let resource =
@@ -631,17 +643,17 @@ where
let checks = checks
.into_iter()
- .map(TryInto::try_into)
- .collect::<Result<Vec<KvCheck>, AnyError>>()
+ .map(check_from_v8)
+ .collect::<Result<Vec<Check>, AnyError>>()
.with_context(|| "invalid check")?;
let mutations = mutations
.into_iter()
- .map(|mutation| TryFrom::try_from((mutation, current_timestamp)))
- .collect::<Result<Vec<KvMutation>, AnyError>>()
+ .map(|mutation| mutation_from_v8((mutation, current_timestamp)))
+ .collect::<Result<Vec<Mutation>, AnyError>>()
.with_context(|| "invalid mutation")?;
let enqueues = enqueues
.into_iter()
- .map(TryInto::try_into)
+ .map(|e| enqueue_from_v8(e, current_timestamp))
.collect::<Result<Vec<Enqueue>, AnyError>>()
.with_context(|| "invalid enqueue")?;
@@ -690,7 +702,7 @@ where
enqueues,
};
- let result = db.atomic_write(state.clone(), atomic_write).await?;
+ let result = db.atomic_write(atomic_write).await?;
Ok(result.map(|res| hex::encode(res.versionstamp)))
}
@@ -732,11 +744,11 @@ fn check_write_key_size(key: &[u8]) -> Result<usize, AnyError> {
}
}
-fn check_value_size(value: &Value) -> Result<usize, AnyError> {
+fn check_value_size(value: &KvValue) -> Result<usize, AnyError> {
let payload = match value {
- Value::Bytes(x) => x,
- Value::V8(x) => x,
- Value::U64(_) => return Ok(8),
+ KvValue::Bytes(x) => x,
+ KvValue::V8(x) => x,
+ KvValue::U64(_) => return Ok(8),
};
if payload.len() > MAX_VALUE_SIZE_BYTES {
diff --git a/ext/kv/proto/datapath.proto b/ext/kv/proto/datapath.proto
deleted file mode 100644
index 59793000b..000000000
--- a/ext/kv/proto/datapath.proto
+++ /dev/null
@@ -1,97 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-syntax = "proto3";
-
-package datapath;
-
-message SnapshotRead {
- repeated ReadRange ranges = 1;
-}
-
-message SnapshotReadOutput {
- repeated ReadRangeOutput ranges = 1;
- bool read_disabled = 2;
- repeated string regions_if_read_disabled = 3;
- bool read_is_strongly_consistent = 4;
- string primary_if_not_strongly_consistent = 5;
-}
-
-message ReadRange {
- bytes start = 1;
- bytes end = 2;
- int32 limit = 3;
- bool reverse = 4;
-}
-
-message ReadRangeOutput {
- repeated KvEntry values = 1;
-}
-
-message AtomicWrite {
- repeated KvCheck kv_checks = 1;
- repeated KvMutation kv_mutations = 2;
- repeated Enqueue enqueues = 3;
-}
-
-message AtomicWriteOutput {
- AtomicWriteStatus status = 1;
- bytes versionstamp = 2;
- string primary_if_write_disabled = 3;
-}
-
-message KvCheck {
- bytes key = 1;
- bytes versionstamp = 2; // 10-byte raw versionstamp
-}
-
-message KvMutation {
- bytes key = 1;
- KvValue value = 2;
- KvMutationType mutation_type = 3;
- int64 expire_at_ms = 4;
-}
-
-message KvValue {
- bytes data = 1;
- KvValueEncoding encoding = 2;
-}
-
-message KvEntry {
- bytes key = 1;
- bytes value = 2;
- KvValueEncoding encoding = 3;
- bytes versionstamp = 4;
-}
-
-enum KvMutationType {
- M_UNSPECIFIED = 0;
- M_SET = 1;
- M_CLEAR = 2;
- M_SUM = 3;
- M_MAX = 4;
- M_MIN = 5;
-}
-
-enum KvValueEncoding {
- VE_UNSPECIFIED = 0;
- VE_V8 = 1;
- VE_LE64 = 2;
- VE_BYTES = 3;
-}
-
-enum AtomicWriteStatus {
- AW_UNSPECIFIED = 0;
- AW_SUCCESS = 1;
- AW_CHECK_FAILURE = 2;
- AW_UNSUPPORTED_WRITE = 3;
- AW_USAGE_LIMIT_EXCEEDED = 4;
- AW_WRITE_DISABLED = 5;
- AW_QUEUE_BACKLOG_LIMIT_EXCEEDED = 6;
-}
-
-message Enqueue {
- bytes payload = 1;
- int64 deadline_ms = 2;
- repeated bytes kv_keys_if_undelivered = 3;
- repeated uint32 backoff_schedule = 4;
-}
diff --git a/ext/kv/proto/mod.rs b/ext/kv/proto/mod.rs
deleted file mode 100644
index d258a0551..000000000
--- a/ext/kv/proto/mod.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-// Generated code, disable lints
-#[allow(clippy::all, non_snake_case)]
-pub mod datapath {
- include!(concat!(env!("OUT_DIR"), "/datapath.rs"));
-}
diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs
index 0a061b35b..7cac6b9c3 100644
--- a/ext/kv/remote.rs
+++ b/ext/kv/remote.rs
@@ -1,43 +1,42 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
-use std::fmt;
-use std::io::Write;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
-use std::time::Duration;
-use crate::proto::datapath as pb;
-use crate::AtomicWrite;
-use crate::CommitResult;
-use crate::Database;
use crate::DatabaseHandler;
-use crate::KvEntry;
-use crate::MutationKind;
-use crate::QueueMessageHandle;
-use crate::ReadRange;
-use crate::ReadRangeOutput;
-use crate::SnapshotReadOptions;
use anyhow::Context;
use async_trait::async_trait;
-use chrono::DateTime;
-use chrono::Utc;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures::TryFutureExt;
-use deno_core::unsync::JoinHandle;
use deno_core::OpState;
-use prost::Message;
-use rand::Rng;
-use serde::Deserialize;
-use termcolor::Ansi;
-use termcolor::Color;
-use termcolor::ColorSpec;
-use termcolor::WriteColor;
-use tokio::sync::watch;
+use deno_fetch::create_http_client;
+use deno_fetch::CreateHttpClientOptions;
+use deno_tls::rustls::RootCertStore;
+use deno_tls::Proxy;
+use deno_tls::RootCertStoreProvider;
+use denokv_remote::MetadataEndpoint;
+use denokv_remote::Remote;
use url::Url;
-use uuid::Uuid;
+
+#[derive(Clone)]
+pub struct HttpOptions {
+ pub user_agent: String,
+ pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
+ pub proxy: Option<Proxy>,
+ pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
+ pub client_cert_chain_and_key: Option<(String, String)>,
+}
+
+impl HttpOptions {
+ pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, AnyError> {
+ Ok(match &self.root_cert_store_provider {
+ Some(provider) => Some(provider.get_or_try_init()?.clone()),
+ None => None,
+ })
+ }
+}
pub trait RemoteDbHandlerPermissions {
fn check_env(&mut self, var: &str) -> Result<(), AnyError>;
@@ -49,50 +48,39 @@ pub trait RemoteDbHandlerPermissions {
}
pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> {
+ http_options: HttpOptions,
_p: std::marker::PhantomData<P>,
}
impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> {
- pub fn new() -> Self {
- Self { _p: PhantomData }
- }
-}
-
-impl<P: RemoteDbHandlerPermissions> Default for RemoteDbHandler<P> {
- fn default() -> Self {
- Self::new()
+ pub fn new(http_options: HttpOptions) -> Self {
+ Self {
+ http_options,
+ _p: PhantomData,
+ }
}
}
-#[derive(Deserialize)]
-struct VersionInfo {
- version: u64,
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-#[allow(dead_code)]
-struct DatabaseMetadata {
- version: u64,
- database_id: Uuid,
- endpoints: Vec<EndpointInfo>,
- token: String,
- expires_at: DateTime<Utc>,
+pub struct PermissionChecker<P: RemoteDbHandlerPermissions> {
+ state: Rc<RefCell<OpState>>,
+ _permissions: PhantomData<P>,
}
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct EndpointInfo {
- pub url: String,
-
- // Using `String` instead of an enum, so that parsing doesn't
- // break if more consistency levels are added.
- pub consistency: String,
+impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions
+ for PermissionChecker<P>
+{
+ fn check_net_url(&self, url: &Url) -> Result<(), anyhow::Error> {
+ let mut state = self.state.borrow_mut();
+ let permissions = state.borrow_mut::<P>();
+ permissions.check_net_url(url, "Deno.openKv")
+ }
}
#[async_trait(?Send)]
-impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> {
- type DB = RemoteDb<P>;
+impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler
+ for RemoteDbHandler<P>
+{
+ type DB = Remote<PermissionChecker<P>>;
async fn open(
&self,
@@ -122,470 +110,36 @@ impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> {
"Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account."
})?;
- let refresher = MetadataRefresher::new(url, access_token);
-
- let db = RemoteDb {
- client: reqwest::Client::new(),
- refresher,
- _p: PhantomData,
- };
- Ok(db)
- }
-}
-
-pub struct RemoteDb<P: RemoteDbHandlerPermissions + 'static> {
- client: reqwest::Client,
- refresher: MetadataRefresher,
- _p: std::marker::PhantomData<P>,
-}
-
-pub struct DummyQueueMessageHandle {}
-
-#[async_trait(?Send)]
-impl QueueMessageHandle for DummyQueueMessageHandle {
- async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
- unimplemented!()
- }
-
- async fn finish(&self, _success: bool) -> Result<(), AnyError> {
- unimplemented!()
- }
-}
-
-#[async_trait(?Send)]
-impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> {
- type QMH = DummyQueueMessageHandle;
-
- async fn snapshot_read(
- &self,
- state: Rc<RefCell<OpState>>,
- requests: Vec<ReadRange>,
- _options: SnapshotReadOptions,
- ) -> Result<Vec<ReadRangeOutput>, AnyError> {
- let req = pb::SnapshotRead {
- ranges: requests
- .into_iter()
- .map(|r| pb::ReadRange {
- start: r.start,
- end: r.end,
- limit: r.limit.get() as _,
- reverse: r.reverse,
- })
- .collect(),
- };
-
- let res: pb::SnapshotReadOutput = call_remote::<P, _, _>(
- &state,
- &self.refresher,
- &self.client,
- "snapshot_read",
- &req,
- )
- .await?;
-
- if res.read_disabled {
- return Err(type_error("Reads are disabled for this database."));
- }
-
- let out = res
- .ranges
- .into_iter()
- .map(|r| {
- Ok(ReadRangeOutput {
- entries: r
- .values
- .into_iter()
- .map(|e| {
- let encoding = e.encoding();
- Ok(KvEntry {
- key: e.key,
- value: decode_value(e.value, encoding)?,
- versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?,
- })
- })
- .collect::<Result<_, AnyError>>()?,
- })
- })
- .collect::<Result<Vec<_>, AnyError>>()?;
- Ok(out)
- }
-
- async fn atomic_write(
- &self,
- state: Rc<RefCell<OpState>>,
- write: AtomicWrite,
- ) -> Result<Option<CommitResult>, AnyError> {
- if !write.enqueues.is_empty() {
- return Err(type_error("Enqueue operations are not supported yet."));
- }
-
- let req = pb::AtomicWrite {
- kv_checks: write
- .checks
- .into_iter()
- .map(|x| {
- Ok(pb::KvCheck {
- key: x.key,
- versionstamp: x.versionstamp.unwrap_or([0u8; 10]).to_vec(),
- })
- })
- .collect::<anyhow::Result<_>>()?,
- kv_mutations: write.mutations.into_iter().map(encode_mutation).collect(),
- enqueues: vec![],
- };
-
- let res: pb::AtomicWriteOutput = call_remote::<P, _, _>(
- &state,
- &self.refresher,
- &self.client,
- "atomic_write",
- &req,
- )
- .await?;
- match res.status() {
- pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult {
- versionstamp: if res.versionstamp.is_empty() {
- Default::default()
- } else {
- res.versionstamp[..].try_into()?
- },
- })),
- pb::AtomicWriteStatus::AwCheckFailure => Ok(None),
- pb::AtomicWriteStatus::AwUnsupportedWrite => {
- Err(type_error("Unsupported write"))
- }
- pb::AtomicWriteStatus::AwUsageLimitExceeded => {
- Err(type_error("The database usage limit has been exceeded."))
- }
- pb::AtomicWriteStatus::AwWriteDisabled => {
- // TODO: Auto retry
- Err(type_error("Writes are disabled for this database."))
- }
- pb::AtomicWriteStatus::AwUnspecified => {
- Err(type_error("Unspecified error"))
- }
- pb::AtomicWriteStatus::AwQueueBacklogLimitExceeded => {
- Err(type_error("Queue backlog limit exceeded"))
- }
- }
- }
-
- async fn dequeue_next_message(
- &self,
- _state: Rc<RefCell<OpState>>,
- ) -> Result<Option<Self::QMH>, AnyError> {
- let msg = "Deno.Kv.listenQueue is not supported for remote KV databases";
- eprintln!("{}", yellow(msg));
- deno_core::futures::future::pending().await
- }
-
- fn close(&self) {}
-}
-
-fn yellow<S: AsRef<str>>(s: S) -> impl fmt::Display {
- if std::env::var_os("NO_COLOR").is_some() {
- return String::from(s.as_ref());
- }
- let mut style_spec = ColorSpec::new();
- style_spec.set_fg(Some(Color::Yellow));
- let mut v = Vec::new();
- let mut ansi_writer = Ansi::new(&mut v);
- ansi_writer.set_color(&style_spec).unwrap();
- ansi_writer.write_all(s.as_ref().as_bytes()).unwrap();
- ansi_writer.reset().unwrap();
- String::from_utf8_lossy(&v).into_owned()
-}
-
-fn decode_value(
- value: Vec<u8>,
- encoding: pb::KvValueEncoding,
-) -> anyhow::Result<crate::Value> {
- match encoding {
- pb::KvValueEncoding::VeV8 => Ok(crate::Value::V8(value)),
- pb::KvValueEncoding::VeBytes => Ok(crate::Value::Bytes(value)),
- pb::KvValueEncoding::VeLe64 => Ok(crate::Value::U64(u64::from_le_bytes(
- <[u8; 8]>::try_from(&value[..])?,
- ))),
- pb::KvValueEncoding::VeUnspecified => {
- Err(anyhow::anyhow!("Unspecified value encoding, cannot decode"))
- }
- }
-}
-
-fn encode_value(value: crate::Value) -> pb::KvValue {
- match value {
- crate::Value::V8(data) => pb::KvValue {
- data,
- encoding: pb::KvValueEncoding::VeV8 as _,
- },
- crate::Value::Bytes(data) => pb::KvValue {
- data,
- encoding: pb::KvValueEncoding::VeBytes as _,
- },
- crate::Value::U64(x) => pb::KvValue {
- data: x.to_le_bytes().to_vec(),
- encoding: pb::KvValueEncoding::VeLe64 as _,
- },
- }
-}
-
-fn encode_mutation(m: crate::KvMutation) -> pb::KvMutation {
- let key = m.key;
- let expire_at_ms =
- m.expire_at.and_then(|x| i64::try_from(x).ok()).unwrap_or(0);
-
- match m.kind {
- MutationKind::Set(x) => pb::KvMutation {
- key,
- value: Some(encode_value(x)),
- mutation_type: pb::KvMutationType::MSet as _,
- expire_at_ms,
- },
- MutationKind::Delete => pb::KvMutation {
- key,
- value: Some(encode_value(crate::Value::Bytes(vec![]))),
- mutation_type: pb::KvMutationType::MClear as _,
- expire_at_ms,
- },
- MutationKind::Max(x) => pb::KvMutation {
- key,
- value: Some(encode_value(x)),
- mutation_type: pb::KvMutationType::MMax as _,
- expire_at_ms,
- },
- MutationKind::Min(x) => pb::KvMutation {
- key,
- value: Some(encode_value(x)),
- mutation_type: pb::KvMutationType::MMin as _,
- expire_at_ms,
- },
- MutationKind::Sum(x) => pb::KvMutation {
- key,
- value: Some(encode_value(x)),
- mutation_type: pb::KvMutationType::MSum as _,
- expire_at_ms,
- },
- }
-}
-
-#[derive(Clone)]
-enum MetadataState {
- Ready(Arc<DatabaseMetadata>),
- Invalid(String),
- Pending,
-}
-
-struct MetadataRefresher {
- metadata_rx: watch::Receiver<MetadataState>,
- handle: JoinHandle<()>,
-}
-
-impl MetadataRefresher {
- pub fn new(url: String, access_token: String) -> Self {
- let (tx, rx) = watch::channel(MetadataState::Pending);
- let handle =
- deno_core::unsync::spawn(metadata_refresh_task(url, access_token, tx));
- Self {
- handle,
- metadata_rx: rx,
- }
- }
-}
-
-impl Drop for MetadataRefresher {
- fn drop(&mut self) {
- self.handle.abort();
- }
-}
-
-async fn metadata_refresh_task(
- metadata_url: String,
- access_token: String,
- tx: watch::Sender<MetadataState>,
-) {
- let client = reqwest::Client::new();
- loop {
- let mut attempt = 0u64;
- let metadata = loop {
- match fetch_metadata(&client, &metadata_url, &access_token).await {
- Ok(Ok(x)) => break x,
- Ok(Err(e)) => {
- if tx.send(MetadataState::Invalid(e)).is_err() {
- return;
- }
- }
- Err(e) => {
- log::error!("Failed to fetch database metadata: {}", e);
- }
- }
- randomized_exponential_backoff(Duration::from_secs(5), attempt).await;
- attempt += 1;
+ let metadata_endpoint = MetadataEndpoint {
+ url: parsed_url.clone(),
+ access_token: access_token.clone(),
};
- let ms_until_expire = u64::try_from(
- metadata
- .expires_at
- .timestamp_millis()
- .saturating_sub(crate::time::utc_now().timestamp_millis()),
- )
- .unwrap_or_default();
-
- // Refresh 10 minutes before expiry
- // In case of buggy clocks, don't refresh more than once per minute
- let interval = Duration::from_millis(ms_until_expire)
- .saturating_sub(Duration::from_secs(600))
- .max(Duration::from_secs(60));
-
- if tx.send(MetadataState::Ready(Arc::new(metadata))).is_err() {
- return;
- }
-
- tokio::time::sleep(interval).await;
- }
-}
-
-async fn fetch_metadata(
- client: &reqwest::Client,
- metadata_url: &str,
- access_token: &str,
-) -> anyhow::Result<Result<DatabaseMetadata, String>> {
- let res = client
- .post(metadata_url)
- .header("authorization", format!("Bearer {}", access_token))
- .send()
- .await?;
-
- if !res.status().is_success() {
- if res.status().is_client_error() {
- return Ok(Err(format!(
- "Client error while fetching metadata: {:?} {}",
- res.status(),
- res.text().await?
- )));
- } else {
- anyhow::bail!(
- "remote returned error: {:?} {}",
- res.status(),
- res.text().await?
- );
- }
- }
-
- let res = res.bytes().await?;
- let version_info: VersionInfo = match serde_json::from_slice(&res) {
- Ok(x) => x,
- Err(e) => return Ok(Err(format!("Failed to decode version info: {}", e))),
- };
- if version_info.version > 1 {
- return Ok(Err(format!(
- "Unsupported metadata version: {}",
- version_info.version
- )));
- }
-
- Ok(
- serde_json::from_slice(&res)
- .map_err(|e| format!("Failed to decode metadata: {}", e)),
- )
-}
-
-async fn randomized_exponential_backoff(base: Duration, attempt: u64) {
- let attempt = attempt.min(12);
- let delay = base.as_millis() as u64 + (2 << attempt);
- let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1);
- tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
-}
-
-async fn call_remote<
- P: RemoteDbHandlerPermissions + 'static,
- T: Message,
- R: Message + Default,
->(
- state: &RefCell<OpState>,
- refresher: &MetadataRefresher,
- client: &reqwest::Client,
- method: &str,
- req: &T,
-) -> anyhow::Result<R> {
- let mut attempt = 0u64;
- let res = loop {
- let mut metadata_rx = refresher.metadata_rx.clone();
- let metadata = loop {
- match &*metadata_rx.borrow() {
- MetadataState::Pending => {}
- MetadataState::Ready(x) => break x.clone(),
- MetadataState::Invalid(e) => {
- return Err(type_error(format!("Metadata error: {}", e)))
- }
- }
- // `unwrap()` never fails because `tx` is owned by the task held by `refresher`.
- metadata_rx.changed().await.unwrap();
- };
- let Some(sc_endpoint) = metadata
- .endpoints
- .iter()
- .find(|x| x.consistency == "strong")
- else {
- return Err(type_error(
- "No strong consistency endpoint is available for this database",
- ));
+ let options = &self.http_options;
+ let client = create_http_client(
+ &options.user_agent,
+ CreateHttpClientOptions {
+ root_cert_store: options.root_cert_store()?,
+ ca_certs: vec![],
+ proxy: options.proxy.clone(),
+ unsafely_ignore_certificate_errors: options
+ .unsafely_ignore_certificate_errors
+ .clone(),
+ client_cert_chain_and_key: options.client_cert_chain_and_key.clone(),
+ pool_max_idle_per_host: None,
+ pool_idle_timeout: None,
+ http1: true,
+ http2: true,
+ },
+ )?;
+
+ let permissions = PermissionChecker {
+ state: state.clone(),
+ _permissions: PhantomData,
};
- let full_url = format!("{}/{}", sc_endpoint.url, method);
- {
- let parsed_url = Url::parse(&full_url)?;
- let mut state = state.borrow_mut();
- let permissions = state.borrow_mut::<P>();
- permissions.check_net_url(&parsed_url, "Deno.Kv")?;
- }
-
- let res = client
- .post(&full_url)
- .header("x-transaction-domain-id", metadata.database_id.to_string())
- .header("authorization", format!("Bearer {}", metadata.token))
- .body(req.encode_to_vec())
- .send()
- .map_err(anyhow::Error::from)
- .and_then(|x| async move {
- if x.status().is_success() {
- Ok(Ok(x.bytes().await?))
- } else if x.status().is_client_error() {
- Ok(Err((x.status(), x.text().await?)))
- } else {
- Err(anyhow::anyhow!(
- "server error ({:?}): {}",
- x.status(),
- x.text().await?
- ))
- }
- })
- .await;
-
- match res {
- Ok(x) => break x,
- Err(e) => {
- log::error!("retryable error in {}: {}", method, e);
- randomized_exponential_backoff(Duration::from_millis(0), attempt).await;
- attempt += 1;
- }
- }
- };
-
- let res = match res {
- Ok(x) => x,
- Err((status, message)) => {
- return Err(type_error(format!(
- "client error in {} (status {:?}): {}",
- method, status, message
- )))
- }
- };
+ let remote = Remote::new(client, permissions, metadata_endpoint);
- match R::decode(&*res) {
- Ok(x) => Ok(x),
- Err(e) => Err(type_error(format!(
- "failed to decode response from {}: {}",
- method, e
- ))),
+ Ok(remote)
}
}
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index 327091f05..b4e251f96 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -1,176 +1,37 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::HashMap;
use std::env::current_dir;
-use std::future::Future;
use std::io::ErrorKind;
use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
-use std::rc::Weak;
use std::sync::Arc;
use std::sync::Mutex;
-use std::time::Duration;
-use std::time::SystemTime;
+use std::sync::OnceLock;
use async_trait::async_trait;
-use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures;
-use deno_core::futures::FutureExt;
-use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
-use deno_core::AsyncRefCell;
use deno_core::OpState;
use deno_node::PathClean;
-use rand::Rng;
-use rusqlite::params;
+pub use denokv_sqlite::TypeError;
+use rand::RngCore;
+use rand::SeedableRng;
use rusqlite::OpenFlags;
-use rusqlite::OptionalExtension;
-use rusqlite::Transaction;
-use tokio::sync::broadcast;
-use tokio::sync::broadcast::error::RecvError;
-use tokio::sync::mpsc;
-use tokio::sync::watch;
-use tokio::sync::OnceCell;
-use tokio::sync::OwnedSemaphorePermit;
-use tokio::sync::Semaphore;
-use uuid::Uuid;
+use tokio::sync::Notify;
-use crate::AtomicWrite;
-use crate::CommitResult;
-use crate::Database;
use crate::DatabaseHandler;
-use crate::KvEntry;
-use crate::MutationKind;
-use crate::QueueMessageHandle;
-use crate::ReadRange;
-use crate::ReadRangeOutput;
-use crate::SnapshotReadOptions;
-use crate::Value;
-const STATEMENT_INC_AND_GET_DATA_VERSION: &str =
- "update data_version set version = version + 1 where k = 0 returning version";
-const STATEMENT_KV_RANGE_SCAN: &str =
- "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k asc limit ?";
-const STATEMENT_KV_RANGE_SCAN_REVERSE: &str =
- "select k, v, v_encoding, version from kv where k >= ? and k < ? order by k desc limit ?";
-const STATEMENT_KV_POINT_GET_VALUE_ONLY: &str =
- "select v, v_encoding from kv where k = ?";
-const STATEMENT_KV_POINT_GET_VERSION_ONLY: &str =
- "select version from kv where k = ?";
-const STATEMENT_KV_POINT_SET: &str =
- "insert into kv (k, v, v_encoding, version, expiration_ms) values (:k, :v, :v_encoding, :version, :expiration_ms) on conflict(k) do update set v = :v, v_encoding = :v_encoding, version = :version, expiration_ms = :expiration_ms";
-const STATEMENT_KV_POINT_DELETE: &str = "delete from kv where k = ?";
-
-const STATEMENT_QUEUE_ADD_READY: &str = "insert into queue (ts, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)";
-const STATEMENT_QUEUE_GET_NEXT_READY: &str = "select ts, id, data, backoff_schedule, keys_if_undelivered from queue where ts <= ? order by ts limit 100";
-const STATEMENT_QUEUE_GET_EARLIEST_READY: &str =
- "select ts from queue order by ts limit 1";
-const STATEMENT_QUEUE_REMOVE_READY: &str = "delete from queue where id = ?";
-const STATEMENT_QUEUE_ADD_RUNNING: &str = "insert into queue_running (deadline, id, data, backoff_schedule, keys_if_undelivered) values(?, ?, ?, ?, ?)";
-const STATEMENT_QUEUE_REMOVE_RUNNING: &str =
- "delete from queue_running where id = ?";
-const STATEMENT_QUEUE_GET_RUNNING_BY_ID: &str = "select deadline, id, data, backoff_schedule, keys_if_undelivered from queue_running where id = ?";
-const STATEMENT_QUEUE_GET_RUNNING: &str =
- "select id from queue_running order by deadline limit 100";
-
-const STATEMENT_CREATE_MIGRATION_TABLE: &str = "
-create table if not exists migration_state(
- k integer not null primary key,
- version integer not null
-)
-";
-
-const MIGRATIONS: [&str; 3] = [
- "
-create table data_version (
- k integer primary key,
- version integer not null
-);
-insert into data_version (k, version) values (0, 0);
-create table kv (
- k blob primary key,
- v blob not null,
- v_encoding integer not null,
- version integer not null
-) without rowid;
-",
- "
-create table queue (
- ts integer not null,
- id text not null,
- data blob not null,
- backoff_schedule text not null,
- keys_if_undelivered blob not null,
-
- primary key (ts, id)
-);
-create table queue_running(
- deadline integer not null,
- id text not null,
- data blob not null,
- backoff_schedule text not null,
- keys_if_undelivered blob not null,
-
- primary key (deadline, id)
-);
-",
- "
-alter table kv add column seq integer not null default 0;
-alter table data_version add column seq integer not null default 0;
-alter table kv add column expiration_ms integer not null default -1;
-create index kv_expiration_ms_idx on kv (expiration_ms);
-",
-];
-
-const DISPATCH_CONCURRENCY_LIMIT: usize = 100;
-const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1000, 5000, 30000, 60000];
-
-const ERROR_USING_CLOSED_DATABASE: &str = "Attempted to use a closed database";
-
-#[derive(Clone)]
-struct ProtectedConn {
- guard: Rc<AsyncRefCell<()>>,
- conn: Arc<Mutex<Option<rusqlite::Connection>>>,
-}
-
-#[derive(Clone)]
-struct WeakProtectedConn {
- guard: Weak<AsyncRefCell<()>>,
- conn: std::sync::Weak<Mutex<Option<rusqlite::Connection>>>,
-}
-
-impl ProtectedConn {
- fn new(conn: rusqlite::Connection) -> Self {
- Self {
- guard: Rc::new(AsyncRefCell::new(())),
- conn: Arc::new(Mutex::new(Some(conn))),
- }
- }
-
- fn downgrade(&self) -> WeakProtectedConn {
- WeakProtectedConn {
- guard: Rc::downgrade(&self.guard),
- conn: Arc::downgrade(&self.conn),
- }
- }
-}
-
-impl WeakProtectedConn {
- fn upgrade(&self) -> Option<ProtectedConn> {
- let guard = self.guard.upgrade()?;
- let conn = self.conn.upgrade()?;
- Some(ProtectedConn { guard, conn })
- }
-}
+static QUEUE_WAKER_MAP: OnceLock<Mutex<HashMap<PathBuf, Arc<Notify>>>> =
+ OnceLock::new();
pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> {
pub default_storage_dir: Option<PathBuf>,
+ versionstamp_rng_seed: Option<u64>,
_permissions: PhantomData<P>,
}
@@ -180,9 +41,13 @@ pub trait SqliteDbHandlerPermissions {
}
impl<P: SqliteDbHandlerPermissions> SqliteDbHandler<P> {
- pub fn new(default_storage_dir: Option<PathBuf>) -> Self {
+ pub fn new(
+ default_storage_dir: Option<PathBuf>,
+ versionstamp_rng_seed: Option<u64>,
+ ) -> Self {
Self {
default_storage_dir,
+ versionstamp_rng_seed,
_permissions: PhantomData,
}
}
@@ -190,7 +55,7 @@ impl<P: SqliteDbHandlerPermissions> SqliteDbHandler<P> {
#[async_trait(?Send)]
impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
- type DB = SqliteDb;
+ type DB = denokv_sqlite::Sqlite;
async fn open(
&self,
@@ -218,866 +83,61 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
}
}
- let (conn, queue_waker_key) = sqlite_retry_loop(|| {
- let path = path.clone();
- let default_storage_dir = self.default_storage_dir.clone();
- async move {
- spawn_blocking(move || {
- let (conn, queue_waker_key) =
- match (path.as_deref(), &default_storage_dir) {
- (Some(":memory:"), _) | (None, None) => {
- (rusqlite::Connection::open_in_memory()?, None)
- }
- (Some(path), _) => {
- let flags =
- OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
- let resolved_path = canonicalize_path(&PathBuf::from(path))?;
- (
- rusqlite::Connection::open_with_flags(path, flags)?,
- Some(resolved_path),
- )
- }
- (None, Some(path)) => {
- std::fs::create_dir_all(path)?;
- let path = path.join("kv.sqlite3");
- (rusqlite::Connection::open(path.clone())?, Some(path))
- }
- };
-
- conn.pragma_update(None, "journal_mode", "wal")?;
-
- Ok::<_, AnyError>((conn, queue_waker_key))
- })
- .await
- .unwrap()
- }
- })
- .await?;
- let conn = ProtectedConn::new(conn);
- SqliteDb::run_tx(conn.clone(), |tx| {
- tx.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?;
-
- let current_version: usize = tx
- .query_row(
- "select version from migration_state where k = 0",
- [],
- |row| row.get(0),
- )
- .optional()?
- .unwrap_or(0);
-
- for (i, migration) in MIGRATIONS.iter().enumerate() {
- let version = i + 1;
- if version > current_version {
- tx.execute_batch(migration)?;
- tx.execute(
- "replace into migration_state (k, version) values(?, ?)",
- [&0, &version],
- )?;
- }
- }
-
- tx.commit()?;
-
- Ok(())
- })
- .await?;
-
- let expiration_watcher = spawn(watch_expiration(conn.clone()));
-
- Ok(SqliteDb {
- conn,
- queue: OnceCell::new(),
- queue_waker_key,
- expiration_watcher,
- })
- }
-}
-
-pub struct SqliteDb {
- conn: ProtectedConn,
- queue: OnceCell<SqliteQueue>,
- queue_waker_key: Option<PathBuf>,
- expiration_watcher: deno_core::unsync::JoinHandle<()>,
-}
-
-impl Drop for SqliteDb {
- fn drop(&mut self) {
- self.close();
- }
-}
-
-async fn sqlite_retry_loop<R, Fut: Future<Output = Result<R, AnyError>>>(
- mut f: impl FnMut() -> Fut,
-) -> Result<R, AnyError> {
- loop {
- match f().await {
- Ok(x) => return Ok(x),
- Err(e) => {
- if let Some(x) = e.downcast_ref::<rusqlite::Error>() {
- if x.sqlite_error_code() == Some(rusqlite::ErrorCode::DatabaseBusy) {
- log::debug!("kv: Database is busy, retrying");
- tokio::time::sleep(Duration::from_millis(
- rand::thread_rng().gen_range(5..20),
- ))
- .await;
- continue;
- }
- }
- return Err(e);
- }
- }
- }
-}
-
-impl SqliteDb {
- async fn run_tx<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError>
- where
- F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>)
- + Clone
- + Send
- + 'static,
- R: Send + 'static,
- {
- sqlite_retry_loop(|| Self::run_tx_inner(conn.clone(), f.clone())).await
- }
-
- async fn run_tx_inner<F, R>(conn: ProtectedConn, f: F) -> Result<R, AnyError>
- where
- F: (FnOnce(rusqlite::Transaction<'_>) -> Result<R, AnyError>)
- + Send
- + 'static,
- R: Send + 'static,
- {
- // `run_tx` runs in an asynchronous context. First acquire the async lock to
- // coordinate with other async invocations.
- let _guard_holder = conn.guard.borrow_mut().await;
-
- // Then, take the synchronous lock. This operation is guaranteed to success without waiting,
- // unless the database is being closed.
- let db = conn.conn.clone();
- spawn_blocking(move || {
- let mut db = db.try_lock().ok();
- let Some(db) = db.as_mut().and_then(|x| x.as_mut()) else {
- return Err(type_error(ERROR_USING_CLOSED_DATABASE));
- };
- let result = match db.transaction() {
- Ok(tx) => f(tx),
- Err(e) => Err(e.into()),
- };
- result
- })
- .await
- .unwrap()
- }
-}
-
-pub struct DequeuedMessage {
- conn: WeakProtectedConn,
- id: String,
- payload: Option<Vec<u8>>,
- waker_tx: broadcast::Sender<()>,
- _permit: OwnedSemaphorePermit,
-}
-
-#[async_trait(?Send)]
-impl QueueMessageHandle for DequeuedMessage {
- async fn finish(&self, success: bool) -> Result<(), AnyError> {
- let Some(conn) = self.conn.upgrade() else {
- return Ok(());
- };
- let id = self.id.clone();
- let requeued = SqliteDb::run_tx(conn, move |tx| {
- let requeued = {
- if success {
- let changed = tx
- .prepare_cached(STATEMENT_QUEUE_REMOVE_RUNNING)?
- .execute([&id])?;
- assert!(changed <= 1);
- false
- } else {
- SqliteQueue::requeue_message(&id, &tx)?
- }
- };
- tx.commit()?;
- Ok(requeued)
- })
- .await;
- let requeued = match requeued {
- Ok(x) => x,
- Err(e) => {
- // Silently ignore the error if the database has been closed
- // This message will be delivered on the next run
- if is_conn_closed_error(&e) {
- return Ok(());
- }
- return Err(e);
- }
- };
- if requeued {
- // If the message was requeued, wake up the dequeue loop.
- let _ = self.waker_tx.send(());
- }
- Ok(())
- }
-
- async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
- self
- .payload
- .take()
- .ok_or_else(|| type_error("Payload already consumed"))
- }
-}
-
-type DequeueReceiver = mpsc::Receiver<(Vec<u8>, String)>;
-
-struct SqliteQueue {
- conn: ProtectedConn,
- dequeue_rx: Rc<AsyncRefCell<DequeueReceiver>>,
- concurrency_limiter: Arc<Semaphore>,
- waker_tx: broadcast::Sender<()>,
- shutdown_tx: watch::Sender<()>,
-}
-
-impl SqliteQueue {
- fn new(
- conn: ProtectedConn,
- waker_tx: broadcast::Sender<()>,
- waker_rx: broadcast::Receiver<()>,
- ) -> Self {
- let conn_clone = conn.clone();
- let (shutdown_tx, shutdown_rx) = watch::channel::<()>(());
- let (dequeue_tx, dequeue_rx) = mpsc::channel::<(Vec<u8>, String)>(64);
-
- spawn(async move {
- // Oneshot requeue of all inflight messages.
- if let Err(e) = Self::requeue_inflight_messages(conn.clone()).await {
- // Exit the dequeue loop cleanly if the database has been closed.
- if is_conn_closed_error(&e) {
- return;
- }
- panic!("kv: Error in requeue_inflight_messages: {}", e);
- }
-
- // Continuous dequeue loop.
- if let Err(e) =
- Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
- .await
- {
- // Exit the dequeue loop cleanly if the database has been closed.
- if is_conn_closed_error(&e) {
- return;
- }
- panic!("kv: Error in dequeue_loop: {}", e);
- }
- });
-
- Self {
- conn: conn_clone,
- dequeue_rx: Rc::new(AsyncRefCell::new(dequeue_rx)),
- waker_tx,
- shutdown_tx,
- concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)),
- }
- }
-
- async fn dequeue(&self) -> Result<Option<DequeuedMessage>, AnyError> {
- // Wait for the next message to be available from dequeue_rx.
- let (payload, id) = {
- let mut queue_rx = self.dequeue_rx.borrow_mut().await;
- let Some(msg) = queue_rx.recv().await else {
- return Ok(None);
- };
- msg
- };
-
- let permit = self.concurrency_limiter.clone().acquire_owned().await?;
-
- Ok(Some(DequeuedMessage {
- conn: self.conn.downgrade(),
- id,
- payload: Some(payload),
- waker_tx: self.waker_tx.clone(),
- _permit: permit,
- }))
- }
-
- fn shutdown(&self) {
- let _ = self.shutdown_tx.send(());
- }
-
- async fn dequeue_loop(
- conn: ProtectedConn,
- dequeue_tx: mpsc::Sender<(Vec<u8>, String)>,
- mut shutdown_rx: watch::Receiver<()>,
- mut waker_rx: broadcast::Receiver<()>,
- ) -> Result<(), AnyError> {
- loop {
- let messages = SqliteDb::run_tx(conn.clone(), move |tx| {
- let now = SystemTime::now()
- .duration_since(SystemTime::UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64;
-
- let messages = tx
- .prepare_cached(STATEMENT_QUEUE_GET_NEXT_READY)?
- .query_map([now], |row| {
- let ts: u64 = row.get(0)?;
- let id: String = row.get(1)?;
- let data: Vec<u8> = row.get(2)?;
- let backoff_schedule: String = row.get(3)?;
- let keys_if_undelivered: String = row.get(4)?;
- Ok((ts, id, data, backoff_schedule, keys_if_undelivered))
- })?
- .collect::<Result<Vec<_>, rusqlite::Error>>()?;
-
- for (ts, id, data, backoff_schedule, keys_if_undelivered) in &messages {
- let changed = tx
- .prepare_cached(STATEMENT_QUEUE_REMOVE_READY)?
- .execute(params![id])?;
- assert_eq!(changed, 1);
-
- let changed =
- tx.prepare_cached(STATEMENT_QUEUE_ADD_RUNNING)?.execute(
- params![ts, id, &data, &backoff_schedule, &keys_if_undelivered],
- )?;
- assert_eq!(changed, 1);
- }
- tx.commit()?;
-
- Ok(
- messages
- .into_iter()
- .map(|(_, id, data, _, _)| (id, data))
- .collect::<Vec<_>>(),
- )
- })
- .await?;
-
- let busy = !messages.is_empty();
-
- for (id, data) in messages {
- if dequeue_tx.send((data, id)).await.is_err() {
- // Queue receiver was dropped. Stop the dequeue loop.
- return Ok(());
- }
- }
-
- if !busy {
- // There's nothing to dequeue right now; sleep until one of the
- // following happens:
- // - It's time to dequeue the next message based on its timestamp
- // - A new message is added to the queue
- // - The database is closed
- let sleep_fut = {
- match Self::get_earliest_ready_ts(conn.clone()).await? {
- Some(ts) => {
- let now = SystemTime::now()
- .duration_since(SystemTime::UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64;
- if ts <= now {
- continue;
- }
- tokio::time::sleep(Duration::from_millis(ts - now)).boxed()
- }
- None => futures::future::pending().boxed(),
- }
- };
- tokio::select! {
- _ = sleep_fut => {}
- x = waker_rx.recv() => {
- if let Err(RecvError::Closed) = x {return Ok(());}
- },
- _ = shutdown_rx.changed() => return Ok(())
- }
- }
- }
- }
-
- async fn get_earliest_ready_ts(
- conn: ProtectedConn,
- ) -> Result<Option<u64>, AnyError> {
- SqliteDb::run_tx(conn.clone(), move |tx| {
- let ts = tx
- .prepare_cached(STATEMENT_QUEUE_GET_EARLIEST_READY)?
- .query_row([], |row| {
- let ts: u64 = row.get(0)?;
- Ok(ts)
- })
- .optional()?;
- Ok(ts)
- })
- .await
- }
-
- async fn requeue_inflight_messages(
- conn: ProtectedConn,
- ) -> Result<(), AnyError> {
- loop {
- let done = SqliteDb::run_tx(conn.clone(), move |tx| {
- let entries = tx
- .prepare_cached(STATEMENT_QUEUE_GET_RUNNING)?
- .query_map([], |row| {
- let id: String = row.get(0)?;
- Ok(id)
- })?
- .collect::<Result<Vec<_>, rusqlite::Error>>()?;
- for id in &entries {
- Self::requeue_message(id, &tx)?;
- }
- tx.commit()?;
- Ok(entries.is_empty())
- })
- .await?;
- if done {
- return Ok(());
- }
- }
- }
-
- fn requeue_message(
- id: &str,
- tx: &rusqlite::Transaction<'_>,
- ) -> Result<bool, AnyError> {
- let Some((_, id, data, backoff_schedule, keys_if_undelivered)) = tx
- .prepare_cached(STATEMENT_QUEUE_GET_RUNNING_BY_ID)?
- .query_row([id], |row| {
- let deadline: u64 = row.get(0)?;
- let id: String = row.get(1)?;
- let data: Vec<u8> = row.get(2)?;
- let backoff_schedule: String = row.get(3)?;
- let keys_if_undelivered: String = row.get(4)?;
- Ok((deadline, id, data, backoff_schedule, keys_if_undelivered))
- })
- .optional()?
- else {
- return Ok(false);
- };
-
- let backoff_schedule = {
- let backoff_schedule =
- serde_json::from_str::<Option<Vec<u64>>>(&backoff_schedule)?;
- backoff_schedule.unwrap_or_default()
- };
-
- let mut requeued = false;
- if !backoff_schedule.is_empty() {
- // Requeue based on backoff schedule
- let now = SystemTime::now()
- .duration_since(SystemTime::UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64;
- let new_ts = now + backoff_schedule[0];
- let new_backoff_schedule = serde_json::to_string(&backoff_schedule[1..])?;
- let changed = tx
- .prepare_cached(STATEMENT_QUEUE_ADD_READY)?
- .execute(params![
- new_ts,
- id,
- &data,
- &new_backoff_schedule,
- &keys_if_undelivered
- ])
- .unwrap();
- assert_eq!(changed, 1);
- requeued = true;
- } else if !keys_if_undelivered.is_empty() {
- // No more requeues. Insert the message into the undelivered queue.
- let keys_if_undelivered =
- serde_json::from_str::<Vec<Vec<u8>>>(&keys_if_undelivered)?;
-
- let version: i64 = tx
- .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
- .query_row([], |row| row.get(0))?;
-
- for key in keys_if_undelivered {
- let changed = tx
- .prepare_cached(STATEMENT_KV_POINT_SET)?
- .execute(params![key, &data, &VALUE_ENCODING_V8, &version, -1i64])?;
- assert_eq!(changed, 1);
- }
- }
-
- // Remove from running
- let changed = tx
- .prepare_cached(STATEMENT_QUEUE_REMOVE_RUNNING)?
- .execute(params![id])?;
- assert_eq!(changed, 1);
-
- Ok(requeued)
- }
-}
-
-async fn watch_expiration(db: ProtectedConn) {
- loop {
- // Scan for expired keys
- let res = SqliteDb::run_tx(db.clone(), move |tx| {
- let now = SystemTime::now()
- .duration_since(SystemTime::UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64;
- tx.prepare_cached(
- "delete from kv where expiration_ms >= 0 and expiration_ms <= ?",
- )?
- .execute(params![now])?;
- tx.commit()?;
- Ok(())
- })
- .await;
- if let Err(e) = res {
- eprintln!("kv: Error in expiration watcher: {}", e);
- }
- let sleep_duration =
- Duration::from_secs_f64(60.0 + rand::thread_rng().gen_range(0.0..30.0));
- tokio::time::sleep(sleep_duration).await;
- }
-}
-
-#[async_trait(?Send)]
-impl Database for SqliteDb {
- type QMH = DequeuedMessage;
-
- async fn snapshot_read(
- &self,
- _state: Rc<RefCell<OpState>>,
- requests: Vec<ReadRange>,
- _options: SnapshotReadOptions,
- ) -> Result<Vec<ReadRangeOutput>, AnyError> {
- let requests = Arc::new(requests);
- Self::run_tx(self.conn.clone(), move |tx| {
- let mut responses = Vec::with_capacity(requests.len());
- for request in &*requests {
- let mut stmt = tx.prepare_cached(if request.reverse {
- STATEMENT_KV_RANGE_SCAN_REVERSE
- } else {
- STATEMENT_KV_RANGE_SCAN
- })?;
- let entries = stmt
- .query_map(
- (
- request.start.as_slice(),
- request.end.as_slice(),
- request.limit.get(),
- ),
- |row| {
- let key: Vec<u8> = row.get(0)?;
- let value: Vec<u8> = row.get(1)?;
- let encoding: i64 = row.get(2)?;
-
- let value = decode_value(value, encoding);
-
- let version: i64 = row.get(3)?;
- Ok(KvEntry {
- key,
- value,
- versionstamp: version_to_versionstamp(version),
- })
- },
- )?
- .collect::<Result<Vec<_>, rusqlite::Error>>()?;
- responses.push(ReadRangeOutput { entries });
- }
-
- Ok(responses)
- })
- .await
- }
-
- async fn atomic_write(
- &self,
- state: Rc<RefCell<OpState>>,
- write: AtomicWrite,
- ) -> Result<Option<CommitResult>, AnyError> {
- let write = Arc::new(write);
- let (has_enqueues, commit_result) =
- Self::run_tx(self.conn.clone(), move |tx| {
- for check in &write.checks {
- let real_versionstamp = tx
- .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)?
- .query_row([check.key.as_slice()], |row| row.get(0))
- .optional()?
- .map(version_to_versionstamp);
- if real_versionstamp != check.versionstamp {
- return Ok((false, None));
- }
- }
-
- let version: i64 = tx
- .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)?
- .query_row([], |row| row.get(0))?;
-
- for mutation in &write.mutations {
- match &mutation.kind {
- MutationKind::Set(value) => {
- let (value, encoding) = encode_value(value);
- let changed =
- tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![
- mutation.key,
- value,
- &encoding,
- &version,
- mutation
- .expire_at
- .and_then(|x| i64::try_from(x).ok())
- .unwrap_or(-1i64)
- ])?;
- assert_eq!(changed, 1)
- }
- MutationKind::Delete => {
- let changed = tx
- .prepare_cached(STATEMENT_KV_POINT_DELETE)?
- .execute(params![mutation.key])?;
- assert!(changed == 0 || changed == 1)
- }
- MutationKind::Sum(operand) => {
- mutate_le64(
- &tx,
- &mutation.key,
- "sum",
- operand,
- version,
- |a, b| a.wrapping_add(b),
- )?;
+ let path = path.clone();
+ let default_storage_dir = self.default_storage_dir.clone();
+ let (conn, queue_waker_key) = spawn_blocking(move || {
+ denokv_sqlite::sqlite_retry_loop(|| {
+ let (conn, queue_waker_key) =
+ match (path.as_deref(), &default_storage_dir) {
+ (Some(":memory:"), _) | (None, None) => {
+ (rusqlite::Connection::open_in_memory()?, None)
}
- MutationKind::Min(operand) => {
- mutate_le64(
- &tx,
- &mutation.key,
- "min",
- operand,
- version,
- |a, b| a.min(b),
- )?;
+ (Some(path), _) => {
+ let flags =
+ OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
+ let resolved_path = canonicalize_path(&PathBuf::from(path))?;
+ (
+ rusqlite::Connection::open_with_flags(path, flags)?,
+ Some(resolved_path),
+ )
}
- MutationKind::Max(operand) => {
- mutate_le64(
- &tx,
- &mutation.key,
- "max",
- operand,
- version,
- |a, b| a.max(b),
- )?;
+ (None, Some(path)) => {
+ std::fs::create_dir_all(path)?;
+ let path = path.join("kv.sqlite3");
+ (rusqlite::Connection::open(path.clone())?, Some(path))
}
- }
- }
-
- let now = SystemTime::now()
- .duration_since(SystemTime::UNIX_EPOCH)
- .unwrap()
- .as_millis() as u64;
-
- let has_enqueues = !write.enqueues.is_empty();
- for enqueue in &write.enqueues {
- let id = Uuid::new_v4().to_string();
- let backoff_schedule = serde_json::to_string(
- &enqueue
- .backoff_schedule
- .as_deref()
- .or_else(|| Some(&DEFAULT_BACKOFF_SCHEDULE[..])),
- )?;
- let keys_if_undelivered =
- serde_json::to_string(&enqueue.keys_if_undelivered)?;
+ };
- let changed =
- tx.prepare_cached(STATEMENT_QUEUE_ADD_READY)?
- .execute(params![
- now + enqueue.delay_ms,
- id,
- &enqueue.payload,
- &backoff_schedule,
- &keys_if_undelivered
- ])?;
- assert_eq!(changed, 1)
- }
-
- tx.commit()?;
- let new_versionstamp = version_to_versionstamp(version);
-
- Ok((
- has_enqueues,
- Some(CommitResult {
- versionstamp: new_versionstamp,
- }),
- ))
- })
- .await?;
-
- if has_enqueues {
- match self.queue.get() {
- Some(queue) => {
- let _ = queue.waker_tx.send(());
- }
- None => {
- if let Some(waker_key) = &self.queue_waker_key {
- let (waker_tx, _) =
- shared_queue_waker_channel(waker_key, state.clone());
- let _ = waker_tx.send(());
- }
- }
- }
- }
- Ok(commit_result)
- }
+ conn.pragma_update(None, "journal_mode", "wal")?;
- async fn dequeue_next_message(
- &self,
- state: Rc<RefCell<OpState>>,
- ) -> Result<Option<Self::QMH>, AnyError> {
- let queue = self
- .queue
- .get_or_init(|| async move {
- let (waker_tx, waker_rx) = {
- match &self.queue_waker_key {
- Some(waker_key) => {
- shared_queue_waker_channel(waker_key, state.clone())
- }
- None => broadcast::channel(1),
- }
- };
- SqliteQueue::new(self.conn.clone(), waker_tx, waker_rx)
+ Ok::<_, AnyError>((conn, queue_waker_key))
})
- .await;
- let handle = queue.dequeue().await?;
- Ok(handle)
- }
-
- fn close(&self) {
- if let Some(queue) = self.queue.get() {
- queue.shutdown();
- }
-
- self.expiration_watcher.abort();
-
- // The above `abort()` operation is asynchronous. It's not
- // guaranteed that the sqlite connection will be closed immediately.
- // So here we synchronously take the conn mutex and drop the connection.
- //
- // This blocks the event loop if the connection is still being used,
- // but ensures correctness - deleting the database file after calling
- // the `close` method will always work.
- self.conn.conn.lock().unwrap().take();
- }
-}
-
-/// Mutates a LE64 value in the database, defaulting to setting it to the
-/// operand if it doesn't exist.
-fn mutate_le64(
- tx: &Transaction,
- key: &[u8],
- op_name: &str,
- operand: &Value,
- new_version: i64,
- mutate: impl FnOnce(u64, u64) -> u64,
-) -> Result<(), AnyError> {
- let Value::U64(operand) = *operand else {
- return Err(type_error(format!(
- "Failed to perform '{op_name}' mutation on a non-U64 operand"
- )));
- };
-
- let old_value = tx
- .prepare_cached(STATEMENT_KV_POINT_GET_VALUE_ONLY)?
- .query_row([key], |row| {
- let value: Vec<u8> = row.get(0)?;
- let encoding: i64 = row.get(1)?;
-
- let value = decode_value(value, encoding);
- Ok(value)
})
- .optional()?;
-
- let new_value = match old_value {
- Some(Value::U64(old_value) ) => mutate(old_value, operand),
- Some(_) => return Err(type_error(format!("Failed to perform '{op_name}' mutation on a non-U64 value in the database"))),
- None => operand,
- };
-
- let new_value = Value::U64(new_value);
- let (new_value, encoding) = encode_value(&new_value);
-
- let changed = tx.prepare_cached(STATEMENT_KV_POINT_SET)?.execute(params![
- key,
- &new_value[..],
- encoding,
- new_version,
- -1i64,
- ])?;
- assert_eq!(changed, 1);
-
- Ok(())
-}
-
-fn version_to_versionstamp(version: i64) -> [u8; 10] {
- let mut versionstamp = [0; 10];
- versionstamp[..8].copy_from_slice(&version.to_be_bytes());
- versionstamp
-}
+ .await
+ .unwrap()?;
-const VALUE_ENCODING_V8: i64 = 1;
-const VALUE_ENCODING_LE64: i64 = 2;
-const VALUE_ENCODING_BYTES: i64 = 3;
+ let dequeue_notify = if let Some(queue_waker_key) = queue_waker_key {
+ QUEUE_WAKER_MAP
+ .get_or_init(Default::default)
+ .lock()
+ .unwrap()
+ .entry(queue_waker_key)
+ .or_default()
+ .clone()
+ } else {
+ Arc::new(Notify::new())
+ };
-fn decode_value(value: Vec<u8>, encoding: i64) -> crate::Value {
- match encoding {
- VALUE_ENCODING_V8 => crate::Value::V8(value),
- VALUE_ENCODING_BYTES => crate::Value::Bytes(value),
- VALUE_ENCODING_LE64 => {
- let mut buf = [0; 8];
- buf.copy_from_slice(&value);
- crate::Value::U64(u64::from_le_bytes(buf))
- }
- _ => todo!(),
- }
-}
+ let versionstamp_rng: Box<dyn RngCore + Send> =
+ match &self.versionstamp_rng_seed {
+ Some(seed) => Box::new(rand::rngs::StdRng::seed_from_u64(*seed)),
+ None => Box::new(rand::rngs::StdRng::from_entropy()),
+ };
-fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
- match value {
- crate::Value::V8(value) => (Cow::Borrowed(value), VALUE_ENCODING_V8),
- crate::Value::Bytes(value) => (Cow::Borrowed(value), VALUE_ENCODING_BYTES),
- crate::Value::U64(value) => {
- let mut buf = [0; 8];
- buf.copy_from_slice(&value.to_le_bytes());
- (Cow::Owned(buf.to_vec()), VALUE_ENCODING_LE64)
- }
+ denokv_sqlite::Sqlite::new(conn, dequeue_notify, versionstamp_rng)
}
}
-pub struct QueueWaker {
- wakers_tx: HashMap<PathBuf, broadcast::Sender<()>>,
-}
-
-fn shared_queue_waker_channel(
- waker_key: &Path,
- state: Rc<RefCell<OpState>>,
-) -> (broadcast::Sender<()>, broadcast::Receiver<()>) {
- let mut state = state.borrow_mut();
- let waker = {
- let waker = state.try_borrow_mut::<QueueWaker>();
- match waker {
- Some(waker) => waker,
- None => {
- let waker = QueueWaker {
- wakers_tx: HashMap::new(),
- };
- state.put::<QueueWaker>(waker);
- state.borrow_mut::<QueueWaker>()
- }
- }
- };
-
- let waker_tx = waker
- .wakers_tx
- .entry(waker_key.to_path_buf())
- .or_insert_with(|| {
- let (waker_tx, _) = broadcast::channel(1);
- waker_tx
- });
-
- (waker_tx.clone(), waker_tx.subscribe())
-}
-
/// Same as Path::canonicalize, but also handles non-existing paths.
fn canonicalize_path(path: &Path) -> Result<PathBuf, AnyError> {
let path = path.to_path_buf().clean();
@@ -1106,8 +166,3 @@ fn canonicalize_path(path: &Path) -> Result<PathBuf, AnyError> {
}
}
}
-
-fn is_conn_closed_error(e: &AnyError) -> bool {
- get_custom_error_class(e) == Some("TypeError")
- && e.to_string() == ERROR_USING_CLOSED_DATABASE
-}
diff --git a/runtime/build.rs b/runtime/build.rs
index dec687b6e..ce1896e6f 100644
--- a/runtime/build.rs
+++ b/runtime/build.rs
@@ -222,7 +222,7 @@ mod startup_snapshot {
deno_tls::deno_tls::init_ops_and_esm(),
deno_kv::deno_kv::init_ops_and_esm(deno_kv::sqlite::SqliteDbHandler::<
Permissions,
- >::new(None)),
+ >::new(None, None)),
deno_napi::deno_napi::init_ops_and_esm::<Permissions>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_io::deno_io::init_ops_and_esm(Default::default()),
diff --git a/runtime/errors.rs b/runtime/errors.rs
index e6ae14abb..47925fe5c 100644
--- a/runtime/errors.rs
+++ b/runtime/errors.rs
@@ -212,6 +212,10 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
.map(get_url_parse_error_class)
})
.or_else(|| {
+ e.downcast_ref::<deno_kv::sqlite::TypeError>()
+ .map(|_| "TypeError")
+ })
+ .or_else(|| {
#[cfg(unix)]
let maybe_get_nix_error_class =
|| e.downcast_ref::<nix::Error>().map(get_nix_error_class);
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index c1fe5a619..de69ce43b 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -436,7 +436,19 @@ impl WebWorker {
),
deno_tls::deno_tls::init_ops_and_esm(),
deno_kv::deno_kv::init_ops_and_esm(
- MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(None),
+ MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(
+ None,
+ options.seed,
+ deno_kv::remote::HttpOptions {
+ user_agent: options.bootstrap.user_agent.clone(),
+ root_cert_store_provider: options.root_cert_store_provider.clone(),
+ unsafely_ignore_certificate_errors: options
+ .unsafely_ignore_certificate_errors
+ .clone(),
+ client_cert_chain_and_key: None,
+ proxy: None,
+ },
+ ),
),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
diff --git a/runtime/worker.rs b/runtime/worker.rs
index e8d9ca6bc..f0fc25aa2 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -261,6 +261,16 @@ impl MainWorker {
deno_kv::deno_kv::init_ops_and_esm(
MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(
options.origin_storage_dir.clone(),
+ options.seed,
+ deno_kv::remote::HttpOptions {
+ user_agent: options.bootstrap.user_agent.clone(),
+ root_cert_store_provider: options.root_cert_store_provider.clone(),
+ unsafely_ignore_certificate_errors: options
+ .unsafely_ignore_certificate_errors
+ .clone(),
+ client_cert_chain_and_key: None,
+ proxy: None,
+ },
),
),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
diff --git a/test_util/Cargo.toml b/test_util/Cargo.toml
index f48fce0b3..72126ae66 100644
--- a/test_util/Cargo.toml
+++ b/test_util/Cargo.toml
@@ -19,6 +19,7 @@ async-stream = "0.3.3"
base64.workspace = true
bytes.workspace = true
console_static_text.workspace = true
+denokv_proto.workspace = true
fastwebsockets = { workspace = true, features = ["upgrade"] }
flate2.workspace = true
futures.workspace = true
diff --git a/test_util/build.rs b/test_util/build.rs
deleted file mode 100644
index 420abd0a1..000000000
--- a/test_util/build.rs
+++ /dev/null
@@ -1,22 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-use std::env;
-use std::io;
-use std::path::PathBuf;
-
-fn main() -> io::Result<()> {
- println!("cargo:rerun-if-changed=../ext/kv/proto");
-
- let descriptor_path =
- PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
-
- prost_build::Config::new()
- .file_descriptor_set_path(&descriptor_path)
- .compile_well_known_types()
- .compile_protos(
- &["../ext/kv/proto/datapath.proto"],
- &["../ext/kv/proto/"],
- )?;
-
- Ok(())
-}
diff --git a/test_util/src/kv_remote.rs b/test_util/src/kv_remote.rs
deleted file mode 100644
index d258a0551..000000000
--- a/test_util/src/kv_remote.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-
-// Generated code, disable lints
-#[allow(clippy::all, non_snake_case)]
-pub mod datapath {
- include!(concat!(env!("OUT_DIR"), "/datapath.rs"));
-}
diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs
index b7106a2b3..692a6a08c 100644
--- a/test_util/src/lib.rs
+++ b/test_util/src/lib.rs
@@ -4,6 +4,12 @@
use anyhow::anyhow;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
+use denokv_proto::datapath::AtomicWrite;
+use denokv_proto::datapath::AtomicWriteOutput;
+use denokv_proto::datapath::AtomicWriteStatus;
+use denokv_proto::datapath::ReadRangeOutput;
+use denokv_proto::datapath::SnapshotRead;
+use denokv_proto::datapath::SnapshotReadOutput;
use futures::Future;
use futures::FutureExt;
use futures::Stream;
@@ -18,12 +24,6 @@ use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
-use kv_remote::datapath::AtomicWrite;
-use kv_remote::datapath::AtomicWriteOutput;
-use kv_remote::datapath::AtomicWriteStatus;
-use kv_remote::datapath::ReadRangeOutput;
-use kv_remote::datapath::SnapshotRead;
-use kv_remote::datapath::SnapshotReadOutput;
use npm::CUSTOM_NPM_PACKAGE_CACHE;
use once_cell::sync::Lazy;
use pretty_assertions::assert_eq;
@@ -70,7 +70,6 @@ pub mod assertions;
mod builders;
pub mod factory;
mod fs;
-mod kv_remote;
pub mod lsp;
mod npm;
pub mod pty;
@@ -1206,7 +1205,7 @@ async fn main_server(
.header("content-type", "application/json")
.body(Body::from(
serde_json::json!({
- "version": 2,
+ "version": 1000,
"databaseId": KV_DATABASE_ID,
"endpoints": [
{
@@ -1268,9 +1267,7 @@ async fn main_server(
.map(|_| ReadRangeOutput { values: vec![] })
.collect(),
read_disabled: false,
- regions_if_read_disabled: vec![],
read_is_strongly_consistent: true,
- primary_if_not_strongly_consistent: "".into(),
}
.encode_to_vec(),
))
@@ -1311,7 +1308,7 @@ async fn main_server(
AtomicWriteOutput {
status: AtomicWriteStatus::AwSuccess.into(),
versionstamp: vec![0u8; 10],
- primary_if_write_disabled: "".into(),
+ failed_checks: vec![],
}
.encode_to_vec(),
))