summaryrefslogtreecommitdiff
path: root/ext/kv/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r--ext/kv/lib.rs129
1 files changed, 124 insertions, 5 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index c0091d75d..456a1ebf7 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -20,12 +20,17 @@ use deno_core::anyhow::Context;
use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::serde_v8::AnyValue;
use deno_core::serde_v8::BigInt;
+use deno_core::AsyncRefCell;
use deno_core::ByteString;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::OpState;
+use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ToJsBuffer;
@@ -45,6 +50,8 @@ use denokv_proto::MutationKind;
use denokv_proto::QueueMessageHandle;
use denokv_proto::ReadRange;
use denokv_proto::SnapshotReadOptions;
+use denokv_proto::WatchKeyOutput;
+use denokv_proto::WatchStream;
use log::debug;
use serde::Deserialize;
use serde::Serialize;
@@ -62,6 +69,7 @@ const MAX_READ_RANGES: usize = 10;
const MAX_READ_ENTRIES: usize = 1000;
const MAX_CHECKS: usize = 100;
const MAX_MUTATIONS: usize = 1000;
+const MAX_WATCHED_KEYS: usize = 10;
const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 800 * 1024;
const MAX_TOTAL_KEY_SIZE_BYTES: usize = 80 * 1024;
@@ -75,6 +83,8 @@ deno_core::extension!(deno_kv,
op_kv_encode_cursor,
op_kv_dequeue_next_message<DBH>,
op_kv_finish_dequeued_message<DBH>,
+ op_kv_watch<DBH>,
+ op_kv_watch_next,
],
esm = [ "01_db.ts" ],
options = {
@@ -86,7 +96,8 @@ deno_core::extension!(deno_kv,
);
struct DatabaseResource<DB: Database + 'static> {
- db: Rc<DB>,
+ db: DB,
+ cancel_handle: Rc<CancelHandle>,
}
impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
@@ -96,6 +107,23 @@ impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
fn close(self: Rc<Self>) {
self.db.close();
+ self.cancel_handle.cancel();
+ }
+}
+
+struct DatabaseWatcherResource {
+ stream: AsyncRefCell<WatchStream>,
+ db_cancel_handle: Rc<CancelHandle>,
+ cancel_handle: Rc<CancelHandle>,
+}
+
+impl Resource for DatabaseWatcherResource {
+ fn name(&self) -> Cow<str> {
+ "databaseWatcher".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_handle.cancel()
}
}
@@ -118,10 +146,10 @@ where
state.borrow::<Rc<DBH>>().clone()
};
let db = handler.open(state.clone(), path).await?;
- let rid = state
- .borrow_mut()
- .resource_table
- .add(DatabaseResource { db: Rc::new(db) });
+ let rid = state.borrow_mut().resource_table.add(DatabaseResource {
+ db,
+ cancel_handle: CancelHandle::new_rc(),
+ });
Ok(rid)
}
@@ -354,6 +382,97 @@ where
Ok(Some((payload, handle_rid)))
}
+#[op2]
+#[smi]
+fn op_kv_watch<DBH>(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+ #[serde] keys: Vec<KvKey>,
+) -> Result<ResourceId, AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let resource = state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+
+ if keys.len() > MAX_WATCHED_KEYS {
+ return Err(type_error(format!(
+ "too many keys (max {})",
+ MAX_WATCHED_KEYS
+ )));
+ }
+
+ let keys: Vec<Vec<u8>> = keys
+ .into_iter()
+ .map(encode_v8_key)
+ .collect::<std::io::Result<_>>()?;
+
+ for k in &keys {
+ check_read_key_size(k)?;
+ }
+
+ let stream = resource.db.watch(keys);
+
+ let rid = state.resource_table.add(DatabaseWatcherResource {
+ stream: AsyncRefCell::new(stream),
+ db_cancel_handle: resource.cancel_handle.clone(),
+ cancel_handle: CancelHandle::new_rc(),
+ });
+
+ Ok(rid)
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase", untagged)]
+enum WatchEntry {
+ Changed(Option<ToV8KvEntry>),
+ Unchanged,
+}
+
+#[op2(async)]
+#[serde]
+async fn op_kv_watch_next(
+ state: Rc<RefCell<OpState>>,
+ #[smi] rid: ResourceId,
+) -> Result<Option<Vec<WatchEntry>>, AnyError> {
+ let resource = {
+ let state = state.borrow();
+ let resource = state.resource_table.get::<DatabaseWatcherResource>(rid)?;
+ resource.clone()
+ };
+
+ let db_cancel_handle = resource.db_cancel_handle.clone();
+ let cancel_handle = resource.cancel_handle.clone();
+ let stream = RcRef::map(resource, |r| &r.stream)
+ .borrow_mut()
+ .or_cancel(db_cancel_handle)
+ .or_cancel(cancel_handle)
+ .await;
+ let Ok(Ok(mut stream)) = stream else {
+ return Ok(None);
+ };
+
+ // doesn't need a cancel handle because the stream ends when the database
+ // connection is closed
+ let Some(res) = stream.next().await else {
+ return Ok(None);
+ };
+
+ let entries = res?;
+ let entries = entries
+ .into_iter()
+ .map(|entry| {
+ Ok(match entry {
+ WatchKeyOutput::Changed { entry } => {
+ WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?)
+ }
+ WatchKeyOutput::Unchanged => WatchEntry::Unchanged,
+ })
+ })
+ .collect::<Result<_, anyhow::Error>>()?;
+
+ Ok(Some(entries))
+}
+
#[op2(async)]
async fn op_kv_finish_dequeued_message<DBH>(
state: Rc<RefCell<OpState>>,