diff options
author | Luca Casonato <hello@lcas.dev> | 2023-10-31 12:13:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-31 11:13:57 +0000 |
commit | 2d9298f5f5550c21ba218ff7095aa9afe80c7e02 (patch) | |
tree | 6daee1cc6007745097e176c7367b697e58632baf /ext/kv/dynamic.rs | |
parent | 092555c611ebab87ad570b4dcb73d54288dccdd9 (diff) |
chore: update ext/kv to use denokv_* crates (#20986)
This commit updates the ext/kv module to use the denokv_* crates for
the protocol and the sqlite backend. This also fixes a couple of bugs in
the sqlite backend, and updates versionstamps to be updated less
linearly.
Diffstat (limited to 'ext/kv/dynamic.rs')
-rw-r--r-- | ext/kv/dynamic.rs | 44 |
1 files changed, 15 insertions, 29 deletions
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs index 9084cc1bf..b772d26b8 100644 --- a/ext/kv/dynamic.rs +++ b/ext/kv/dynamic.rs @@ -7,17 +7,17 @@ use crate::remote::RemoteDbHandlerPermissions; use crate::sqlite::SqliteDbHandler; use crate::sqlite::SqliteDbHandlerPermissions; use crate::AtomicWrite; -use crate::CommitResult; use crate::Database; use crate::DatabaseHandler; use crate::QueueMessageHandle; use crate::ReadRange; -use crate::ReadRangeOutput; use crate::SnapshotReadOptions; use async_trait::async_trait; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::OpState; +use denokv_proto::CommitResult; +use denokv_proto::ReadRangeOutput; pub struct MultiBackendDbHandler { backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>, @@ -34,15 +34,20 @@ impl MultiBackendDbHandler { P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static, >( default_storage_dir: Option<std::path::PathBuf>, + versionstamp_rng_seed: Option<u64>, + http_options: crate::remote::HttpOptions, ) -> Self { Self::new(vec![ ( &["https://", "http://"], - Box::new(crate::remote::RemoteDbHandler::<P>::new()), + Box::new(crate::remote::RemoteDbHandler::<P>::new(http_options)), ), ( &[""], - Box::new(SqliteDbHandler::<P>::new(default_storage_dir)), + Box::new(SqliteDbHandler::<P>::new( + default_storage_dir, + versionstamp_rng_seed, + )), ), ]) } @@ -118,20 +123,17 @@ where pub trait DynamicDb { async fn dyn_snapshot_read( &self, - state: Rc<RefCell<OpState>>, requests: Vec<ReadRange>, options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError>; async fn dyn_atomic_write( &self, - state: Rc<RefCell<OpState>>, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError>; async fn dyn_dequeue_next_message( &self, - state: Rc<RefCell<OpState>>, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>; fn dyn_close(&self); @@ -143,26 +145,23 @@ impl Database for Box<dyn DynamicDb> { async fn snapshot_read( &self, - state: Rc<RefCell<OpState>>, requests: Vec<ReadRange>, options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError> { - (**self).dyn_snapshot_read(state, requests, options).await + (**self).dyn_snapshot_read(requests, options).await } async fn atomic_write( &self, - state: Rc<RefCell<OpState>>, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError> { - (**self).dyn_atomic_write(state, write).await + (**self).dyn_atomic_write(write).await } async fn dequeue_next_message( &self, - state: Rc<RefCell<OpState>>, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> { - (**self).dyn_dequeue_next_message(state).await + (**self).dyn_dequeue_next_message().await } fn close(&self) { @@ -178,28 +177,25 @@ where { async fn dyn_snapshot_read( &self, - state: Rc<RefCell<OpState>>, requests: Vec<ReadRange>, options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError> { - Ok(self.snapshot_read(state, requests, options).await?) + Ok(self.snapshot_read(requests, options).await?) } async fn dyn_atomic_write( &self, - state: Rc<RefCell<OpState>>, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError> { - Ok(self.atomic_write(state, write).await?) + Ok(self.atomic_write(write).await?) } async fn dyn_dequeue_next_message( &self, - state: Rc<RefCell<OpState>>, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> { Ok( self - .dequeue_next_message(state) + .dequeue_next_message() .await? .map(|x| Box::new(x) as Box<dyn QueueMessageHandle>), ) @@ -209,13 +205,3 @@ where self.close() } } - -#[async_trait(?Send)] -impl QueueMessageHandle for Box<dyn QueueMessageHandle> { - async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> { - (**self).take_payload().await - } - async fn finish(&self, success: bool) -> Result<(), AnyError> { - (**self).finish(success).await - } -} |