diff options
author | Luca Casonato <hello@lcas.dev> | 2023-12-05 14:21:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-05 21:21:46 +0800 |
commit | 74e39a927c63e789fec1c8f1817812920079229d (patch) | |
tree | fa38e32c700865b25710f491d551086733d58d5f /ext | |
parent | a24d3e8763bc48b69936db9231efb76766914303 (diff) |
feat(unstable): kv.watch() (#21147)
This commit adds support for a new `kv.watch()` method that allows
watching for changes to a key-value pair. This is useful for cases
where you want to be notified when a key-value pair changes, but
don't want to have to poll for changes.
---------
Co-authored-by: losfair <zhy20000919@hotmail.com>
Diffstat (limited to 'ext')
-rw-r--r-- | ext/kv/01_db.ts | 67 | ||||
-rw-r--r-- | ext/kv/dynamic.rs | 34 | ||||
-rw-r--r-- | ext/kv/lib.rs | 129 | ||||
-rw-r--r-- | ext/kv/remote.rs | 9 | ||||
-rw-r--r-- | ext/kv/sqlite.rs | 62 |
5 files changed, 254 insertions, 47 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 34678261a..73deee27f 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -11,8 +11,10 @@ const { SymbolFor, SymbolToStringTag, Uint8ArrayPrototype, + Error, } = globalThis.__bootstrap.primordials; import { SymbolDispose } from "ext:deno_web/00_infra.js"; +import { ReadableStream } from "ext:deno_web/06_streams.js"; const core = Deno.core; const ops = core.ops; @@ -297,6 +299,71 @@ class Kv { finishMessageOps.clear(); } + watch(keys: Deno.KvKey[], options = {}) { + const raw = options.raw ?? false; + const rid = ops.op_kv_watch(this.#rid, keys); + const lastEntries: (Deno.KvEntryMaybe<unknown> | undefined)[] = Array.from( + { length: keys.length }, + () => undefined, + ); + return new ReadableStream({ + async pull(controller) { + while (true) { + let updates; + try { + updates = await core.opAsync("op_kv_watch_next", rid); + } catch (err) { + core.tryClose(rid); + controller.error(err); + return; + } + if (updates === null) { + core.tryClose(rid); + controller.close(); + return; + } + let changed = false; + for (let i = 0; i < keys.length; i++) { + if (updates[i] === "unchanged") { + if (lastEntries[i] === undefined) { + throw new Error( + "watch: invalid unchanged update (internal error)", + ); + } + continue; + } + if ( + lastEntries[i] !== undefined && + (updates[i]?.versionstamp ?? null) === + lastEntries[i]?.versionstamp + ) { + continue; + } + changed = true; + if (updates[i] === null) { + lastEntries[i] = { + key: [...keys[i]], + value: null, + versionstamp: null, + }; + } else { + lastEntries[i] = updates[i]; + } + } + if (!changed && !raw) continue; // no change + const entries = lastEntries.map((entry) => + entry.versionstamp === null ? { ...entry } : deserializeValue(entry) + ); + controller.enqueue(entries); + return; + } + }, + cancel() { + core.tryClose(rid); + }, + }); + } + close() { core.close(this.#rid); } diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs index b772d26b8..c8dd6640c 100644 --- a/ext/kv/dynamic.rs +++ b/ext/kv/dynamic.rs @@ -18,6 +18,7 @@ use deno_core::error::AnyError; use deno_core::OpState; use denokv_proto::CommitResult; use denokv_proto::ReadRangeOutput; +use denokv_proto::WatchStream; pub struct MultiBackendDbHandler { backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>, @@ -55,7 +56,7 @@ impl MultiBackendDbHandler { #[async_trait(?Send)] impl DatabaseHandler for MultiBackendDbHandler { - type DB = Box<dyn DynamicDb>; + type DB = RcDynamicDb; async fn open( &self, @@ -88,12 +89,12 @@ pub trait DynamicDbHandler { &self, state: Rc<RefCell<OpState>>, path: Option<String>, - ) -> Result<Box<dyn DynamicDb>, AnyError>; + ) -> Result<RcDynamicDb, AnyError>; } #[async_trait(?Send)] impl DatabaseHandler for Box<dyn DynamicDbHandler> { - type DB = Box<dyn DynamicDb>; + type DB = RcDynamicDb; async fn open( &self, @@ -114,8 +115,8 @@ where &self, state: Rc<RefCell<OpState>>, path: Option<String>, - ) -> Result<Box<dyn DynamicDb>, AnyError> { - Ok(Box::new(self.open(state, path).await?)) + ) -> Result<RcDynamicDb, AnyError> { + Ok(RcDynamicDb(Rc::new(self.open(state, path).await?))) } } @@ -136,11 +137,16 @@ pub trait DynamicDb { &self, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>; + fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream; + fn dyn_close(&self); } +#[derive(Clone)] +pub struct RcDynamicDb(Rc<dyn DynamicDb>); + #[async_trait(?Send)] -impl Database for Box<dyn DynamicDb> { +impl Database for RcDynamicDb { type QMH = Box<dyn QueueMessageHandle>; async fn snapshot_read( @@ -148,24 +154,28 @@ impl Database for Box<dyn DynamicDb> { requests: Vec<ReadRange>, options: SnapshotReadOptions, ) -> Result<Vec<ReadRangeOutput>, AnyError> { - (**self).dyn_snapshot_read(requests, options).await + (*self.0).dyn_snapshot_read(requests, options).await } async fn atomic_write( &self, write: AtomicWrite, ) -> Result<Option<CommitResult>, AnyError> { - (**self).dyn_atomic_write(write).await + (*self.0).dyn_atomic_write(write).await } async fn dequeue_next_message( &self, ) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> { - (**self).dyn_dequeue_next_message().await + (*self.0).dyn_dequeue_next_message().await + } + + fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream { + (*self.0).dyn_watch(keys) } fn close(&self) { - (**self).dyn_close() + (*self.0).dyn_close() } } @@ -201,6 +211,10 @@ where ) } + fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream { + self.watch(keys) + } + fn dyn_close(&self) { self.close() } diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index c0091d75d..456a1ebf7 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -20,12 +20,17 @@ use deno_core::anyhow::Context; use deno_core::error::get_custom_error_class; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::StreamExt; use deno_core::op2; use deno_core::serde_v8::AnyValue; use deno_core::serde_v8::BigInt; +use deno_core::AsyncRefCell; use deno_core::ByteString; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::JsBuffer; use deno_core::OpState; +use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ToJsBuffer; @@ -45,6 +50,8 @@ use denokv_proto::MutationKind; use denokv_proto::QueueMessageHandle; use denokv_proto::ReadRange; use denokv_proto::SnapshotReadOptions; +use denokv_proto::WatchKeyOutput; +use denokv_proto::WatchStream; use log::debug; use serde::Deserialize; use serde::Serialize; @@ -62,6 +69,7 @@ const MAX_READ_RANGES: usize = 10; const MAX_READ_ENTRIES: usize = 1000; const MAX_CHECKS: usize = 100; const MAX_MUTATIONS: usize = 1000; +const MAX_WATCHED_KEYS: usize = 10; const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 800 * 1024; const MAX_TOTAL_KEY_SIZE_BYTES: usize = 80 * 1024; @@ -75,6 +83,8 @@ deno_core::extension!(deno_kv, op_kv_encode_cursor, op_kv_dequeue_next_message<DBH>, op_kv_finish_dequeued_message<DBH>, + op_kv_watch<DBH>, + op_kv_watch_next, ], esm = [ "01_db.ts" ], options = { @@ -86,7 +96,8 @@ deno_core::extension!(deno_kv, ); struct DatabaseResource<DB: Database + 'static> { - db: Rc<DB>, + db: DB, + cancel_handle: Rc<CancelHandle>, } impl<DB: Database + 'static> Resource for DatabaseResource<DB> { @@ -96,6 +107,23 @@ impl<DB: Database + 'static> Resource for DatabaseResource<DB> { fn close(self: Rc<Self>) { self.db.close(); + self.cancel_handle.cancel(); + } +} + +struct DatabaseWatcherResource { + stream: AsyncRefCell<WatchStream>, + db_cancel_handle: Rc<CancelHandle>, + cancel_handle: Rc<CancelHandle>, +} + +impl Resource for DatabaseWatcherResource { + fn name(&self) -> Cow<str> { + "databaseWatcher".into() + } + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel() } } @@ -118,10 +146,10 @@ where state.borrow::<Rc<DBH>>().clone() }; let db = handler.open(state.clone(), path).await?; - let rid = state - .borrow_mut() - .resource_table - .add(DatabaseResource { db: Rc::new(db) }); + let rid = state.borrow_mut().resource_table.add(DatabaseResource { + db, + cancel_handle: CancelHandle::new_rc(), + }); Ok(rid) } @@ -354,6 +382,97 @@ where Ok(Some((payload, handle_rid))) } +#[op2] +#[smi] +fn op_kv_watch<DBH>( + state: &mut OpState, + #[smi] rid: ResourceId, + #[serde] keys: Vec<KvKey>, +) -> Result<ResourceId, AnyError> +where + DBH: DatabaseHandler + 'static, +{ + let resource = state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?; + + if keys.len() > MAX_WATCHED_KEYS { + return Err(type_error(format!( + "too many keys (max {})", + MAX_WATCHED_KEYS + ))); + } + + let keys: Vec<Vec<u8>> = keys + .into_iter() + .map(encode_v8_key) + .collect::<std::io::Result<_>>()?; + + for k in &keys { + check_read_key_size(k)?; + } + + let stream = resource.db.watch(keys); + + let rid = state.resource_table.add(DatabaseWatcherResource { + stream: AsyncRefCell::new(stream), + db_cancel_handle: resource.cancel_handle.clone(), + cancel_handle: CancelHandle::new_rc(), + }); + + Ok(rid) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase", untagged)] +enum WatchEntry { + Changed(Option<ToV8KvEntry>), + Unchanged, +} + +#[op2(async)] +#[serde] +async fn op_kv_watch_next( + state: Rc<RefCell<OpState>>, + #[smi] rid: ResourceId, +) -> Result<Option<Vec<WatchEntry>>, AnyError> { + let resource = { + let state = state.borrow(); + let resource = state.resource_table.get::<DatabaseWatcherResource>(rid)?; + resource.clone() + }; + + let db_cancel_handle = resource.db_cancel_handle.clone(); + let cancel_handle = resource.cancel_handle.clone(); + let stream = RcRef::map(resource, |r| &r.stream) + .borrow_mut() + .or_cancel(db_cancel_handle) + .or_cancel(cancel_handle) + .await; + let Ok(Ok(mut stream)) = stream else { + return Ok(None); + }; + + // doesn't need a cancel handle because the stream ends when the database + // connection is closed + let Some(res) = stream.next().await else { + return Ok(None); + }; + + let entries = res?; + let entries = entries + .into_iter() + .map(|entry| { + Ok(match entry { + WatchKeyOutput::Changed { entry } => { + WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?) + } + WatchKeyOutput::Unchanged => WatchEntry::Unchanged, + }) + }) + .collect::<Result<_, anyhow::Error>>()?; + + Ok(Some(entries)) +} + #[op2(async)] async fn op_kv_finish_dequeued_message<DBH>( state: Rc<RefCell<OpState>>, diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs index 7cac6b9c3..855b091fa 100644 --- a/ext/kv/remote.rs +++ b/ext/kv/remote.rs @@ -66,6 +66,15 @@ pub struct PermissionChecker<P: RemoteDbHandlerPermissions> { _permissions: PhantomData<P>, } +impl<P: RemoteDbHandlerPermissions> Clone for PermissionChecker<P> { + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + _permissions: PhantomData, + } + } +} + impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions for PermissionChecker<P> { diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index 2e7b97126..e0facace4 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -8,7 +8,6 @@ use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; -use std::sync::Arc; use std::sync::Mutex; use std::sync::OnceLock; @@ -19,14 +18,14 @@ use deno_core::unsync::spawn_blocking; use deno_core::OpState; use deno_node::PathClean; pub use denokv_sqlite::SqliteBackendError; +use denokv_sqlite::SqliteNotifier; use rand::RngCore; use rand::SeedableRng; use rusqlite::OpenFlags; -use tokio::sync::Notify; use crate::DatabaseHandler; -static QUEUE_WAKER_MAP: OnceLock<Mutex<HashMap<PathBuf, Arc<Notify>>>> = +static SQLITE_NOTIFIERS_MAP: OnceLock<Mutex<HashMap<PathBuf, SqliteNotifier>>> = OnceLock::new(); pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> { @@ -85,49 +84,48 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { let path = path.clone(); let default_storage_dir = self.default_storage_dir.clone(); - let (conn, queue_waker_key) = spawn_blocking(move || { + let (conn, notifier_key) = spawn_blocking(move || { denokv_sqlite::sqlite_retry_loop(|| { - let (conn, queue_waker_key) = - match (path.as_deref(), &default_storage_dir) { - (Some(":memory:"), _) | (None, None) => { - (rusqlite::Connection::open_in_memory()?, None) - } - (Some(path), _) => { - let flags = - OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); - let resolved_path = canonicalize_path(&PathBuf::from(path)) - .map_err(|_| SqliteBackendError::DatabaseClosed)?; - ( - rusqlite::Connection::open_with_flags(path, flags)?, - Some(resolved_path), - ) - } - (None, Some(path)) => { - std::fs::create_dir_all(path) - .map_err(|_| SqliteBackendError::DatabaseClosed)?; - let path = path.join("kv.sqlite3"); - (rusqlite::Connection::open(path.clone())?, Some(path)) - } - }; + let (conn, notifier_key) = match (path.as_deref(), &default_storage_dir) + { + (Some(":memory:"), _) | (None, None) => { + (rusqlite::Connection::open_in_memory()?, None) + } + (Some(path), _) => { + let flags = + OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); + let resolved_path = canonicalize_path(&PathBuf::from(path)) + .map_err(anyhow::Error::from)?; + ( + rusqlite::Connection::open_with_flags(path, flags)?, + Some(resolved_path), + ) + } + (None, Some(path)) => { + std::fs::create_dir_all(path).map_err(anyhow::Error::from)?; + let path = path.join("kv.sqlite3"); + (rusqlite::Connection::open(path.clone())?, Some(path)) + } + }; conn.pragma_update(None, "journal_mode", "wal")?; - Ok::<_, SqliteBackendError>((conn, queue_waker_key)) + Ok::<_, SqliteBackendError>((conn, notifier_key)) }) }) .await .unwrap()?; - let dequeue_notify = if let Some(queue_waker_key) = queue_waker_key { - QUEUE_WAKER_MAP + let notifier = if let Some(notifier_key) = notifier_key { + SQLITE_NOTIFIERS_MAP .get_or_init(Default::default) .lock() .unwrap() - .entry(queue_waker_key) + .entry(notifier_key) .or_default() .clone() } else { - Arc::new(Notify::new()) + SqliteNotifier::default() }; let versionstamp_rng: Box<dyn RngCore + Send> = @@ -136,7 +134,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> { None => Box::new(rand::rngs::StdRng::from_entropy()), }; - denokv_sqlite::Sqlite::new(conn, dequeue_notify, versionstamp_rng) + denokv_sqlite::Sqlite::new(conn, notifier, versionstamp_rng) } } |