summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIgor Zinkovsky <igor@deno.com>2023-08-29 11:24:44 -0700
committerGitHub <noreply@github.com>2023-08-29 11:24:44 -0700
commit441b860978afd0160a459aee7e42ccc97263dc23 (patch)
tree34be6a90ed1a14849d76d87c1f07a443ba3a8039
parentc4451d307697ec05029f2645ab787e9d1981145e (diff)
fix(ext/kv): don't panic if listening on queues and KV is not closed (#20317)
fixes #20312
-rw-r--r--cli/tests/integration/js_unit_tests.rs1
-rw-r--r--cli/tests/unit/kv_queue_test_no_db_close.ts25
-rw-r--r--ext/kv/sqlite.rs43
3 files changed, 55 insertions, 14 deletions
diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs
index f54280b23..4bb2ef73b 100644
--- a/cli/tests/integration/js_unit_tests.rs
+++ b/cli/tests/integration/js_unit_tests.rs
@@ -46,6 +46,7 @@ util::unit_test_factory!(
intl_test,
io_test,
kv_test,
+ kv_queue_test_no_db_close,
kv_queue_undelivered_test,
link_test,
make_temp_test,
diff --git a/cli/tests/unit/kv_queue_test_no_db_close.ts b/cli/tests/unit/kv_queue_test_no_db_close.ts
new file mode 100644
index 000000000..e639574a3
--- /dev/null
+++ b/cli/tests/unit/kv_queue_test_no_db_close.ts
@@ -0,0 +1,25 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+import {
+ assert,
+ assertEquals,
+ assertNotEquals,
+ deferred,
+} from "./test_util.ts";
+
+Deno.test({
+ sanitizeOps: false,
+ sanitizeResources: false,
+}, async function queueTestNoDbClose() {
+ const db: Deno.Kv = await Deno.openKv(":memory:");
+ const promise = deferred();
+ let dequeuedMessage: unknown = null;
+ db.listenQueue((msg) => {
+ dequeuedMessage = msg;
+ promise.resolve();
+ });
+ const res = await db.enqueue("test");
+ assert(res.ok);
+ assertNotEquals(res.versionstamp, null);
+ await promise;
+ assertEquals(dequeuedMessage, "test");
+});
diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs
index ea46aae32..ceeb98c25 100644
--- a/ext/kv/sqlite.rs
+++ b/ext/kv/sqlite.rs
@@ -290,16 +290,7 @@ pub struct SqliteDb {
impl Drop for SqliteDb {
fn drop(&mut self) {
- self.expiration_watcher.abort();
-
- // The above `abort()` operation is asynchronous. It's not
- // guaranteed that the sqlite connection will be closed immediately.
- // So here we synchronously take the conn mutex and drop the connection.
- //
- // This blocks the event loop if the connection is still being used,
- // but ensures correctness - deleting the database file after calling
- // the `close` method will always work.
- self.conn.conn.lock().unwrap().take();
+ self.close();
}
}
@@ -449,9 +440,22 @@ impl SqliteQueue {
Self::requeue_inflight_messages(conn.clone()).await.unwrap();
// Continuous dequeue loop.
- Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
+ match Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
.await
- .unwrap();
+ {
+ Ok(_) => Ok(()),
+ Err(e) => {
+ // Exit the dequeue loop cleanly if the database has been closed.
+ if get_custom_error_class(&e) == Some("TypeError")
+ && e.to_string() == ERROR_USING_CLOSED_DATABASE
+ {
+ Ok(())
+ } else {
+ Err(e)
+ }
+ }
+ }
+ .unwrap();
});
Self {
@@ -490,7 +494,7 @@ impl SqliteQueue {
}
fn shutdown(&self) {
- self.shutdown_tx.send(()).unwrap();
+ let _ = self.shutdown_tx.send(());
}
async fn dequeue_loop(
@@ -573,7 +577,7 @@ impl SqliteQueue {
};
tokio::select! {
_ = sleep_fut => {}
- _ = waker_rx.recv() => {}
+ x = waker_rx.recv() => if x.is_none() {return Ok(());},
_ = shutdown_rx.changed() => return Ok(())
}
}
@@ -913,6 +917,17 @@ impl Database for SqliteDb {
if let Some(queue) = self.queue.get() {
queue.shutdown();
}
+
+ self.expiration_watcher.abort();
+
+ // The above `abort()` operation is asynchronous. It's not
+ // guaranteed that the sqlite connection will be closed immediately.
+ // So here we synchronously take the conn mutex and drop the connection.
+ //
+ // This blocks the event loop if the connection is still being used,
+ // but ensures correctness - deleting the database file after calling
+ // the `close` method will always work.
+ self.conn.conn.lock().unwrap().take();
}
}