summaryrefslogtreecommitdiff
path: root/ext/kv/sqlite.rs
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-09-26 20:06:57 -0700
committerGitHub <noreply@github.com>2023-09-26 20:06:57 -0700
commitf0a022bed4c8ca46c472c6361f1793cb29f73480 (patch)
tree4f88ffcd77e94292a8696111ba6828a197eb153e /ext/kv/sqlite.rs
parentb433133a1ff1c329b05ce024d8db5fefb8f7b431 (diff)
fix(kv_queues): graceful shutdown (#20627)
This fixes the `TypeError: Database closed` error during shutdown.
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r--ext/kv/sqlite.rs47
1 files changed, 25 insertions, 22 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index ceeb98c25..192141e27 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -395,9 +395,7 @@ impl QueueMessageHandle for DequeuedMessage {
Err(e) => {
// Silently ignore the error if the database has been closed
// This message will be delivered on the next run
- if get_custom_error_class(&e) == Some("TypeError")
- && e.to_string() == ERROR_USING_CLOSED_DATABASE
- {
+ if is_conn_closed_error(&e) {
return Ok(());
}
return Err(e);
@@ -437,25 +435,25 @@ impl SqliteQueue {
spawn(async move {
// Oneshot requeue of all inflight messages.
- Self::requeue_inflight_messages(conn.clone()).await.unwrap();
+ if let Err(e) = Self::requeue_inflight_messages(conn.clone()).await {
+ // Exit the dequeue loop cleanly if the database has been closed.
+ if is_conn_closed_error(&e) {
+ return;
+ }
+ panic!("kv: Error in requeue_inflight_messages: {}", e);
+ }
// Continuous dequeue loop.
- match Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
- .await
+ if let Err(e) =
+ Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
+ .await
{
- Ok(_) => Ok(()),
- Err(e) => {
- // Exit the dequeue loop cleanly if the database has been closed.
- if get_custom_error_class(&e) == Some("TypeError")
- && e.to_string() == ERROR_USING_CLOSED_DATABASE
- {
- Ok(())
- } else {
- Err(e)
- }
+ // Exit the dequeue loop cleanly if the database has been closed.
+ if is_conn_closed_error(&e) {
+ return;
}
+ panic!("kv: Error in dequeue_loop: {}", e);
}
- .unwrap();
});
Self {
@@ -467,25 +465,25 @@ impl SqliteQueue {
}
}
- async fn dequeue(&self) -> Result<DequeuedMessage, AnyError> {
+ async fn dequeue(&self) -> Result<Option<DequeuedMessage>, AnyError> {
// Wait for the next message to be available from dequeue_rx.
let (payload, id) = {
let mut queue_rx = self.dequeue_rx.borrow_mut().await;
let Some(msg) = queue_rx.recv().await else {
- return Err(type_error("Database closed"));
+ return Ok(None);
};
msg
};
let permit = self.concurrency_limiter.clone().acquire_owned().await?;
- Ok(DequeuedMessage {
+ Ok(Some(DequeuedMessage {
conn: self.conn.downgrade(),
id,
payload: Some(payload),
waker_tx: self.waker_tx.clone(),
_permit: permit,
- })
+ }))
}
async fn wake(&self) -> Result<(), AnyError> {
@@ -904,7 +902,7 @@ impl Database for SqliteDb {
async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
- ) -> Result<Self::QMH, AnyError> {
+ ) -> Result<Option<Self::QMH>, AnyError> {
let queue = self
.queue
.get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) })
@@ -1013,3 +1011,8 @@ fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
}
}
}
+
+fn is_conn_closed_error(e: &AnyError) -> bool {
+ get_custom_error_class(e) == Some("TypeError")
+ && e.to_string() == ERROR_USING_CLOSED_DATABASE
+}