summaryrefslogtreecommitdiff
path: root/ext/kv/lib.rs
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-09-26 20:06:57 -0700
committerGitHub <noreply@github.com>2023-09-26 20:06:57 -0700
commitf0a022bed4c8ca46c472c6361f1793cb29f73480 (patch)
tree4f88ffcd77e94292a8696111ba6828a197eb153e /ext/kv/lib.rs
parentb433133a1ff1c329b05ce024d8db5fefb8f7b431 (diff)
fix(kv_queues): graceful shutdown (#20627)
This fixes the `TypeError: Database closed` error during shutdown.
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r--ext/kv/lib.rs20
1 files changed, 16 insertions, 4 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index 72d5e862b..762009d2a 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -16,6 +16,7 @@ use chrono::Utc;
use codec::decode_key;
use codec::encode_key;
use deno_core::anyhow::Context;
+use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;
@@ -322,24 +323,35 @@ impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
async fn op_kv_dequeue_next_message<DBH>(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<(ToJsBuffer, ResourceId), AnyError>
+) -> Result<Option<(ToJsBuffer, ResourceId)>, AnyError>
where
DBH: DatabaseHandler + 'static,
{
let db = {
let state = state.borrow();
let resource =
- state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
+ match state.resource_table.get::<DatabaseResource<DBH::DB>>(rid) {
+ Ok(resource) => resource,
+ Err(err) => {
+ if get_custom_error_class(&err) == Some("BadResource") {
+ return Ok(None);
+ } else {
+ return Err(err);
+ }
+ }
+ };
resource.db.clone()
};
- let mut handle = db.dequeue_next_message(state.clone()).await?;
+ let Some(mut handle) = db.dequeue_next_message(state.clone()).await? else {
+ return Ok(None);
+ };
let payload = handle.take_payload().await?.into();
let handle_rid = {
let mut state = state.borrow_mut();
state.resource_table.add(QueueMessageResource { handle })
};
- Ok((payload, handle_rid))
+ Ok(Some((payload, handle_rid)))
}
#[op2(async)]