summaryrefslogtreecommitdiff
path: root/ext/kv/dynamic.rs
diff options
context:
space:
mode:
authorHeyang Zhou <zhy20000919@hotmail.com>2023-08-22 13:56:00 +0800
committerGitHub <noreply@github.com>2023-08-22 13:56:00 +0800
commit6d4a005e4108a5dd762b339a02bc4d802755ba0d (patch)
tree69679038bfbd3127f6c1e1b85dbc347c8c52e36e /ext/kv/dynamic.rs
parent5834d282d4de5d0b5cacb9bf068f3896bef0a48a (diff)
feat(ext/kv): connect to remote database (#20178)
This patch adds a `remote` backend for `ext/kv`. This supports connection to Deno Deploy and potentially other services compatible with the KV Connect protocol.
Diffstat (limited to 'ext/kv/dynamic.rs')
-rw-r--r--ext/kv/dynamic.rs216
1 files changed, 216 insertions, 0 deletions
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs
new file mode 100644
index 000000000..f79c10f55
--- /dev/null
+++ b/ext/kv/dynamic.rs
@@ -0,0 +1,216 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::cell::RefCell;
+use std::rc::Rc;
+
+use crate::remote::RemoteDbHandlerPermissions;
+use crate::sqlite::SqliteDbHandler;
+use crate::sqlite::SqliteDbHandlerPermissions;
+use crate::AtomicWrite;
+use crate::CommitResult;
+use crate::Database;
+use crate::DatabaseHandler;
+use crate::QueueMessageHandle;
+use crate::ReadRange;
+use crate::ReadRangeOutput;
+use crate::SnapshotReadOptions;
+use async_trait::async_trait;
+use deno_core::error::type_error;
+use deno_core::error::AnyError;
+use deno_core::OpState;
+
+pub struct MultiBackendDbHandler {
+ backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
+}
+
+impl MultiBackendDbHandler {
+ pub fn new(
+ backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
+ ) -> Self {
+ Self { backends }
+ }
+
+ pub fn remote_or_sqlite<
+ P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static,
+ >(
+ default_storage_dir: Option<std::path::PathBuf>,
+ ) -> Self {
+ Self::new(vec![
+ (
+ &["https://", "http://"],
+ Box::new(crate::remote::RemoteDbHandler::<P>::new()),
+ ),
+ (
+ &[""],
+ Box::new(SqliteDbHandler::<P>::new(default_storage_dir)),
+ ),
+ ])
+ }
+}
+
+#[async_trait(?Send)]
+impl DatabaseHandler for MultiBackendDbHandler {
+ type DB = Box<dyn DynamicDb>;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError> {
+ for (prefixes, handler) in &self.backends {
+ for &prefix in *prefixes {
+ if prefix.is_empty() {
+ return handler.dyn_open(state.clone(), path.clone()).await;
+ }
+ let Some(path) = &path else {
+ continue;
+ };
+ if path.starts_with(prefix) {
+ return handler.dyn_open(state.clone(), Some(path.clone())).await;
+ }
+ }
+ }
+ Err(type_error(format!(
+ "No backend supports the given path: {:?}",
+ path
+ )))
+ }
+}
+
+#[async_trait(?Send)]
+pub trait DynamicDbHandler {
+ async fn dyn_open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Box<dyn DynamicDb>, AnyError>;
+}
+
+#[async_trait(?Send)]
+impl DatabaseHandler for Box<dyn DynamicDbHandler> {
+ type DB = Box<dyn DynamicDb>;
+
+ async fn open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Self::DB, AnyError> {
+ (**self).dyn_open(state, path).await
+ }
+}
+
+#[async_trait(?Send)]
+impl<T, DB> DynamicDbHandler for T
+where
+ T: DatabaseHandler<DB = DB>,
+ DB: Database + 'static,
+{
+ async fn dyn_open(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ path: Option<String>,
+ ) -> Result<Box<dyn DynamicDb>, AnyError> {
+ Ok(Box::new(self.open(state, path).await?))
+ }
+}
+
+#[async_trait(?Send)]
+pub trait DynamicDb {
+ async fn dyn_snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError>;
+
+ async fn dyn_atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError>;
+
+ async fn dyn_dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Box<dyn QueueMessageHandle>, AnyError>;
+
+ fn dyn_close(&self);
+}
+
+#[async_trait(?Send)]
+impl Database for Box<dyn DynamicDb> {
+ type QMH = Box<dyn QueueMessageHandle>;
+
+ async fn snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ (**self).dyn_snapshot_read(state, requests, options).await
+ }
+
+ async fn atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError> {
+ (**self).dyn_atomic_write(state, write).await
+ }
+
+ async fn dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
+ (**self).dyn_dequeue_next_message(state).await
+ }
+
+ fn close(&self) {
+ (**self).dyn_close()
+ }
+}
+
+#[async_trait(?Send)]
+impl<T, QMH> DynamicDb for T
+where
+ T: Database<QMH = QMH>,
+ QMH: QueueMessageHandle + 'static,
+{
+ async fn dyn_snapshot_read(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ requests: Vec<ReadRange>,
+ options: SnapshotReadOptions,
+ ) -> Result<Vec<ReadRangeOutput>, AnyError> {
+ Ok(self.snapshot_read(state, requests, options).await?)
+ }
+
+ async fn dyn_atomic_write(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ write: AtomicWrite,
+ ) -> Result<Option<CommitResult>, AnyError> {
+ Ok(self.atomic_write(state, write).await?)
+ }
+
+ async fn dyn_dequeue_next_message(
+ &self,
+ state: Rc<RefCell<OpState>>,
+ ) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
+ Ok(Box::new(self.dequeue_next_message(state).await?))
+ }
+
+ fn dyn_close(&self) {
+ self.close()
+ }
+}
+
+#[async_trait(?Send)]
+impl QueueMessageHandle for Box<dyn QueueMessageHandle> {
+ async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
+ (**self).take_payload().await
+ }
+ async fn finish(&self, success: bool) -> Result<(), AnyError> {
+ (**self).finish(success).await
+ }
+}