diff options
Diffstat (limited to 'ext/kv/dynamic.rs')
-rw-r--r-- | ext/kv/dynamic.rs | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs index b772d26b8..c8dd6640c 100644 --- a/ext/kv/dynamic.rs +++ b/ext/kv/dynamic.rs @@ -18,6 +18,7 @@ use deno_core::error::AnyError; use deno_core::OpState; use denokv_proto::CommitResult; use denokv_proto::ReadRangeOutput; +use denokv_proto::WatchStream; pub struct MultiBackendDbHandler { backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>, @@ -55,7 +56,7 @@ impl MultiBackendDbHandler { #[async_trait(?Send)] impl DatabaseHandler for MultiBackendDbHandler { - type DB = Box<dyn DynamicDb>; + type DB = RcDynamicDb; async fn open( &self, @@ -88,12 +89,12 @@ pub trait DynamicDbHandler { &self, state: Rc<RefCell<OpState>>, path: Option<String>, - ) -> Result<Box<dyn DynamicDb>, AnyError>; + ) -> Result<RcDynamicDb, AnyError>; } #[async_trait(?Send)] impl DatabaseHandler for Box<dyn DynamicDbHandler> { - type DB = Box<dyn DynamicDb>; + type DB = RcDynamicDb; async fn open( &self, @@ -114,8 +115,8 @@ where &self, state: Rc<RefCell<OpState>>, path: Option<String>, - ) -> Result<Box<dyn DynamicDb>, AnyError> { - Ok(Box::new(self.open(state, path).await?)) + ) -> Result<RcDynamicDb, AnyError> { + Ok(RcDynamicDb(Rc::new(self.open(state, path).await?))) } } @@ -136,11 +137,16 @@ pub trait DynamicDb { &self, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>; + fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream; + fn dyn_close(&self); } +#[derive(Clone)] +pub struct RcDynamicDb(Rc<dyn DynamicDb>); + #[async_trait(?Send)] -impl Database for Box<dyn DynamicDb> { +impl Database for RcDynamicDb { type QMH = Box<dyn QueueMessageHandle>; async fn snapshot_read( @@ -148,24 +154,28 @@ impl Database for Box<dyn DynamicDb> { requests: Vec<ReadRange>, options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError> { - (**self).dyn_snapshot_read(requests, options).await + (*self.0).dyn_snapshot_read(requests, options).await } async fn atomic_write( &self, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError> { - (**self).dyn_atomic_write(write).await + (*self.0).dyn_atomic_write(write).await } async fn dequeue_next_message( &self, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> { - (**self).dyn_dequeue_next_message().await + (*self.0).dyn_dequeue_next_message().await + } + + fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream { + (*self.0).dyn_watch(keys) } fn close(&self) { - (**self).dyn_close() + (*self.0).dyn_close() } } @@ -201,6 +211,10 @@ where ) } + fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream { + self.watch(keys) + } + fn dyn_close(&self) { self.close() } |