diff options
author | Bert Belder <bertbelder@gmail.com> | 2021-10-10 09:51:35 +0200 |
---|---|---|
committer | Bert Belder <bertbelder@gmail.com> | 2021-10-17 15:16:10 +0200 |
commit | ff95fc167d7124f3c7f2c6951070e2c40701cf32 (patch) | |
tree | 2b55915c4185d36880c4fcc14bf57d29f214a36f /core/async_cancel.rs | |
parent | 62f43030b455508d4cd57603f9ed641fe609d0cd (diff) |
fix(core): avoid polling future after cancellation (#12385)
Diffstat (limited to 'core/async_cancel.rs')
-rw-r--r-- | core/async_cancel.rs | 84 |
1 files changed, 64 insertions, 20 deletions
diff --git a/core/async_cancel.rs b/core/async_cancel.rs index 5e52d9cd1..e3110bf13 100644 --- a/core/async_cancel.rs +++ b/core/async_cancel.rs @@ -204,16 +204,14 @@ mod internal { mut registration: Pin<&mut Registration>, cx: &mut Context, ) -> Poll<Result<F::Output, Canceled>> { - // If this future is being polled for the first time, perform an extra - // cancellation check _before_ polling the inner future. The reason to do - // this is that polling the inner future for the first time might start - // some activity that cannot actually be canceled (e.g. running a compute - // job in a thread pool), so we should try to never start it at all. - match &*registration { - Registration::WillRegister { head_node } if head_node.is_canceled() => { - return Poll::Ready(Err(Canceled)); - } - _ => {} + // Do a cancellation check _before_ polling the inner future. If it has + // already been canceled the inner future will not be polled. + let node = match &*registration { + Registration::WillRegister { head_node } => &*head_node, + Registration::Registered { node } => node, + }; + if node.is_canceled() { + return Poll::Ready(Err(Canceled)); } match future.poll(cx) { @@ -491,17 +489,13 @@ mod internal { } } - /// Returns true if this node has been marked for cancellation. Note that - /// `self` must refer to a head (`CancelHandle`) node. + /// Returns true if this node has been marked for cancellation. This method + /// may be used with both head (`CancelHandle`) and item (`Cancelable`) + /// nodes. fn is_canceled(&self) -> bool { match self { - NodeInner::Unlinked => false, - NodeInner::Linked { - kind: NodeKind::Head { .. }, - .. - } => false, + NodeInner::Unlinked | NodeInner::Linked { .. } => false, NodeInner::Canceled => true, - _ => unreachable!(), } } } @@ -549,6 +543,7 @@ mod tests { use futures::future::ready; use futures::future::FutureExt; use futures::future::TryFutureExt; + use futures::pending; use futures::select; use futures::task::noop_waker_ref; use futures::task::Context; @@ -557,6 +552,7 @@ mod tests { use std::io; use tokio::net::TcpStream; use tokio::spawn; + use tokio::task::yield_now; fn box_fused<'a, F: FusedFuture + 'a>( future: F, @@ -635,10 +631,10 @@ mod tests { results, [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)] ), - 1 => assert_eq!(results, [Ok("E"), Err(Canceled)]), + 1 => assert_eq!(results, [Err(Canceled), Err(Canceled)]), 2 => assert_eq!(results, []), 3 => assert_eq!(results, [Ok("G")]), - 4 => assert_eq!(results, [Ok("H"), Err(Canceled)]), + 4 => assert_eq!(results, [Err(Canceled), Err(Canceled)]), 5 => assert_eq!(results, [Ok("J"), Ok("K")]), _ => unreachable!(), } @@ -680,6 +676,54 @@ mod tests { } } + #[tokio::test] + async fn future_cancels_itself_before_completion() { + // A future cancels itself before it reaches completion. This future should + // indeed get canceled and should not be polled again. + let cancel_handle = CancelHandle::new_rc(); + let result = async { + cancel_handle.cancel(); + yield_now().await; + unreachable!(); + } + .or_cancel(&cancel_handle) + .await; + assert_eq!(result.unwrap_err(), Canceled); + } + + #[tokio::test] + async fn future_cancels_itself_and_hangs() { + // A future cancels itself, after which it returns `Poll::Pending` without + // setting up a waker that would allow it to make progress towards + // completion. Nevertheless, the `Cancelable` wrapper future must finish. + let cancel_handle = CancelHandle::new_rc(); + let result = async { + yield_now().await; + cancel_handle.cancel(); + pending!(); + unreachable!(); + } + .or_cancel(&cancel_handle) + .await; + assert_eq!(result.unwrap_err(), Canceled); + } + + #[tokio::test] + async fn future_cancels_itself_and_completes() { + // A TryFuture attempts to cancel itself while it is getting polled, and + // yields a result from the very same `poll()` call. Because this future + // actually reaches completion, the attempted cancellation has no effect. + let cancel_handle = CancelHandle::new_rc(); + let result = async { + yield_now().await; + cancel_handle.cancel(); + Ok::<_, io::Error>("done") + } + .try_or_cancel(&cancel_handle) + .await; + assert_eq!(result.unwrap(), "done"); + } + #[test] fn cancel_handle_pinning() { let mut cancel_handle = CancelHandle::new_rc(); |