summaryrefslogtreecommitdiff
path: root/ext/kv/lib.rs
diff options
context:
space:
mode:
authorHeyang Zhou <zhy20000919@hotmail.com>2023-12-22 05:04:17 +0800
committerGitHub <noreply@github.com>2023-12-22 05:04:17 +0800
commit3fb4f3fe5a18916aa95f8b035ca994c290c173dc (patch)
tree0b7640ab4939a794bdf7ae66d6e8b4fb74943c9a /ext/kv/lib.rs
parent760af934d9b4bf8d0ab7f47263dd7cb9675db7a5 (diff)
fix(unstable): kv watch should stop when db is closed (#21665)
Fixes #21634.
Diffstat (limited to 'ext/kv/lib.rs')
-rw-r--r--ext/kv/lib.rs15
1 files changed, 10 insertions, 5 deletions
diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs
index e78526856..032a16863 100644
--- a/ext/kv/lib.rs
+++ b/ext/kv/lib.rs
@@ -444,16 +444,21 @@ async fn op_kv_watch_next(
let cancel_handle = resource.cancel_handle.clone();
let stream = RcRef::map(resource, |r| &r.stream)
.borrow_mut()
- .or_cancel(db_cancel_handle)
- .or_cancel(cancel_handle)
+ .or_cancel(db_cancel_handle.clone())
+ .or_cancel(cancel_handle.clone())
.await;
let Ok(Ok(mut stream)) = stream else {
return Ok(None);
};
- // doesn't need a cancel handle because the stream ends when the database
- // connection is closed
- let Some(res) = stream.next().await else {
+ // We hold a strong reference to `resource`, so we can't rely on the stream
+ // being dropped when the db connection is closed
+ let Ok(Ok(Some(res))) = stream
+ .next()
+ .or_cancel(db_cancel_handle)
+ .or_cancel(cancel_handle)
+ .await
+ else {
return Ok(None);
};