summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHeyang Zhou <zhy20000919@hotmail.com>2023-08-22 13:56:00 +0800
committerGitHub <noreply@github.com>2023-08-22 13:56:00 +0800
commit6d4a005e4108a5dd762b339a02bc4d802755ba0d (patch)
tree69679038bfbd3127f6c1e1b85dbc347c8c52e36e
parent5834d282d4de5d0b5cacb9bf068f3896bef0a48a (diff)
feat(ext/kv): connect to remote database (#20178)
This patch adds a `remote` backend for `ext/kv`. This supports connection to Deno Deploy and potentially other services compatible with the KV Connect protocol.
-rw-r--r--.github/workflows/bench_cron.yml6
-rwxr-xr-x.github/workflows/ci.generate.ts6
-rw-r--r--.github/workflows/ci.yml6
-rw-r--r--Cargo.lock78
-rw-r--r--Cargo.toml5
-rw-r--r--README.md5
-rw-r--r--cli/Cargo.toml2
-rw-r--r--cli/schemas/kv-metadata-exchange-response.v1.json54
-rw-r--r--cli/tests/unit/kv_test.ts74
-rw-r--r--ext/kv/Cargo.toml7
-rw-r--r--ext/kv/README.md80
-rw-r--r--ext/kv/build.rs19
-rw-r--r--ext/kv/dynamic.rs216
-rw-r--r--ext/kv/interface.rs7
-rw-r--r--ext/kv/lib.rs10
-rw-r--r--ext/kv/proto/datapath.proto96
-rw-r--r--ext/kv/proto/mod.rs7
-rw-r--r--ext/kv/remote.rs558
-rw-r--r--ext/kv/sqlite.rs7
-rw-r--r--runtime/permissions/mod.rs16
-rw-r--r--runtime/web_worker.rs4
-rw-r--r--runtime/worker.rs4
-rw-r--r--test_util/Cargo.toml4
-rw-r--r--test_util/build.rs22
-rw-r--r--test_util/src/kv_remote.rs7
-rw-r--r--test_util/src/lib.rs204
26 files changed, 1492 insertions, 12 deletions
diff --git a/.github/workflows/bench_cron.yml b/.github/workflows/bench_cron.yml
index 3725f80a8..c62a0f267 100644
--- a/.github/workflows/bench_cron.yml
+++ b/.github/workflows/bench_cron.yml
@@ -31,6 +31,12 @@ jobs:
- uses: dsherret/rust-toolchain-file@v1
+ - name: Install protoc
+ uses: arduino/setup-protoc@v2
+ with:
+ version: "21.12"
+ repo-token: ${{ secrets.GITHUB_TOKEN }}
+
- name: Build release
run: cargo build --release --locked --all-targets
diff --git a/.github/workflows/ci.generate.ts b/.github/workflows/ci.generate.ts
index 728a9d768..aaf96c807 100755
--- a/.github/workflows/ci.generate.ts
+++ b/.github/workflows/ci.generate.ts
@@ -168,6 +168,11 @@ const installNodeStep = {
uses: "actions/setup-node@v3",
with: { "node-version": 18 },
};
+const installProtocStep = {
+ name: "Install protoc",
+ uses: "arduino/setup-protoc@v2",
+ with: { "version": "21.12", "repo-token": "${{ secrets.GITHUB_TOKEN }}" },
+};
const installDenoStep = {
name: "Install Deno",
uses: "denoland/setup-deno@v1",
@@ -434,6 +439,7 @@ const ci = {
if: "matrix.job == 'bench'",
...installNodeStep,
},
+ installProtocStep,
{
if: [
"matrix.profile == 'release' &&",
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index bfbdb5c9c..45607d82e 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -170,6 +170,12 @@ jobs:
uses: actions/setup-node@v3
with:
node-version: 18
+ - name: Install protoc
+ uses: arduino/setup-protoc@v2
+ with:
+ version: '21.12'
+ repo-token: '${{ secrets.GITHUB_TOKEN }}'
+ if: '!(github.event_name == ''pull_request'' && matrix.skip_pr)'
- if: |-
!(github.event_name == 'pull_request' && matrix.skip_pr) && (matrix.profile == 'release' &&
matrix.job == 'test' &&
diff --git a/Cargo.lock b/Cargo.lock
index 9f7febad2..1ba74428f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -507,6 +507,7 @@ dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
+ "serde",
"time 0.1.45",
"wasm-bindgen",
"winapi",
@@ -1227,15 +1228,20 @@ dependencies = [
"anyhow",
"async-trait",
"base64 0.13.1",
+ "chrono",
"deno_core",
"hex",
"log",
"num-bigint",
+ "prost",
+ "prost-build",
"rand",
+ "reqwest",
"rusqlite",
"serde",
"serde_json",
"tokio",
+ "url",
"uuid",
]
@@ -3114,6 +3120,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4519a88847ba2d5ead3dc53f1060ec6a571de93f325d9c5c4968147382b1cbc3"
[[package]]
+name = "multimap"
+version = "0.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
+
+[[package]]
name = "napi-build"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3696,6 +3708,16 @@ dependencies = [
]
[[package]]
+name = "prettyplease"
+version = "0.1.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86"
+dependencies = [
+ "proc-macro2 1.0.66",
+ "syn 1.0.109",
+]
+
+[[package]]
name = "primeorder"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3763,6 +3785,60 @@ dependencies = [
]
[[package]]
+name = "prost"
+version = "0.11.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
+dependencies = [
+ "bytes",
+ "prost-derive",
+]
+
+[[package]]
+name = "prost-build"
+version = "0.11.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
+dependencies = [
+ "bytes",
+ "heck",
+ "itertools",
+ "lazy_static",
+ "log",
+ "multimap",
+ "petgraph",
+ "prettyplease",
+ "prost",
+ "prost-types",
+ "regex",
+ "syn 1.0.109",
+ "tempfile",
+ "which",
+]
+
+[[package]]
+name = "prost-derive"
+version = "0.11.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
+dependencies = [
+ "anyhow",
+ "itertools",
+ "proc-macro2 1.0.66",
+ "quote 1.0.32",
+ "syn 1.0.109",
+]
+
+[[package]]
+name = "prost-types"
+version = "0.11.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
+dependencies = [
+ "prost",
+]
+
+[[package]]
name = "psm"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -5232,6 +5308,8 @@ dependencies = [
"os_pipe",
"parking_lot 0.12.1",
"pretty_assertions",
+ "prost",
+ "prost-build",
"regex",
"reqwest",
"ring",
diff --git a/Cargo.toml b/Cargo.toml
index 3e1927466..761794d65 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -82,6 +82,7 @@ brotli = "3.3.4"
bytes = "1.4.0"
cache_control = "=0.2.0"
cbc = { version = "=0.1.2", features = ["alloc"] }
+chrono = { version = "=0.4.26", default-features = false, features = ["std", "serde", "clock"] }
console_static_text = "=0.8.1"
data-url = "=0.2.0"
dlopen = "0.1.8"
@@ -110,10 +111,12 @@ parking_lot = "0.12.0"
percent-encoding = "=2.3.0"
pin-project = "1.0.11" # don't pin because they yank crates from cargo
pretty_assertions = "=1.3.0"
+prost = "0.11"
+prost-build = "0.11"
rand = "=0.8.5"
regex = "^1.7.0"
lazy-regex = "2.5.0"
-reqwest = { version = "0.11.18", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli", "socks"] }
+reqwest = { version = "0.11.18", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli", "socks", "json"] }
ring = "=0.16.20"
rusqlite = { version = "=0.29.0", features = ["unlock_notify", "bundled"] }
rustls = "0.21.0"
diff --git a/README.md b/README.md
index 47d811da7..d1da25b7f 100644
--- a/README.md
+++ b/README.md
@@ -60,6 +60,11 @@ scoop install deno
Build and install from source using [Cargo](https://crates.io/crates/deno):
```sh
+# Install the Protobuf compiler
+apt install -y protobuf-compiler # Linux
+brew install protobuf # macOS
+
+# Build and install Deno
cargo install deno --locked
```
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index dec23e6a5..b8a22b293 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -66,7 +66,7 @@ base32 = "=0.4.0"
base64.workspace = true
bincode = "=1.3.3"
cache_control.workspace = true
-chrono = { version = "=0.4.26", default-features = false, features = ["std"] }
+chrono.workspace = true
clap = { version = "=4.3.3", features = ["string"] }
clap_complete = "=4.3.1"
clap_complete_fig = "=4.3.1"
diff --git a/cli/schemas/kv-metadata-exchange-response.v1.json b/cli/schemas/kv-metadata-exchange-response.v1.json
new file mode 100644
index 000000000..aa29242fb
--- /dev/null
+++ b/cli/schemas/kv-metadata-exchange-response.v1.json
@@ -0,0 +1,54 @@
+{
+ "$id": "https://deno.land/x/deno/cli/schemas/kv-metadata-exchange-response.v1.json",
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "definitions": {
+ "Uuid": {
+ "type": "string",
+ "pattern": "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"
+ },
+ "DateTime": {
+ "type": "string",
+ "format": "date-time"
+ },
+ "EndpointInfo": {
+ "type": "object",
+ "properties": {
+ "url": {
+ "type": "string"
+ },
+ "consistency": {
+ "type": "string"
+ }
+ },
+ "required": ["url", "consistency"],
+ "additionalProperties": false
+ },
+ "DatabaseMetadata": {
+ "type": "object",
+ "properties": {
+ "version": {
+ "type": "integer",
+ "minimum": 0
+ },
+ "databaseId": {
+ "$ref": "#/definitions/Uuid"
+ },
+ "endpoints": {
+ "type": "array",
+ "items": {
+ "$ref": "#/definitions/EndpointInfo"
+ }
+ },
+ "token": {
+ "type": "string"
+ },
+ "expiresAt": {
+ "$ref": "#/definitions/DateTime"
+ }
+ },
+ "required": ["version", "databaseId", "endpoints", "token", "expiresAt"],
+ "additionalProperties": false
+ }
+ },
+ "$ref": "#/definitions/DatabaseMetadata"
+}
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts
index 438ebd7ee..acda9a0e2 100644
--- a/cli/tests/unit/kv_test.ts
+++ b/cli/tests/unit/kv_test.ts
@@ -20,6 +20,9 @@ try {
isCI = true;
}
+// Defined in test_util/src/lib.rs
+Deno.env.set("DENO_KV_ACCESS_TOKEN", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
+
Deno.test({
name: "openKv :memory: no permissions",
permissions: {},
@@ -1932,3 +1935,74 @@ Deno.test({
}
},
});
+
+Deno.test({
+ name: "remote backend",
+ async fn() {
+ const db = await Deno.openKv("http://localhost:4545/kv_remote_authorize");
+ try {
+ await db.set(["some-key"], 1);
+ const entry = await db.get(["some-key"]);
+ assertEquals(entry.value, null);
+ assertEquals(entry.versionstamp, null);
+ } finally {
+ db.close();
+ }
+ },
+});
+
+Deno.test({
+ name: "remote backend invalid format",
+ async fn() {
+ const db = await Deno.openKv(
+ "http://localhost:4545/kv_remote_authorize_invalid_format",
+ );
+ let ok = false;
+ try {
+ await db.set(["some-key"], 1);
+ } catch (e) {
+ if (
+ e.name === "TypeError" &&
+ e.message.startsWith("Metadata error: Failed to decode metadata: ")
+ ) {
+ ok = true;
+ } else {
+ throw e;
+ }
+ } finally {
+ db.close();
+ }
+
+ if (!ok) {
+ throw new Error("did not get expected error");
+ }
+ },
+});
+
+Deno.test({
+ name: "remote backend invalid version",
+ async fn() {
+ const db = await Deno.openKv(
+ "http://localhost:4545/kv_remote_authorize_invalid_version",
+ );
+ let ok = false;
+ try {
+ await db.set(["some-key"], 1);
+ } catch (e) {
+ if (
+ e.name === "TypeError" &&
+ e.message === "Metadata error: Unsupported metadata version: 2"
+ ) {
+ ok = true;
+ } else {
+ throw e;
+ }
+ } finally {
+ db.close();
+ }
+
+ if (!ok) {
+ throw new Error("did not get expected error");
+ }
+ },
+});
diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml
index 645689b74..bb2503e86 100644
--- a/ext/kv/Cargo.toml
+++ b/ext/kv/Cargo.toml
@@ -17,13 +17,20 @@ path = "lib.rs"
anyhow.workspace = true
async-trait.workspace = true
base64.workspace = true
+chrono.workspace = true
deno_core.workspace = true
hex.workspace = true
log.workspace = true
num-bigint.workspace = true
+prost.workspace = true
rand.workspace = true
+reqwest.workspace = true
rusqlite.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
+url.workspace = true
uuid.workspace = true
+
+[build-dependencies]
+prost-build.workspace = true
diff --git a/ext/kv/README.md b/ext/kv/README.md
index 32896da62..f5c2de9ed 100644
--- a/ext/kv/README.md
+++ b/ext/kv/README.md
@@ -1,3 +1,81 @@
# deno_kv
-This crate provides a key/value store for Deno.
+This crate provides a key/value store for Deno. For an overview of Deno KV,
+please read the [manual](https://deno.land/manual/runtime/kv).
+
+## Storage Backends
+
+Deno KV has a pluggable storage interface that supports multiple backends:
+
+- SQLite - backed by a local SQLite database. This backend is suitable for
+ development and is the default when running locally.
+- Remote - backed by a remote service that implements the
+ [KV Connect](#kv-connect) protocol, for example
+ [Deno Deploy](https://deno.com/deploy).
+
+Additional backends can be added by implementing the `DatabaseHandler` trait.
+
+## KV Connect
+
+The KV Connect protocol has separate control and data planes to maximize
+throughput and minimize latency. _Metadata Exchange_ and _Data Path_ are the two
+sub-protocols that are used when talking to a KV Connect-compatible service.
+
+### Metadata Exchange
+
+To connect to a KV Connect service, the user provides an HTTP or HTTPS URL to
+`Deno.openKv`. A background task is then spawned to periodically make HTTP POST
+requests to the provided URL to refresh database metadata.
+
+The HTTP `Authorization` header is included and have the format
+`Bearer <access-token>`. The `<access-token>` is a static token issued by the
+service provider. For Deno Deploy, this is the personal access token generated
+from the dashboard. You can specify the access token with the environment
+variable `DENO_KV_ACCESS_TOKEN`.
+
+Request body is currently unused. The response is a JSON message that satisfies
+the [JSON Schema](https://json-schema.org/) definition in
+`cli/schemas/kv-metadata-exchange-response.v1.json`.
+
+Semantics of the response fields:
+
+- `version`: Protocol version. The only supported value is `1`.
+- `databaseId`: UUID of the database.
+- `endpoints`: Data plane endpoints that can serve requests to the database,
+ along with their consistency levels.
+- `token`: An ephemeral authentication token that must be included in all
+ requests to the data plane. This value is an opaque string and the client
+ should not depend on its format.
+- `expiresAt`: The time at which the token expires. Encoded as an ISO 8601
+ string.
+
+### Data Path
+
+After the first metadata exchange has completed, the client can talk to the data
+plane endpoints listed in the `endpoints` field using a Protobuf-over-HTTP
+protocol called the _Data Path_. The Protobuf messages are defined in
+`proto/datapath.proto`.
+
+Two sub-endpoints are available under a data plane endpoint URL:
+
+- `POST /snapshot_read`: Used for read operations: `kv.get()` and
+ `kv.getMany()`.
+ - **Request type**: `SnapshotRead`
+ - **Response type**: `SnapshotReadOutput`
+- `POST /atomic_write`: Used for write operations: `kv.set()` and
+ `kv.atomic().commit()`.
+ - **Request type**: `AtomicWrite`
+ - **Response type**: `AtomicWriteOutput`
+
+An HTTP `Authorization` header in the format `Bearer <ephemeral-token>` must be
+included in all requests to the data plane. The value of `<ephemeral-token>` is
+the `token` field from the metadata exchange response.
+
+### Error handling
+
+All non-client errors (i.e. network errors and HTTP 5xx status codes) are
+handled by retrying the request. Randomized exponential backoff is applied to
+each retry.
+
+Client errors cannot be recovered by retrying. A JavaScript exception is
+generated for each of those errors.
diff --git a/ext/kv/build.rs b/ext/kv/build.rs
new file mode 100644
index 000000000..eba8a20f7
--- /dev/null
+++ b/ext/kv/build.rs
@@ -0,0 +1,19 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::env;
+use std::io;
+use std::path::PathBuf;
+
+fn main() -> io::Result<()> {
+ println!("cargo:rerun-if-changed=./proto");
+
+ let descriptor_path =
+ PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
+
+ prost_build::Config::new()
+ .file_descriptor_set_path(&descriptor_path)
+ .compile_well_known_types()
+ .compile_protos(&["proto/datapath.proto"], &["proto/"])?;
+
+ Ok(())
+}
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs
new file mode 100644
index 000000000..f79c10f55
--- /dev/null
+++ b/ext/kv/dynamic.rs
@@ -0,0 +1,216 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::cell::RefCell;
+use std::rc::Rc;
+
+use crate::remote::RemoteDbHandlerPermissions;
+use crate::sqlite::SqliteDbHandler;
+use crate::sqlite::SqliteDbHandlerPermissions;
+use crate::AtomicWrite;
+use crate::CommitResult;
+use crate::Database;
+use crate::DatabaseHandler;
+use crate::QueueMessageHandle;
+use crate::ReadRange;
+use crate::ReadRangeOutput;
+use crate::SnapshotReadOptions;
+use async_trait::async_trait;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::OpState;
+
+pub struct MultiBackendDbHandler {
+ backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
+}
+
+impl MultiBackendDbHandler {
+ pub fn new(
+ backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
+ ) -> Self {
+ Self { backends }
+ }
+
+ pub fn remote_or_sqlite<
+ P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static,
+ >(
+ default_storage_dir: Option<std::path::PathBuf>,
+ ) -> Self {
+ Self::new(vec![
+ (
+ &["https://", "http://"],
+ Box::new(crate::remote::RemoteDbHandler::<P>::new()),
+ ),
+ (
+ &[""],
+ Box::new(SqliteDbHandler::<P>::new(default_storage_dir)),
+ ),
+ ])
+ }
+}
+
+#[async_trait(?Send)]
+impl DatabaseHandler for MultiBackendDbHandler {
+ type DB = Box<dyn DynamicDb>;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError> {
+ for (prefixes, handler) in &self.backends {
+ for &prefix in *prefixes {
+ if prefix.is_empty() {
+ return handler.dyn_open(state.clone(), path.clone()).await;
+ }
+ let Some(path) = &path else {
+ continue;
+ };
+ if path.starts_with(prefix) {
+ return handler.dyn_open(state.clone(), Some(path.clone())).await;
+ }
+ }
+ }
+ Err(type_error(format!(
+ "No backend supports the given path: {:?}",
+ path
+ )))
+ }
+}
+
+#[async_trait(?Send)]
+pub trait DynamicDbHandler {
+ async fn dyn_open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Box<dyn DynamicDb>, AnyError>;
+}
+
+#[async_trait(?Send)]
+impl DatabaseHandler for Box<dyn DynamicDbHandler> {
+ type DB = Box<dyn DynamicDb>;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError> {
+ (**self).dyn_open(state, path).await
+ }
+}
+
+#[async_trait(?Send)]
+impl<T, DB> DynamicDbHandler for T
+where
+ T: DatabaseHandler<DB = DB>,
+ DB: Database + 'static,
+{
+ async fn dyn_open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Box<dyn DynamicDb>, AnyError> {
+ Ok(Box::new(self.open(state, path).await?))
+ }
+}
+
+#[async_trait(?Send)]
+pub trait DynamicDb {
+ async fn dyn_snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError>;
+
+ async fn dyn_atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError>;
+
+ async fn dyn_dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Box<dyn QueueMessageHandle>, AnyError>;
+
+ fn dyn_close(&self);
+}
+
+#[async_trait(?Send)]
+impl Database for Box<dyn DynamicDb> {
+ type QMH = Box<dyn QueueMessageHandle>;
+
+ async fn snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ (**self).dyn_snapshot_read(state, requests, options).await
+ }
+
+ async fn atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError> {
+ (**self).dyn_atomic_write(state, write).await
+ }
+
+ async fn dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
+ (**self).dyn_dequeue_next_message(state).await
+ }
+
+ fn close(&self) {
+ (**self).dyn_close()
+ }
+}
+
+#[async_trait(?Send)]
+impl<T, QMH> DynamicDb for T
+where
+ T: Database<QMH = QMH>,
+ QMH: QueueMessageHandle + 'static,
+{
+ async fn dyn_snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ Ok(self.snapshot_read(state, requests, options).await?)
+ }
+
+ async fn dyn_atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError> {
+ Ok(self.atomic_write(state, write).await?)
+ }
+
+ async fn dyn_dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
+ Ok(Box::new(self.dequeue_next_message(state).await?))
+ }
+
+ fn dyn_close(&self) {
+ self.close()
+ }
+}
+
+#[async_trait(?Send)]
+impl QueueMessageHandle for Box<dyn QueueMessageHandle> {
+ async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
+ (**self).take_payload().await
+ }
+ async fn finish(&self, success: bool) -> Result<(), AnyError> {
+ (**self).finish(success).await
+ }
+}
diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs
index 28b43f8d7..abeaf8dd5 100644
--- a/ext/kv/interface.rs
+++ b/ext/kv/interface.rs
@@ -29,16 +29,21 @@ pub trait Database {
async fn snapshot_read(
&self,
+ state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError>;
async fn atomic_write(
&self,
+ state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError>;
- async fn dequeue_next_message(&self) -> Result<Self::QMH, AnyError>;
+ async fn dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Self::QMH, AnyError>;
fn close(&self);
}
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index 7164a700b..f226b11ae 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -1,7 +1,10 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
pub mod codec;
+pub mod dynamic;
mod interface;
+mod proto;
+pub mod remote;
pub mod sqlite;
use std::borrow::Cow;
@@ -285,7 +288,8 @@ where
let opts = SnapshotReadOptions {
consistency: consistency.into(),
};
- let output_ranges = db.snapshot_read(read_ranges, opts).await?;
+ let output_ranges =
+ db.snapshot_read(state.clone(), read_ranges, opts).await?;
let output_ranges = output_ranges
.into_iter()
.map(|x| {
@@ -323,7 +327,7 @@ where
resource.db.clone()
};
- let mut handle = db.dequeue_next_message().await?;
+ let mut handle = db.dequeue_next_message(state.clone()).await?;
let payload = handle.take_payload().await?.into();
let handle_rid = {
let mut state = state.borrow_mut();
@@ -660,7 +664,7 @@ where
enqueues,
};
- let result = db.atomic_write(atomic_write).await?;
+ let result = db.atomic_write(state.clone(), atomic_write).await?;
Ok(result.map(|res| hex::encode(res.versionstamp)))
}
diff --git a/ext/kv/proto/datapath.proto b/ext/kv/proto/datapath.proto
new file mode 100644
index 000000000..ea48f2385
--- /dev/null
+++ b/ext/kv/proto/datapath.proto
@@ -0,0 +1,96 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+syntax = "proto3";
+
+package datapath;
+
+message SnapshotRead {
+ repeated ReadRange ranges = 1;
+}
+
+message SnapshotReadOutput {
+ repeated ReadRangeOutput ranges = 1;
+ bool read_disabled = 2;
+ repeated string regions_if_read_disabled = 3;
+ bool read_is_strongly_consistent = 4;
+ string primary_if_not_strongly_consistent = 5;
+}
+
+message ReadRange {
+ bytes start = 1;
+ bytes end = 2;
+ int32 limit = 3;
+ bool reverse = 4;
+}
+
+message ReadRangeOutput {
+ repeated KvEntry values = 1;
+}
+
+message AtomicWrite {
+ repeated KvCheck kv_checks = 1;
+ repeated KvMutation kv_mutations = 2;
+ repeated Enqueue enqueues = 3;
+}
+
+message AtomicWriteOutput {
+ AtomicWriteStatus status = 1;
+ bytes versionstamp = 2;
+ string primary_if_write_disabled = 3;
+}
+
+message KvCheck {
+ bytes key = 1;
+ bytes versionstamp = 2; // 10-byte raw versionstamp
+}
+
+message KvMutation {
+ bytes key = 1;
+ KvValue value = 2;
+ KvMutationType mutation_type = 3;
+}
+
+message KvValue {
+ bytes data = 1;
+ KvValueEncoding encoding = 2;
+}
+
+message KvEntry {
+ bytes key = 1;
+ bytes value = 2;
+ KvValueEncoding encoding = 3;
+ bytes versionstamp = 4;
+}
+
+enum KvMutationType {
+ M_UNSPECIFIED = 0;
+ M_SET = 1;
+ M_CLEAR = 2;
+ M_SUM = 3;
+ M_MAX = 4;
+ M_MIN = 5;
+}
+
+enum KvValueEncoding {
+ VE_UNSPECIFIED = 0;
+ VE_V8 = 1;
+ VE_LE64 = 2;
+ VE_BYTES = 3;
+}
+
+enum AtomicWriteStatus {
+ AW_UNSPECIFIED = 0;
+ AW_SUCCESS = 1;
+ AW_CHECK_FAILURE = 2;
+ AW_UNSUPPORTED_WRITE = 3;
+ AW_USAGE_LIMIT_EXCEEDED = 4;
+ AW_WRITE_DISABLED = 5;
+ AW_QUEUE_BACKLOG_LIMIT_EXCEEDED = 6;
+}
+
+message Enqueue {
+ bytes payload = 1;
+ int64 deadline_ms = 2;
+ repeated bytes kv_keys_if_undelivered = 3;
+ repeated uint32 backoff_schedule = 4;
+}
diff --git a/ext/kv/proto/mod.rs b/ext/kv/proto/mod.rs
new file mode 100644
index 000000000..d258a0551
--- /dev/null
+++ b/ext/kv/proto/mod.rs
@@ -0,0 +1,7 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+// Generated code, disable lints
+#[allow(clippy::all, non_snake_case)]
+pub mod datapath {
+ include!(concat!(env!("OUT_DIR"), "/datapath.rs"));
+}
diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs
new file mode 100644
index 000000000..47528d15f
--- /dev/null
+++ b/ext/kv/remote.rs
@@ -0,0 +1,558 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::cell::RefCell;
+use std::marker::PhantomData;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::proto::datapath as pb;
+use crate::AtomicWrite;
+use crate::CommitResult;
+use crate::Database;
+use crate::DatabaseHandler;
+use crate::KvEntry;
+use crate::MutationKind;
+use crate::QueueMessageHandle;
+use crate::ReadRange;
+use crate::ReadRangeOutput;
+use crate::SnapshotReadOptions;
+use anyhow::Context;
+use async_trait::async_trait;
+use chrono::DateTime;
+use chrono::Utc;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures::TryFutureExt;
+use deno_core::task::JoinHandle;
+use deno_core::OpState;
+use prost::Message;
+use rand::Rng;
+use serde::Deserialize;
+use tokio::sync::watch;
+use url::Url;
+use uuid::Uuid;
+
+pub trait RemoteDbHandlerPermissions {
+ fn check_env(&mut self, var: &str) -> Result<(), AnyError>;
+ fn check_net_url(
+ &mut self,
+ url: &Url,
+ api_name: &str,
+ ) -> Result<(), AnyError>;
+}
+
+pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> {
+ _p: std::marker::PhantomData<P>,
+}
+
+impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> {
+ pub fn new() -> Self {
+ Self { _p: PhantomData }
+ }
+}
+
+impl<P: RemoteDbHandlerPermissions> Default for RemoteDbHandler<P> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[derive(Deserialize)]
+struct VersionInfo {
+ version: u64,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+#[allow(dead_code)]
+struct DatabaseMetadata {
+ version: u64,
+ database_id: Uuid,
+ endpoints: Vec<EndpointInfo>,
+ token: String,
+ expires_at: DateTime<Utc>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct EndpointInfo {
+ pub url: String,
+
+ // Using `String` instead of an enum, so that parsing doesn't
+ // break if more consistency levels are added.
+ pub consistency: String,
+}
+
+#[async_trait(?Send)]
+impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> {
+ type DB = RemoteDb<P>;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError> {
+ const ENV_VAR_NAME: &str = "DENO_KV_ACCESS_TOKEN";
+
+ let Some(url) = path else {
+ return Err(type_error("Missing database url"));
+ };
+
+ let Ok(parsed_url) = Url::parse(&url) else {
+ return Err(type_error(format!("Invalid database url: {}", url)));
+ };
+
+ {
+ let mut state = state.borrow_mut();
+ let permissions = state.borrow_mut::<P>();
+ permissions.check_env(ENV_VAR_NAME)?;
+ permissions.check_net_url(&parsed_url, "Deno.openKv")?;
+ }
+
+ let access_token = std::env::var(ENV_VAR_NAME)
+ .map_err(anyhow::Error::from)
+ .with_context(|| {
+ "Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account."
+ })?;
+
+ let refresher = MetadataRefresher::new(url, access_token);
+
+ let db = RemoteDb {
+ client: reqwest::Client::new(),
+ refresher,
+ _p: PhantomData,
+ };
+ Ok(db)
+ }
+}
+
+pub struct RemoteDb<P: RemoteDbHandlerPermissions + 'static> {
+ client: reqwest::Client,
+ refresher: MetadataRefresher,
+ _p: std::marker::PhantomData<P>,
+}
+
+pub struct DummyQueueMessageHandle {}
+
+#[async_trait(?Send)]
+impl QueueMessageHandle for DummyQueueMessageHandle {
+ async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
+ unimplemented!()
+ }
+
+ async fn finish(&self, _success: bool) -> Result<(), AnyError> {
+ unimplemented!()
+ }
+}
+
+#[async_trait(?Send)]
+impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> {
+ type QMH = DummyQueueMessageHandle;
+
+ async fn snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ _options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ let req = pb::SnapshotRead {
+ ranges: requests
+ .into_iter()
+ .map(|r| pb::ReadRange {
+ start: r.start,
+ end: r.end,
+ limit: r.limit.get() as _,
+ reverse: r.reverse,
+ })
+ .collect(),
+ };
+
+ let res: pb::SnapshotReadOutput = call_remote::<P, _, _>(
+ &state,
+ &self.refresher,
+ &self.client,
+ "snapshot_read",
+ &req,
+ )
+ .await?;
+
+ if res.read_disabled {
+ return Err(type_error("Reads are disabled for this database."));
+ }
+
+ let out = res
+ .ranges
+ .into_iter()
+ .map(|r| {
+ Ok(ReadRangeOutput {
+ entries: r
+ .values
+ .into_iter()
+ .map(|e| {
+ let encoding = e.encoding();
+ Ok(KvEntry {
+ key: e.key,
+ value: decode_value(e.value, encoding)?,
+ versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?,
+ })
+ })
+ .collect::<Result<_, AnyError>>()?,
+ })
+ })
+ .collect::<Result<Vec<_>, AnyError>>()?;
+ Ok(out)
+ }
+
+ async fn atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError> {
+ if !write.enqueues.is_empty() {
+ return Err(type_error("Enqueue operations are not supported yet."));
+ }
+
+ let req = pb::AtomicWrite {
+ kv_checks: write
+ .checks
+ .into_iter()
+ .map(|x| {
+ Ok(pb::KvCheck {
+ key: x.key,
+ versionstamp: x.versionstamp.unwrap_or([0u8; 10]).to_vec(),
+ })
+ })
+ .collect::<anyhow::Result<_>>()?,
+ kv_mutations: write
+ .mutations
+ .into_iter()
+ .map(|x| encode_mutation(x.key, x.kind))
+ .collect(),
+ enqueues: vec![],
+ };
+
+ let res: pb::AtomicWriteOutput = call_remote::<P, _, _>(
+ &state,
+ &self.refresher,
+ &self.client,
+ "atomic_write",
+ &req,
+ )
+ .await?;
+ match res.status() {
+ pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult {
+ versionstamp: if res.versionstamp.is_empty() {
+ Default::default()
+ } else {
+ res.versionstamp[..].try_into()?
+ },
+ })),
+ pb::AtomicWriteStatus::AwCheckFailure => Ok(None),
+ pb::AtomicWriteStatus::AwUnsupportedWrite => {
+ Err(type_error("Unsupported write"))
+ }
+ pb::AtomicWriteStatus::AwUsageLimitExceeded => {
+ Err(type_error("The database usage limit has been exceeded."))
+ }
+ pb::AtomicWriteStatus::AwWriteDisabled => {
+ // TODO: Auto retry
+ Err(type_error("Writes are disabled for this database."))
+ }
+ pb::AtomicWriteStatus::AwUnspecified => {
+ Err(type_error("Unspecified error"))
+ }
+ pb::AtomicWriteStatus::AwQueueBacklogLimitExceeded => {
+ Err(type_error("Queue backlog limit exceeded"))
+ }
+ }
+ }
+
+ async fn dequeue_next_message(
+ &self,
+ _state: Rc<RefCell<OpState>>,
+ ) -> Result<Self::QMH, AnyError> {
+ deno_core::futures::future::pending().await
+ }
+
+ fn close(&self) {}
+}
+
+fn decode_value(
+ value: Vec<u8>,
+ encoding: pb::KvValueEncoding,
+) -> anyhow::Result<crate::Value> {
+ match encoding {
+ pb::KvValueEncoding::VeV8 => Ok(crate::Value::V8(value)),
+ pb::KvValueEncoding::VeBytes => Ok(crate::Value::Bytes(value)),
+ pb::KvValueEncoding::VeLe64 => Ok(crate::Value::U64(u64::from_le_bytes(
+ <[u8; 8]>::try_from(&value[..])?,
+ ))),
+ pb::KvValueEncoding::VeUnspecified => {
+ Err(anyhow::anyhow!("Unspecified value encoding, cannot decode"))
+ }
+ }
+}
+
+fn encode_value(value: crate::Value) -> pb::KvValue {
+ match value {
+ crate::Value::V8(data) => pb::KvValue {
+ data,
+ encoding: pb::KvValueEncoding::VeV8 as _,
+ },
+ crate::Value::Bytes(data) => pb::KvValue {
+ data,
+ encoding: pb::KvValueEncoding::VeBytes as _,
+ },
+ crate::Value::U64(x) => pb::KvValue {
+ data: x.to_le_bytes().to_vec(),
+ encoding: pb::KvValueEncoding::VeLe64 as _,
+ },
+ }
+}
+
+fn encode_mutation(key: Vec<u8>, mutation: MutationKind) -> pb::KvMutation {
+ match mutation {
+ MutationKind::Set(x) => pb::KvMutation {
+ key,
+ value: Some(encode_value(x)),
+ mutation_type: pb::KvMutationType::MSet as _,
+ },
+ MutationKind::Delete => pb::KvMutation {
+ key,
+ value: Some(encode_value(crate::Value::Bytes(vec![]))),
+ mutation_type: pb::KvMutationType::MClear as _,
+ },
+ MutationKind::Max(x) => pb::KvMutation {
+ key,
+ value: Some(encode_value(x)),
+ mutation_type: pb::KvMutationType::MMax as _,
+ },
+ MutationKind::Min(x) => pb::KvMutation {
+ key,
+ value: Some(encode_value(x)),
+ mutation_type: pb::KvMutationType::MMin as _,
+ },
+ MutationKind::Sum(x) => pb::KvMutation {
+ key,
+ value: Some(encode_value(x)),
+ mutation_type: pb::KvMutationType::MSum as _,
+ },
+ }
+}
+
+#[derive(Clone)]
+enum MetadataState {
+ Ready(Arc<DatabaseMetadata>),
+ Invalid(String),
+ Pending,
+}
+
+struct MetadataRefresher {
+ metadata_rx: watch::Receiver<MetadataState>,
+ handle: JoinHandle<()>,
+}
+
+impl MetadataRefresher {
+ pub fn new(url: String, access_token: String) -> Self {
+ let (tx, rx) = watch::channel(MetadataState::Pending);
+ let handle =
+ deno_core::task::spawn(metadata_refresh_task(url, access_token, tx));
+ Self {
+ handle,
+ metadata_rx: rx,
+ }
+ }
+}
+
+impl Drop for MetadataRefresher {
+ fn drop(&mut self) {
+ self.handle.abort();
+ }
+}
+
+async fn metadata_refresh_task(
+ metadata_url: String,
+ access_token: String,
+ tx: watch::Sender<MetadataState>,
+) {
+ let client = reqwest::Client::new();
+ loop {
+ let mut attempt = 0u64;
+ let metadata = loop {
+ match fetch_metadata(&client, &metadata_url, &access_token).await {
+ Ok(Ok(x)) => break x,
+ Ok(Err(e)) => {
+ if tx.send(MetadataState::Invalid(e)).is_err() {
+ return;
+ }
+ }
+ Err(e) => {
+ log::error!("Failed to fetch database metadata: {}", e);
+ }
+ }
+ randomized_exponential_backoff(Duration::from_secs(5), attempt).await;
+ attempt += 1;
+ };
+
+ let ms_until_expire = u64::try_from(
+ metadata
+ .expires_at
+ .timestamp_millis()
+ .saturating_sub(Utc::now().timestamp_millis()),
+ )
+ .unwrap_or_default();
+
+ // Refresh 10 minutes before expiry
+ // In case of buggy clocks, don't refresh more than once per minute
+ let interval = Duration::from_millis(ms_until_expire)
+ .saturating_sub(Duration::from_secs(600))
+ .max(Duration::from_secs(60));
+
+ if tx.send(MetadataState::Ready(Arc::new(metadata))).is_err() {
+ return;
+ }
+
+ tokio::time::sleep(interval).await;
+ }
+}
+
+async fn fetch_metadata(
+ client: &reqwest::Client,
+ metadata_url: &str,
+ access_token: &str,
+) -> anyhow::Result<Result<DatabaseMetadata, String>> {
+ let res = client
+ .post(metadata_url)
+ .header("authorization", format!("Bearer {}", access_token))
+ .send()
+ .await?;
+
+ if !res.status().is_success() {
+ if res.status().is_client_error() {
+ return Ok(Err(format!(
+ "Client error while fetching metadata: {:?} {}",
+ res.status(),
+ res.text().await?
+ )));
+ } else {
+ anyhow::bail!(
+ "remote returned error: {:?} {}",
+ res.status(),
+ res.text().await?
+ );
+ }
+ }
+
+ let res = res.bytes().await?;
+ let version_info: VersionInfo = match serde_json::from_slice(&res) {
+ Ok(x) => x,
+ Err(e) => return Ok(Err(format!("Failed to decode version info: {}", e))),
+ };
+ if version_info.version > 1 {
+ return Ok(Err(format!(
+ "Unsupported metadata version: {}",
+ version_info.version
+ )));
+ }
+
+ Ok(
+ serde_json::from_slice(&res)
+ .map_err(|e| format!("Failed to decode metadata: {}", e)),
+ )
+}
+
+async fn randomized_exponential_backoff(base: Duration, attempt: u64) {
+ let attempt = attempt.min(12);
+ let delay = base.as_millis() as u64 + (2 << attempt);
+ let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1);
+ tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
+}
+
+async fn call_remote<
+ P: RemoteDbHandlerPermissions + 'static,
+ T: Message,
+ R: Message + Default,
+>(
+ state: &RefCell<OpState>,
+ refresher: &MetadataRefresher,
+ client: &reqwest::Client,
+ method: &str,
+ req: &T,
+) -> anyhow::Result<R> {
+ let mut attempt = 0u64;
+ let res = loop {
+ let mut metadata_rx = refresher.metadata_rx.clone();
+ let metadata = loop {
+ match &*metadata_rx.borrow() {
+ MetadataState::Pending => {}
+ MetadataState::Ready(x) => break x.clone(),
+ MetadataState::Invalid(e) => {
+ return Err(type_error(format!("Metadata error: {}", e)))
+ }
+ }
+ // `unwrap()` never fails because `tx` is owned by the task held by `refresher`.
+ metadata_rx.changed().await.unwrap();
+ };
+ let Some(sc_endpoint) = metadata.endpoints.iter().find(|x| x.consistency == "strong") else {
+ return Err(type_error("No strong consistency endpoint is available for this database"));
+ };
+
+ let full_url = format!("{}/{}", sc_endpoint.url, method);
+ {
+ let parsed_url = Url::parse(&full_url)?;
+ let mut state = state.borrow_mut();
+ let permissions = state.borrow_mut::<P>();
+ permissions.check_net_url(&parsed_url, "Deno.Kv")?;
+ }
+
+ let res = client
+ .post(&full_url)
+ .header("x-transaction-domain-id", metadata.database_id.to_string())
+ .header("authorization", format!("Bearer {}", metadata.token))
+ .body(req.encode_to_vec())
+ .send()
+ .map_err(anyhow::Error::from)
+ .and_then(|x| async move {
+ if x.status().is_success() {
+ Ok(Ok(x.bytes().await?))
+ } else if x.status().is_client_error() {
+ Ok(Err((x.status(), x.text().await?)))
+ } else {
+ Err(anyhow::anyhow!(
+ "server error ({:?}): {}",
+ x.status(),
+ x.text().await?
+ ))
+ }
+ })
+ .await;
+
+ match res {
+ Ok(x) => break x,
+ Err(e) => {
+ log::error!("retryable error in {}: {}", method, e);
+ randomized_exponential_backoff(Duration::from_millis(0), attempt).await;
+ attempt += 1;
+ }
+ }
+ };
+
+ let res = match res {
+ Ok(x) => x,
+ Err((status, message)) => {
+ return Err(type_error(format!(
+ "client error in {} (status {:?}): {}",
+ method, status, message
+ )))
+ }
+ };
+
+ match R::decode(&*res) {
+ Ok(x) => Ok(x),
+ Err(e) => Err(type_error(format!(
+ "failed to decode response from {}: {}",
+ method, e
+ ))),
+ }
+}
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index 8e37d2c87..bf2688920 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -724,6 +724,7 @@ impl Database for SqliteDb {
async fn snapshot_read(
&self,
+ _state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
_options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
@@ -769,6 +770,7 @@ impl Database for SqliteDb {
async fn atomic_write(
&self,
+ _state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
let write = Arc::new(write);
@@ -894,7 +896,10 @@ impl Database for SqliteDb {
Ok(commit_result)
}
- async fn dequeue_next_message(&self) -> Result<Self::QMH, AnyError> {
+ async fn dequeue_next_message(
+ &self,
+ _state: Rc<RefCell<OpState>>,
+ ) -> Result<Self::QMH, AnyError> {
let queue = self
.queue
.get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) })
diff --git a/runtime/permissions/mod.rs b/runtime/permissions/mod.rs
index 93294fc92..a87ca309f 100644
--- a/runtime/permissions/mod.rs
+++ b/runtime/permissions/mod.rs
@@ -1483,6 +1483,22 @@ impl deno_kv::sqlite::SqliteDbHandlerPermissions for PermissionsContainer {
}
}
+impl deno_kv::remote::RemoteDbHandlerPermissions for PermissionsContainer {
+ #[inline(always)]
+ fn check_env(&mut self, var: &str) -> Result<(), AnyError> {
+ self.0.lock().env.check(var)
+ }
+
+ #[inline(always)]
+ fn check_net_url(
+ &mut self,
+ url: &url::Url,
+ api_name: &str,
+ ) -> Result<(), AnyError> {
+ self.0.lock().net.check_url(url, Some(api_name))
+ }
+}
+
fn unit_permission_from_flag_bools(
allow_flag: bool,
deny_flag: bool,
diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs
index 0c4e95140..8a88dfa40 100644
--- a/runtime/web_worker.rs
+++ b/runtime/web_worker.rs
@@ -38,7 +38,7 @@ use deno_core::SourceMapGetter;
use deno_fs::FileSystem;
use deno_http::DefaultHttpPropertyExtractor;
use deno_io::Stdio;
-use deno_kv::sqlite::SqliteDbHandler;
+use deno_kv::dynamic::MultiBackendDbHandler;
use deno_node::SUPPORTED_BUILTIN_NODE_MODULES_WITH_PREFIX;
use deno_tls::RootCertStoreProvider;
use deno_web::create_entangled_message_port;
@@ -439,7 +439,7 @@ impl WebWorker {
),
deno_tls::deno_tls::init_ops_and_esm(),
deno_kv::deno_kv::init_ops_and_esm(
- SqliteDbHandler::<PermissionsContainer>::new(None),
+ MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(None),
unstable,
),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
diff --git a/runtime/worker.rs b/runtime/worker.rs
index 5eefd5fa8..a31bd2ae1 100644
--- a/runtime/worker.rs
+++ b/runtime/worker.rs
@@ -35,7 +35,7 @@ use deno_core::SourceMapGetter;
use deno_fs::FileSystem;
use deno_http::DefaultHttpPropertyExtractor;
use deno_io::Stdio;
-use deno_kv::sqlite::SqliteDbHandler;
+use deno_kv::dynamic::MultiBackendDbHandler;
use deno_node::SUPPORTED_BUILTIN_NODE_MODULES_WITH_PREFIX;
use deno_tls::RootCertStoreProvider;
use deno_web::BlobStore;
@@ -334,7 +334,7 @@ impl MainWorker {
),
deno_tls::deno_tls::init_ops_and_esm(),
deno_kv::deno_kv::init_ops_and_esm(
- SqliteDbHandler::<PermissionsContainer>::new(
+ MultiBackendDbHandler::remote_or_sqlite::<PermissionsContainer>(
options.origin_storage_dir.clone(),
),
unstable,
diff --git a/test_util/Cargo.toml b/test_util/Cargo.toml
index 2f35473e8..6ea9d870d 100644
--- a/test_util/Cargo.toml
+++ b/test_util/Cargo.toml
@@ -31,6 +31,7 @@ once_cell.workspace = true
os_pipe.workspace = true
parking_lot.workspace = true
pretty_assertions.workspace = true
+prost.workspace = true
regex.workspace = true
reqwest.workspace = true
ring.workspace = true
@@ -46,3 +47,6 @@ url.workspace = true
[target.'cfg(windows)'.dependencies]
winapi = { workspace = true, features = ["consoleapi", "synchapi", "handleapi", "namedpipeapi", "winbase", "winerror"] }
+
+[build-dependencies]
+prost-build.workspace = true
diff --git a/test_util/build.rs b/test_util/build.rs
new file mode 100644
index 000000000..420abd0a1
--- /dev/null
+++ b/test_util/build.rs
@@ -0,0 +1,22 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::env;
+use std::io;
+use std::path::PathBuf;
+
+fn main() -> io::Result<()> {
+ println!("cargo:rerun-if-changed=../ext/kv/proto");
+
+ let descriptor_path =
+ PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
+
+ prost_build::Config::new()
+ .file_descriptor_set_path(&descriptor_path)
+ .compile_well_known_types()
+ .compile_protos(
+ &["../ext/kv/proto/datapath.proto"],
+ &["../ext/kv/proto/"],
+ )?;
+
+ Ok(())
+}
diff --git a/test_util/src/kv_remote.rs b/test_util/src/kv_remote.rs
new file mode 100644
index 000000000..d258a0551
--- /dev/null
+++ b/test_util/src/kv_remote.rs
@@ -0,0 +1,7 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+// Generated code, disable lints
+#[allow(clippy::all, non_snake_case)]
+pub mod datapath {
+ include!(concat!(env!("OUT_DIR"), "/datapath.rs"));
+}
diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs
index 136576890..41fb96a1b 100644
--- a/test_util/src/lib.rs
+++ b/test_util/src/lib.rs
@@ -15,9 +15,16 @@ use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
+use kv_remote::datapath::AtomicWrite;
+use kv_remote::datapath::AtomicWriteOutput;
+use kv_remote::datapath::AtomicWriteStatus;
+use kv_remote::datapath::ReadRangeOutput;
+use kv_remote::datapath::SnapshotRead;
+use kv_remote::datapath::SnapshotReadOutput;
use npm::CUSTOM_NPM_PACKAGE_CACHE;
use once_cell::sync::Lazy;
use pretty_assertions::assert_eq;
+use prost::Message;
use pty::Pty;
use regex::Regex;
use rustls::Certificate;
@@ -57,6 +64,7 @@ pub mod assertions;
mod builders;
pub mod factory;
mod fs;
+mod kv_remote;
pub mod lsp;
mod npm;
pub mod pty;
@@ -72,6 +80,9 @@ const PORT: u16 = 4545;
const TEST_AUTH_TOKEN: &str = "abcdef123456789";
const TEST_BASIC_AUTH_USERNAME: &str = "testuser123";
const TEST_BASIC_AUTH_PASSWORD: &str = "testpassabc";
+const KV_DATABASE_ID: &str = "11111111-1111-1111-1111-111111111111";
+const KV_ACCESS_TOKEN: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
+const KV_DATABASE_TOKEN: &str = "MOCKMOCKMOCKMOCKMOCKMOCKMOCK";
const REDIRECT_PORT: u16 = 4546;
const ANOTHER_REDIRECT_PORT: u16 = 4547;
const DOUBLE_REDIRECTS_PORT: u16 = 4548;
@@ -1095,6 +1106,199 @@ async fn main_server(
let res = Response::new(Body::from(query.unwrap_or_default()));
Ok(res)
}
+ (&hyper::Method::POST, "/kv_remote_authorize") => {
+ if req
+ .headers()
+ .get("authorization")
+ .and_then(|x| x.to_str().ok())
+ .unwrap_or_default()
+ != format!("Bearer {}", KV_ACCESS_TOKEN)
+ {
+ return Ok(
+ Response::builder()
+ .status(StatusCode::UNAUTHORIZED)
+ .body(Body::empty())
+ .unwrap(),
+ );
+ }
+
+ Ok(
+ Response::builder()
+ .header("content-type", "application/json")
+ .body(Body::from(
+ serde_json::json!({
+ "version": 1,
+ "databaseId": KV_DATABASE_ID,
+ "endpoints": [
+ {
+ "url": format!("http://localhost:{}/kv_blackhole", PORT),
+ "consistency": "strong",
+ }
+ ],
+ "token": KV_DATABASE_TOKEN,
+ "expiresAt": "2099-01-01T00:00:00Z",
+ })
+ .to_string(),
+ ))
+ .unwrap(),
+ )
+ }
+ (&hyper::Method::POST, "/kv_remote_authorize_invalid_format") => {
+ if req
+ .headers()
+ .get("authorization")
+ .and_then(|x| x.to_str().ok())
+ .unwrap_or_default()
+ != format!("Bearer {}", KV_ACCESS_TOKEN)
+ {
+ return Ok(
+ Response::builder()
+ .status(StatusCode::UNAUTHORIZED)
+ .body(Body::empty())
+ .unwrap(),
+ );
+ }
+
+ Ok(
+ Response::builder()
+ .header("content-type", "application/json")
+ .body(Body::from(
+ serde_json::json!({
+ "version": 1,
+ "databaseId": KV_DATABASE_ID,
+ })
+ .to_string(),
+ ))
+ .unwrap(),
+ )
+ }
+ (&hyper::Method::POST, "/kv_remote_authorize_invalid_version") => {
+ if req
+ .headers()
+ .get("authorization")
+ .and_then(|x| x.to_str().ok())
+ .unwrap_or_default()
+ != format!("Bearer {}", KV_ACCESS_TOKEN)
+ {
+ return Ok(
+ Response::builder()
+ .status(StatusCode::UNAUTHORIZED)
+ .body(Body::empty())
+ .unwrap(),
+ );
+ }
+
+ Ok(
+ Response::builder()
+ .header("content-type", "application/json")
+ .body(Body::from(
+ serde_json::json!({
+ "version": 2,
+ "databaseId": KV_DATABASE_ID,
+ "endpoints": [
+ {
+ "url": format!("http://localhost:{}/kv_blackhole", PORT),
+ "consistency": "strong",
+ }
+ ],
+ "token": KV_DATABASE_TOKEN,
+ "expiresAt": "2099-01-01T00:00:00Z",
+ })
+ .to_string(),
+ ))
+ .unwrap(),
+ )
+ }
+ (&hyper::Method::POST, "/kv_blackhole/snapshot_read") => {
+ if req
+ .headers()
+ .get("authorization")
+ .and_then(|x| x.to_str().ok())
+ .unwrap_or_default()
+ != format!("Bearer {}", KV_DATABASE_TOKEN)
+ {
+ return Ok(
+ Response::builder()
+ .status(StatusCode::UNAUTHORIZED)
+ .body(Body::empty())
+ .unwrap(),
+ );
+ }
+
+ let body = hyper::body::to_bytes(req.into_body())
+ .await
+ .unwrap_or_default();
+ let Ok(body): Result<SnapshotRead, _> = prost::Message::decode(&body[..]) else {
+ return Ok(Response::builder()
+ .status(StatusCode::BAD_REQUEST)
+ .body(Body::empty())
+ .unwrap());
+ };
+ if body.ranges.is_empty() {
+ return Ok(
+ Response::builder()
+ .status(StatusCode::BAD_REQUEST)
+ .body(Body::empty())
+ .unwrap(),
+ );
+ }
+ Ok(
+ Response::builder()
+ .body(Body::from(
+ SnapshotReadOutput {
+ ranges: body
+ .ranges
+ .iter()
+ .map(|_| ReadRangeOutput { values: vec![] })
+ .collect(),
+ read_disabled: false,
+ regions_if_read_disabled: vec![],
+ read_is_strongly_consistent: true,
+ primary_if_not_strongly_consistent: "".into(),
+ }
+ .encode_to_vec(),
+ ))
+ .unwrap(),
+ )
+ }
+ (&hyper::Method::POST, "/kv_blackhole/atomic_write") => {
+ if req
+ .headers()
+ .get("authorization")
+ .and_then(|x| x.to_str().ok())
+ .unwrap_or_default()
+ != format!("Bearer {}", KV_DATABASE_TOKEN)
+ {
+ return Ok(
+ Response::builder()
+ .status(StatusCode::UNAUTHORIZED)
+ .body(Body::empty())
+ .unwrap(),
+ );
+ }
+
+ let body = hyper::body::to_bytes(req.into_body())
+ .await
+ .unwrap_or_default();
+ let Ok(_body): Result<AtomicWrite, _> = prost::Message::decode(&body[..]) else {
+ return Ok(Response::builder()
+ .status(StatusCode::BAD_REQUEST)
+ .body(Body::empty())
+ .unwrap());
+ };
+ Ok(
+ Response::builder()
+ .body(Body::from(
+ AtomicWriteOutput {
+ status: AtomicWriteStatus::AwSuccess.into(),
+ versionstamp: vec![0u8; 10],
+ primary_if_write_disabled: "".into(),
+ }
+ .encode_to_vec(),
+ ))
+ .unwrap(),
+ )
+ }
_ => {
let mut file_path = testdata_path().to_path_buf();
file_path.push(&req.uri().path()[1..]);