summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/tests/unit/kv_test.ts20
-rw-r--r--ext/kv/lib.rs15
2 files changed, 30 insertions, 5 deletions
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts
index 28c913f21..4963882e1 100644
--- a/cli/tests/unit/kv_test.ts
+++ b/cli/tests/unit/kv_test.ts
@@ -2248,3 +2248,23 @@ dbTest("set with key versionstamp suffix", async (db) => {
"expected string, number, bigint, ArrayBufferView, boolean",
);
});
+
+Deno.test({
+ name: "watch should stop when db closed",
+ async fn() {
+ const db = await Deno.openKv(":memory:");
+
+ const watch = db.watch([["a"]]);
+ const completion = (async () => {
+ for await (const _item of watch) {
+ // pass
+ }
+ })();
+
+ setTimeout(() => {
+ db.close();
+ }, 100);
+
+ await completion;
+ },
+});
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index e78526856..032a16863 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -444,16 +444,21 @@ async fn op_kv_watch_next(
let cancel_handle = resource.cancel_handle.clone();
let stream = RcRef::map(resource, |r| &r.stream)
.borrow_mut()
- .or_cancel(db_cancel_handle)
- .or_cancel(cancel_handle)
+ .or_cancel(db_cancel_handle.clone())
+ .or_cancel(cancel_handle.clone())
.await;
let Ok(Ok(mut stream)) = stream else {
return Ok(None);
};
- // doesn't need a cancel handle because the stream ends when the database
- // connection is closed
- let Some(res) = stream.next().await else {
+ // We hold a strong reference to `resource`, so we can't rely on the stream
+ // being dropped when the db connection is closed
+ let Ok(Ok(Some(res))) = stream
+ .next()
+ .or_cancel(db_cancel_handle)
+ .or_cancel(cancel_handle)
+ .await
+ else {
return Ok(None);
};