summaryrefslogtreecommitdiff
path: root/ext/kv/sqlite.rs
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2023-12-05 14:21:46 +0100
committerGitHub <noreply@github.com>2023-12-05 21:21:46 +0800
commit74e39a927c63e789fec1c8f1817812920079229d (patch)
treefa38e32c700865b25710f491d551086733d58d5f /ext/kv/sqlite.rs
parenta24d3e8763bc48b69936db9231efb76766914303 (diff)
feat(unstable): kv.watch() (#21147)
This commit adds support for a new `kv.watch()` method that allows watching for changes to a key-value pair. This is useful for cases where you want to be notified when a key-value pair changes, but don't want to have to poll for changes. --------- Co-authored-by: losfair <zhy20000919@hotmail.com>
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r--ext/kv/sqlite.rs62
1 files changed, 30 insertions, 32 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index 2e7b97126..e0facace4 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -8,7 +8,6 @@ use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
-use std::sync::Arc;
use std::sync::Mutex;
use std::sync::OnceLock;
@@ -19,14 +18,14 @@ use deno_core::unsync::spawn_blocking;
use deno_core::OpState;
use deno_node::PathClean;
pub use denokv_sqlite::SqliteBackendError;
+use denokv_sqlite::SqliteNotifier;
use rand::RngCore;
use rand::SeedableRng;
use rusqlite::OpenFlags;
-use tokio::sync::Notify;
use crate::DatabaseHandler;
-static QUEUE_WAKER_MAP: OnceLock<Mutex<HashMap<PathBuf, Arc<Notify>>>> =
+static SQLITE_NOTIFIERS_MAP: OnceLock<Mutex<HashMap<PathBuf, SqliteNotifier>>> =
OnceLock::new();
pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> {
@@ -85,49 +84,48 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
let path = path.clone();
let default_storage_dir = self.default_storage_dir.clone();
- let (conn, queue_waker_key) = spawn_blocking(move || {
+ let (conn, notifier_key) = spawn_blocking(move || {
denokv_sqlite::sqlite_retry_loop(|| {
- let (conn, queue_waker_key) =
- match (path.as_deref(), &default_storage_dir) {
- (Some(":memory:"), _) | (None, None) => {
- (rusqlite::Connection::open_in_memory()?, None)
- }
- (Some(path), _) => {
- let flags =
- OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
- let resolved_path = canonicalize_path(&PathBuf::from(path))
- .map_err(|_| SqliteBackendError::DatabaseClosed)?;
- (
- rusqlite::Connection::open_with_flags(path, flags)?,
- Some(resolved_path),
- )
- }
- (None, Some(path)) => {
- std::fs::create_dir_all(path)
- .map_err(|_| SqliteBackendError::DatabaseClosed)?;
- let path = path.join("kv.sqlite3");
- (rusqlite::Connection::open(path.clone())?, Some(path))
- }
- };
+ let (conn, notifier_key) = match (path.as_deref(), &default_storage_dir)
+ {
+ (Some(":memory:"), _) | (None, None) => {
+ (rusqlite::Connection::open_in_memory()?, None)
+ }
+ (Some(path), _) => {
+ let flags =
+ OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
+ let resolved_path = canonicalize_path(&PathBuf::from(path))
+ .map_err(anyhow::Error::from)?;
+ (
+ rusqlite::Connection::open_with_flags(path, flags)?,
+ Some(resolved_path),
+ )
+ }
+ (None, Some(path)) => {
+ std::fs::create_dir_all(path).map_err(anyhow::Error::from)?;
+ let path = path.join("kv.sqlite3");
+ (rusqlite::Connection::open(path.clone())?, Some(path))
+ }
+ };
conn.pragma_update(None, "journal_mode", "wal")?;
- Ok::<_, SqliteBackendError>((conn, queue_waker_key))
+ Ok::<_, SqliteBackendError>((conn, notifier_key))
})
})
.await
.unwrap()?;
- let dequeue_notify = if let Some(queue_waker_key) = queue_waker_key {
- QUEUE_WAKER_MAP
+ let notifier = if let Some(notifier_key) = notifier_key {
+ SQLITE_NOTIFIERS_MAP
.get_or_init(Default::default)
.lock()
.unwrap()
- .entry(queue_waker_key)
+ .entry(notifier_key)
.or_default()
.clone()
} else {
- Arc::new(Notify::new())
+ SqliteNotifier::default()
};
let versionstamp_rng: Box<dyn RngCore + Send> =
@@ -136,7 +134,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
None => Box::new(rand::rngs::StdRng::from_entropy()),
};
- denokv_sqlite::Sqlite::new(conn, dequeue_notify, versionstamp_rng)
+ denokv_sqlite::Sqlite::new(conn, notifier, versionstamp_rng)
}
}