diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-07-01 18:00:14 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-07-02 00:00:14 +0000 |
commit | e746b6d80654ba4e4e26370fe6e4f784ce841d92 (patch) | |
tree | 153ffad92a96126b9ab8e906dcdabf7648755931 /core/async_cancel.rs | |
parent | b9c0e7cd550ab14fa7da7e33ed87cbeeeb9785a0 (diff) |
refactor(core): Extract deno_core (#19658)
`deno_core` is moving out! You'll find it at
https://github.com/denoland/deno_core/ once this PR lands.
Diffstat (limited to 'core/async_cancel.rs')
-rw-r--r-- | core/async_cancel.rs | 793 |
1 files changed, 0 insertions, 793 deletions
diff --git a/core/async_cancel.rs b/core/async_cancel.rs deleted file mode 100644 index 5573e543d..000000000 --- a/core/async_cancel.rs +++ /dev/null @@ -1,793 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -use std::any::type_name; -use std::borrow::Cow; -use std::error::Error; -use std::fmt; -use std::fmt::Display; -use std::fmt::Formatter; -use std::io; -use std::pin::Pin; -use std::rc::Rc; - -use futures::future::FusedFuture; -use futures::future::Future; -use futures::future::TryFuture; -use futures::task::Context; -use futures::task::Poll; -use pin_project::pin_project; - -use crate::RcLike; -use crate::Resource; - -use self::internal as i; - -#[derive(Debug, Default)] -pub struct CancelHandle { - node: i::Node, -} - -impl CancelHandle { - pub fn new() -> Self { - Default::default() - } - - pub fn new_rc() -> Rc<Self> { - Rc::new(Self::new()) - } - - /// Cancel all cancelable futures that are bound to this handle. Note that - /// this method does not require a mutable reference to the `CancelHandle`. - pub fn cancel(&self) { - self.node.cancel(); - } - - pub fn is_canceled(&self) -> bool { - self.node.is_canceled() - } -} - -#[pin_project(project = CancelableProjection)] -#[derive(Debug)] -pub enum Cancelable<F> { - Pending { - #[pin] - future: F, - #[pin] - registration: i::Registration, - }, - Terminated, -} - -impl<F: Future> Future for Cancelable<F> { - type Output = Result<F::Output, Canceled>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let poll_result = match self.as_mut().project() { - CancelableProjection::Pending { - future, - registration, - } => Self::poll_pending(future, registration, cx), - CancelableProjection::Terminated => { - panic!("{}::poll() called after completion", type_name::<Self>()) - } - }; - // Fuse: if this Future is completed or canceled, make sure the inner - // `future` and `registration` fields are dropped in order to unlink it from - // its cancel handle. - if matches!(poll_result, Poll::Ready(_)) { - self.set(Cancelable::Terminated) - } - poll_result - } -} - -impl<F: Future> FusedFuture for Cancelable<F> { - fn is_terminated(&self) -> bool { - matches!(self, Self::Terminated) - } -} - -impl Resource for CancelHandle { - fn name(&self) -> Cow<str> { - "cancellation".into() - } - - fn close(self: Rc<Self>) { - self.cancel(); - } -} - -#[pin_project(project = TryCancelableProjection)] -#[derive(Debug)] -pub struct TryCancelable<F> { - #[pin] - inner: Cancelable<F>, -} - -impl<F, T, E> Future for TryCancelable<F> -where - F: Future<Output = Result<T, E>>, - Canceled: Into<E>, -{ - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let TryCancelableProjection { inner } = self.project(); - match inner.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(result)) => Poll::Ready(result), - Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), - } - } -} - -impl<F, T, E> FusedFuture for TryCancelable<F> -where - F: Future<Output = Result<T, E>>, - Canceled: Into<E>, -{ - fn is_terminated(&self) -> bool { - self.inner.is_terminated() - } -} - -pub trait CancelFuture -where - Self: Future + Sized, -{ - fn or_cancel<H: RcLike<CancelHandle>>( - self, - cancel_handle: H, - ) -> Cancelable<Self> { - Cancelable::new(self, cancel_handle.into()) - } -} - -impl<F> CancelFuture for F where F: Future {} - -pub trait CancelTryFuture -where - Self: TryFuture + Sized, - Canceled: Into<Self::Error>, -{ - fn try_or_cancel<H: RcLike<CancelHandle>>( - self, - cancel_handle: H, - ) -> TryCancelable<Self> { - TryCancelable::new(self, cancel_handle.into()) - } -} - -impl<F> CancelTryFuture for F -where - F: TryFuture, - Canceled: Into<F::Error>, -{ -} - -#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)] -pub struct Canceled; - -impl Display for Canceled { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - write!(f, "operation canceled") - } -} - -impl Error for Canceled {} - -impl From<Canceled> for io::Error { - fn from(_: Canceled) -> Self { - io::Error::new(io::ErrorKind::Interrupted, Canceled) - } -} - -mod internal { - use super::CancelHandle; - use super::Cancelable; - use super::Canceled; - use super::TryCancelable; - use crate::RcRef; - use futures::future::Future; - use futures::task::Context; - use futures::task::Poll; - use futures::task::Waker; - use pin_project::pin_project; - use std::any::Any; - use std::cell::UnsafeCell; - use std::marker::PhantomPinned; - use std::mem::replace; - use std::pin::Pin; - use std::ptr::NonNull; - use std::rc::Rc; - use std::rc::Weak; - - impl<F: Future> Cancelable<F> { - pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self { - let head_node = RcRef::map(cancel_handle, |r| &r.node); - let registration = Registration::WillRegister { head_node }; - Self::Pending { - future, - registration, - } - } - - pub(super) fn poll_pending( - future: Pin<&mut F>, - mut registration: Pin<&mut Registration>, - cx: &mut Context, - ) -> Poll<Result<F::Output, Canceled>> { - // Do a cancellation check _before_ polling the inner future. If it has - // already been canceled the inner future will not be polled. - let node = match &*registration { - Registration::WillRegister { head_node } => head_node, - Registration::Registered { node } => node, - }; - if node.is_canceled() { - return Poll::Ready(Err(Canceled)); - } - - match future.poll(cx) { - Poll::Ready(res) => return Poll::Ready(Ok(res)), - Poll::Pending => {} - } - - // Register this future with its `CancelHandle`, saving the `Waker` that - // can be used to make the runtime poll this future when it is canceled. - // When already registered, update the stored `Waker` if necessary. - let head_node = match &*registration { - Registration::WillRegister { .. } => { - match registration.as_mut().project_replace(Default::default()) { - RegistrationProjectionOwned::WillRegister { head_node } => { - Some(head_node) - } - _ => unreachable!(), - } - } - _ => None, - }; - let node = match registration.project() { - RegistrationProjection::Registered { node } => node, - _ => unreachable!(), - }; - node.register(cx.waker(), head_node)?; - - Poll::Pending - } - } - - impl<F: Future> TryCancelable<F> { - pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self { - Self { - inner: Cancelable::new(future, cancel_handle), - } - } - } - - #[pin_project(project = RegistrationProjection, - project_replace = RegistrationProjectionOwned)] - #[derive(Debug)] - pub enum Registration { - WillRegister { - head_node: RcRef<Node>, - }, - Registered { - #[pin] - node: Node, - }, - } - - impl Default for Registration { - fn default() -> Self { - Self::Registered { - node: Default::default(), - } - } - } - - #[derive(Debug)] - pub struct Node { - inner: UnsafeCell<NodeInner>, - _pin: PhantomPinned, - } - - impl Node { - /// If necessary, register a `Cancelable` node with a `CancelHandle`, and - /// save or update the `Waker` that can wake with this cancelable future. - pub fn register( - &self, - waker: &Waker, - head_rc: Option<RcRef<Node>>, - ) -> Result<(), Canceled> { - match head_rc.as_ref().map(RcRef::split) { - Some((head, rc)) => { - // Register this `Cancelable` node with a `CancelHandle` head node. - assert_ne!(self, head); - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - let self_inner = unsafe { &mut *self.inner.get() }; - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - let head_inner = unsafe { &mut *head.inner.get() }; - self_inner.link(waker, head_inner, rc) - } - None => { - // This `Cancelable` has already been linked to a `CancelHandle` head - // node; just update our stored `Waker` if necessary. - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - let inner = unsafe { &mut *self.inner.get() }; - inner.update_waker(waker) - } - } - } - - pub fn cancel(&self) { - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - let inner = unsafe { &mut *self.inner.get() }; - inner.cancel(); - } - - pub fn is_canceled(&self) -> bool { - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - let inner = unsafe { &mut *self.inner.get() }; - inner.is_canceled() - } - } - - impl Default for Node { - fn default() -> Self { - Self { - inner: UnsafeCell::new(NodeInner::Unlinked), - _pin: PhantomPinned, - } - } - } - - impl Drop for Node { - fn drop(&mut self) { - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - let inner = unsafe { &mut *self.inner.get() }; - inner.unlink(); - } - } - - impl PartialEq for Node { - fn eq(&self, other: &Self) -> bool { - std::ptr::eq(self, other) - } - } - - #[derive(Debug)] - enum NodeInner { - Unlinked, - Linked { - kind: NodeKind, - prev: NonNull<NodeInner>, - next: NonNull<NodeInner>, - }, - Canceled, - } - - impl NodeInner { - fn as_non_null(&mut self) -> NonNull<Self> { - NonNull::from(self) - } - - fn link( - &mut self, - waker: &Waker, - head: &mut Self, - rc_pin: &Rc<dyn Any>, - ) -> Result<(), Canceled> { - // The future should not have been linked to a cancel handle before. - assert!(matches!(self, NodeInner::Unlinked)); - - match head { - NodeInner::Unlinked => { - *head = NodeInner::Linked { - kind: NodeKind::head(rc_pin), - prev: self.as_non_null(), - next: self.as_non_null(), - }; - *self = NodeInner::Linked { - kind: NodeKind::item(waker), - prev: head.as_non_null(), - next: head.as_non_null(), - }; - Ok(()) - } - NodeInner::Linked { - kind: NodeKind::Head { .. }, - prev: next_prev_nn, - .. - } => { - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - let prev = unsafe { &mut *next_prev_nn.as_ptr() }; - match prev { - NodeInner::Linked { - kind: NodeKind::Item { .. }, - next: prev_next_nn, - .. - } => { - *self = NodeInner::Linked { - kind: NodeKind::item(waker), - prev: replace(next_prev_nn, self.as_non_null()), - next: replace(prev_next_nn, self.as_non_null()), - }; - Ok(()) - } - _ => unreachable!(), - } - } - NodeInner::Canceled => Err(Canceled), - _ => unreachable!(), - } - } - - fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> { - match self { - NodeInner::Unlinked => Ok(()), - NodeInner::Linked { - kind: NodeKind::Item { waker }, - .. - } => { - if !waker.will_wake(new_waker) { - *waker = new_waker.clone(); - } - Ok(()) - } - NodeInner::Canceled => Err(Canceled), - _ => unreachable!(), - } - } - - /// If this node is linked to other nodes, remove it from the chain. This - /// method is called (only) by the drop handler for `Node`. It is suitable - /// for both 'head' and 'item' nodes. - fn unlink(&mut self) { - if let NodeInner::Linked { - prev: mut prev_nn, - next: mut next_nn, - .. - } = replace(self, NodeInner::Unlinked) - { - if prev_nn == next_nn { - // There were only two nodes in this chain; after unlinking ourselves - // the other node is no longer linked. - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - let other = unsafe { prev_nn.as_mut() }; - *other = NodeInner::Unlinked; - } else { - // The chain had more than two nodes. - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - match unsafe { prev_nn.as_mut() } { - NodeInner::Linked { - next: prev_next_nn, .. - } => { - *prev_next_nn = next_nn; - } - _ => unreachable!(), - } - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - match unsafe { next_nn.as_mut() } { - NodeInner::Linked { - prev: next_prev_nn, .. - } => { - *next_prev_nn = prev_nn; - } - _ => unreachable!(), - } - } - } - } - - /// Mark this node and all linked nodes for cancellation. Note that `self` - /// must refer to a head (`CancelHandle`) node. - fn cancel(&mut self) { - let mut head_nn = NonNull::from(self); - - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - // Mark the head node as canceled. - let mut item_nn = - match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) { - NodeInner::Linked { - kind: NodeKind::Head { .. }, - next: next_nn, - .. - } => next_nn, - NodeInner::Unlinked | NodeInner::Canceled => return, - _ => unreachable!(), - }; - - // Cancel all item nodes in the chain, waking each stored `Waker`. - while item_nn != head_nn { - // TODO(piscisaureus): safety comment - #[allow(clippy::undocumented_unsafe_blocks)] - match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) { - NodeInner::Linked { - kind: NodeKind::Item { waker }, - next: next_nn, - .. - } => { - waker.wake(); - item_nn = next_nn; - } - _ => unreachable!(), - } - } - } - - /// Returns true if this node has been marked for cancellation. This method - /// may be used with both head (`CancelHandle`) and item (`Cancelable`) - /// nodes. - fn is_canceled(&self) -> bool { - match self { - NodeInner::Unlinked | NodeInner::Linked { .. } => false, - NodeInner::Canceled => true, - } - } - } - - #[derive(Debug)] - enum NodeKind { - /// In a chain of linked nodes, the "head" node is owned by the - /// `CancelHandle`. A chain usually contains at most one head node; however - /// when a `CancelHandle` is dropped before the futures associated with it - /// are dropped, a chain may temporarily contain no head node at all. - Head { - /// The `weak_pin` field adds adds a weak reference to the `Rc` guarding - /// the heap allocation that contains the `CancelHandle`. Without this - /// extra weak reference, `Rc::get_mut()` might succeed and allow the - /// `CancelHandle` to be moved when it isn't safe to do so. - _weak_pin: Weak<dyn Any>, - }, - /// All item nodes in a chain are associated with a `Cancelable` head node. - Item { - /// If this future indeed does get canceled, the waker is needed to make - /// sure that the canceled future gets polled as soon as possible. - waker: Waker, - }, - } - - impl NodeKind { - fn head(rc_pin: &Rc<dyn Any>) -> Self { - let _weak_pin = Rc::downgrade(rc_pin); - Self::Head { _weak_pin } - } - - fn item(waker: &Waker) -> Self { - let waker = waker.clone(); - Self::Item { waker } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use anyhow::Error; - use futures::future::pending; - use futures::future::poll_fn; - use futures::future::ready; - use futures::future::FutureExt; - use futures::future::TryFutureExt; - use futures::pending; - use futures::select; - use futures::task::noop_waker_ref; - use futures::task::Context; - use futures::task::Poll; - use std::convert::Infallible as Never; - use std::io; - use tokio::net::TcpStream; - use tokio::spawn; - use tokio::task::yield_now; - - fn box_fused<'a, F: FusedFuture + 'a>( - future: F, - ) -> Pin<Box<dyn FusedFuture<Output = F::Output> + 'a>> { - Box::pin(future) - } - - async fn ready_in_n(name: &str, count: usize) -> &str { - let mut remaining = count as isize; - poll_fn(|_| { - assert!(remaining >= 0); - if remaining == 0 { - Poll::Ready(name) - } else { - remaining -= 1; - Poll::Pending - } - }) - .await - } - - #[test] - fn cancel_future() { - let cancel_now = CancelHandle::new_rc(); - let cancel_at_0 = CancelHandle::new_rc(); - let cancel_at_1 = CancelHandle::new_rc(); - let cancel_at_4 = CancelHandle::new_rc(); - let cancel_never = CancelHandle::new_rc(); - - cancel_now.cancel(); - - let mut futures = vec![ - box_fused(ready("A").or_cancel(&cancel_now)), - box_fused(ready("B").or_cancel(&cancel_at_0)), - box_fused(ready("C").or_cancel(&cancel_at_1)), - box_fused( - ready_in_n("D", 0) - .or_cancel(&cancel_never) - .try_or_cancel(&cancel_now), - ), - box_fused( - ready_in_n("E", 1) - .or_cancel(&cancel_at_1) - .try_or_cancel(&cancel_at_1), - ), - box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)), - box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)), - box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)), - box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)), - box_fused(ready_in_n("J", 5).map(Ok)), - box_fused(ready_in_n("K", 5).or_cancel(cancel_never)), - ]; - - let mut cx = Context::from_waker(noop_waker_ref()); - - for i in 0..=5 { - match i { - 0 => cancel_at_0.cancel(), - 1 => cancel_at_1.cancel(), - 4 => cancel_at_4.cancel(), - 2 | 3 | 5 => {} - _ => unreachable!(), - } - - let results = futures - .iter_mut() - .filter(|fut| !fut.is_terminated()) - .filter_map(|fut| match fut.poll_unpin(&mut cx) { - Poll::Pending => None, - Poll::Ready(res) => Some(res), - }) - .collect::<Vec<_>>(); - - match i { - 0 => assert_eq!( - results, - [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)] - ), - 1 => assert_eq!(results, [Err(Canceled), Err(Canceled)]), - 2 => assert_eq!(results, []), - 3 => assert_eq!(results, [Ok("G")]), - 4 => assert_eq!(results, [Err(Canceled), Err(Canceled)]), - 5 => assert_eq!(results, [Ok("J"), Ok("K")]), - _ => unreachable!(), - } - } - - assert!(!futures.into_iter().any(|fut| !fut.is_terminated())); - - let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4]; - assert!(!cancel_handles.iter().any(|c| !c.is_canceled())); - } - - #[tokio::test] - async fn cancel_try_future() { - { - // Cancel a spawned task before it actually runs. - let cancel_handle = Rc::new(CancelHandle::new()); - let future = spawn(async { panic!("the task should not be spawned") }) - .map_err(Error::from) - .try_or_cancel(&cancel_handle); - cancel_handle.cancel(); - let error = future.await.unwrap_err(); - assert!(error.downcast_ref::<Canceled>().is_some()); - assert_eq!(error.to_string().as_str(), "operation canceled"); - } - - { - // Cancel a network I/O future right after polling it. - let cancel_handle = Rc::new(CancelHandle::new()); - let result = loop { - select! { - r = TcpStream::connect("1.2.3.4:12345") - .try_or_cancel(&cancel_handle) => break r, - default => cancel_handle.cancel(), - }; - }; - let error = result.unwrap_err(); - assert_eq!(error.kind(), io::ErrorKind::Interrupted); - assert_eq!(error.to_string().as_str(), "operation canceled"); - } - } - - #[tokio::test] - async fn future_cancels_itself_before_completion() { - // A future cancels itself before it reaches completion. This future should - // indeed get canceled and should not be polled again. - let cancel_handle = CancelHandle::new_rc(); - let result = async { - cancel_handle.cancel(); - yield_now().await; - unreachable!(); - } - .or_cancel(&cancel_handle) - .await; - assert_eq!(result.unwrap_err(), Canceled); - } - - #[tokio::test] - async fn future_cancels_itself_and_hangs() { - // A future cancels itself, after which it returns `Poll::Pending` without - // setting up a waker that would allow it to make progress towards - // completion. Nevertheless, the `Cancelable` wrapper future must finish. - let cancel_handle = CancelHandle::new_rc(); - let result = async { - yield_now().await; - cancel_handle.cancel(); - pending!(); - unreachable!(); - } - .or_cancel(&cancel_handle) - .await; - assert_eq!(result.unwrap_err(), Canceled); - } - - #[tokio::test] - async fn future_cancels_itself_and_completes() { - // A TryFuture attempts to cancel itself while it is getting polled, and - // yields a result from the very same `poll()` call. Because this future - // actually reaches completion, the attempted cancellation has no effect. - let cancel_handle = CancelHandle::new_rc(); - let result = async { - yield_now().await; - cancel_handle.cancel(); - Ok::<_, io::Error>("done") - } - .try_or_cancel(&cancel_handle) - .await; - assert_eq!(result.unwrap(), "done"); - } - - #[test] - fn cancel_handle_pinning() { - let mut cancel_handle = CancelHandle::new_rc(); - - // There is only one reference to `cancel_handle`, so `Rc::get_mut()` should - // succeed. - assert!(Rc::get_mut(&mut cancel_handle).is_some()); - - let mut future = pending::<Never>().or_cancel(&cancel_handle); - // SAFETY: `Cancelable` pins the future - let future = unsafe { Pin::new_unchecked(&mut future) }; - - // There are two `Rc<CancelHandle>` references now, so this fails. - assert!(Rc::get_mut(&mut cancel_handle).is_none()); - - let mut cx = Context::from_waker(noop_waker_ref()); - assert!(future.poll(&mut cx).is_pending()); - - // Polling `future` has established a link between the future and - // `cancel_handle`, so both values should be pinned at this point. - assert!(Rc::get_mut(&mut cancel_handle).is_none()); - - cancel_handle.cancel(); - - // Canceling or dropping the associated future(s) unlinks them from the - // cancel handle, therefore `cancel_handle` can now safely be moved again. - assert!(Rc::get_mut(&mut cancel_handle).is_some()); - } -} |