summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/kv/Cargo.toml7
-rw-r--r--ext/kv/README.md80
-rw-r--r--ext/kv/build.rs19
-rw-r--r--ext/kv/dynamic.rs216
-rw-r--r--ext/kv/interface.rs7
-rw-r--r--ext/kv/lib.rs10
-rw-r--r--ext/kv/proto/datapath.proto96
-rw-r--r--ext/kv/proto/mod.rs7
-rw-r--r--ext/kv/remote.rs558
-rw-r--r--ext/kv/sqlite.rs7
10 files changed, 1001 insertions, 6 deletions
diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml
index 645689b74..bb2503e86 100644
--- a/ext/kv/Cargo.toml
+++ b/ext/kv/Cargo.toml
@@ -17,13 +17,20 @@ path = "lib.rs"
anyhow.workspace = true
async-trait.workspace = true
base64.workspace = true
+chrono.workspace = true
deno_core.workspace = true
hex.workspace = true
log.workspace = true
num-bigint.workspace = true
+prost.workspace = true
rand.workspace = true
+reqwest.workspace = true
rusqlite.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
+url.workspace = true
uuid.workspace = true
+
+[build-dependencies]
+prost-build.workspace = true
diff --git a/ext/kv/README.md b/ext/kv/README.md
index 32896da62..f5c2de9ed 100644
--- a/ext/kv/README.md
+++ b/ext/kv/README.md
@@ -1,3 +1,81 @@
# deno_kv
-This crate provides a key/value store for Deno.
+This crate provides a key/value store for Deno. For an overview of Deno KV,
+please read the [manual](https://deno.land/manual/runtime/kv).
+
+## Storage Backends
+
+Deno KV has a pluggable storage interface that supports multiple backends:
+
+- SQLite - backed by a local SQLite database. This backend is suitable for
+ development and is the default when running locally.
+- Remote - backed by a remote service that implements the
+ [KV Connect](#kv-connect) protocol, for example
+ [Deno Deploy](https://deno.com/deploy).
+
+Additional backends can be added by implementing the `DatabaseHandler` trait.
+
+## KV Connect
+
+The KV Connect protocol has separate control and data planes to maximize
+throughput and minimize latency. _Metadata Exchange_ and _Data Path_ are the two
+sub-protocols that are used when talking to a KV Connect-compatible service.
+
+### Metadata Exchange
+
+To connect to a KV Connect service, the user provides an HTTP or HTTPS URL to
+`Deno.openKv`. A background task is then spawned to periodically make HTTP POST
+requests to the provided URL to refresh database metadata.
+
+The HTTP `Authorization` header is included and have the format
+`Bearer <access-token>`. The `<access-token>` is a static token issued by the
+service provider. For Deno Deploy, this is the personal access token generated
+from the dashboard. You can specify the access token with the environment
+variable `DENO_KV_ACCESS_TOKEN`.
+
+Request body is currently unused. The response is a JSON message that satisfies
+the [JSON Schema](https://json-schema.org/) definition in
+`cli/schemas/kv-metadata-exchange-response.v1.json`.
+
+Semantics of the response fields:
+
+- `version`: Protocol version. The only supported value is `1`.
+- `databaseId`: UUID of the database.
+- `endpoints`: Data plane endpoints that can serve requests to the database,
+ along with their consistency levels.
+- `token`: An ephemeral authentication token that must be included in all
+ requests to the data plane. This value is an opaque string and the client
+ should not depend on its format.
+- `expiresAt`: The time at which the token expires. Encoded as an ISO 8601
+ string.
+
+### Data Path
+
+After the first metadata exchange has completed, the client can talk to the data
+plane endpoints listed in the `endpoints` field using a Protobuf-over-HTTP
+protocol called the _Data Path_. The Protobuf messages are defined in
+`proto/datapath.proto`.
+
+Two sub-endpoints are available under a data plane endpoint URL:
+
+- `POST /snapshot_read`: Used for read operations: `kv.get()` and
+ `kv.getMany()`.
+ - **Request type**: `SnapshotRead`
+ - **Response type**: `SnapshotReadOutput`
+- `POST /atomic_write`: Used for write operations: `kv.set()` and
+ `kv.atomic().commit()`.
+ - **Request type**: `AtomicWrite`
+ - **Response type**: `AtomicWriteOutput`
+
+An HTTP `Authorization` header in the format `Bearer <ephemeral-token>` must be
+included in all requests to the data plane. The value of `<ephemeral-token>` is
+the `token` field from the metadata exchange response.
+
+### Error handling
+
+All non-client errors (i.e. network errors and HTTP 5xx status codes) are
+handled by retrying the request. Randomized exponential backoff is applied to
+each retry.
+
+Client errors cannot be recovered by retrying. A JavaScript exception is
+generated for each of those errors.
diff --git a/ext/kv/build.rs b/ext/kv/build.rs
new file mode 100644
index 000000000..eba8a20f7
--- /dev/null
+++ b/ext/kv/build.rs
@@ -0,0 +1,19 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::env;
+use std::io;
+use std::path::PathBuf;
+
+fn main() -> io::Result<()> {
+ println!("cargo:rerun-if-changed=./proto");
+
+ let descriptor_path =
+ PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin");
+
+ prost_build::Config::new()
+ .file_descriptor_set_path(&descriptor_path)
+ .compile_well_known_types()
+ .compile_protos(&["proto/datapath.proto"], &["proto/"])?;
+
+ Ok(())
+}
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs
new file mode 100644
index 000000000..f79c10f55
--- /dev/null
+++ b/ext/kv/dynamic.rs
@@ -0,0 +1,216 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::cell::RefCell;
+use std::rc::Rc;
+
+use crate::remote::RemoteDbHandlerPermissions;
+use crate::sqlite::SqliteDbHandler;
+use crate::sqlite::SqliteDbHandlerPermissions;
+use crate::AtomicWrite;
+use crate::CommitResult;
+use crate::Database;
+use crate::DatabaseHandler;
+use crate::QueueMessageHandle;
+use crate::ReadRange;
+use crate::ReadRangeOutput;
+use crate::SnapshotReadOptions;
+use async_trait::async_trait;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::OpState;
+
+pub struct MultiBackendDbHandler {
+ backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
+}
+
+impl MultiBackendDbHandler {
+ pub fn new(
+ backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
+ ) -> Self {
+ Self { backends }
+ }
+
+ pub fn remote_or_sqlite<
+ P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static,
+ >(
+ default_storage_dir: Option<std::path::PathBuf>,
+ ) -> Self {
+ Self::new(vec![
+ (
+ &["https://", "http://"],
+ Box::new(crate::remote::RemoteDbHandler::<P>::new()),
+ ),
+ (
+ &[""],
+ Box::new(SqliteDbHandler::<P>::new(default_storage_dir)),
+ ),
+ ])
+ }
+}
+
+#[async_trait(?Send)]
+impl DatabaseHandler for MultiBackendDbHandler {
+ type DB = Box<dyn DynamicDb>;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError> {
+ for (prefixes, handler) in &self.backends {
+ for &prefix in *prefixes {
+ if prefix.is_empty() {
+ return handler.dyn_open(state.clone(), path.clone()).await;
+ }
+ let Some(path) = &path else {
+ continue;
+ };
+ if path.starts_with(prefix) {
+ return handler.dyn_open(state.clone(), Some(path.clone())).await;
+ }
+ }
+ }
+ Err(type_error(format!(
+ "No backend supports the given path: {:?}",
+ path
+ )))
+ }
+}
+
+#[async_trait(?Send)]
+pub trait DynamicDbHandler {
+ async fn dyn_open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Box<dyn DynamicDb>, AnyError>;
+}
+
+#[async_trait(?Send)]
+impl DatabaseHandler for Box<dyn DynamicDbHandler> {
+ type DB = Box<dyn DynamicDb>;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError> {
+ (**self).dyn_open(state, path).await
+ }
+}
+
+#[async_trait(?Send)]
+impl<T, DB> DynamicDbHandler for T
+where
+ T: DatabaseHandler<DB = DB>,
+ DB: Database + 'static,
+{
+ async fn dyn_open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Box<dyn DynamicDb>, AnyError> {
+ Ok(Box::new(self.open(state, path).await?))
+ }
+}
+
+#[async_trait(?Send)]
+pub trait DynamicDb {
+ async fn dyn_snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError>;
+
+ async fn dyn_atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError>;
+
+ async fn dyn_dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Box<dyn QueueMessageHandle>, AnyError>;
+
+ fn dyn_close(&self);
+}
+
+#[async_trait(?Send)]
+impl Database for Box<dyn DynamicDb> {
+ type QMH = Box<dyn QueueMessageHandle>;
+
+ async fn snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ (**self).dyn_snapshot_read(state, requests, options).await
+ }
+
+ async fn atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError> {
+ (**self).dyn_atomic_write(state, write).await
+ }
+
+ async fn dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
+ (**self).dyn_dequeue_next_message(state).await
+ }
+
+ fn close(&self) {
+ (**self).dyn_close()
+ }
+}
+
+#[async_trait(?Send)]
+impl<T, QMH> DynamicDb for T
+where
+ T: Database<QMH = QMH>,
+ QMH: QueueMessageHandle + 'static,
+{
+ async fn dyn_snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ Ok(self.snapshot_read(state, requests, options).await?)
+ }
+
+ async fn dyn_atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError> {
+ Ok(self.atomic_write(state, write).await?)
+ }
+
+ async fn dyn_dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
+ Ok(Box::new(self.dequeue_next_message(state).await?))
+ }
+
+ fn dyn_close(&self) {
+ self.close()
+ }
+}
+
+#[async_trait(?Send)]
+impl QueueMessageHandle for Box<dyn QueueMessageHandle> {
+ async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
+ (**self).take_payload().await
+ }
+ async fn finish(&self, success: bool) -> Result<(), AnyError> {
+ (**self).finish(success).await
+ }
+}
diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs
index 28b43f8d7..abeaf8dd5 100644
--- a/ext/kv/interface.rs
+++ b/ext/kv/interface.rs
@@ -29,16 +29,21 @@ pub trait Database {
async fn snapshot_read(
&self,
+ state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError>;
async fn atomic_write(
&self,
+ state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError>;
- async fn dequeue_next_message(&self) -> Result<Self::QMH, AnyError>;
+ async fn dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Self::QMH, AnyError>;
fn close(&self);
}
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index 7164a700b..f226b11ae 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -1,7 +1,10 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
pub mod codec;
+pub mod dynamic;
mod interface;
+mod proto;
+pub mod remote;
pub mod sqlite;
use std::borrow::Cow;
@@ -285,7 +288,8 @@ where
let opts = SnapshotReadOptions {
consistency: consistency.into(),
};
- let output_ranges = db.snapshot_read(read_ranges, opts).await?;
+ let output_ranges =
+ db.snapshot_read(state.clone(), read_ranges, opts).await?;
let output_ranges = output_ranges
.into_iter()
.map(|x| {
@@ -323,7 +327,7 @@ where
resource.db.clone()
};
- let mut handle = db.dequeue_next_message().await?;
+ let mut handle = db.dequeue_next_message(state.clone()).await?;
let payload = handle.take_payload().await?.into();
let handle_rid = {
let mut state = state.borrow_mut();
@@ -660,7 +664,7 @@ where
enqueues,
};
- let result = db.atomic_write(atomic_write).await?;
+ let result = db.atomic_write(state.clone(), atomic_write).await?;
Ok(result.map(|res| hex::encode(res.versionstamp)))
}
diff --git a/ext/kv/proto/datapath.proto b/ext/kv/proto/datapath.proto
new file mode 100644
index 000000000..ea48f2385
--- /dev/null
+++ b/ext/kv/proto/datapath.proto
@@ -0,0 +1,96 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+syntax = "proto3";
+
+package datapath;
+
+message SnapshotRead {
+ repeated ReadRange ranges = 1;
+}
+
+message SnapshotReadOutput {
+ repeated ReadRangeOutput ranges = 1;
+ bool read_disabled = 2;
+ repeated string regions_if_read_disabled = 3;
+ bool read_is_strongly_consistent = 4;
+ string primary_if_not_strongly_consistent = 5;
+}
+
+message ReadRange {
+ bytes start = 1;
+ bytes end = 2;
+ int32 limit = 3;
+ bool reverse = 4;
+}
+
+message ReadRangeOutput {
+ repeated KvEntry values = 1;
+}
+
+message AtomicWrite {
+ repeated KvCheck kv_checks = 1;
+ repeated KvMutation kv_mutations = 2;
+ repeated Enqueue enqueues = 3;
+}
+
+message AtomicWriteOutput {
+ AtomicWriteStatus status = 1;
+ bytes versionstamp = 2;
+ string primary_if_write_disabled = 3;
+}
+
+message KvCheck {
+ bytes key = 1;
+ bytes versionstamp = 2; // 10-byte raw versionstamp
+}
+
+message KvMutation {
+ bytes key = 1;
+ KvValue value = 2;
+ KvMutationType mutation_type = 3;
+}
+
+message KvValue {
+ bytes data = 1;
+ KvValueEncoding encoding = 2;
+}
+
+message KvEntry {
+ bytes key = 1;
+ bytes value = 2;
+ KvValueEncoding encoding = 3;
+ bytes versionstamp = 4;
+}
+
+enum KvMutationType {
+ M_UNSPECIFIED = 0;
+ M_SET = 1;
+ M_CLEAR = 2;
+ M_SUM = 3;
+ M_MAX = 4;
+ M_MIN = 5;
+}
+
+enum KvValueEncoding {
+ VE_UNSPECIFIED = 0;
+ VE_V8 = 1;
+ VE_LE64 = 2;
+ VE_BYTES = 3;
+}
+
+enum AtomicWriteStatus {
+ AW_UNSPECIFIED = 0;
+ AW_SUCCESS = 1;
+ AW_CHECK_FAILURE = 2;
+ AW_UNSUPPORTED_WRITE = 3;
+ AW_USAGE_LIMIT_EXCEEDED = 4;
+ AW_WRITE_DISABLED = 5;
+ AW_QUEUE_BACKLOG_LIMIT_EXCEEDED = 6;
+}
+
+message Enqueue {
+ bytes payload = 1;
+ int64 deadline_ms = 2;
+ repeated bytes kv_keys_if_undelivered = 3;
+ repeated uint32 backoff_schedule = 4;
+}
diff --git a/ext/kv/proto/mod.rs b/ext/kv/proto/mod.rs
new file mode 100644
index 000000000..d258a0551
--- /dev/null
+++ b/ext/kv/proto/mod.rs
@@ -0,0 +1,7 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+// Generated code, disable lints
+#[allow(clippy::all, non_snake_case)]
+pub mod datapath {
+ include!(concat!(env!("OUT_DIR"), "/datapath.rs"));
+}
diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs
new file mode 100644
index 000000000..47528d15f
--- /dev/null
+++ b/ext/kv/remote.rs
@@ -0,0 +1,558 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::cell::RefCell;
+use std::marker::PhantomData;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::time::Duration;
+
+use crate::proto::datapath as pb;
+use crate::AtomicWrite;
+use crate::CommitResult;
+use crate::Database;
+use crate::DatabaseHandler;
+use crate::KvEntry;
+use crate::MutationKind;
+use crate::QueueMessageHandle;
+use crate::ReadRange;
+use crate::ReadRangeOutput;
+use crate::SnapshotReadOptions;
+use anyhow::Context;
+use async_trait::async_trait;
+use chrono::DateTime;
+use chrono::Utc;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::futures::TryFutureExt;
+use deno_core::task::JoinHandle;
+use deno_core::OpState;
+use prost::Message;
+use rand::Rng;
+use serde::Deserialize;
+use tokio::sync::watch;
+use url::Url;
+use uuid::Uuid;
+
+pub trait RemoteDbHandlerPermissions {
+ fn check_env(&mut self, var: &str) -> Result<(), AnyError>;
+ fn check_net_url(
+ &mut self,
+ url: &Url,
+ api_name: &str,
+ ) -> Result<(), AnyError>;
+}
+
+pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> {
+ _p: std::marker::PhantomData<P>,
+}
+
+impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> {
+ pub fn new() -> Self {
+ Self { _p: PhantomData }
+ }
+}
+
+impl<P: RemoteDbHandlerPermissions> Default for RemoteDbHandler<P> {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+#[derive(Deserialize)]
+struct VersionInfo {
+ version: u64,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+#[allow(dead_code)]
+struct DatabaseMetadata {
+ version: u64,
+ database_id: Uuid,
+ endpoints: Vec<EndpointInfo>,
+ token: String,
+ expires_at: DateTime<Utc>,
+}
+
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct EndpointInfo {
+ pub url: String,
+
+ // Using `String` instead of an enum, so that parsing doesn't
+ // break if more consistency levels are added.
+ pub consistency: String,
+}
+
+#[async_trait(?Send)]
+impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> {
+ type DB = RemoteDb<P>;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError> {
+ const ENV_VAR_NAME: &str = "DENO_KV_ACCESS_TOKEN";
+
+ let Some(url) = path else {
+ return Err(type_error("Missing database url"));
+ };
+
+ let Ok(parsed_url) = Url::parse(&url) else {
+ return Err(type_error(format!("Invalid database url: {}", url)));
+ };
+
+ {
+ let mut state = state.borrow_mut();
+ let permissions = state.borrow_mut::<P>();
+ permissions.check_env(ENV_VAR_NAME)?;
+ permissions.check_net_url(&parsed_url, "Deno.openKv")?;
+ }
+
+ let access_token = std::env::var(ENV_VAR_NAME)
+ .map_err(anyhow::Error::from)
+ .with_context(|| {
+ "Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account."
+ })?;
+
+ let refresher = MetadataRefresher::new(url, access_token);
+
+ let db = RemoteDb {
+ client: reqwest::Client::new(),
+ refresher,
+ _p: PhantomData,
+ };
+ Ok(db)
+ }
+}
+
+pub struct RemoteDb<P: RemoteDbHandlerPermissions + 'static> {
+ client: reqwest::Client,
+ refresher: MetadataRefresher,
+ _p: std::marker::PhantomData<P>,
+}
+
+pub struct DummyQueueMessageHandle {}
+
+#[async_trait(?Send)]
+impl QueueMessageHandle for DummyQueueMessageHandle {
+ async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
+ unimplemented!()
+ }
+
+ async fn finish(&self, _success: bool) -> Result<(), AnyError> {
+ unimplemented!()
+ }
+}
+
+#[async_trait(?Send)]
+impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> {
+ type QMH = DummyQueueMessageHandle;
+
+ async fn snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ _options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ let req = pb::SnapshotRead {
+ ranges: requests
+ .into_iter()
+ .map(|r| pb::ReadRange {
+ start: r.start,
+ end: r.end,
+ limit: r.limit.get() as _,
+ reverse: r.reverse,
+ })
+ .collect(),
+ };
+
+ let res: pb::SnapshotReadOutput = call_remote::<P, _, _>(
+ &state,
+ &self.refresher,
+ &self.client,
+ "snapshot_read",
+ &req,
+ )
+ .await?;
+
+ if res.read_disabled {
+ return Err(type_error("Reads are disabled for this database."));
+ }
+
+ let out = res
+ .ranges
+ .into_iter()
+ .map(|r| {
+ Ok(ReadRangeOutput {
+ entries: r
+ .values
+ .into_iter()
+ .map(|e| {
+ let encoding = e.encoding();
+ Ok(KvEntry {
+ key: e.key,
+ value: decode_value(e.value, encoding)?,
+ versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?,
+ })
+ })
+ .collect::<Result<_, AnyError>>()?,
+ })
+ })
+ .collect::<Result<Vec<_>, AnyError>>()?;
+ Ok(out)
+ }
+
+ async fn atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError> {
+ if !write.enqueues.is_empty() {
+ return Err(type_error("Enqueue operations are not supported yet."));
+ }
+
+ let req = pb::AtomicWrite {
+ kv_checks: write
+ .checks
+ .into_iter()
+ .map(|x| {
+ Ok(pb::KvCheck {
+ key: x.key,
+ versionstamp: x.versionstamp.unwrap_or([0u8; 10]).to_vec(),
+ })
+ })
+ .collect::<anyhow::Result<_>>()?,
+ kv_mutations: write
+ .mutations
+ .into_iter()
+ .map(|x| encode_mutation(x.key, x.kind))
+ .collect(),
+ enqueues: vec![],
+ };
+
+ let res: pb::AtomicWriteOutput = call_remote::<P, _, _>(
+ &state,
+ &self.refresher,
+ &self.client,
+ "atomic_write",
+ &req,
+ )
+ .await?;
+ match res.status() {
+ pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult {
+ versionstamp: if res.versionstamp.is_empty() {
+ Default::default()
+ } else {
+ res.versionstamp[..].try_into()?
+ },
+ })),
+ pb::AtomicWriteStatus::AwCheckFailure => Ok(None),
+ pb::AtomicWriteStatus::AwUnsupportedWrite => {
+ Err(type_error("Unsupported write"))
+ }
+ pb::AtomicWriteStatus::AwUsageLimitExceeded => {
+ Err(type_error("The database usage limit has been exceeded."))
+ }
+ pb::AtomicWriteStatus::AwWriteDisabled => {
+ // TODO: Auto retry
+ Err(type_error("Writes are disabled for this database."))
+ }
+ pb::AtomicWriteStatus::AwUnspecified => {
+ Err(type_error("Unspecified error"))
+ }
+ pb::AtomicWriteStatus::AwQueueBacklogLimitExceeded => {
+ Err(type_error("Queue backlog limit exceeded"))
+ }
+ }
+ }
+
+ async fn dequeue_next_message(
+ &self,
+ _state: Rc<RefCell<OpState>>,
+ ) -> Result<Self::QMH, AnyError> {
+ deno_core::futures::future::pending().await
+ }
+
+ fn close(&self) {}
+}
+
+fn decode_value(
+ value: Vec<u8>,
+ encoding: pb::KvValueEncoding,
+) -> anyhow::Result<crate::Value> {
+ match encoding {
+ pb::KvValueEncoding::VeV8 => Ok(crate::Value::V8(value)),
+ pb::KvValueEncoding::VeBytes => Ok(crate::Value::Bytes(value)),
+ pb::KvValueEncoding::VeLe64 => Ok(crate::Value::U64(u64::from_le_bytes(
+ <[u8; 8]>::try_from(&value[..])?,
+ ))),
+ pb::KvValueEncoding::VeUnspecified => {
+ Err(anyhow::anyhow!("Unspecified value encoding, cannot decode"))
+ }
+ }
+}
+
+fn encode_value(value: crate::Value) -> pb::KvValue {
+ match value {
+ crate::Value::V8(data) => pb::KvValue {
+ data,
+ encoding: pb::KvValueEncoding::VeV8 as _,
+ },
+ crate::Value::Bytes(data) => pb::KvValue {
+ data,
+ encoding: pb::KvValueEncoding::VeBytes as _,
+ },
+ crate::Value::U64(x) => pb::KvValue {
+ data: x.to_le_bytes().to_vec(),
+ encoding: pb::KvValueEncoding::VeLe64 as _,
+ },
+ }
+}
+
+fn encode_mutation(key: Vec<u8>, mutation: MutationKind) -> pb::KvMutation {
+ match mutation {
+ MutationKind::Set(x) => pb::KvMutation {
+ key,
+ value: Some(encode_value(x)),
+ mutation_type: pb::KvMutationType::MSet as _,
+ },
+ MutationKind::Delete => pb::KvMutation {
+ key,
+ value: Some(encode_value(crate::Value::Bytes(vec![]))),
+ mutation_type: pb::KvMutationType::MClear as _,
+ },
+ MutationKind::Max(x) => pb::KvMutation {
+ key,
+ value: Some(encode_value(x)),
+ mutation_type: pb::KvMutationType::MMax as _,
+ },
+ MutationKind::Min(x) => pb::KvMutation {
+ key,
+ value: Some(encode_value(x)),
+ mutation_type: pb::KvMutationType::MMin as _,
+ },
+ MutationKind::Sum(x) => pb::KvMutation {
+ key,
+ value: Some(encode_value(x)),
+ mutation_type: pb::KvMutationType::MSum as _,
+ },
+ }
+}
+
+#[derive(Clone)]
+enum MetadataState {
+ Ready(Arc<DatabaseMetadata>),
+ Invalid(String),
+ Pending,
+}
+
+struct MetadataRefresher {
+ metadata_rx: watch::Receiver<MetadataState>,
+ handle: JoinHandle<()>,
+}
+
+impl MetadataRefresher {
+ pub fn new(url: String, access_token: String) -> Self {
+ let (tx, rx) = watch::channel(MetadataState::Pending);
+ let handle =
+ deno_core::task::spawn(metadata_refresh_task(url, access_token, tx));
+ Self {
+ handle,
+ metadata_rx: rx,
+ }
+ }
+}
+
+impl Drop for MetadataRefresher {
+ fn drop(&mut self) {
+ self.handle.abort();
+ }
+}
+
+async fn metadata_refresh_task(
+ metadata_url: String,
+ access_token: String,
+ tx: watch::Sender<MetadataState>,
+) {
+ let client = reqwest::Client::new();
+ loop {
+ let mut attempt = 0u64;
+ let metadata = loop {
+ match fetch_metadata(&client, &metadata_url, &access_token).await {
+ Ok(Ok(x)) => break x,
+ Ok(Err(e)) => {
+ if tx.send(MetadataState::Invalid(e)).is_err() {
+ return;
+ }
+ }
+ Err(e) => {
+ log::error!("Failed to fetch database metadata: {}", e);
+ }
+ }
+ randomized_exponential_backoff(Duration::from_secs(5), attempt).await;
+ attempt += 1;
+ };
+
+ let ms_until_expire = u64::try_from(
+ metadata
+ .expires_at
+ .timestamp_millis()
+ .saturating_sub(Utc::now().timestamp_millis()),
+ )
+ .unwrap_or_default();
+
+ // Refresh 10 minutes before expiry
+ // In case of buggy clocks, don't refresh more than once per minute
+ let interval = Duration::from_millis(ms_until_expire)
+ .saturating_sub(Duration::from_secs(600))
+ .max(Duration::from_secs(60));
+
+ if tx.send(MetadataState::Ready(Arc::new(metadata))).is_err() {
+ return;
+ }
+
+ tokio::time::sleep(interval).await;
+ }
+}
+
+async fn fetch_metadata(
+ client: &reqwest::Client,
+ metadata_url: &str,
+ access_token: &str,
+) -> anyhow::Result<Result<DatabaseMetadata, String>> {
+ let res = client
+ .post(metadata_url)
+ .header("authorization", format!("Bearer {}", access_token))
+ .send()
+ .await?;
+
+ if !res.status().is_success() {
+ if res.status().is_client_error() {
+ return Ok(Err(format!(
+ "Client error while fetching metadata: {:?} {}",
+ res.status(),
+ res.text().await?
+ )));
+ } else {
+ anyhow::bail!(
+ "remote returned error: {:?} {}",
+ res.status(),
+ res.text().await?
+ );
+ }
+ }
+
+ let res = res.bytes().await?;
+ let version_info: VersionInfo = match serde_json::from_slice(&res) {
+ Ok(x) => x,
+ Err(e) => return Ok(Err(format!("Failed to decode version info: {}", e))),
+ };
+ if version_info.version > 1 {
+ return Ok(Err(format!(
+ "Unsupported metadata version: {}",
+ version_info.version
+ )));
+ }
+
+ Ok(
+ serde_json::from_slice(&res)
+ .map_err(|e| format!("Failed to decode metadata: {}", e)),
+ )
+}
+
+async fn randomized_exponential_backoff(base: Duration, attempt: u64) {
+ let attempt = attempt.min(12);
+ let delay = base.as_millis() as u64 + (2 << attempt);
+ let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1);
+ tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
+}
+
+async fn call_remote<
+ P: RemoteDbHandlerPermissions + 'static,
+ T: Message,
+ R: Message + Default,
+>(
+ state: &RefCell<OpState>,
+ refresher: &MetadataRefresher,
+ client: &reqwest::Client,
+ method: &str,
+ req: &T,
+) -> anyhow::Result<R> {
+ let mut attempt = 0u64;
+ let res = loop {
+ let mut metadata_rx = refresher.metadata_rx.clone();
+ let metadata = loop {
+ match &*metadata_rx.borrow() {
+ MetadataState::Pending => {}
+ MetadataState::Ready(x) => break x.clone(),
+ MetadataState::Invalid(e) => {
+ return Err(type_error(format!("Metadata error: {}", e)))
+ }
+ }
+ // `unwrap()` never fails because `tx` is owned by the task held by `refresher`.
+ metadata_rx.changed().await.unwrap();
+ };
+ let Some(sc_endpoint) = metadata.endpoints.iter().find(|x| x.consistency == "strong") else {
+ return Err(type_error("No strong consistency endpoint is available for this database"));
+ };
+
+ let full_url = format!("{}/{}", sc_endpoint.url, method);
+ {
+ let parsed_url = Url::parse(&full_url)?;
+ let mut state = state.borrow_mut();
+ let permissions = state.borrow_mut::<P>();
+ permissions.check_net_url(&parsed_url, "Deno.Kv")?;
+ }
+
+ let res = client
+ .post(&full_url)
+ .header("x-transaction-domain-id", metadata.database_id.to_string())
+ .header("authorization", format!("Bearer {}", metadata.token))
+ .body(req.encode_to_vec())
+ .send()
+ .map_err(anyhow::Error::from)
+ .and_then(|x| async move {
+ if x.status().is_success() {
+ Ok(Ok(x.bytes().await?))
+ } else if x.status().is_client_error() {
+ Ok(Err((x.status(), x.text().await?)))
+ } else {
+ Err(anyhow::anyhow!(
+ "server error ({:?}): {}",
+ x.status(),
+ x.text().await?
+ ))
+ }
+ })
+ .await;
+
+ match res {
+ Ok(x) => break x,
+ Err(e) => {
+ log::error!("retryable error in {}: {}", method, e);
+ randomized_exponential_backoff(Duration::from_millis(0), attempt).await;
+ attempt += 1;
+ }
+ }
+ };
+
+ let res = match res {
+ Ok(x) => x,
+ Err((status, message)) => {
+ return Err(type_error(format!(
+ "client error in {} (status {:?}): {}",
+ method, status, message
+ )))
+ }
+ };
+
+ match R::decode(&*res) {
+ Ok(x) => Ok(x),
+ Err(e) => Err(type_error(format!(
+ "failed to decode response from {}: {}",
+ method, e
+ ))),
+ }
+}
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index 8e37d2c87..bf2688920 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -724,6 +724,7 @@ impl Database for SqliteDb {
async fn snapshot_read(
&self,
+ _state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
_options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
@@ -769,6 +770,7 @@ impl Database for SqliteDb {
async fn atomic_write(
&self,
+ _state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
let write = Arc::new(write);
@@ -894,7 +896,10 @@ impl Database for SqliteDb {
Ok(commit_result)
}
- async fn dequeue_next_message(&self) -> Result<Self::QMH, AnyError> {
+ async fn dequeue_next_message(
+ &self,
+ _state: Rc<RefCell<OpState>>,
+ ) -> Result<Self::QMH, AnyError> {
let queue = self
.queue
.get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) })