summaryrefslogtreecommitdiff
path: root/ext/kv/dynamic.rs
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2023-12-05 14:21:46 +0100
committerGitHub <noreply@github.com>2023-12-05 21:21:46 +0800
commit74e39a927c63e789fec1c8f1817812920079229d (patch)
treefa38e32c700865b25710f491d551086733d58d5f /ext/kv/dynamic.rs
parenta24d3e8763bc48b69936db9231efb76766914303 (diff)
feat(unstable): kv.watch() (#21147)
This commit adds support for a new `kv.watch()` method that allows watching for changes to a key-value pair. This is useful for cases where you want to be notified when a key-value pair changes, but don't want to have to poll for changes. --------- Co-authored-by: losfair <zhy20000919@hotmail.com>
Diffstat (limited to 'ext/kv/dynamic.rs')
-rw-r--r--ext/kv/dynamic.rs34
1 files changed, 24 insertions, 10 deletions
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs
index b772d26b8..c8dd6640c 100644
--- a/ext/kv/dynamic.rs
+++ b/ext/kv/dynamic.rs
@@ -18,6 +18,7 @@ use deno_core::error::AnyError;
use deno_core::OpState;
use denokv_proto::CommitResult;
use denokv_proto::ReadRangeOutput;
+use denokv_proto::WatchStream;
pub struct MultiBackendDbHandler {
backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
@@ -55,7 +56,7 @@ impl MultiBackendDbHandler {
#[async_trait(?Send)]
impl DatabaseHandler for MultiBackendDbHandler {
- type DB = Box<dyn DynamicDb>;
+ type DB = RcDynamicDb;
async fn open(
&self,
@@ -88,12 +89,12 @@ pub trait DynamicDbHandler {
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
- ) -> Result<Box<dyn DynamicDb>, AnyError>;
+ ) -> Result<RcDynamicDb, AnyError>;
}
#[async_trait(?Send)]
impl DatabaseHandler for Box<dyn DynamicDbHandler> {
- type DB = Box<dyn DynamicDb>;
+ type DB = RcDynamicDb;
async fn open(
&self,
@@ -114,8 +115,8 @@ where
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
- ) -> Result<Box<dyn DynamicDb>, AnyError> {
- Ok(Box::new(self.open(state, path).await?))
+ ) -> Result<RcDynamicDb, AnyError> {
+ Ok(RcDynamicDb(Rc::new(self.open(state, path).await?)))
}
}
@@ -136,11 +137,16 @@ pub trait DynamicDb {
&self,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;
+ fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream;
+
fn dyn_close(&self);
}
+#[derive(Clone)]
+pub struct RcDynamicDb(Rc<dyn DynamicDb>);
+
#[async_trait(?Send)]
-impl Database for Box<dyn DynamicDb> {
+impl Database for RcDynamicDb {
type QMH = Box<dyn QueueMessageHandle>;
async fn snapshot_read(
@@ -148,24 +154,28 @@ impl Database for Box<dyn DynamicDb> {
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
- (**self).dyn_snapshot_read(requests, options).await
+ (*self.0).dyn_snapshot_read(requests, options).await
}
async fn atomic_write(
&self,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
- (**self).dyn_atomic_write(write).await
+ (*self.0).dyn_atomic_write(write).await
}
async fn dequeue_next_message(
&self,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
- (**self).dyn_dequeue_next_message().await
+ (*self.0).dyn_dequeue_next_message().await
+ }
+
+ fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
+ (*self.0).dyn_watch(keys)
}
fn close(&self) {
- (**self).dyn_close()
+ (*self.0).dyn_close()
}
}
@@ -201,6 +211,10 @@ where
)
}
+ fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
+ self.watch(keys)
+ }
+
fn dyn_close(&self) {
self.close()
}