summaryrefslogtreecommitdiff
path: root/ext/kv/sqlite.rs
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-08-29 11:24:44 -0700
committerGitHub <noreply@github.com>2023-08-29 11:24:44 -0700
commit441b860978afd0160a459aee7e42ccc97263dc23 (patch)
tree34be6a90ed1a14849d76d87c1f07a443ba3a8039 /ext/kv/sqlite.rs
parentc4451d307697ec05029f2645ab787e9d1981145e (diff)
fix(ext/kv): don't panic if listening on queues and KV is not closed (#20317)
fixes #20312
Diffstat (limited to 'ext/kv/sqlite.rs')
-rw-r--r--ext/kv/sqlite.rs43
1 files changed, 29 insertions, 14 deletions
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index ea46aae32..ceeb98c25 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -290,16 +290,7 @@ pub struct SqliteDb {
impl Drop for SqliteDb {
fn drop(&mut self) {
- self.expiration_watcher.abort();
-
- // The above `abort()` operation is asynchronous. It's not
- // guaranteed that the sqlite connection will be closed immediately.
- // So here we synchronously take the conn mutex and drop the connection.
- //
- // This blocks the event loop if the connection is still being used,
- // but ensures correctness - deleting the database file after calling
- // the `close` method will always work.
- self.conn.conn.lock().unwrap().take();
+ self.close();
}
}
@@ -449,9 +440,22 @@ impl SqliteQueue {
Self::requeue_inflight_messages(conn.clone()).await.unwrap();
// Continuous dequeue loop.
- Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
+ match Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
.await
- .unwrap();
+ {
+ Ok(_) => Ok(()),
+ Err(e) => {
+ // Exit the dequeue loop cleanly if the database has been closed.
+ if get_custom_error_class(&e) == Some("TypeError")
+ && e.to_string() == ERROR_USING_CLOSED_DATABASE
+ {
+ Ok(())
+ } else {
+ Err(e)
+ }
+ }
+ }
+ .unwrap();
});
Self {
@@ -490,7 +494,7 @@ impl SqliteQueue {
}
fn shutdown(&self) {
- self.shutdown_tx.send(()).unwrap();
+ let _ = self.shutdown_tx.send(());
}
async fn dequeue_loop(
@@ -573,7 +577,7 @@ impl SqliteQueue {
};
tokio::select! {
_ = sleep_fut => {}
- _ = waker_rx.recv() => {}
+ x = waker_rx.recv() => if x.is_none() {return Ok(());},
_ = shutdown_rx.changed() => return Ok(())
}
}
@@ -913,6 +917,17 @@ impl Database for SqliteDb {
if let Some(queue) = self.queue.get() {
queue.shutdown();
}
+
+ self.expiration_watcher.abort();
+
+ // The above `abort()` operation is asynchronous. It's not
+ // guaranteed that the sqlite connection will be closed immediately.
+ // So here we synchronously take the conn mutex and drop the connection.
+ //
+ // This blocks the event loop if the connection is still being used,
+ // but ensures correctness - deleting the database file after calling
+ // the `close` method will always work.
+ self.conn.conn.lock().unwrap().take();
}
}