diff options
author | Heyang Zhou <zhy20000919@hotmail.com> | 2023-12-22 05:04:17 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-22 05:04:17 +0800 |
commit | 3fb4f3fe5a18916aa95f8b035ca994c290c173dc (patch) | |
tree | 0b7640ab4939a794bdf7ae66d6e8b4fb74943c9a | |
parent | 760af934d9b4bf8d0ab7f47263dd7cb9675db7a5 (diff) |
fix(unstable): kv watch should stop when db is closed (#21665)
Fixes #21634.
-rw-r--r-- | cli/tests/unit/kv_test.ts | 20 | ||||
-rw-r--r-- | ext/kv/lib.rs | 15 |
2 files changed, 30 insertions, 5 deletions
diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 28c913f21..4963882e1 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -2248,3 +2248,23 @@ dbTest("set with key versionstamp suffix", async (db) => { "expected string, number, bigint, ArrayBufferView, boolean", ); }); + +Deno.test({ + name: "watch should stop when db closed", + async fn() { + const db = await Deno.openKv(":memory:"); + + const watch = db.watch([["a"]]); + const completion = (async () => { + for await (const _item of watch) { + // pass + } + })(); + + setTimeout(() => { + db.close(); + }, 100); + + await completion; + }, +}); diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index e78526856..032a16863 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -444,16 +444,21 @@ async fn op_kv_watch_next( let cancel_handle = resource.cancel_handle.clone(); let stream = RcRef::map(resource, |r| &r.stream) .borrow_mut() - .or_cancel(db_cancel_handle) - .or_cancel(cancel_handle) + .or_cancel(db_cancel_handle.clone()) + .or_cancel(cancel_handle.clone()) .await; let Ok(Ok(mut stream)) = stream else { return Ok(None); }; - // doesn't need a cancel handle because the stream ends when the database - // connection is closed - let Some(res) = stream.next().await else { + // We hold a strong reference to `resource`, so we can't rely on the stream + // being dropped when the db connection is closed + let Ok(Ok(Some(res))) = stream + .next() + .or_cancel(db_cancel_handle) + .or_cancel(cancel_handle) + .await + else { return Ok(None); }; |