summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2023-12-05 14:21:46 +0100
committerGitHub <noreply@github.com>2023-12-05 21:21:46 +0800
commit74e39a927c63e789fec1c8f1817812920079229d (patch)
treefa38e32c700865b25710f491d551086733d58d5f /ext
parenta24d3e8763bc48b69936db9231efb76766914303 (diff)
feat(unstable): kv.watch() (#21147)
This commit adds support for a new `kv.watch()` method that allows watching for changes to a key-value pair. This is useful for cases where you want to be notified when a key-value pair changes, but don't want to have to poll for changes. --------- Co-authored-by: losfair <zhy20000919@hotmail.com>
Diffstat (limited to 'ext')
-rw-r--r--ext/kv/01_db.ts67
-rw-r--r--ext/kv/dynamic.rs34
-rw-r--r--ext/kv/lib.rs129
-rw-r--r--ext/kv/remote.rs9
-rw-r--r--ext/kv/sqlite.rs62
5 files changed, 254 insertions, 47 deletions
diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts
index 34678261a..73deee27f 100644
--- a/ext/kv/01_db.ts
+++ b/ext/kv/01_db.ts
@@ -11,8 +11,10 @@ const {
SymbolFor,
SymbolToStringTag,
Uint8ArrayPrototype,
+ Error,
} = globalThis.__bootstrap.primordials;
import { SymbolDispose } from "ext:deno_web/00_infra.js";
+import { ReadableStream } from "ext:deno_web/06_streams.js";
const core = Deno.core;
const ops = core.ops;
@@ -297,6 +299,71 @@ class Kv {
finishMessageOps.clear();
}
+ watch(keys: Deno.KvKey[], options = {}) {
+ const raw = options.raw ?? false;
+ const rid = ops.op_kv_watch(this.#rid, keys);
+ const lastEntries: (Deno.KvEntryMaybe<unknown> | undefined)[] = Array.from(
+ { length: keys.length },
+ () => undefined,
+ );
+ return new ReadableStream({
+ async pull(controller) {
+ while (true) {
+ let updates;
+ try {
+ updates = await core.opAsync("op_kv_watch_next", rid);
+ } catch (err) {
+ core.tryClose(rid);
+ controller.error(err);
+ return;
+ }
+ if (updates === null) {
+ core.tryClose(rid);
+ controller.close();
+ return;
+ }
+ let changed = false;
+ for (let i = 0; i < keys.length; i++) {
+ if (updates[i] === "unchanged") {
+ if (lastEntries[i] === undefined) {
+ throw new Error(
+ "watch: invalid unchanged update (internal error)",
+ );
+ }
+ continue;
+ }
+ if (
+ lastEntries[i] !== undefined &&
+ (updates[i]?.versionstamp ?? null) ===
+ lastEntries[i]?.versionstamp
+ ) {
+ continue;
+ }
+ changed = true;
+ if (updates[i] === null) {
+ lastEntries[i] = {
+ key: [...keys[i]],
+ value: null,
+ versionstamp: null,
+ };
+ } else {
+ lastEntries[i] = updates[i];
+ }
+ }
+ if (!changed && !raw) continue; // no change
+ const entries = lastEntries.map((entry) =>
+ entry.versionstamp === null ? { ...entry } : deserializeValue(entry)
+ );
+ controller.enqueue(entries);
+ return;
+ }
+ },
+ cancel() {
+ core.tryClose(rid);
+ },
+ });
+ }
+
close() {
core.close(this.#rid);
}
diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs
index b772d26b8..c8dd6640c 100644
--- a/ext/kv/dynamic.rs
+++ b/ext/kv/dynamic.rs
@@ -18,6 +18,7 @@ use deno_core::error::AnyError;
use deno_core::OpState;
use denokv_proto::CommitResult;
use denokv_proto::ReadRangeOutput;
+use denokv_proto::WatchStream;
pub struct MultiBackendDbHandler {
backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
@@ -55,7 +56,7 @@ impl MultiBackendDbHandler {
#[async_trait(?Send)]
impl DatabaseHandler for MultiBackendDbHandler {
- type DB = Box<dyn DynamicDb>;
+ type DB = RcDynamicDb;
async fn open(
&self,
@@ -88,12 +89,12 @@ pub trait DynamicDbHandler {
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
- ) -> Result<Box<dyn DynamicDb>, AnyError>;
+ ) -> Result<RcDynamicDb, AnyError>;
}
#[async_trait(?Send)]
impl DatabaseHandler for Box<dyn DynamicDbHandler> {
- type DB = Box<dyn DynamicDb>;
+ type DB = RcDynamicDb;
async fn open(
&self,
@@ -114,8 +115,8 @@ where
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
- ) -> Result<Box<dyn DynamicDb>, AnyError> {
- Ok(Box::new(self.open(state, path).await?))
+ ) -> Result<RcDynamicDb, AnyError> {
+ Ok(RcDynamicDb(Rc::new(self.open(state, path).await?)))
}
}
@@ -136,11 +137,16 @@ pub trait DynamicDb {
&self,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;
+ fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream;
+
fn dyn_close(&self);
}
+#[derive(Clone)]
+pub struct RcDynamicDb(Rc<dyn DynamicDb>);
+
#[async_trait(?Send)]
-impl Database for Box<dyn DynamicDb> {
+impl Database for RcDynamicDb {
type QMH = Box<dyn QueueMessageHandle>;
async fn snapshot_read(
@@ -148,24 +154,28 @@ impl Database for Box<dyn DynamicDb> {
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
- (**self).dyn_snapshot_read(requests, options).await
+ (*self.0).dyn_snapshot_read(requests, options).await
}
async fn atomic_write(
&self,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
- (**self).dyn_atomic_write(write).await
+ (*self.0).dyn_atomic_write(write).await
}
async fn dequeue_next_message(
&self,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
- (**self).dyn_dequeue_next_message().await
+ (*self.0).dyn_dequeue_next_message().await
+ }
+
+ fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
+ (*self.0).dyn_watch(keys)
}
fn close(&self) {
- (**self).dyn_close()
+ (*self.0).dyn_close()
}
}
@@ -201,6 +211,10 @@ where
)
}
+ fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
+ self.watch(keys)
+ }
+
fn dyn_close(&self) {
self.close()
}
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index c0091d75d..456a1ebf7 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -20,12 +20,17 @@ use deno_core::anyhow::Context;
use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::serde_v8::AnyValue;
use deno_core::serde_v8::BigInt;
+use deno_core::AsyncRefCell;
use deno_core::ByteString;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::OpState;
+use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ToJsBuffer;
@@ -45,6 +50,8 @@ use denokv_proto::MutationKind;
use denokv_proto::QueueMessageHandle;
use denokv_proto::ReadRange;
use denokv_proto::SnapshotReadOptions;
+use denokv_proto::WatchKeyOutput;
+use denokv_proto::WatchStream;
use log::debug;
use serde::Deserialize;
use serde::Serialize;
@@ -62,6 +69,7 @@ const MAX_READ_RANGES: usize = 10;
const MAX_READ_ENTRIES: usize = 1000;
const MAX_CHECKS: usize = 100;
const MAX_MUTATIONS: usize = 1000;
+const MAX_WATCHED_KEYS: usize = 10;
const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 800 * 1024;
const MAX_TOTAL_KEY_SIZE_BYTES: usize = 80 * 1024;
@@ -75,6 +83,8 @@ deno_core::extension!(deno_kv,
op_kv_encode_cursor,
op_kv_dequeue_next_message<DBH>,
op_kv_finish_dequeued_message<DBH>,
+ op_kv_watch<DBH>,
+ op_kv_watch_next,
],
esm = [ "01_db.ts" ],
options = {
@@ -86,7 +96,8 @@ deno_core::extension!(deno_kv,
);
struct DatabaseResource<DB: Database + 'static> {
- db: Rc<DB>,
+ db: DB,
+ cancel_handle: Rc<CancelHandle>,
}
impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
@@ -96,6 +107,23 @@ impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
fn close(self: Rc<Self>) {
self.db.close();
+ self.cancel_handle.cancel();
+ }
+}
+
+struct DatabaseWatcherResource {
+ stream: AsyncRefCell<WatchStream>,
+ db_cancel_handle: Rc<CancelHandle>,
+ cancel_handle: Rc<CancelHandle>,
+}
+
+impl Resource for DatabaseWatcherResource {
+ fn name(&self) -> Cow<str> {
+ "databaseWatcher".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_handle.cancel()
}
}
@@ -118,10 +146,10 @@ where
state.borrow::<Rc<DBH>>().clone()
};
let db = handler.open(state.clone(), path).await?;
- let rid = state
- .borrow_mut()
- .resource_table
- .add(DatabaseResource { db: Rc::new(db) });
+ let rid = state.borrow_mut().resource_table.add(DatabaseResource {
+ db,
+ cancel_handle: CancelHandle::new_rc(),
+ });
Ok(rid)
}
@@ -354,6 +382,97 @@ where
Ok(Some((payload, handle_rid)))
}
+#[op2]
+#[smi]
+fn op_kv_watch<DBH>(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+ #[serde] keys: Vec<KvKey>,
+) -> Result<ResourceId, AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let resource = state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+
+ if keys.len() > MAX_WATCHED_KEYS {
+ return Err(type_error(format!(
+ "too many keys (max {})",
+ MAX_WATCHED_KEYS
+ )));
+ }
+
+ let keys: Vec<Vec<u8>> = keys
+ .into_iter()
+ .map(encode_v8_key)
+ .collect::<std::io::Result<_>>()?;
+
+ for k in &keys {
+ check_read_key_size(k)?;
+ }
+
+ let stream = resource.db.watch(keys);
+
+ let rid = state.resource_table.add(DatabaseWatcherResource {
+ stream: AsyncRefCell::new(stream),
+ db_cancel_handle: resource.cancel_handle.clone(),
+ cancel_handle: CancelHandle::new_rc(),
+ });
+
+ Ok(rid)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase", untagged)]
+enum WatchEntry {
+ Changed(Option<ToV8KvEntry>),
+ Unchanged,
+}
+
+#[op2(async)]
+#[serde]
+async fn op_kv_watch_next(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+) -> Result<Option<Vec<WatchEntry>>, AnyError> {
+ let resource = {
+ let state = state.borrow();
+ let resource = state.resource_table.get::<DatabaseWatcherResource>(rid)?;
+ resource.clone()
+ };
+
+ let db_cancel_handle = resource.db_cancel_handle.clone();
+ let cancel_handle = resource.cancel_handle.clone();
+ let stream = RcRef::map(resource, |r| &r.stream)
+ .borrow_mut()
+ .or_cancel(db_cancel_handle)
+ .or_cancel(cancel_handle)
+ .await;
+ let Ok(Ok(mut stream)) = stream else {
+ return Ok(None);
+ };
+
+ // doesn't need a cancel handle because the stream ends when the database
+ // connection is closed
+ let Some(res) = stream.next().await else {
+ return Ok(None);
+ };
+
+ let entries = res?;
+ let entries = entries
+ .into_iter()
+ .map(|entry| {
+ Ok(match entry {
+ WatchKeyOutput::Changed { entry } => {
+ WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?)
+ }
+ WatchKeyOutput::Unchanged => WatchEntry::Unchanged,
+ })
+ })
+ .collect::<Result<_, anyhow::Error>>()?;
+
+ Ok(Some(entries))
+}
+
#[op2(async)]
async fn op_kv_finish_dequeued_message<DBH>(
state: Rc<RefCell<OpState>>,
diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs
index 7cac6b9c3..855b091fa 100644
--- a/ext/kv/remote.rs
+++ b/ext/kv/remote.rs
@@ -66,6 +66,15 @@ pub struct PermissionChecker<P: RemoteDbHandlerPermissions> {
_permissions: PhantomData<P>,
}
+impl<P: RemoteDbHandlerPermissions> Clone for PermissionChecker<P> {
+ fn clone(&self) -> Self {
+ Self {
+ state: self.state.clone(),
+ _permissions: PhantomData,
+ }
+ }
+}
+
impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions
for PermissionChecker<P>
{
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index 2e7b97126..e0facace4 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -8,7 +8,6 @@ use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
-use std::sync::Arc;
use std::sync::Mutex;
use std::sync::OnceLock;
@@ -19,14 +18,14 @@ use deno_core::unsync::spawn_blocking;
use deno_core::OpState;
use deno_node::PathClean;
pub use denokv_sqlite::SqliteBackendError;
+use denokv_sqlite::SqliteNotifier;
use rand::RngCore;
use rand::SeedableRng;
use rusqlite::OpenFlags;
-use tokio::sync::Notify;
use crate::DatabaseHandler;
-static QUEUE_WAKER_MAP: OnceLock<Mutex<HashMap<PathBuf, Arc<Notify>>>> =
+static SQLITE_NOTIFIERS_MAP: OnceLock<Mutex<HashMap<PathBuf, SqliteNotifier>>> =
OnceLock::new();
pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> {
@@ -85,49 +84,48 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
let path = path.clone();
let default_storage_dir = self.default_storage_dir.clone();
- let (conn, queue_waker_key) = spawn_blocking(move || {
+ let (conn, notifier_key) = spawn_blocking(move || {
denokv_sqlite::sqlite_retry_loop(|| {
- let (conn, queue_waker_key) =
- match (path.as_deref(), &default_storage_dir) {
- (Some(":memory:"), _) | (None, None) => {
- (rusqlite::Connection::open_in_memory()?, None)
- }
- (Some(path), _) => {
- let flags =
- OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
- let resolved_path = canonicalize_path(&PathBuf::from(path))
- .map_err(|_| SqliteBackendError::DatabaseClosed)?;
- (
- rusqlite::Connection::open_with_flags(path, flags)?,
- Some(resolved_path),
- )
- }
- (None, Some(path)) => {
- std::fs::create_dir_all(path)
- .map_err(|_| SqliteBackendError::DatabaseClosed)?;
- let path = path.join("kv.sqlite3");
- (rusqlite::Connection::open(path.clone())?, Some(path))
- }
- };
+ let (conn, notifier_key) = match (path.as_deref(), &default_storage_dir)
+ {
+ (Some(":memory:"), _) | (None, None) => {
+ (rusqlite::Connection::open_in_memory()?, None)
+ }
+ (Some(path), _) => {
+ let flags =
+ OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
+ let resolved_path = canonicalize_path(&PathBuf::from(path))
+ .map_err(anyhow::Error::from)?;
+ (
+ rusqlite::Connection::open_with_flags(path, flags)?,
+ Some(resolved_path),
+ )
+ }
+ (None, Some(path)) => {
+ std::fs::create_dir_all(path).map_err(anyhow::Error::from)?;
+ let path = path.join("kv.sqlite3");
+ (rusqlite::Connection::open(path.clone())?, Some(path))
+ }
+ };
conn.pragma_update(None, "journal_mode", "wal")?;
- Ok::<_, SqliteBackendError>((conn, queue_waker_key))
+ Ok::<_, SqliteBackendError>((conn, notifier_key))
})
})
.await
.unwrap()?;
- let dequeue_notify = if let Some(queue_waker_key) = queue_waker_key {
- QUEUE_WAKER_MAP
+ let notifier = if let Some(notifier_key) = notifier_key {
+ SQLITE_NOTIFIERS_MAP
.get_or_init(Default::default)
.lock()
.unwrap()
- .entry(queue_waker_key)
+ .entry(notifier_key)
.or_default()
.clone()
} else {
- Arc::new(Notify::new())
+ SqliteNotifier::default()
};
let versionstamp_rng: Box<dyn RngCore + Send> =
@@ -136,7 +134,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
None => Box::new(rand::rngs::StdRng::from_entropy()),
};
- denokv_sqlite::Sqlite::new(conn, dequeue_notify, versionstamp_rng)
+ denokv_sqlite::Sqlite::new(conn, notifier, versionstamp_rng)
}
}