summaryrefslogtreecommitdiff
path: root/core/async_cancel.rs
diff options
context:
space:
mode:
authorBert Belder <bertbelder@gmail.com>2021-10-10 09:51:35 +0200
committerBert Belder <bertbelder@gmail.com>2021-10-17 15:16:10 +0200
commitff95fc167d7124f3c7f2c6951070e2c40701cf32 (patch)
tree2b55915c4185d36880c4fcc14bf57d29f214a36f /core/async_cancel.rs
parent62f43030b455508d4cd57603f9ed641fe609d0cd (diff)
fix(core): avoid polling future after cancellation (#12385)
Diffstat (limited to 'core/async_cancel.rs')
-rw-r--r--core/async_cancel.rs84
1 files changed, 64 insertions, 20 deletions
diff --git a/core/async_cancel.rs b/core/async_cancel.rs
index 5e52d9cd1..e3110bf13 100644
--- a/core/async_cancel.rs
+++ b/core/async_cancel.rs
@@ -204,16 +204,14 @@ mod internal {
mut registration: Pin<&mut Registration>,
cx: &mut Context,
) -> Poll<Result<F::Output, Canceled>> {
- // If this future is being polled for the first time, perform an extra
- // cancellation check _before_ polling the inner future. The reason to do
- // this is that polling the inner future for the first time might start
- // some activity that cannot actually be canceled (e.g. running a compute
- // job in a thread pool), so we should try to never start it at all.
- match &*registration {
- Registration::WillRegister { head_node } if head_node.is_canceled() => {
- return Poll::Ready(Err(Canceled));
- }
- _ => {}
+ // Do a cancellation check _before_ polling the inner future. If it has
+ // already been canceled the inner future will not be polled.
+ let node = match &*registration {
+ Registration::WillRegister { head_node } => &*head_node,
+ Registration::Registered { node } => node,
+ };
+ if node.is_canceled() {
+ return Poll::Ready(Err(Canceled));
}
match future.poll(cx) {
@@ -491,17 +489,13 @@ mod internal {
}
}
- /// Returns true if this node has been marked for cancellation. Note that
- /// `self` must refer to a head (`CancelHandle`) node.
+ /// Returns true if this node has been marked for cancellation. This method
+ /// may be used with both head (`CancelHandle`) and item (`Cancelable`)
+ /// nodes.
fn is_canceled(&self) -> bool {
match self {
- NodeInner::Unlinked => false,
- NodeInner::Linked {
- kind: NodeKind::Head { .. },
- ..
- } => false,
+ NodeInner::Unlinked | NodeInner::Linked { .. } => false,
NodeInner::Canceled => true,
- _ => unreachable!(),
}
}
}
@@ -549,6 +543,7 @@ mod tests {
use futures::future::ready;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
+ use futures::pending;
use futures::select;
use futures::task::noop_waker_ref;
use futures::task::Context;
@@ -557,6 +552,7 @@ mod tests {
use std::io;
use tokio::net::TcpStream;
use tokio::spawn;
+ use tokio::task::yield_now;
fn box_fused<'a, F: FusedFuture + 'a>(
future: F,
@@ -635,10 +631,10 @@ mod tests {
results,
[Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)]
),
- 1 => assert_eq!(results, [Ok("E"), Err(Canceled)]),
+ 1 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
2 => assert_eq!(results, []),
3 => assert_eq!(results, [Ok("G")]),
- 4 => assert_eq!(results, [Ok("H"), Err(Canceled)]),
+ 4 => assert_eq!(results, [Err(Canceled), Err(Canceled)]),
5 => assert_eq!(results, [Ok("J"), Ok("K")]),
_ => unreachable!(),
}
@@ -680,6 +676,54 @@ mod tests {
}
}
+ #[tokio::test]
+ async fn future_cancels_itself_before_completion() {
+ // A future cancels itself before it reaches completion. This future should
+ // indeed get canceled and should not be polled again.
+ let cancel_handle = CancelHandle::new_rc();
+ let result = async {
+ cancel_handle.cancel();
+ yield_now().await;
+ unreachable!();
+ }
+ .or_cancel(&cancel_handle)
+ .await;
+ assert_eq!(result.unwrap_err(), Canceled);
+ }
+
+ #[tokio::test]
+ async fn future_cancels_itself_and_hangs() {
+ // A future cancels itself, after which it returns `Poll::Pending` without
+ // setting up a waker that would allow it to make progress towards
+ // completion. Nevertheless, the `Cancelable` wrapper future must finish.
+ let cancel_handle = CancelHandle::new_rc();
+ let result = async {
+ yield_now().await;
+ cancel_handle.cancel();
+ pending!();
+ unreachable!();
+ }
+ .or_cancel(&cancel_handle)
+ .await;
+ assert_eq!(result.unwrap_err(), Canceled);
+ }
+
+ #[tokio::test]
+ async fn future_cancels_itself_and_completes() {
+ // A TryFuture attempts to cancel itself while it is getting polled, and
+ // yields a result from the very same `poll()` call. Because this future
+ // actually reaches completion, the attempted cancellation has no effect.
+ let cancel_handle = CancelHandle::new_rc();
+ let result = async {
+ yield_now().await;
+ cancel_handle.cancel();
+ Ok::<_, io::Error>("done")
+ }
+ .try_or_cancel(&cancel_handle)
+ .await;
+ assert_eq!(result.unwrap(), "done");
+ }
+
#[test]
fn cancel_handle_pinning() {
let mut cancel_handle = CancelHandle::new_rc();