summaryrefslogtreecommitdiff
path: root/ext/kv/remote.rs
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2023-10-31 12:13:57 +0100
committerGitHub <noreply@github.com>2023-10-31 11:13:57 +0000
commit2d9298f5f5550c21ba218ff7095aa9afe80c7e02 (patch)
tree6daee1cc6007745097e176c7367b697e58632baf /ext/kv/remote.rs
parent092555c611ebab87ad570b4dcb73d54288dccdd9 (diff)
chore: update ext/kv to use denokv_* crates (#20986)
This commit updates the ext/kv module to use the denokv_* crates for the protocol and the sqlite backend. This also fixes a couple of bugs in the sqlite backend, and updates versionstamps to be updated less linearly.
Diffstat (limited to 'ext/kv/remote.rs')
-rw-r--r--ext/kv/remote.rs590
1 files changed, 72 insertions, 518 deletions
diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs
index 0a061b35b..7cac6b9c3 100644
--- a/ext/kv/remote.rs
+++ b/ext/kv/remote.rs
@@ -1,43 +1,42 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
-use std::fmt;
-use std::io::Write;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::Arc;
-use std::time::Duration;
-use crate::proto::datapath as pb;
-use crate::AtomicWrite;
-use crate::CommitResult;
-use crate::Database;
use crate::DatabaseHandler;
-use crate::KvEntry;
-use crate::MutationKind;
-use crate::QueueMessageHandle;
-use crate::ReadRange;
-use crate::ReadRangeOutput;
-use crate::SnapshotReadOptions;
use anyhow::Context;
use async_trait::async_trait;
-use chrono::DateTime;
-use chrono::Utc;
use deno_core::error::type_error;
use deno_core::error::AnyError;
-use deno_core::futures::TryFutureExt;
-use deno_core::unsync::JoinHandle;
use deno_core::OpState;
-use prost::Message;
-use rand::Rng;
-use serde::Deserialize;
-use termcolor::Ansi;
-use termcolor::Color;
-use termcolor::ColorSpec;
-use termcolor::WriteColor;
-use tokio::sync::watch;
+use deno_fetch::create_http_client;
+use deno_fetch::CreateHttpClientOptions;
+use deno_tls::rustls::RootCertStore;
+use deno_tls::Proxy;
+use deno_tls::RootCertStoreProvider;
+use denokv_remote::MetadataEndpoint;
+use denokv_remote::Remote;
use url::Url;
-use uuid::Uuid;
+
+#[derive(Clone)]
+pub struct HttpOptions {
+ pub user_agent: String,
+ pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
+ pub proxy: Option<Proxy>,
+ pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
+ pub client_cert_chain_and_key: Option<(String, String)>,
+}
+
+impl HttpOptions {
+ pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, AnyError> {
+ Ok(match &self.root_cert_store_provider {
+ Some(provider) => Some(provider.get_or_try_init()?.clone()),
+ None => None,
+ })
+ }
+}
pub trait RemoteDbHandlerPermissions {
fn check_env(&mut self, var: &str) -> Result<(), AnyError>;
@@ -49,50 +48,39 @@ pub trait RemoteDbHandlerPermissions {
}
pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> {
+ http_options: HttpOptions,
_p: std::marker::PhantomData<P>,
}
impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> {
- pub fn new() -> Self {
- Self { _p: PhantomData }
- }
-}
-
-impl<P: RemoteDbHandlerPermissions> Default for RemoteDbHandler<P> {
- fn default() -> Self {
- Self::new()
+ pub fn new(http_options: HttpOptions) -> Self {
+ Self {
+ http_options,
+ _p: PhantomData,
+ }
}
}
-#[derive(Deserialize)]
-struct VersionInfo {
- version: u64,
-}
-
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-#[allow(dead_code)]
-struct DatabaseMetadata {
- version: u64,
- database_id: Uuid,
- endpoints: Vec<EndpointInfo>,
- token: String,
- expires_at: DateTime<Utc>,
+pub struct PermissionChecker<P: RemoteDbHandlerPermissions> {
+ state: Rc<RefCell<OpState>>,
+ _permissions: PhantomData<P>,
}
-#[derive(Deserialize)]
-#[serde(rename_all = "camelCase")]
-pub struct EndpointInfo {
- pub url: String,
-
- // Using `String` instead of an enum, so that parsing doesn't
- // break if more consistency levels are added.
- pub consistency: String,
+impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions
+ for PermissionChecker<P>
+{
+ fn check_net_url(&self, url: &Url) -> Result<(), anyhow::Error> {
+ let mut state = self.state.borrow_mut();
+ let permissions = state.borrow_mut::<P>();
+ permissions.check_net_url(url, "Deno.openKv")
+ }
}
#[async_trait(?Send)]
-impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> {
- type DB = RemoteDb<P>;
+impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler
+ for RemoteDbHandler<P>
+{
+ type DB = Remote<PermissionChecker<P>>;
async fn open(
&self,
@@ -122,470 +110,36 @@ impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> {
"Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account."
})?;
- let refresher = MetadataRefresher::new(url, access_token);
-
- let db = RemoteDb {
- client: reqwest::Client::new(),
- refresher,
- _p: PhantomData,
- };
- Ok(db)
- }
-}
-
-pub struct RemoteDb<P: RemoteDbHandlerPermissions + 'static> {
- client: reqwest::Client,
- refresher: MetadataRefresher,
- _p: std::marker::PhantomData<P>,
-}
-
-pub struct DummyQueueMessageHandle {}
-
-#[async_trait(?Send)]
-impl QueueMessageHandle for DummyQueueMessageHandle {
- async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
- unimplemented!()
- }
-
- async fn finish(&self, _success: bool) -> Result<(), AnyError> {
- unimplemented!()
- }
-}
-
-#[async_trait(?Send)]
-impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> {
- type QMH = DummyQueueMessageHandle;
-
- async fn snapshot_read(
- &self,
- state: Rc<RefCell<OpState>>,
- requests: Vec<ReadRange>,
- _options: SnapshotReadOptions,
- ) -> Result<Vec<ReadRangeOutput>, AnyError> {
- let req = pb::SnapshotRead {
- ranges: requests
- .into_iter()
- .map(|r| pb::ReadRange {
- start: r.start,
- end: r.end,
- limit: r.limit.get() as _,
- reverse: r.reverse,
- })
- .collect(),
- };
-
- let res: pb::SnapshotReadOutput = call_remote::<P, _, _>(
- &state,
- &self.refresher,
- &self.client,
- "snapshot_read",
- &req,
- )
- .await?;
-
- if res.read_disabled {
- return Err(type_error("Reads are disabled for this database."));
- }
-
- let out = res
- .ranges
- .into_iter()
- .map(|r| {
- Ok(ReadRangeOutput {
- entries: r
- .values
- .into_iter()
- .map(|e| {
- let encoding = e.encoding();
- Ok(KvEntry {
- key: e.key,
- value: decode_value(e.value, encoding)?,
- versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?,
- })
- })
- .collect::<Result<_, AnyError>>()?,
- })
- })
- .collect::<Result<Vec<_>, AnyError>>()?;
- Ok(out)
- }
-
- async fn atomic_write(
- &self,
- state: Rc<RefCell<OpState>>,
- write: AtomicWrite,
- ) -> Result<Option<CommitResult>, AnyError> {
- if !write.enqueues.is_empty() {
- return Err(type_error("Enqueue operations are not supported yet."));
- }
-
- let req = pb::AtomicWrite {
- kv_checks: write
- .checks
- .into_iter()
- .map(|x| {
- Ok(pb::KvCheck {
- key: x.key,
- versionstamp: x.versionstamp.unwrap_or([0u8; 10]).to_vec(),
- })
- })
- .collect::<anyhow::Result<_>>()?,
- kv_mutations: write.mutations.into_iter().map(encode_mutation).collect(),
- enqueues: vec![],
- };
-
- let res: pb::AtomicWriteOutput = call_remote::<P, _, _>(
- &state,
- &self.refresher,
- &self.client,
- "atomic_write",
- &req,
- )
- .await?;
- match res.status() {
- pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult {
- versionstamp: if res.versionstamp.is_empty() {
- Default::default()
- } else {
- res.versionstamp[..].try_into()?
- },
- })),
- pb::AtomicWriteStatus::AwCheckFailure => Ok(None),
- pb::AtomicWriteStatus::AwUnsupportedWrite => {
- Err(type_error("Unsupported write"))
- }
- pb::AtomicWriteStatus::AwUsageLimitExceeded => {
- Err(type_error("The database usage limit has been exceeded."))
- }
- pb::AtomicWriteStatus::AwWriteDisabled => {
- // TODO: Auto retry
- Err(type_error("Writes are disabled for this database."))
- }
- pb::AtomicWriteStatus::AwUnspecified => {
- Err(type_error("Unspecified error"))
- }
- pb::AtomicWriteStatus::AwQueueBacklogLimitExceeded => {
- Err(type_error("Queue backlog limit exceeded"))
- }
- }
- }
-
- async fn dequeue_next_message(
- &self,
- _state: Rc<RefCell<OpState>>,
- ) -> Result<Option<Self::QMH>, AnyError> {
- let msg = "Deno.Kv.listenQueue is not supported for remote KV databases";
- eprintln!("{}", yellow(msg));
- deno_core::futures::future::pending().await
- }
-
- fn close(&self) {}
-}
-
-fn yellow<S: AsRef<str>>(s: S) -> impl fmt::Display {
- if std::env::var_os("NO_COLOR").is_some() {
- return String::from(s.as_ref());
- }
- let mut style_spec = ColorSpec::new();
- style_spec.set_fg(Some(Color::Yellow));
- let mut v = Vec::new();
- let mut ansi_writer = Ansi::new(&mut v);
- ansi_writer.set_color(&style_spec).unwrap();
- ansi_writer.write_all(s.as_ref().as_bytes()).unwrap();
- ansi_writer.reset().unwrap();
- String::from_utf8_lossy(&v).into_owned()
-}
-
-fn decode_value(
- value: Vec<u8>,
- encoding: pb::KvValueEncoding,
-) -> anyhow::Result<crate::Value> {
- match encoding {
- pb::KvValueEncoding::VeV8 => Ok(crate::Value::V8(value)),
- pb::KvValueEncoding::VeBytes => Ok(crate::Value::Bytes(value)),
- pb::KvValueEncoding::VeLe64 => Ok(crate::Value::U64(u64::from_le_bytes(
- <[u8; 8]>::try_from(&value[..])?,
- ))),
- pb::KvValueEncoding::VeUnspecified => {
- Err(anyhow::anyhow!("Unspecified value encoding, cannot decode"))
- }
- }
-}
-
-fn encode_value(value: crate::Value) -> pb::KvValue {
- match value {
- crate::Value::V8(data) => pb::KvValue {
- data,
- encoding: pb::KvValueEncoding::VeV8 as _,
- },
- crate::Value::Bytes(data) => pb::KvValue {
- data,
- encoding: pb::KvValueEncoding::VeBytes as _,
- },
- crate::Value::U64(x) => pb::KvValue {
- data: x.to_le_bytes().to_vec(),
- encoding: pb::KvValueEncoding::VeLe64 as _,
- },
- }
-}
-
-fn encode_mutation(m: crate::KvMutation) -> pb::KvMutation {
- let key = m.key;
- let expire_at_ms =
- m.expire_at.and_then(|x| i64::try_from(x).ok()).unwrap_or(0);
-
- match m.kind {
- MutationKind::Set(x) => pb::KvMutation {
- key,
- value: Some(encode_value(x)),
- mutation_type: pb::KvMutationType::MSet as _,
- expire_at_ms,
- },
- MutationKind::Delete => pb::KvMutation {
- key,
- value: Some(encode_value(crate::Value::Bytes(vec![]))),
- mutation_type: pb::KvMutationType::MClear as _,
- expire_at_ms,
- },
- MutationKind::Max(x) => pb::KvMutation {
- key,
- value: Some(encode_value(x)),
- mutation_type: pb::KvMutationType::MMax as _,
- expire_at_ms,
- },
- MutationKind::Min(x) => pb::KvMutation {
- key,
- value: Some(encode_value(x)),
- mutation_type: pb::KvMutationType::MMin as _,
- expire_at_ms,
- },
- MutationKind::Sum(x) => pb::KvMutation {
- key,
- value: Some(encode_value(x)),
- mutation_type: pb::KvMutationType::MSum as _,
- expire_at_ms,
- },
- }
-}
-
-#[derive(Clone)]
-enum MetadataState {
- Ready(Arc<DatabaseMetadata>),
- Invalid(String),
- Pending,
-}
-
-struct MetadataRefresher {
- metadata_rx: watch::Receiver<MetadataState>,
- handle: JoinHandle<()>,
-}
-
-impl MetadataRefresher {
- pub fn new(url: String, access_token: String) -> Self {
- let (tx, rx) = watch::channel(MetadataState::Pending);
- let handle =
- deno_core::unsync::spawn(metadata_refresh_task(url, access_token, tx));
- Self {
- handle,
- metadata_rx: rx,
- }
- }
-}
-
-impl Drop for MetadataRefresher {
- fn drop(&mut self) {
- self.handle.abort();
- }
-}
-
-async fn metadata_refresh_task(
- metadata_url: String,
- access_token: String,
- tx: watch::Sender<MetadataState>,
-) {
- let client = reqwest::Client::new();
- loop {
- let mut attempt = 0u64;
- let metadata = loop {
- match fetch_metadata(&client, &metadata_url, &access_token).await {
- Ok(Ok(x)) => break x,
- Ok(Err(e)) => {
- if tx.send(MetadataState::Invalid(e)).is_err() {
- return;
- }
- }
- Err(e) => {
- log::error!("Failed to fetch database metadata: {}", e);
- }
- }
- randomized_exponential_backoff(Duration::from_secs(5), attempt).await;
- attempt += 1;
+ let metadata_endpoint = MetadataEndpoint {
+ url: parsed_url.clone(),
+ access_token: access_token.clone(),
};
- let ms_until_expire = u64::try_from(
- metadata
- .expires_at
- .timestamp_millis()
- .saturating_sub(crate::time::utc_now().timestamp_millis()),
- )
- .unwrap_or_default();
-
- // Refresh 10 minutes before expiry
- // In case of buggy clocks, don't refresh more than once per minute
- let interval = Duration::from_millis(ms_until_expire)
- .saturating_sub(Duration::from_secs(600))
- .max(Duration::from_secs(60));
-
- if tx.send(MetadataState::Ready(Arc::new(metadata))).is_err() {
- return;
- }
-
- tokio::time::sleep(interval).await;
- }
-}
-
-async fn fetch_metadata(
- client: &reqwest::Client,
- metadata_url: &str,
- access_token: &str,
-) -> anyhow::Result<Result<DatabaseMetadata, String>> {
- let res = client
- .post(metadata_url)
- .header("authorization", format!("Bearer {}", access_token))
- .send()
- .await?;
-
- if !res.status().is_success() {
- if res.status().is_client_error() {
- return Ok(Err(format!(
- "Client error while fetching metadata: {:?} {}",
- res.status(),
- res.text().await?
- )));
- } else {
- anyhow::bail!(
- "remote returned error: {:?} {}",
- res.status(),
- res.text().await?
- );
- }
- }
-
- let res = res.bytes().await?;
- let version_info: VersionInfo = match serde_json::from_slice(&res) {
- Ok(x) => x,
- Err(e) => return Ok(Err(format!("Failed to decode version info: {}", e))),
- };
- if version_info.version > 1 {
- return Ok(Err(format!(
- "Unsupported metadata version: {}",
- version_info.version
- )));
- }
-
- Ok(
- serde_json::from_slice(&res)
- .map_err(|e| format!("Failed to decode metadata: {}", e)),
- )
-}
-
-async fn randomized_exponential_backoff(base: Duration, attempt: u64) {
- let attempt = attempt.min(12);
- let delay = base.as_millis() as u64 + (2 << attempt);
- let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1);
- tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
-}
-
-async fn call_remote<
- P: RemoteDbHandlerPermissions + 'static,
- T: Message,
- R: Message + Default,
->(
- state: &RefCell<OpState>,
- refresher: &MetadataRefresher,
- client: &reqwest::Client,
- method: &str,
- req: &T,
-) -> anyhow::Result<R> {
- let mut attempt = 0u64;
- let res = loop {
- let mut metadata_rx = refresher.metadata_rx.clone();
- let metadata = loop {
- match &*metadata_rx.borrow() {
- MetadataState::Pending => {}
- MetadataState::Ready(x) => break x.clone(),
- MetadataState::Invalid(e) => {
- return Err(type_error(format!("Metadata error: {}", e)))
- }
- }
- // `unwrap()` never fails because `tx` is owned by the task held by `refresher`.
- metadata_rx.changed().await.unwrap();
- };
- let Some(sc_endpoint) = metadata
- .endpoints
- .iter()
- .find(|x| x.consistency == "strong")
- else {
- return Err(type_error(
- "No strong consistency endpoint is available for this database",
- ));
+ let options = &self.http_options;
+ let client = create_http_client(
+ &options.user_agent,
+ CreateHttpClientOptions {
+ root_cert_store: options.root_cert_store()?,
+ ca_certs: vec![],
+ proxy: options.proxy.clone(),
+ unsafely_ignore_certificate_errors: options
+ .unsafely_ignore_certificate_errors
+ .clone(),
+ client_cert_chain_and_key: options.client_cert_chain_and_key.clone(),
+ pool_max_idle_per_host: None,
+ pool_idle_timeout: None,
+ http1: true,
+ http2: true,
+ },
+ )?;
+
+ let permissions = PermissionChecker {
+ state: state.clone(),
+ _permissions: PhantomData,
};
- let full_url = format!("{}/{}", sc_endpoint.url, method);
- {
- let parsed_url = Url::parse(&full_url)?;
- let mut state = state.borrow_mut();
- let permissions = state.borrow_mut::<P>();
- permissions.check_net_url(&parsed_url, "Deno.Kv")?;
- }
-
- let res = client
- .post(&full_url)
- .header("x-transaction-domain-id", metadata.database_id.to_string())
- .header("authorization", format!("Bearer {}", metadata.token))
- .body(req.encode_to_vec())
- .send()
- .map_err(anyhow::Error::from)
- .and_then(|x| async move {
- if x.status().is_success() {
- Ok(Ok(x.bytes().await?))
- } else if x.status().is_client_error() {
- Ok(Err((x.status(), x.text().await?)))
- } else {
- Err(anyhow::anyhow!(
- "server error ({:?}): {}",
- x.status(),
- x.text().await?
- ))
- }
- })
- .await;
-
- match res {
- Ok(x) => break x,
- Err(e) => {
- log::error!("retryable error in {}: {}", method, e);
- randomized_exponential_backoff(Duration::from_millis(0), attempt).await;
- attempt += 1;
- }
- }
- };
-
- let res = match res {
- Ok(x) => x,
- Err((status, message)) => {
- return Err(type_error(format!(
- "client error in {} (status {:?}): {}",
- method, status, message
- )))
- }
- };
+ let remote = Remote::new(client, permissions, metadata_endpoint);
- match R::decode(&*res) {
- Ok(x) => Ok(x),
- Err(e) => Err(type_error(format!(
- "failed to decode response from {}: {}",
- method, e
- ))),
+ Ok(remote)
}
}