diff options
author | Igor Zinkovsky <igor@deno.com> | 2023-08-29 11:24:44 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-29 11:24:44 -0700 |
commit | 441b860978afd0160a459aee7e42ccc97263dc23 (patch) | |
tree | 34be6a90ed1a14849d76d87c1f07a443ba3a8039 | |
parent | c4451d307697ec05029f2645ab787e9d1981145e (diff) |
fix(ext/kv): don't panic if listening on queues and KV is not closed (#20317)
fixes #20312
-rw-r--r-- | cli/tests/integration/js_unit_tests.rs | 1 | ||||
-rw-r--r-- | cli/tests/unit/kv_queue_test_no_db_close.ts | 25 | ||||
-rw-r--r-- | ext/kv/sqlite.rs | 43 |
3 files changed, 55 insertions, 14 deletions
diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs index f54280b23..4bb2ef73b 100644 --- a/cli/tests/integration/js_unit_tests.rs +++ b/cli/tests/integration/js_unit_tests.rs @@ -46,6 +46,7 @@ util::unit_test_factory!( intl_test, io_test, kv_test, + kv_queue_test_no_db_close, kv_queue_undelivered_test, link_test, make_temp_test, diff --git a/cli/tests/unit/kv_queue_test_no_db_close.ts b/cli/tests/unit/kv_queue_test_no_db_close.ts new file mode 100644 index 000000000..e639574a3 --- /dev/null +++ b/cli/tests/unit/kv_queue_test_no_db_close.ts @@ -0,0 +1,25 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +import { + assert, + assertEquals, + assertNotEquals, + deferred, +} from "./test_util.ts"; + +Deno.test({ + sanitizeOps: false, + sanitizeResources: false, +}, async function queueTestNoDbClose() { + const db: Deno.Kv = await Deno.openKv(":memory:"); + const promise = deferred(); + let dequeuedMessage: unknown = null; + db.listenQueue((msg) => { + dequeuedMessage = msg; + promise.resolve(); + }); + const res = await db.enqueue("test"); + assert(res.ok); + assertNotEquals(res.versionstamp, null); + await promise; + assertEquals(dequeuedMessage, "test"); +}); diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index ea46aae32..ceeb98c25 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -290,16 +290,7 @@ pub struct SqliteDb { impl Drop for SqliteDb { fn drop(&mut self) { - self.expiration_watcher.abort(); - - // The above `abort()` operation is asynchronous. It's not - // guaranteed that the sqlite connection will be closed immediately. - // So here we synchronously take the conn mutex and drop the connection. - // - // This blocks the event loop if the connection is still being used, - // but ensures correctness - deleting the database file after calling - // the `close` method will always work. - self.conn.conn.lock().unwrap().take(); + self.close(); } } @@ -449,9 +440,22 @@ impl SqliteQueue { Self::requeue_inflight_messages(conn.clone()).await.unwrap(); // Continuous dequeue loop. - Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx) + match Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx) .await - .unwrap(); + { + Ok(_) => Ok(()), + Err(e) => { + // Exit the dequeue loop cleanly if the database has been closed. + if get_custom_error_class(&e) == Some("TypeError") + && e.to_string() == ERROR_USING_CLOSED_DATABASE + { + Ok(()) + } else { + Err(e) + } + } + } + .unwrap(); }); Self { @@ -490,7 +494,7 @@ impl SqliteQueue { } fn shutdown(&self) { - self.shutdown_tx.send(()).unwrap(); + let _ = self.shutdown_tx.send(()); } async fn dequeue_loop( @@ -573,7 +577,7 @@ impl SqliteQueue { }; tokio::select! { _ = sleep_fut => {} - _ = waker_rx.recv() => {} + x = waker_rx.recv() => if x.is_none() {return Ok(());}, _ = shutdown_rx.changed() => return Ok(()) } } @@ -913,6 +917,17 @@ impl Database for SqliteDb { if let Some(queue) = self.queue.get() { queue.shutdown(); } + + self.expiration_watcher.abort(); + + // The above `abort()` operation is asynchronous. It's not + // guaranteed that the sqlite connection will be closed immediately. + // So here we synchronously take the conn mutex and drop the connection. + // + // This blocks the event loop if the connection is still being used, + // but ensures correctness - deleting the database file after calling + // the `close` method will always work. + self.conn.conn.lock().unwrap().take(); } } |