From fd9d6baea311d9b227b130749647a86838ba2ccc Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Tue, 13 Jun 2023 17:49:57 -0700 Subject: feat(kv) queue implementation (#19459) Extend the unstable `Deno.Kv` API to support queues. --- ext/kv/lib.rs | 65 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) (limited to 'ext/kv/lib.rs') diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index dbc626225..2763fcf50 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -8,6 +8,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::num::NonZeroU32; use std::rc::Rc; +use std::vec; use codec::decode_key; use codec::encode_key; @@ -60,6 +61,8 @@ deno_core::extension!(deno_kv, op_kv_snapshot_read, op_kv_atomic_write, op_kv_encode_cursor, + op_kv_dequeue_next_message, + op_kv_finish_dequeued_message, ], esm = [ "01_db.ts" ], options = { @@ -80,6 +83,10 @@ impl Resource for DatabaseResource { fn name(&self) -> Cow { "database".into() } + + fn close(self: Rc) { + self.db.close(); + } } #[op] @@ -280,6 +287,62 @@ where Ok(output_ranges) } +struct QueueMessageResource { + handle: QPH, +} + +impl Resource for QueueMessageResource { + fn name(&self) -> Cow { + "queue_message".into() + } +} + +#[op] +async fn op_kv_dequeue_next_message( + state: Rc>, + rid: ResourceId, +) -> Result<(ZeroCopyBuf, ResourceId), AnyError> +where + DBH: DatabaseHandler + 'static, +{ + let db = { + let state = state.borrow(); + let resource = + state.resource_table.get::>(rid)?; + resource.db.clone() + }; + + let mut handle = db.dequeue_next_message().await?; + let payload = handle.take_payload().await?.into(); + let handle_rid = { + let mut state = state.borrow_mut(); + state.resource_table.add(QueueMessageResource { handle }) + }; + Ok((payload, handle_rid)) +} + +#[op] +async fn op_kv_finish_dequeued_message( + state: Rc>, + handle_rid: ResourceId, + success: bool, +) -> Result<(), AnyError> +where + DBH: DatabaseHandler + 'static, +{ + let handle = { + let mut state = state.borrow_mut(); + let handle = state + .resource_table + .take::::DB as Database>::QMH>>(handle_rid) + .map_err(|_| type_error("Queue message not found"))?; + Rc::try_unwrap(handle) + .map_err(|_| type_error("Queue message not found"))? + .handle + }; + handle.finish(success).await +} + type V8KvCheck = (KvKey, Option); impl TryFrom for KvCheck { @@ -333,7 +396,7 @@ impl TryFrom for Enqueue { fn try_from(value: V8Enqueue) -> Result { Ok(Enqueue { payload: value.0.to_vec(), - deadline_ms: value.1, + delay_ms: value.1, keys_if_undelivered: value .2 .into_iter() -- cgit v1.2.3