diff options
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r-- | ext/kv/lib.rs | 65 |
1 files changed, 64 insertions, 1 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index dbc626225..2763fcf50 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -8,6 +8,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::num::NonZeroU32; use std::rc::Rc; +use std::vec; use codec::decode_key; use codec::encode_key; @@ -60,6 +61,8 @@ deno_core::extension!(deno_kv, op_kv_snapshot_read<DBH>, op_kv_atomic_write<DBH>, op_kv_encode_cursor, + op_kv_dequeue_next_message<DBH>, + op_kv_finish_dequeued_message<DBH>, ], esm = [ "01_db.ts" ], options = { @@ -80,6 +83,10 @@ impl<DB: Database + 'static> Resource for DatabaseResource<DB> { fn name(&self) -> Cow<str> { "database".into() } + + fn close(self: Rc<Self>) { + self.db.close(); + } } #[op] @@ -280,6 +287,62 @@ where Ok(output_ranges) } +struct QueueMessageResource<QPH: QueueMessageHandle + 'static> { + handle: QPH, +} + +impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> { + fn name(&self) -> Cow<str> { + "queue_message".into() + } +} + +#[op] +async fn op_kv_dequeue_next_message<DBH>( + state: Rc<RefCell<OpState>>, + rid: ResourceId, +) -> Result<(ZeroCopyBuf, ResourceId), AnyError> +where + DBH: DatabaseHandler + 'static, +{ + let db = { + let state = state.borrow(); + let resource = + state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?; + resource.db.clone() + }; + + let mut handle = db.dequeue_next_message().await?; + let payload = handle.take_payload().await?.into(); + let handle_rid = { + let mut state = state.borrow_mut(); + state.resource_table.add(QueueMessageResource { handle }) + }; + Ok((payload, handle_rid)) +} + +#[op] +async fn op_kv_finish_dequeued_message<DBH>( + state: Rc<RefCell<OpState>>, + handle_rid: ResourceId, + success: bool, +) -> Result<(), AnyError> +where + DBH: DatabaseHandler + 'static, +{ + let handle = { + let mut state = state.borrow_mut(); + let handle = state + .resource_table + .take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid) + .map_err(|_| type_error("Queue message not found"))?; + Rc::try_unwrap(handle) + .map_err(|_| type_error("Queue message not found"))? + .handle + }; + handle.finish(success).await +} + type V8KvCheck = (KvKey, Option<ByteString>); impl TryFrom<V8KvCheck> for KvCheck { @@ -333,7 +396,7 @@ impl TryFrom<V8Enqueue> for Enqueue { fn try_from(value: V8Enqueue) -> Result<Self, AnyError> { Ok(Enqueue { payload: value.0.to_vec(), - deadline_ms: value.1, + delay_ms: value.1, keys_if_undelivered: value .2 .into_iter() |