diff options
author | Igor Zinkovsky <igor@deno.com> | 2023-09-26 20:06:57 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-26 20:06:57 -0700 |
commit | f0a022bed4c8ca46c472c6361f1793cb29f73480 (patch) | |
tree | 4f88ffcd77e94292a8696111ba6828a197eb153e /ext/kv/lib.rs | |
parent | b433133a1ff1c329b05ce024d8db5fefb8f7b431 (diff) |
fix(kv_queues): graceful shutdown (#20627)
This fixes the `TypeError: Database closed` error during shutdown.
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r-- | ext/kv/lib.rs | 20 |
1 files changed, 16 insertions, 4 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index 72d5e862b..762009d2a 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -16,6 +16,7 @@ use chrono::Utc; use codec::decode_key; use codec::encode_key; use deno_core::anyhow::Context; +use deno_core::error::get_custom_error_class; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op2; @@ -322,24 +323,35 @@ impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> { async fn op_kv_dequeue_next_message<DBH>( state: Rc<RefCell<OpState>>, #[smi] rid: ResourceId, -) -> Result<(ToJsBuffer, ResourceId), AnyError> +) -> Result<Option<(ToJsBuffer, ResourceId)>, AnyError> where DBH: DatabaseHandler + 'static, { let db = { let state = state.borrow(); let resource = - state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?; + match state.resource_table.get::<DatabaseResource<DBH::DB>>(rid) { + Ok(resource) => resource, + Err(err) => { + if get_custom_error_class(&err) == Some("BadResource") { + return Ok(None); + } else { + return Err(err); + } + } + }; resource.db.clone() }; - let mut handle = db.dequeue_next_message(state.clone()).await?; + let Some(mut handle) = db.dequeue_next_message(state.clone()).await? else { + return Ok(None); + }; let payload = handle.take_payload().await?.into(); let handle_rid = { let mut state = state.borrow_mut(); state.resource_table.add(QueueMessageResource { handle }) }; - Ok((payload, handle_rid)) + Ok(Some((payload, handle_rid))) } #[op2(async)] |