diff options
author | Luca Casonato <hello@lcas.dev> | 2023-12-05 14:21:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-05 21:21:46 +0800 |
commit | 74e39a927c63e789fec1c8f1817812920079229d (patch) | |
tree | fa38e32c700865b25710f491d551086733d58d5f /ext/kv/dynamic.rs | |
parent | a24d3e8763bc48b69936db9231efb76766914303 (diff) |
feat(unstable): kv.watch() (#21147)
This commit adds support for a new `kv.watch()` method that allows
watching for changes to a key-value pair. This is useful for cases
where you want to be notified when a key-value pair changes, but
don't want to have to poll for changes.
---------
Co-authored-by: losfair <zhy20000919@hotmail.com>
Diffstat (limited to 'ext/kv/dynamic.rs')
-rw-r--r-- | ext/kv/dynamic.rs | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs index b772d26b8..c8dd6640c 100644 --- a/ext/kv/dynamic.rs +++ b/ext/kv/dynamic.rs @@ -18,6 +18,7 @@ use deno_core::error::AnyError; use deno_core::OpState; use denokv_proto::CommitResult; use denokv_proto::ReadRangeOutput; +use denokv_proto::WatchStream; pub struct MultiBackendDbHandler { backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>, @@ -55,7 +56,7 @@ impl MultiBackendDbHandler { #[async_trait(?Send)] impl DatabaseHandler for MultiBackendDbHandler { - type DB = Box<dyn DynamicDb>; + type DB = RcDynamicDb; async fn open( &self, @@ -88,12 +89,12 @@ pub trait DynamicDbHandler { &self, state: Rc<RefCell<OpState>>, path: Option<String>, - ) -> Result<Box<dyn DynamicDb>, AnyError>; + ) -> Result<RcDynamicDb, AnyError>; } #[async_trait(?Send)] impl DatabaseHandler for Box<dyn DynamicDbHandler> { - type DB = Box<dyn DynamicDb>; + type DB = RcDynamicDb; async fn open( &self, @@ -114,8 +115,8 @@ where &self, state: Rc<RefCell<OpState>>, path: Option<String>, - ) -> Result<Box<dyn DynamicDb>, AnyError> { - Ok(Box::new(self.open(state, path).await?)) + ) -> Result<RcDynamicDb, AnyError> { + Ok(RcDynamicDb(Rc::new(self.open(state, path).await?))) } } @@ -136,11 +137,16 @@ pub trait DynamicDb { &self, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>; + fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream; + fn dyn_close(&self); } +#[derive(Clone)] +pub struct RcDynamicDb(Rc<dyn DynamicDb>); + #[async_trait(?Send)] -impl Database for Box<dyn DynamicDb> { +impl Database for RcDynamicDb { type QMH = Box<dyn QueueMessageHandle>; async fn snapshot_read( @@ -148,24 +154,28 @@ impl Database for Box<dyn DynamicDb> { requests: Vec<ReadRange>, options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError> { - (**self).dyn_snapshot_read(requests, options).await + (*self.0).dyn_snapshot_read(requests, options).await } async fn atomic_write( &self, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError> { - (**self).dyn_atomic_write(write).await + (*self.0).dyn_atomic_write(write).await } async fn dequeue_next_message( &self, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> { - (**self).dyn_dequeue_next_message().await + (*self.0).dyn_dequeue_next_message().await + } + + fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream { + (*self.0).dyn_watch(keys) } fn close(&self) { - (**self).dyn_close() + (*self.0).dyn_close() } } @@ -201,6 +211,10 @@ where ) } + fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream { + self.watch(keys) + } + fn dyn_close(&self) { self.close() } |