diff options
author | Luca Casonato <hello@lcas.dev> | 2023-12-05 14:21:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-05 21:21:46 +0800 |
commit | 74e39a927c63e789fec1c8f1817812920079229d (patch) | |
tree | fa38e32c700865b25710f491d551086733d58d5f /ext/kv/sqlite.rs | |
parent | a24d3e8763bc48b69936db9231efb76766914303 (diff) |
feat(unstable): kv.watch() (#21147)
This commit adds support for a new `kv.watch()` method that allows
watching for changes to a key-value pair. This is useful for cases
where you want to be notified when a key-value pair changes, but
don't want to have to poll for changes.
---------
Co-authored-by: losfair <zhy20000919@hotmail.com>
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r-- | ext/kv/sqlite.rs | 62 |
1 files changed, 30 insertions, 32 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index 2e7b97126..e0facace4 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -8,7 +8,6 @@ use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; -use std::sync::Arc; use std::sync::Mutex; use std::sync::OnceLock; @@ -19,14 +18,14 @@ use deno_core::unsync::spawn_blocking; use deno_core::OpState; use deno_node::PathClean; pub use denokv_sqlite::SqliteBackendError; +use denokv_sqlite::SqliteNotifier; use rand::RngCore; use rand::SeedableRng; use rusqlite::OpenFlags; -use tokio::sync::Notify; use crate::DatabaseHandler; -static QUEUE_WAKER_MAP: OnceLock<Mutex<HashMap<PathBuf, Arc<Notify>>>> = +static SQLITE_NOTIFIERS_MAP: OnceLock<Mutex<HashMap<PathBuf, SqliteNotifier>>> = OnceLock::new(); pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> { @@ -85,49 +84,48 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { let path = path.clone(); let default_storage_dir = self.default_storage_dir.clone(); - let (conn, queue_waker_key) = spawn_blocking(move || { + let (conn, notifier_key) = spawn_blocking(move || { denokv_sqlite::sqlite_retry_loop(|| { - let (conn, queue_waker_key) = - match (path.as_deref(), &default_storage_dir) { - (Some(":memory:"), _) | (None, None) => { - (rusqlite::Connection::open_in_memory()?, None) - } - (Some(path), _) => { - let flags = - OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); - let resolved_path = canonicalize_path(&PathBuf::from(path)) - .map_err(|_| SqliteBackendError::DatabaseClosed)?; - ( - rusqlite::Connection::open_with_flags(path, flags)?, - Some(resolved_path), - ) - } - (None, Some(path)) => { - std::fs::create_dir_all(path) - .map_err(|_| SqliteBackendError::DatabaseClosed)?; - let path = path.join("kv.sqlite3"); - (rusqlite::Connection::open(path.clone())?, Some(path)) - } - }; + let (conn, notifier_key) = match (path.as_deref(), &default_storage_dir) + { + (Some(":memory:"), _) | (None, None) => { + (rusqlite::Connection::open_in_memory()?, None) + } + (Some(path), _) => { + let flags = + OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); + let resolved_path = canonicalize_path(&PathBuf::from(path)) + .map_err(anyhow::Error::from)?; + ( + rusqlite::Connection::open_with_flags(path, flags)?, + Some(resolved_path), + ) + } + (None, Some(path)) => { + std::fs::create_dir_all(path).map_err(anyhow::Error::from)?; + let path = path.join("kv.sqlite3"); + (rusqlite::Connection::open(path.clone())?, Some(path)) + } + }; conn.pragma_update(None, "journal_mode", "wal")?; - Ok::<_, SqliteBackendError>((conn, queue_waker_key)) + Ok::<_, SqliteBackendError>((conn, notifier_key)) }) }) .await .unwrap()?; - let dequeue_notify = if let Some(queue_waker_key) = queue_waker_key { - QUEUE_WAKER_MAP + let notifier = if let Some(notifier_key) = notifier_key { + SQLITE_NOTIFIERS_MAP .get_or_init(Default::default) .lock() .unwrap() - .entry(queue_waker_key) + .entry(notifier_key) .or_default() .clone() } else { - Arc::new(Notify::new()) + SqliteNotifier::default() }; let versionstamp_rng: Box<dyn RngCore + Send> = @@ -136,7 +134,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { None => Box::new(rand::rngs::StdRng::from_entropy()), }; - denokv_sqlite::Sqlite::new(conn, dequeue_notify, versionstamp_rng) + denokv_sqlite::Sqlite::new(conn, notifier, versionstamp_rng) } } |