summaryrefslogtreecommitdiff
path: root/ext/kv/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r--ext/kv/lib.rs65
1 files changed, 64 insertions, 1 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index dbc626225..2763fcf50 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -8,6 +8,7 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::num::NonZeroU32;
use std::rc::Rc;
+use std::vec;
use codec::decode_key;
use codec::encode_key;
@@ -60,6 +61,8 @@ deno_core::extension!(deno_kv,
op_kv_snapshot_read<DBH>,
op_kv_atomic_write<DBH>,
op_kv_encode_cursor,
+ op_kv_dequeue_next_message<DBH>,
+ op_kv_finish_dequeued_message<DBH>,
],
esm = [ "01_db.ts" ],
options = {
@@ -80,6 +83,10 @@ impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
fn name(&self) -> Cow<str> {
"database".into()
}
+
+ fn close(self: Rc<Self>) {
+ self.db.close();
+ }
}
#[op]
@@ -280,6 +287,62 @@ where
Ok(output_ranges)
}
+struct QueueMessageResource<QPH: QueueMessageHandle + 'static> {
+ handle: QPH,
+}
+
+impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
+ fn name(&self) -> Cow<str> {
+ "queue_message".into()
+ }
+}
+
+#[op]
+async fn op_kv_dequeue_next_message<DBH>(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+) -> Result<(ZeroCopyBuf, ResourceId), AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let db = {
+ let state = state.borrow();
+ let resource =
+ state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+ resource.db.clone()
+ };
+
+ let mut handle = db.dequeue_next_message().await?;
+ let payload = handle.take_payload().await?.into();
+ let handle_rid = {
+ let mut state = state.borrow_mut();
+ state.resource_table.add(QueueMessageResource { handle })
+ };
+ Ok((payload, handle_rid))
+}
+
+#[op]
+async fn op_kv_finish_dequeued_message<DBH>(
+ state: Rc<RefCell<OpState>>,
+ handle_rid: ResourceId,
+ success: bool,
+) -> Result<(), AnyError>
+where
+ DBH: DatabaseHandler + 'static,
+{
+ let handle = {
+ let mut state = state.borrow_mut();
+ let handle = state
+ .resource_table
+ .take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid)
+ .map_err(|_| type_error("Queue message not found"))?;
+ Rc::try_unwrap(handle)
+ .map_err(|_| type_error("Queue message not found"))?
+ .handle
+ };
+ handle.finish(success).await
+}
+
type V8KvCheck = (KvKey, Option<ByteString>);
impl TryFrom<V8KvCheck> for KvCheck {
@@ -333,7 +396,7 @@ impl TryFrom<V8Enqueue> for Enqueue {
fn try_from(value: V8Enqueue) -> Result<Self, AnyError> {
Ok(Enqueue {
payload: value.0.to_vec(),
- deadline_ms: value.1,
+ delay_ms: value.1,
keys_if_undelivered: value
.2
.into_iter()