summaryrefslogtreecommitdiff
path: root/ext/kv/dynamic.rs
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2023-10-31 12:13:57 +0100
committerGitHub <noreply@github.com>2023-10-31 11:13:57 +0000
commit2d9298f5f5550c21ba218ff7095aa9afe80c7e02 (patch)
tree6daee1cc6007745097e176c7367b697e58632baf /ext/kv/dynamic.rs
parent092555c611ebab87ad570b4dcb73d54288dccdd9 (diff)
chore: update ext/kv to use denokv_* crates (#20986)
This commit updates the ext/kv module to use the denokv_* crates for the protocol and the sqlite backend. This also fixes a couple of bugs in the sqlite backend, and updates versionstamps to be updated less linearly.
Diffstat (limited to 'ext/kv/dynamic.rs')
-rw-r--r--ext/kv/dynamic.rs44
1 files changed, 15 insertions, 29 deletions
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs
index 9084cc1bf..b772d26b8 100644
--- a/ext/kv/dynamic.rs
+++ b/ext/kv/dynamic.rs
@@ -7,17 +7,17 @@ use crate::remote::RemoteDbHandlerPermissions;
use crate::sqlite::SqliteDbHandler;
use crate::sqlite::SqliteDbHandlerPermissions;
use crate::AtomicWrite;
-use crate::CommitResult;
use crate::Database;
use crate::DatabaseHandler;
use crate::QueueMessageHandle;
use crate::ReadRange;
-use crate::ReadRangeOutput;
use crate::SnapshotReadOptions;
use async_trait::async_trait;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::OpState;
+use denokv_proto::CommitResult;
+use denokv_proto::ReadRangeOutput;
pub struct MultiBackendDbHandler {
backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
@@ -34,15 +34,20 @@ impl MultiBackendDbHandler {
P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static,
>(
default_storage_dir: Option<std::path::PathBuf>,
+ versionstamp_rng_seed: Option<u64>,
+ http_options: crate::remote::HttpOptions,
) -> Self {
Self::new(vec![
(
&["https://", "http://"],
- Box::new(crate::remote::RemoteDbHandler::<P>::new()),
+ Box::new(crate::remote::RemoteDbHandler::<P>::new(http_options)),
),
(
&[""],
- Box::new(SqliteDbHandler::<P>::new(default_storage_dir)),
+ Box::new(SqliteDbHandler::<P>::new(
+ default_storage_dir,
+ versionstamp_rng_seed,
+ )),
),
])
}
@@ -118,20 +123,17 @@ where
pub trait DynamicDb {
async fn dyn_snapshot_read(
&self,
- state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError>;
async fn dyn_atomic_write(
&self,
- state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError>;
async fn dyn_dequeue_next_message(
&self,
- state: Rc<RefCell<OpState>>,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;
fn dyn_close(&self);
@@ -143,26 +145,23 @@ impl Database for Box<dyn DynamicDb> {
async fn snapshot_read(
&self,
- state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
- (**self).dyn_snapshot_read(state, requests, options).await
+ (**self).dyn_snapshot_read(requests, options).await
}
async fn atomic_write(
&self,
- state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
- (**self).dyn_atomic_write(state, write).await
+ (**self).dyn_atomic_write(write).await
}
async fn dequeue_next_message(
&self,
- state: Rc<RefCell<OpState>>,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
- (**self).dyn_dequeue_next_message(state).await
+ (**self).dyn_dequeue_next_message().await
}
fn close(&self) {
@@ -178,28 +177,25 @@ where
{
async fn dyn_snapshot_read(
&self,
- state: Rc<RefCell<OpState>>,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
- Ok(self.snapshot_read(state, requests, options).await?)
+ Ok(self.snapshot_read(requests, options).await?)
}
async fn dyn_atomic_write(
&self,
- state: Rc<RefCell<OpState>>,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
- Ok(self.atomic_write(state, write).await?)
+ Ok(self.atomic_write(write).await?)
}
async fn dyn_dequeue_next_message(
&self,
- state: Rc<RefCell<OpState>>,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
Ok(
self
- .dequeue_next_message(state)
+ .dequeue_next_message()
.await?
.map(|x| Box::new(x) as Box<dyn QueueMessageHandle>),
)
@@ -209,13 +205,3 @@ where
self.close()
}
}
-
-#[async_trait(?Send)]
-impl QueueMessageHandle for Box<dyn QueueMessageHandle> {
- async fn take_payload(&mut self) -> Result<Vec<u8>, AnyError> {
- (**self).take_payload().await
- }
- async fn finish(&self, success: bool) -> Result<(), AnyError> {
- (**self).finish(success).await
- }
-}