summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2023-12-05 14:21:46 +0100
committerGitHub <noreply@github.com>2023-12-05 21:21:46 +0800
commit74e39a927c63e789fec1c8f1817812920079229d (patch)
treefa38e32c700865b25710f491d551086733d58d5f
parenta24d3e8763bc48b69936db9231efb76766914303 (diff)
feat(unstable): kv.watch() (#21147)
This commit adds support for a new `kv.watch()` method that allows watching for changes to a key-value pair. This is useful for cases where you want to be notified when a key-value pair changes, but don't want to have to poll for changes. --------- Co-authored-by: losfair <zhy20000919@hotmail.com>
-rw-r--r--Cargo.lock18
-rw-r--r--Cargo.toml6
-rw-r--r--cli/tests/unit/kv_test.ts44
-rw-r--r--cli/tsc/dts/lib.deno.unstable.d.ts42
-rw-r--r--ext/kv/01_db.ts67
-rw-r--r--ext/kv/dynamic.rs34
-rw-r--r--ext/kv/lib.rs129
-rw-r--r--ext/kv/remote.rs9
-rw-r--r--ext/kv/sqlite.rs62
-rw-r--r--test_util/src/lib.rs2
10 files changed, 357 insertions, 56 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 269bf912d..4a645ac0a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1707,13 +1707,14 @@ dependencies = [
[[package]]
name = "denokv_proto"
-version = "0.4.0"
+version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dd501a6b10a8b7fe7e605cafe4aad1d266ed0791b788dab78889df14b1a23e5f"
+checksum = "98a79f7e98bfd3c148ce782c27c1494e77c3c94ab87c9e7e86e901cbc1643449"
dependencies = [
"anyhow",
"async-trait",
"chrono",
+ "futures",
"num-bigint",
"prost",
"prost-build",
@@ -1723,15 +1724,17 @@ dependencies = [
[[package]]
name = "denokv_remote"
-version = "0.4.0"
+version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7d94fd3da7d1fa9ef1515bf3bc5b2fe75389edb3f15e9445e345679fda44987c"
+checksum = "518e181eb14f1a3b8fc423e48de431048249780fb0815d81e8139faf347c3269"
dependencies = [
"anyhow",
+ "async-stream",
"async-trait",
"bytes",
"chrono",
"denokv_proto",
+ "futures",
"log",
"prost",
"rand",
@@ -1739,21 +1742,24 @@ dependencies = [
"serde",
"serde_json",
"tokio",
+ "tokio-util",
"url",
"uuid",
]
[[package]]
name = "denokv_sqlite"
-version = "0.4.0"
+version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c2d2ab5fe45079dc41ab76549bb9fc3aeb9be4e54fddbdfb16a779db4a0b38df"
+checksum = "90af93f2ab8eec43fea9f8931fa99d38e73fa0af60aba0fae79de3fb87a0ed06"
dependencies = [
"anyhow",
+ "async-stream",
"async-trait",
"chrono",
"denokv_proto",
"futures",
+ "hex",
"log",
"num-bigint",
"rand",
diff --git a/Cargo.toml b/Cargo.toml
index d22cf5919..743cfe78f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -49,10 +49,10 @@ test_util = { path = "./test_util" }
deno_lockfile = "0.17.2"
deno_media_type = { version = "0.1.1", features = ["module_specifier"] }
-denokv_proto = "0.4.0"
+denokv_proto = "0.5.0"
# denokv_sqlite brings in bundled sqlite if we don't disable the default features
-denokv_sqlite = { default-features = false, version = "0.4.0" }
-denokv_remote = "0.4.0"
+denokv_sqlite = { default-features = false, version = "0.5.0" }
+denokv_remote = "0.5.0"
# exts
deno_broadcast_channel = { version = "0.120.0", path = "./ext/broadcast_channel" }
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts
index 73c85dd5c..68b3c4013 100644
--- a/cli/tests/unit/kv_test.ts
+++ b/cli/tests/unit/kv_test.ts
@@ -2137,3 +2137,47 @@ Deno.test(
// calling [Symbol.dispose] after manual close is a no-op
},
);
+
+dbTest("key watch", async (db) => {
+ const changeHistory: Deno.KvEntryMaybe<number>[] = [];
+ const watcher: ReadableStream<Deno.KvEntryMaybe<number>[]> = db.watch<
+ number[]
+ >([["key"]]);
+
+ const reader = watcher.getReader();
+ const expectedChanges = 2;
+
+ const work = (async () => {
+ for (let i = 0; i < expectedChanges; i++) {
+ const message = await reader.read();
+ if (message.done) {
+ throw new Error("Unexpected end of stream");
+ }
+ changeHistory.push(message.value[0]);
+ }
+
+ await reader.cancel();
+ })();
+
+ while (changeHistory.length !== 1) {
+ await sleep(100);
+ }
+ assertEquals(changeHistory[0], {
+ key: ["key"],
+ value: null,
+ versionstamp: null,
+ });
+
+ const { versionstamp } = await db.set(["key"], 1);
+ while (changeHistory.length as number !== 2) {
+ await sleep(100);
+ }
+ assertEquals(changeHistory[1], {
+ key: ["key"],
+ value: 1,
+ versionstamp,
+ });
+
+ await work;
+ await reader.cancel();
+});
diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts
index 1bf4d3cfd..2890a50ff 100644
--- a/cli/tsc/dts/lib.deno.unstable.d.ts
+++ b/cli/tsc/dts/lib.deno.unstable.d.ts
@@ -2033,6 +2033,48 @@ declare namespace Deno {
atomic(): AtomicOperation;
/**
+ * Watch for changes to the given keys in the database. The returned stream
+ * is a {@linkcode ReadableStream} that emits a new value whenever any of
+ * the watched keys change their versionstamp. The emitted value is an array
+ * of {@linkcode Deno.KvEntryMaybe} objects, with the same length and order
+ * as the `keys` array. If no value exists for a given key, the returned
+ * entry will have a `null` value and versionstamp.
+ *
+ * The returned stream does not return every single intermediate state of
+ * the watched keys, but rather only keeps you up to date with the latest
+ * state of the keys. This means that if a key is modified multiple times
+ * quickly, you may not receive a notification for every single change, but
+ * rather only the latest state of the key.
+ *
+ * ```ts
+ * const db = await Deno.openKv();
+ *
+ * const stream = db.watch([["foo"], ["bar"]]);
+ * for await (const entries of stream) {
+ * entries[0].key; // ["foo"]
+ * entries[0].value; // "bar"
+ * entries[0].versionstamp; // "00000000000000010000"
+ * entries[1].key; // ["bar"]
+ * entries[1].value; // null
+ * entries[1].versionstamp; // null
+ * }
+ * ```
+ *
+ * The `options` argument can be used to specify additional options for the
+ * watch operation. The `raw` option can be used to specify whether a new
+ * value should be emitted whenever a mutation occurs on any of the watched
+ * keys (even if the value of the key does not change, such as deleting a
+ * deleted key), or only when entries have observably changed in some way.
+ * When `raw: true` is used, it is possible for the stream to occasionally
+ * emit values even if no mutations have occurred on any of the watched
+ * keys. The default value for this option is `false`.
+ */
+ watch<T extends readonly unknown[]>(
+ keys: readonly [...{ [K in keyof T]: KvKey }],
+ options?: { raw?: boolean },
+ ): ReadableStream<{ [K in keyof T]: KvEntryMaybe<T[K]> }>;
+
+ /**
* Close the database connection. This will prevent any further operations
* from being performed on the database, and interrupt any in-flight
* operations immediately.
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index 34678261a..73deee27f 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -11,8 +11,10 @@ const {
SymbolFor,
SymbolToStringTag,
Uint8ArrayPrototype,
+ Error,
} = globalThis.__bootstrap.primordials;
import { SymbolDispose } from "ext:deno_web/00_infra.js";
+import { ReadableStream } from "ext:deno_web/06_streams.js";
const core = Deno.core;
const ops = core.ops;
@@ -297,6 +299,71 @@ class Kv {
finishMessageOps.clear();
}
+ watch(keys: Deno.KvKey[], options = {}) {
+ const raw = options.raw ?? false;
+ const rid = ops.op_kv_watch(this.#rid, keys);
+ const lastEntries: (Deno.KvEntryMaybe<unknown> | undefined)[] = Array.from(
+ { length: keys.length },
+ () => undefined,
+ );
+ return new ReadableStream({
+ async pull(controller) {
+ while (true) {
+ let updates;
+ try {
+ updates = await core.opAsync("op_kv_watch_next", rid);
+ } catch (err) {
+ core.tryClose(rid);
+ controller.error(err);
+ return;
+ }
+ if (updates === null) {
+ core.tryClose(rid);
+ controller.close();
+ return;
+ }
+ let changed = false;
+ for (let i = 0; i < keys.length; i++) {
+ if (updates[i] === "unchanged") {
+ if (lastEntries[i] === undefined) {
+ throw new Error(
+ "watch: invalid unchanged update (internal error)",
+ );
+ }
+ continue;
+ }
+ if (
+ lastEntries[i] !== undefined &&
+ (updates[i]?.versionstamp ?? null) ===
+ lastEntries[i]?.versionstamp
+ ) {
+ continue;
+ }
+ changed = true;
+ if (updates[i] === null) {
+ lastEntries[i] = {
+ key: [...keys[i]],
+ value: null,
+ versionstamp: null,
+ };
+ } else {
+ lastEntries[i] = updates[i];
+ }
+ }
+ if (!changed && !raw) continue; // no change
+ const entries = lastEntries.map((entry) =>
+ entry.versionstamp === null ? { ...entry } : deserializeValue(entry)
+ );
+ controller.enqueue(entries);
+ return;
+ }
+ },
+ cancel() {
+ core.tryClose(rid);
+ },
+ });
+ }
+
close() {
core.close(this.#rid);
}
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs
index b772d26b8..c8dd6640c 100644
--- a/ext/kv/dynamic.rs
+++ b/ext/kv/dynamic.rs
@@ -18,6 +18,7 @@ use deno_core::error::AnyError;
use deno_core::OpState;
use denokv_proto::CommitResult;
use denokv_proto::ReadRangeOutput;
+use denokv_proto::WatchStream;
pub struct MultiBackendDbHandler {
backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
@@ -55,7 +56,7 @@ impl MultiBackendDbHandler {
#[async_trait(?Send)]
impl DatabaseHandler for MultiBackendDbHandler {
- type DB = Box<dyn DynamicDb>;
+ type DB = RcDynamicDb;
async fn open(
&self,
@@ -88,12 +89,12 @@ pub trait DynamicDbHandler {
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
- ) -> Result<Box<dyn DynamicDb>, AnyError>;
+ ) -> Result<RcDynamicDb, AnyError>;
}
#[async_trait(?Send)]
impl DatabaseHandler for Box<dyn DynamicDbHandler> {
- type DB = Box<dyn DynamicDb>;
+ type DB = RcDynamicDb;
async fn open(
&self,
@@ -114,8 +115,8 @@ where
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
- ) -> Result<Box<dyn DynamicDb>, AnyError> {
- Ok(Box::new(self.open(state, path).await?))
+ ) -> Result<RcDynamicDb, AnyError> {
+ Ok(RcDynamicDb(Rc::new(self.open(state, path).await?)))
}
}
@@ -136,11 +137,16 @@ pub trait DynamicDb {
&self,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;
+ fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream;
+
fn dyn_close(&self);
}
+#[derive(Clone)]
+pub struct RcDynamicDb(Rc<dyn DynamicDb>);
+
#[async_trait(?Send)]
-impl Database for Box<dyn DynamicDb> {
+impl Database for RcDynamicDb {
type QMH = Box<dyn QueueMessageHandle>;
async fn snapshot_read(
@@ -148,24 +154,28 @@ impl Database for Box<dyn DynamicDb> {
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
- (**self).dyn_snapshot_read(requests, options).await
+ (*self.0).dyn_snapshot_read(requests, options).await
}
async fn atomic_write(
&self,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
- (**self).dyn_atomic_write(write).await
+ (*self.0).dyn_atomic_write(write).await
}
async fn dequeue_next_message(
&self,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
- (**self).dyn_dequeue_next_message().await
+ (*self.0).dyn_dequeue_next_message().await
+ }
+
+ fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
+ (*self.0).dyn_watch(keys)
}
fn close(&self) {
- (**self).dyn_close()
+ (*self.0).dyn_close()
}
}
@@ -201,6 +211,10 @@ where
)
}
+ fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
+ self.watch(keys)
+ }
+
fn dyn_close(&self) {
self.close()
}
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index c0091d75d..456a1ebf7 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -20,12 +20,17 @@ use deno_core::anyhow::Context;
use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::serde_v8::AnyValue;
use deno_core::serde_v8::BigInt;
+use deno_core::AsyncRefCell;
use deno_core::ByteString;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::OpState;
+use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ToJsBuffer;
@@ -45,6 +50,8 @@ use denokv_proto::MutationKind;
use denokv_proto::QueueMessageHandle;
use denokv_proto::ReadRange;
use denokv_proto::SnapshotReadOptions;
+use denokv_proto::WatchKeyOutput;
+use denokv_proto::WatchStream;
use log::debug;
use serde::Deserialize;
use serde::Serialize;
@@ -62,6 +69,7 @@ const MAX_READ_RANGES: usize = 10;
const MAX_READ_ENTRIES: usize = 1000;
const MAX_CHECKS: usize = 100;
const MAX_MUTATIONS: usize = 1000;
+const MAX_WATCHED_KEYS: usize = 10;
const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 800 * 1024;
const MAX_TOTAL_KEY_SIZE_BYTES: usize = 80 * 1024;
@@ -75,6 +83,8 @@ deno_core::extension!(deno_kv,
op_kv_encode_cursor,
op_kv_dequeue_next_message<DBH>,
op_kv_finish_dequeued_message<DBH>,
+ op_kv_watch<DBH>,
+ op_kv_watch_next,
],
esm = [ "01_db.ts" ],
options = {
@@ -86,7 +96,8 @@ deno_core::extension!(deno_kv,
);
struct DatabaseResource<DB: Database + 'static> {
- db: Rc<DB>,
+ db: DB,
+ cancel_handle: Rc<CancelHandle>,
}
impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
@@ -96,6 +107,23 @@ impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
fn close(self: Rc<Self>) {
self.db.close();
+ self.cancel_handle.cancel();
+ }
+}
+
+struct DatabaseWatcherResource {
+ stream: AsyncRefCell<WatchStream>,
+ db_cancel_handle: Rc<CancelHandle>,
+ cancel_handle: Rc<CancelHandle>,
+}
+
+impl Resource for DatabaseWatcherResource {
+ fn name(&self) -> Cow<str> {
+ "databaseWatcher".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_handle.cancel()
}
}
@@ -118,10 +146,10 @@ where
state.borrow::<Rc<DBH>>().clone()
};
let db = handler.open(state.clone(), path).await?;
- let rid = state
- .borrow_mut()
- .resource_table
- .add(DatabaseResource { db: Rc::new(db) });
+ let rid = state.borrow_mut().resource_table.add(DatabaseResource {
+ db,
+ cancel_handle: CancelHandle::new_rc(),
+ });
Ok(rid)
}
@@ -354,6 +382,97 @@ where
Ok(Some((payload, handle_rid)))
}
+#[op2]
+#[smi]
+fn op_kv_watch<DBH>(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+ #[serde] keys: Vec<KvKey>,
+) -> Result<ResourceId, AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let resource = state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+
+ if keys.len() > MAX_WATCHED_KEYS {
+ return Err(type_error(format!(
+ "too many keys (max {})",
+ MAX_WATCHED_KEYS
+ )));
+ }
+
+ let keys: Vec<Vec<u8>> = keys
+ .into_iter()
+ .map(encode_v8_key)
+ .collect::<std::io::Result<_>>()?;
+
+ for k in &keys {
+ check_read_key_size(k)?;
+ }
+
+ let stream = resource.db.watch(keys);
+
+ let rid = state.resource_table.add(DatabaseWatcherResource {
+ stream: AsyncRefCell::new(stream),
+ db_cancel_handle: resource.cancel_handle.clone(),
+ cancel_handle: CancelHandle::new_rc(),
+ });
+
+ Ok(rid)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase", untagged)]
+enum WatchEntry {
+ Changed(Option<ToV8KvEntry>),
+ Unchanged,
+}
+
+#[op2(async)]
+#[serde]
+async fn op_kv_watch_next(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+) -> Result<Option<Vec<WatchEntry>>, AnyError> {
+ let resource = {
+ let state = state.borrow();
+ let resource = state.resource_table.get::<DatabaseWatcherResource>(rid)?;
+ resource.clone()
+ };
+
+ let db_cancel_handle = resource.db_cancel_handle.clone();
+ let cancel_handle = resource.cancel_handle.clone();
+ let stream = RcRef::map(resource, |r| &r.stream)
+ .borrow_mut()
+ .or_cancel(db_cancel_handle)
+ .or_cancel(cancel_handle)
+ .await;
+ let Ok(Ok(mut stream)) = stream else {
+ return Ok(None);
+ };
+
+ // doesn't need a cancel handle because the stream ends when the database
+ // connection is closed
+ let Some(res) = stream.next().await else {
+ return Ok(None);
+ };
+
+ let entries = res?;
+ let entries = entries
+ .into_iter()
+ .map(|entry| {
+ Ok(match entry {
+ WatchKeyOutput::Changed { entry } => {
+ WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?)
+ }
+ WatchKeyOutput::Unchanged => WatchEntry::Unchanged,
+ })
+ })
+ .collect::<Result<_, anyhow::Error>>()?;
+
+ Ok(Some(entries))
+}
+
#[op2(async)]
async fn op_kv_finish_dequeued_message<DBH>(
state: Rc<RefCell<OpState>>,
diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs
index 7cac6b9c3..855b091fa 100644
--- a/ext/kv/remote.rs
+++ b/ext/kv/remote.rs
@@ -66,6 +66,15 @@ pub struct PermissionChecker<P: RemoteDbHandlerPermissions> {
_permissions: PhantomData<P>,
}
+impl<P: RemoteDbHandlerPermissions> Clone for PermissionChecker<P> {
+ fn clone(&self) -> Self {
+ Self {
+ state: self.state.clone(),
+ _permissions: PhantomData,
+ }
+ }
+}
+
impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions
for PermissionChecker<P>
{
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index 2e7b97126..e0facace4 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -8,7 +8,6 @@ use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
-use std::sync::Arc;
use std::sync::Mutex;
use std::sync::OnceLock;
@@ -19,14 +18,14 @@ use deno_core::unsync::spawn_blocking;
use deno_core::OpState;
use deno_node::PathClean;
pub use denokv_sqlite::SqliteBackendError;
+use denokv_sqlite::SqliteNotifier;
use rand::RngCore;
use rand::SeedableRng;
use rusqlite::OpenFlags;
-use tokio::sync::Notify;
use crate::DatabaseHandler;
-static QUEUE_WAKER_MAP: OnceLock<Mutex<HashMap<PathBuf, Arc<Notify>>>> =
+static SQLITE_NOTIFIERS_MAP: OnceLock<Mutex<HashMap<PathBuf, SqliteNotifier>>> =
OnceLock::new();
pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> {
@@ -85,49 +84,48 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
let path = path.clone();
let default_storage_dir = self.default_storage_dir.clone();
- let (conn, queue_waker_key) = spawn_blocking(move || {
+ let (conn, notifier_key) = spawn_blocking(move || {
denokv_sqlite::sqlite_retry_loop(|| {
- let (conn, queue_waker_key) =
- match (path.as_deref(), &default_storage_dir) {
- (Some(":memory:"), _) | (None, None) => {
- (rusqlite::Connection::open_in_memory()?, None)
- }
- (Some(path), _) => {
- let flags =
- OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
- let resolved_path = canonicalize_path(&PathBuf::from(path))
- .map_err(|_| SqliteBackendError::DatabaseClosed)?;
- (
- rusqlite::Connection::open_with_flags(path, flags)?,
- Some(resolved_path),
- )
- }
- (None, Some(path)) => {
- std::fs::create_dir_all(path)
- .map_err(|_| SqliteBackendError::DatabaseClosed)?;
- let path = path.join("kv.sqlite3");
- (rusqlite::Connection::open(path.clone())?, Some(path))
- }
- };
+ let (conn, notifier_key) = match (path.as_deref(), &default_storage_dir)
+ {
+ (Some(":memory:"), _) | (None, None) => {
+ (rusqlite::Connection::open_in_memory()?, None)
+ }
+ (Some(path), _) => {
+ let flags =
+ OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
+ let resolved_path = canonicalize_path(&PathBuf::from(path))
+ .map_err(anyhow::Error::from)?;
+ (
+ rusqlite::Connection::open_with_flags(path, flags)?,
+ Some(resolved_path),
+ )
+ }
+ (None, Some(path)) => {
+ std::fs::create_dir_all(path).map_err(anyhow::Error::from)?;
+ let path = path.join("kv.sqlite3");
+ (rusqlite::Connection::open(path.clone())?, Some(path))
+ }
+ };
conn.pragma_update(None, "journal_mode", "wal")?;
- Ok::<_, SqliteBackendError>((conn, queue_waker_key))
+ Ok::<_, SqliteBackendError>((conn, notifier_key))
})
})
.await
.unwrap()?;
- let dequeue_notify = if let Some(queue_waker_key) = queue_waker_key {
- QUEUE_WAKER_MAP
+ let notifier = if let Some(notifier_key) = notifier_key {
+ SQLITE_NOTIFIERS_MAP
.get_or_init(Default::default)
.lock()
.unwrap()
- .entry(queue_waker_key)
+ .entry(notifier_key)
.or_default()
.clone()
} else {
- Arc::new(Notify::new())
+ SqliteNotifier::default()
};
let versionstamp_rng: Box<dyn RngCore + Send> =
@@ -136,7 +134,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
None => Box::new(rand::rngs::StdRng::from_entropy()),
};
- denokv_sqlite::Sqlite::new(conn, dequeue_notify, versionstamp_rng)
+ denokv_sqlite::Sqlite::new(conn, notifier, versionstamp_rng)
}
}
diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs
index 8e5742636..6700a49f1 100644
--- a/test_util/src/lib.rs
+++ b/test_util/src/lib.rs
@@ -11,6 +11,7 @@ use denokv_proto::datapath::AtomicWriteStatus;
use denokv_proto::datapath::ReadRangeOutput;
use denokv_proto::datapath::SnapshotRead;
use denokv_proto::datapath::SnapshotReadOutput;
+use denokv_proto::datapath::SnapshotReadStatus;
use fastwebsockets::FragmentCollector;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
@@ -1226,6 +1227,7 @@ async fn main_server(
.collect(),
read_disabled: false,
read_is_strongly_consistent: true,
+ status: SnapshotReadStatus::SrSuccess.into(),
}
.encode_to_vec(),
))