diff options
author | Igor Zinkovsky <igor@deno.com> | 2023-09-26 20:06:57 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-26 20:06:57 -0700 |
commit | f0a022bed4c8ca46c472c6361f1793cb29f73480 (patch) | |
tree | 4f88ffcd77e94292a8696111ba6828a197eb153e /ext/kv/sqlite.rs | |
parent | b433133a1ff1c329b05ce024d8db5fefb8f7b431 (diff) |
fix(kv_queues): graceful shutdown (#20627)
This fixes the `TypeError: Database closed` error during shutdown.
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r-- | ext/kv/sqlite.rs | 47 |
1 files changed, 25 insertions, 22 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index ceeb98c25..192141e27 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -395,9 +395,7 @@ impl QueueMessageHandle for DequeuedMessage { Err(e) => { // Silently ignore the error if the database has been closed // This message will be delivered on the next run - if get_custom_error_class(&e) == Some("TypeError") - && e.to_string() == ERROR_USING_CLOSED_DATABASE - { + if is_conn_closed_error(&e) { return Ok(()); } return Err(e); @@ -437,25 +435,25 @@ impl SqliteQueue { spawn(async move { // Oneshot requeue of all inflight messages. - Self::requeue_inflight_messages(conn.clone()).await.unwrap(); + if let Err(e) = Self::requeue_inflight_messages(conn.clone()).await { + // Exit the dequeue loop cleanly if the database has been closed. + if is_conn_closed_error(&e) { + return; + } + panic!("kv: Error in requeue_inflight_messages: {}", e); + } // Continuous dequeue loop. - match Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx) - .await + if let Err(e) = + Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx) + .await { - Ok(_) => Ok(()), - Err(e) => { - // Exit the dequeue loop cleanly if the database has been closed. - if get_custom_error_class(&e) == Some("TypeError") - && e.to_string() == ERROR_USING_CLOSED_DATABASE - { - Ok(()) - } else { - Err(e) - } + // Exit the dequeue loop cleanly if the database has been closed. + if is_conn_closed_error(&e) { + return; } + panic!("kv: Error in dequeue_loop: {}", e); } - .unwrap(); }); Self { @@ -467,25 +465,25 @@ impl SqliteQueue { } } - async fn dequeue(&self) -> Result<DequeuedMessage, AnyError> { + async fn dequeue(&self) -> Result<Option<DequeuedMessage>, AnyError> { // Wait for the next message to be available from dequeue_rx. let (payload, id) = { let mut queue_rx = self.dequeue_rx.borrow_mut().await; let Some(msg) = queue_rx.recv().await else { - return Err(type_error("Database closed")); + return Ok(None); }; msg }; let permit = self.concurrency_limiter.clone().acquire_owned().await?; - Ok(DequeuedMessage { + Ok(Some(DequeuedMessage { conn: self.conn.downgrade(), id, payload: Some(payload), waker_tx: self.waker_tx.clone(), _permit: permit, - }) + })) } async fn wake(&self) -> Result<(), AnyError> { @@ -904,7 +902,7 @@ impl Database for SqliteDb { async fn dequeue_next_message( &self, _state: Rc<RefCell<OpState>>, - ) -> Result<Self::QMH, AnyError> { + ) -> Result<Option<Self::QMH>, AnyError> { let queue = self .queue .get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) }) @@ -1013,3 +1011,8 @@ fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) { } } } + +fn is_conn_closed_error(e: &AnyError) -> bool { + get_custom_error_class(e) == Some("TypeError") + && e.to_string() == ERROR_USING_CLOSED_DATABASE +} |