diff options
author | Luca Casonato <hello@lcas.dev> | 2023-10-31 12:13:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-31 11:13:57 +0000 |
commit | 2d9298f5f5550c21ba218ff7095aa9afe80c7e02 (patch) | |
tree | 6daee1cc6007745097e176c7367b697e58632baf /ext/kv/remote.rs | |
parent | 092555c611ebab87ad570b4dcb73d54288dccdd9 (diff) |
chore: update ext/kv to use denokv_* crates (#20986)
This commit updates the ext/kv module to use the denokv_* crates for
the protocol and the sqlite backend. This also fixes a couple of bugs in
the sqlite backend, and updates versionstamps to be updated less
linearly.
Diffstat (limited to 'ext/kv/remote.rs')
-rw-r--r-- | ext/kv/remote.rs | 590 |
1 files changed, 72 insertions, 518 deletions
diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs index 0a061b35b..7cac6b9c3 100644 --- a/ext/kv/remote.rs +++ b/ext/kv/remote.rs @@ -1,43 +1,42 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use std::cell::RefCell; -use std::fmt; -use std::io::Write; use std::marker::PhantomData; use std::rc::Rc; use std::sync::Arc; -use std::time::Duration; -use crate::proto::datapath as pb; -use crate::AtomicWrite; -use crate::CommitResult; -use crate::Database; use crate::DatabaseHandler; -use crate::KvEntry; -use crate::MutationKind; -use crate::QueueMessageHandle; -use crate::ReadRange; -use crate::ReadRangeOutput; -use crate::SnapshotReadOptions; use anyhow::Context; use async_trait::async_trait; -use chrono::DateTime; -use chrono::Utc; use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures::TryFutureExt; -use deno_core::unsync::JoinHandle; use deno_core::OpState; -use prost::Message; -use rand::Rng; -use serde::Deserialize; -use termcolor::Ansi; -use termcolor::Color; -use termcolor::ColorSpec; -use termcolor::WriteColor; -use tokio::sync::watch; +use deno_fetch::create_http_client; +use deno_fetch::CreateHttpClientOptions; +use deno_tls::rustls::RootCertStore; +use deno_tls::Proxy; +use deno_tls::RootCertStoreProvider; +use denokv_remote::MetadataEndpoint; +use denokv_remote::Remote; use url::Url; -use uuid::Uuid; + +#[derive(Clone)] +pub struct HttpOptions { + pub user_agent: String, + pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>, + pub proxy: Option<Proxy>, + pub unsafely_ignore_certificate_errors: Option<Vec<String>>, + pub client_cert_chain_and_key: Option<(String, String)>, +} + +impl HttpOptions { + pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, AnyError> { + Ok(match &self.root_cert_store_provider { + Some(provider) => Some(provider.get_or_try_init()?.clone()), + None => None, + }) + } +} pub trait RemoteDbHandlerPermissions { fn check_env(&mut self, var: &str) -> Result<(), AnyError>; @@ -49,50 +48,39 @@ pub trait RemoteDbHandlerPermissions { } pub struct RemoteDbHandler<P: RemoteDbHandlerPermissions + 'static> { + http_options: HttpOptions, _p: std::marker::PhantomData<P>, } impl<P: RemoteDbHandlerPermissions> RemoteDbHandler<P> { - pub fn new() -> Self { - Self { _p: PhantomData } - } -} - -impl<P: RemoteDbHandlerPermissions> Default for RemoteDbHandler<P> { - fn default() -> Self { - Self::new() + pub fn new(http_options: HttpOptions) -> Self { + Self { + http_options, + _p: PhantomData, + } } } -#[derive(Deserialize)] -struct VersionInfo { - version: u64, -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -#[allow(dead_code)] -struct DatabaseMetadata { - version: u64, - database_id: Uuid, - endpoints: Vec<EndpointInfo>, - token: String, - expires_at: DateTime<Utc>, +pub struct PermissionChecker<P: RemoteDbHandlerPermissions> { + state: Rc<RefCell<OpState>>, + _permissions: PhantomData<P>, } -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct EndpointInfo { - pub url: String, - - // Using `String` instead of an enum, so that parsing doesn't - // break if more consistency levels are added. - pub consistency: String, +impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions + for PermissionChecker<P> +{ + fn check_net_url(&self, url: &Url) -> Result<(), anyhow::Error> { + let mut state = self.state.borrow_mut(); + let permissions = state.borrow_mut::<P>(); + permissions.check_net_url(url, "Deno.openKv") + } } #[async_trait(?Send)] -impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> { - type DB = RemoteDb<P>; +impl<P: RemoteDbHandlerPermissions + 'static> DatabaseHandler + for RemoteDbHandler<P> +{ + type DB = Remote<PermissionChecker<P>>; async fn open( &self, @@ -122,470 +110,36 @@ impl<P: RemoteDbHandlerPermissions> DatabaseHandler for RemoteDbHandler<P> { "Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account." })?; - let refresher = MetadataRefresher::new(url, access_token); - - let db = RemoteDb { - client: reqwest::Client::new(), - refresher, - _p: PhantomData, - }; - Ok(db) - } -} - -pub struct RemoteDb<P: RemoteDbHandlerPermissions + 'static> { - client: reqwest::Client, - refresher: MetadataRefresher, - _p: std::marker::PhantomData<P>, -} - -pub struct DummyQueueMessageHandle {} - -#[async_trait(?Send)] -impl QueueMessageHandle for DummyQueueMessageHandle { - async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> { - unimplemented!() - } - - async fn finish(&self, _success: bool) -> Result<(), AnyError> { - unimplemented!() - } -} - -#[async_trait(?Send)] -impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> { - type QMH = DummyQueueMessageHandle; - - async fn snapshot_read( - &self, - state: Rc<RefCell<OpState>>, - requests: Vec<ReadRange>, - _options: SnapshotReadOptions, - ) -> Result<Vec<ReadRangeOutput>, AnyError> { - let req = pb::SnapshotRead { - ranges: requests - .into_iter() - .map(|r| pb::ReadRange { - start: r.start, - end: r.end, - limit: r.limit.get() as _, - reverse: r.reverse, - }) - .collect(), - }; - - let res: pb::SnapshotReadOutput = call_remote::<P, _, _>( - &state, - &self.refresher, - &self.client, - "snapshot_read", - &req, - ) - .await?; - - if res.read_disabled { - return Err(type_error("Reads are disabled for this database.")); - } - - let out = res - .ranges - .into_iter() - .map(|r| { - Ok(ReadRangeOutput { - entries: r - .values - .into_iter() - .map(|e| { - let encoding = e.encoding(); - Ok(KvEntry { - key: e.key, - value: decode_value(e.value, encoding)?, - versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?, - }) - }) - .collect::<Result<_, AnyError>>()?, - }) - }) - .collect::<Result<Vec<_>, AnyError>>()?; - Ok(out) - } - - async fn atomic_write( - &self, - state: Rc<RefCell<OpState>>, - write: AtomicWrite, - ) -> Result<Option<CommitResult>, AnyError> { - if !write.enqueues.is_empty() { - return Err(type_error("Enqueue operations are not supported yet.")); - } - - let req = pb::AtomicWrite { - kv_checks: write - .checks - .into_iter() - .map(|x| { - Ok(pb::KvCheck { - key: x.key, - versionstamp: x.versionstamp.unwrap_or([0u8; 10]).to_vec(), - }) - }) - .collect::<anyhow::Result<_>>()?, - kv_mutations: write.mutations.into_iter().map(encode_mutation).collect(), - enqueues: vec![], - }; - - let res: pb::AtomicWriteOutput = call_remote::<P, _, _>( - &state, - &self.refresher, - &self.client, - "atomic_write", - &req, - ) - .await?; - match res.status() { - pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult { - versionstamp: if res.versionstamp.is_empty() { - Default::default() - } else { - res.versionstamp[..].try_into()? - }, - })), - pb::AtomicWriteStatus::AwCheckFailure => Ok(None), - pb::AtomicWriteStatus::AwUnsupportedWrite => { - Err(type_error("Unsupported write")) - } - pb::AtomicWriteStatus::AwUsageLimitExceeded => { - Err(type_error("The database usage limit has been exceeded.")) - } - pb::AtomicWriteStatus::AwWriteDisabled => { - // TODO: Auto retry - Err(type_error("Writes are disabled for this database.")) - } - pb::AtomicWriteStatus::AwUnspecified => { - Err(type_error("Unspecified error")) - } - pb::AtomicWriteStatus::AwQueueBacklogLimitExceeded => { - Err(type_error("Queue backlog limit exceeded")) - } - } - } - - async fn dequeue_next_message( - &self, - _state: Rc<RefCell<OpState>>, - ) -> Result<Option<Self::QMH>, AnyError> { - let msg = "Deno.Kv.listenQueue is not supported for remote KV databases"; - eprintln!("{}", yellow(msg)); - deno_core::futures::future::pending().await - } - - fn close(&self) {} -} - -fn yellow<S: AsRef<str>>(s: S) -> impl fmt::Display { - if std::env::var_os("NO_COLOR").is_some() { - return String::from(s.as_ref()); - } - let mut style_spec = ColorSpec::new(); - style_spec.set_fg(Some(Color::Yellow)); - let mut v = Vec::new(); - let mut ansi_writer = Ansi::new(&mut v); - ansi_writer.set_color(&style_spec).unwrap(); - ansi_writer.write_all(s.as_ref().as_bytes()).unwrap(); - ansi_writer.reset().unwrap(); - String::from_utf8_lossy(&v).into_owned() -} - -fn decode_value( - value: Vec<u8>, - encoding: pb::KvValueEncoding, -) -> anyhow::Result<crate::Value> { - match encoding { - pb::KvValueEncoding::VeV8 => Ok(crate::Value::V8(value)), - pb::KvValueEncoding::VeBytes => Ok(crate::Value::Bytes(value)), - pb::KvValueEncoding::VeLe64 => Ok(crate::Value::U64(u64::from_le_bytes( - <[u8; 8]>::try_from(&value[..])?, - ))), - pb::KvValueEncoding::VeUnspecified => { - Err(anyhow::anyhow!("Unspecified value encoding, cannot decode")) - } - } -} - -fn encode_value(value: crate::Value) -> pb::KvValue { - match value { - crate::Value::V8(data) => pb::KvValue { - data, - encoding: pb::KvValueEncoding::VeV8 as _, - }, - crate::Value::Bytes(data) => pb::KvValue { - data, - encoding: pb::KvValueEncoding::VeBytes as _, - }, - crate::Value::U64(x) => pb::KvValue { - data: x.to_le_bytes().to_vec(), - encoding: pb::KvValueEncoding::VeLe64 as _, - }, - } -} - -fn encode_mutation(m: crate::KvMutation) -> pb::KvMutation { - let key = m.key; - let expire_at_ms = - m.expire_at.and_then(|x| i64::try_from(x).ok()).unwrap_or(0); - - match m.kind { - MutationKind::Set(x) => pb::KvMutation { - key, - value: Some(encode_value(x)), - mutation_type: pb::KvMutationType::MSet as _, - expire_at_ms, - }, - MutationKind::Delete => pb::KvMutation { - key, - value: Some(encode_value(crate::Value::Bytes(vec![]))), - mutation_type: pb::KvMutationType::MClear as _, - expire_at_ms, - }, - MutationKind::Max(x) => pb::KvMutation { - key, - value: Some(encode_value(x)), - mutation_type: pb::KvMutationType::MMax as _, - expire_at_ms, - }, - MutationKind::Min(x) => pb::KvMutation { - key, - value: Some(encode_value(x)), - mutation_type: pb::KvMutationType::MMin as _, - expire_at_ms, - }, - MutationKind::Sum(x) => pb::KvMutation { - key, - value: Some(encode_value(x)), - mutation_type: pb::KvMutationType::MSum as _, - expire_at_ms, - }, - } -} - -#[derive(Clone)] -enum MetadataState { - Ready(Arc<DatabaseMetadata>), - Invalid(String), - Pending, -} - -struct MetadataRefresher { - metadata_rx: watch::Receiver<MetadataState>, - handle: JoinHandle<()>, -} - -impl MetadataRefresher { - pub fn new(url: String, access_token: String) -> Self { - let (tx, rx) = watch::channel(MetadataState::Pending); - let handle = - deno_core::unsync::spawn(metadata_refresh_task(url, access_token, tx)); - Self { - handle, - metadata_rx: rx, - } - } -} - -impl Drop for MetadataRefresher { - fn drop(&mut self) { - self.handle.abort(); - } -} - -async fn metadata_refresh_task( - metadata_url: String, - access_token: String, - tx: watch::Sender<MetadataState>, -) { - let client = reqwest::Client::new(); - loop { - let mut attempt = 0u64; - let metadata = loop { - match fetch_metadata(&client, &metadata_url, &access_token).await { - Ok(Ok(x)) => break x, - Ok(Err(e)) => { - if tx.send(MetadataState::Invalid(e)).is_err() { - return; - } - } - Err(e) => { - log::error!("Failed to fetch database metadata: {}", e); - } - } - randomized_exponential_backoff(Duration::from_secs(5), attempt).await; - attempt += 1; + let metadata_endpoint = MetadataEndpoint { + url: parsed_url.clone(), + access_token: access_token.clone(), }; - let ms_until_expire = u64::try_from( - metadata - .expires_at - .timestamp_millis() - .saturating_sub(crate::time::utc_now().timestamp_millis()), - ) - .unwrap_or_default(); - - // Refresh 10 minutes before expiry - // In case of buggy clocks, don't refresh more than once per minute - let interval = Duration::from_millis(ms_until_expire) - .saturating_sub(Duration::from_secs(600)) - .max(Duration::from_secs(60)); - - if tx.send(MetadataState::Ready(Arc::new(metadata))).is_err() { - return; - } - - tokio::time::sleep(interval).await; - } -} - -async fn fetch_metadata( - client: &reqwest::Client, - metadata_url: &str, - access_token: &str, -) -> anyhow::Result<Result<DatabaseMetadata, String>> { - let res = client - .post(metadata_url) - .header("authorization", format!("Bearer {}", access_token)) - .send() - .await?; - - if !res.status().is_success() { - if res.status().is_client_error() { - return Ok(Err(format!( - "Client error while fetching metadata: {:?} {}", - res.status(), - res.text().await? - ))); - } else { - anyhow::bail!( - "remote returned error: {:?} {}", - res.status(), - res.text().await? - ); - } - } - - let res = res.bytes().await?; - let version_info: VersionInfo = match serde_json::from_slice(&res) { - Ok(x) => x, - Err(e) => return Ok(Err(format!("Failed to decode version info: {}", e))), - }; - if version_info.version > 1 { - return Ok(Err(format!( - "Unsupported metadata version: {}", - version_info.version - ))); - } - - Ok( - serde_json::from_slice(&res) - .map_err(|e| format!("Failed to decode metadata: {}", e)), - ) -} - -async fn randomized_exponential_backoff(base: Duration, attempt: u64) { - let attempt = attempt.min(12); - let delay = base.as_millis() as u64 + (2 << attempt); - let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1); - tokio::time::sleep(std::time::Duration::from_millis(delay)).await; -} - -async fn call_remote< - P: RemoteDbHandlerPermissions + 'static, - T: Message, - R: Message + Default, ->( - state: &RefCell<OpState>, - refresher: &MetadataRefresher, - client: &reqwest::Client, - method: &str, - req: &T, -) -> anyhow::Result<R> { - let mut attempt = 0u64; - let res = loop { - let mut metadata_rx = refresher.metadata_rx.clone(); - let metadata = loop { - match &*metadata_rx.borrow() { - MetadataState::Pending => {} - MetadataState::Ready(x) => break x.clone(), - MetadataState::Invalid(e) => { - return Err(type_error(format!("Metadata error: {}", e))) - } - } - // `unwrap()` never fails because `tx` is owned by the task held by `refresher`. - metadata_rx.changed().await.unwrap(); - }; - let Some(sc_endpoint) = metadata - .endpoints - .iter() - .find(|x| x.consistency == "strong") - else { - return Err(type_error( - "No strong consistency endpoint is available for this database", - )); + let options = &self.http_options; + let client = create_http_client( + &options.user_agent, + CreateHttpClientOptions { + root_cert_store: options.root_cert_store()?, + ca_certs: vec![], + proxy: options.proxy.clone(), + unsafely_ignore_certificate_errors: options + .unsafely_ignore_certificate_errors + .clone(), + client_cert_chain_and_key: options.client_cert_chain_and_key.clone(), + pool_max_idle_per_host: None, + pool_idle_timeout: None, + http1: true, + http2: true, + }, + )?; + + let permissions = PermissionChecker { + state: state.clone(), + _permissions: PhantomData, }; - let full_url = format!("{}/{}", sc_endpoint.url, method); - { - let parsed_url = Url::parse(&full_url)?; - let mut state = state.borrow_mut(); - let permissions = state.borrow_mut::<P>(); - permissions.check_net_url(&parsed_url, "Deno.Kv")?; - } - - let res = client - .post(&full_url) - .header("x-transaction-domain-id", metadata.database_id.to_string()) - .header("authorization", format!("Bearer {}", metadata.token)) - .body(req.encode_to_vec()) - .send() - .map_err(anyhow::Error::from) - .and_then(|x| async move { - if x.status().is_success() { - Ok(Ok(x.bytes().await?)) - } else if x.status().is_client_error() { - Ok(Err((x.status(), x.text().await?))) - } else { - Err(anyhow::anyhow!( - "server error ({:?}): {}", - x.status(), - x.text().await? - )) - } - }) - .await; - - match res { - Ok(x) => break x, - Err(e) => { - log::error!("retryable error in {}: {}", method, e); - randomized_exponential_backoff(Duration::from_millis(0), attempt).await; - attempt += 1; - } - } - }; - - let res = match res { - Ok(x) => x, - Err((status, message)) => { - return Err(type_error(format!( - "client error in {} (status {:?}): {}", - method, status, message - ))) - } - }; + let remote = Remote::new(client, permissions, metadata_endpoint); - match R::decode(&*res) { - Ok(x) => Ok(x), - Err(e) => Err(type_error(format!( - "failed to decode response from {}: {}", - method, e - ))), + Ok(remote) } } |