diff options
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r-- | ext/kv/lib.rs | 129 |
1 files changed, 124 insertions, 5 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index c0091d75d..456a1ebf7 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -20,12 +20,17 @@ use deno_core::anyhow::Context; use deno_core::error::get_custom_error_class; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::StreamExt; use deno_core::op2; use deno_core::serde_v8::AnyValue; use deno_core::serde_v8::BigInt; +use deno_core::AsyncRefCell; use deno_core::ByteString; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::JsBuffer; use deno_core::OpState; +use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ToJsBuffer; @@ -45,6 +50,8 @@ use denokv_proto::MutationKind; use denokv_proto::QueueMessageHandle; use denokv_proto::ReadRange; use denokv_proto::SnapshotReadOptions; +use denokv_proto::WatchKeyOutput; +use denokv_proto::WatchStream; use log::debug; use serde::Deserialize; use serde::Serialize; @@ -62,6 +69,7 @@ const MAX_READ_RANGES: usize = 10; const MAX_READ_ENTRIES: usize = 1000; const MAX_CHECKS: usize = 100; const MAX_MUTATIONS: usize = 1000; +const MAX_WATCHED_KEYS: usize = 10; const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 800 * 1024; const MAX_TOTAL_KEY_SIZE_BYTES: usize = 80 * 1024; @@ -75,6 +83,8 @@ deno_core::extension!(deno_kv, op_kv_encode_cursor, op_kv_dequeue_next_message<DBH>, op_kv_finish_dequeued_message<DBH>, + op_kv_watch<DBH>, + op_kv_watch_next, ], esm = [ "01_db.ts" ], options = { @@ -86,7 +96,8 @@ deno_core::extension!(deno_kv, ); struct DatabaseResource<DB: Database + 'static> { - db: Rc<DB>, + db: DB, + cancel_handle: Rc<CancelHandle>, } impl<DB: Database + 'static> Resource for DatabaseResource<DB> { @@ -96,6 +107,23 @@ impl<DB: Database + 'static> Resource for DatabaseResource<DB> { fn close(self: Rc<Self>) { self.db.close(); + self.cancel_handle.cancel(); + } +} + +struct DatabaseWatcherResource { + stream: AsyncRefCell<WatchStream>, + db_cancel_handle: Rc<CancelHandle>, + cancel_handle: Rc<CancelHandle>, +} + +impl Resource for DatabaseWatcherResource { + fn name(&self) -> Cow<str> { + "databaseWatcher".into() + } + + fn close(self: Rc<Self>) { + self.cancel_handle.cancel() } } @@ -118,10 +146,10 @@ where state.borrow::<Rc<DBH>>().clone() }; let db = handler.open(state.clone(), path).await?; - let rid = state - .borrow_mut() - .resource_table - .add(DatabaseResource { db: Rc::new(db) }); + let rid = state.borrow_mut().resource_table.add(DatabaseResource { + db, + cancel_handle: CancelHandle::new_rc(), + }); Ok(rid) } @@ -354,6 +382,97 @@ where Ok(Some((payload, handle_rid))) } +#[op2] +#[smi] +fn op_kv_watch<DBH>( + state: &mut OpState, + #[smi] rid: ResourceId, + #[serde] keys: Vec<KvKey>, +) -> Result<ResourceId, AnyError> +where + DBH: DatabaseHandler + 'static, +{ + let resource = state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?; + + if keys.len() > MAX_WATCHED_KEYS { + return Err(type_error(format!( + "too many keys (max {})", + MAX_WATCHED_KEYS + ))); + } + + let keys: Vec<Vec<u8>> = keys + .into_iter() + .map(encode_v8_key) + .collect::<std::io::Result<_>>()?; + + for k in &keys { + check_read_key_size(k)?; + } + + let stream = resource.db.watch(keys); + + let rid = state.resource_table.add(DatabaseWatcherResource { + stream: AsyncRefCell::new(stream), + db_cancel_handle: resource.cancel_handle.clone(), + cancel_handle: CancelHandle::new_rc(), + }); + + Ok(rid) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase", untagged)] +enum WatchEntry { + Changed(Option<ToV8KvEntry>), + Unchanged, +} + +#[op2(async)] +#[serde] +async fn op_kv_watch_next( + state: Rc<RefCell<OpState>>, + #[smi] rid: ResourceId, +) -> Result<Option<Vec<WatchEntry>>, AnyError> { + let resource = { + let state = state.borrow(); + let resource = state.resource_table.get::<DatabaseWatcherResource>(rid)?; + resource.clone() + }; + + let db_cancel_handle = resource.db_cancel_handle.clone(); + let cancel_handle = resource.cancel_handle.clone(); + let stream = RcRef::map(resource, |r| &r.stream) + .borrow_mut() + .or_cancel(db_cancel_handle) + .or_cancel(cancel_handle) + .await; + let Ok(Ok(mut stream)) = stream else { + return Ok(None); + }; + + // doesn't need a cancel handle because the stream ends when the database + // connection is closed + let Some(res) = stream.next().await else { + return Ok(None); + }; + + let entries = res?; + let entries = entries + .into_iter() + .map(|entry| { + Ok(match entry { + WatchKeyOutput::Changed { entry } => { + WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?) + } + WatchKeyOutput::Unchanged => WatchEntry::Unchanged, + }) + }) + .collect::<Result<_, anyhow::Error>>()?; + + Ok(Some(entries)) +} + #[op2(async)] async fn op_kv_finish_dequeued_message<DBH>( state: Rc<RefCell<OpState>>, |