summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock7
-rw-r--r--core/Cargo.toml3
-rw-r--r--core/async_cancel.rs710
-rw-r--r--core/async_cell.rs74
-rw-r--r--core/examples/http_bench_bin_ops.rs91
-rw-r--r--core/examples/http_bench_json_ops.rs91
-rw-r--r--core/lib.rs8
-rw-r--r--core/resources2.rs7
8 files changed, 876 insertions, 115 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 21bf5f146..3d885d361 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -522,11 +522,12 @@ dependencies = [
"lazy_static",
"libc",
"log",
+ "pin-project 1.0.2",
"rusty_v8",
"serde",
"serde_json",
"smallvec",
- "tokio 0.3.4",
+ "tokio 0.3.5",
"url",
]
@@ -2944,9 +2945,9 @@ dependencies = [
[[package]]
name = "tokio"
-version = "0.3.4"
+version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9dfe2523e6fa84ddf5e688151d4e5fddc51678de9752c6512a24714c23818d61"
+checksum = "a12a3eb39ee2c231be64487f1fcbe726c8f2514876a55480a5ab8559fc374252"
dependencies = [
"autocfg 1.0.1",
"bytes 0.6.0",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 367def1f2..21cc5f1f7 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -24,6 +24,7 @@ serde_json = { version = "1.0", features = ["preserve_order"] }
serde = { version = "1.0", features = ["derive"] }
smallvec = "1.4.2"
url = { version = "2.1.1", features = ["serde"] }
+pin-project = "1.0.2"
[[example]]
name = "http_bench_bin_ops"
@@ -35,4 +36,4 @@ path = "examples/http_bench_json_ops.rs"
# These dependendencies are only used for the 'http_bench_*_ops' examples.
[dev-dependencies]
-tokio = { version = "0.3.4", features = ["full"] }
+tokio = { version = "0.3.5", features = ["full"] }
diff --git a/core/async_cancel.rs b/core/async_cancel.rs
new file mode 100644
index 000000000..90cb0c41f
--- /dev/null
+++ b/core/async_cancel.rs
@@ -0,0 +1,710 @@
+// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
+
+use crate::RcLike;
+use futures::future::FusedFuture;
+use futures::future::Future;
+use futures::future::TryFuture;
+use futures::task::Context;
+use futures::task::Poll;
+use pin_project::pin_project;
+use std::any::type_name;
+use std::error::Error;
+use std::fmt;
+use std::fmt::Display;
+use std::fmt::Formatter;
+use std::io;
+use std::pin::Pin;
+use std::rc::Rc;
+
+use self::internal as i;
+
+#[derive(Debug, Default)]
+pub struct CancelHandle {
+ node: i::Node,
+}
+
+impl CancelHandle {
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ pub fn new_rc() -> Rc<Self> {
+ Rc::new(Self::new())
+ }
+
+ /// Cancel all cancelable futures that are bound to this handle. Note that
+ /// this method does not require a mutable reference to the `CancelHandle`.
+ pub fn cancel(&self) {
+ self.node.cancel();
+ }
+
+ pub fn is_canceled(&self) -> bool {
+ self.node.is_canceled()
+ }
+}
+
+#[pin_project(project = CancelableProjection)]
+#[derive(Debug)]
+pub enum Cancelable<F> {
+ Pending {
+ #[pin]
+ future: F,
+ #[pin]
+ registration: i::Registration,
+ },
+ Terminated,
+}
+
+impl<F: Future> Future for Cancelable<F> {
+ type Output = Result<F::Output, Canceled>;
+
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let poll_result = match self.as_mut().project() {
+ CancelableProjection::Pending {
+ future,
+ registration,
+ } => Self::poll_pending(future, registration, cx),
+ CancelableProjection::Terminated => {
+ panic!("{}::poll() called after completion", type_name::<Self>())
+ }
+ };
+ // Fuse: if this Future is completed or canceled, make sure the inner
+ // `future` and `registration` fields are dropped in order to unlink it from
+ // its cancel handle.
+ if matches!(poll_result, Poll::Ready(_)) {
+ self.set(Cancelable::Terminated)
+ }
+ poll_result
+ }
+}
+
+impl<F: Future> FusedFuture for Cancelable<F> {
+ fn is_terminated(&self) -> bool {
+ matches!(self, Self::Terminated)
+ }
+}
+
+#[pin_project(project = TryCancelableProjection)]
+#[derive(Debug)]
+pub struct TryCancelable<F> {
+ #[pin]
+ inner: Cancelable<F>,
+}
+
+impl<F, T, E> Future for TryCancelable<F>
+where
+ F: Future<Output = Result<T, E>>,
+ Canceled: Into<E>,
+{
+ type Output = F::Output;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let TryCancelableProjection { inner } = self.project();
+ match inner.poll(cx) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(Ok(result)) => Poll::Ready(result),
+ Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
+ }
+ }
+}
+
+impl<F, T, E> FusedFuture for TryCancelable<F>
+where
+ F: Future<Output = Result<T, E>>,
+ Canceled: Into<E>,
+{
+ fn is_terminated(&self) -> bool {
+ self.inner.is_terminated()
+ }
+}
+
+pub trait CancelFuture
+where
+ Self: Future + Sized,
+{
+ fn or_cancel<H: RcLike<CancelHandle>>(
+ self,
+ cancel_handle: H,
+ ) -> Cancelable<Self> {
+ Cancelable::new(self, cancel_handle.into())
+ }
+}
+
+impl<F> CancelFuture for F where F: Future {}
+
+pub trait CancelTryFuture
+where
+ Self: TryFuture + Sized,
+ Canceled: Into<Self::Error>,
+{
+ fn try_or_cancel<H: RcLike<CancelHandle>>(
+ self,
+ cancel_handle: H,
+ ) -> TryCancelable<Self> {
+ TryCancelable::new(self, cancel_handle.into())
+ }
+}
+
+impl<F> CancelTryFuture for F
+where
+ F: TryFuture,
+ Canceled: Into<F::Error>,
+{
+}
+
+#[derive(Copy, Clone, Default, Debug, Eq, Hash, PartialEq)]
+pub struct Canceled;
+
+impl Display for Canceled {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ write!(f, "operation canceled")
+ }
+}
+
+impl Error for Canceled {}
+
+impl From<Canceled> for io::Error {
+ fn from(_: Canceled) -> Self {
+ io::Error::new(io::ErrorKind::Interrupted, Canceled)
+ }
+}
+
+mod internal {
+ use super::CancelHandle;
+ use super::Cancelable;
+ use super::Canceled;
+ use super::TryCancelable;
+ use crate::RcRef;
+ use futures::future::Future;
+ use futures::task::Context;
+ use futures::task::Poll;
+ use futures::task::Waker;
+ use pin_project::pin_project;
+ use std::any::Any;
+ use std::cell::UnsafeCell;
+ use std::marker::PhantomPinned;
+ use std::mem::replace;
+ use std::pin::Pin;
+ use std::ptr::NonNull;
+ use std::rc::Rc;
+ use std::rc::Weak;
+
+ impl<F: Future> Cancelable<F> {
+ pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
+ let head_node = RcRef::map(cancel_handle, |r| &r.node);
+ let registration = Registration::WillRegister { head_node };
+ Self::Pending {
+ future,
+ registration,
+ }
+ }
+
+ pub(super) fn poll_pending(
+ future: Pin<&mut F>,
+ mut registration: Pin<&mut Registration>,
+ cx: &mut Context,
+ ) -> Poll<Result<F::Output, Canceled>> {
+ // If this future is being polled for the first time, perform an extra
+ // cancellation check _before_ polling the inner future. The reason to do
+ // this is that polling the inner future for the first time might start
+ // some activity that cannot actually be canceled (e.g. running a compute
+ // job in a thread pool), so we should try to never start it at all.
+ match &*registration {
+ Registration::WillRegister { head_node } if head_node.is_canceled() => {
+ return Poll::Ready(Err(Canceled));
+ }
+ _ => {}
+ }
+
+ match future.poll(cx) {
+ Poll::Ready(res) => return Poll::Ready(Ok(res)),
+ Poll::Pending => {}
+ }
+
+ // Register this future with its `CancelHandle`, saving the `Waker` that
+ // can be used to make the runtime poll this future when it is canceled.
+ // When already registered, update the stored `Waker` if necessary.
+ let head_node = match &*registration {
+ Registration::WillRegister { .. } => {
+ match registration.as_mut().project_replace(Default::default()) {
+ RegistrationProjectionOwned::WillRegister { head_node } => {
+ Some(head_node)
+ }
+ _ => unreachable!(),
+ }
+ }
+ _ => None,
+ };
+ let node = match registration.project() {
+ RegistrationProjection::Registered { node } => node,
+ _ => unreachable!(),
+ };
+ node.register(cx.waker(), head_node)?;
+
+ Poll::Pending
+ }
+ }
+
+ impl<F: Future> TryCancelable<F> {
+ pub(super) fn new(future: F, cancel_handle: RcRef<CancelHandle>) -> Self {
+ Self {
+ inner: Cancelable::new(future, cancel_handle),
+ }
+ }
+ }
+
+ #[pin_project(project = RegistrationProjection,
+ project_replace = RegistrationProjectionOwned)]
+ #[derive(Debug)]
+ pub enum Registration {
+ WillRegister {
+ head_node: RcRef<Node>,
+ },
+ Registered {
+ #[pin]
+ node: Node,
+ },
+ }
+
+ impl Default for Registration {
+ fn default() -> Self {
+ Self::Registered {
+ node: Default::default(),
+ }
+ }
+ }
+
+ #[derive(Debug)]
+ pub struct Node {
+ inner: UnsafeCell<NodeInner>,
+ _pin: PhantomPinned,
+ }
+
+ impl Node {
+ /// If necessary, register a `Cancelable` node with a `CancelHandle`, and
+ /// save or update the `Waker` that can wake with this cancelable future.
+ pub fn register(
+ &self,
+ waker: &Waker,
+ head_rc: Option<RcRef<Node>>,
+ ) -> Result<(), Canceled> {
+ match head_rc.as_ref().map(RcRef::split) {
+ Some((head, rc)) => {
+ // Register this `Cancelable` node with a `CancelHandle` head node.
+ assert_ne!(self, head);
+ let self_inner = unsafe { &mut *self.inner.get() };
+ let head_inner = unsafe { &mut *head.inner.get() };
+ self_inner.link(waker, head_inner, rc)
+ }
+ None => {
+ // This `Cancelable` has already been linked to a `CancelHandle` head
+ // node; just update our stored `Waker` if necessary.
+ let inner = unsafe { &mut *self.inner.get() };
+ inner.update_waker(waker)
+ }
+ }
+ }
+
+ pub fn cancel(&self) {
+ let inner = unsafe { &mut *self.inner.get() };
+ inner.cancel();
+ }
+
+ pub fn is_canceled(&self) -> bool {
+ let inner = unsafe { &mut *self.inner.get() };
+ inner.is_canceled()
+ }
+ }
+
+ impl Default for Node {
+ fn default() -> Self {
+ Self {
+ inner: UnsafeCell::new(NodeInner::Unlinked),
+ _pin: PhantomPinned,
+ }
+ }
+ }
+
+ impl Drop for Node {
+ fn drop(&mut self) {
+ let inner = unsafe { &mut *self.inner.get() };
+ inner.unlink();
+ }
+ }
+
+ impl PartialEq for Node {
+ fn eq(&self, other: &Self) -> bool {
+ self as *const _ == other as *const _
+ }
+ }
+
+ #[derive(Debug)]
+ enum NodeInner {
+ Unlinked,
+ Linked {
+ kind: NodeKind,
+ prev: NonNull<NodeInner>,
+ next: NonNull<NodeInner>,
+ },
+ Canceled,
+ }
+
+ impl NodeInner {
+ fn as_non_null(&mut self) -> NonNull<Self> {
+ NonNull::from(self)
+ }
+
+ fn link(
+ &mut self,
+ waker: &Waker,
+ head: &mut Self,
+ rc_pin: &Rc<dyn Any>,
+ ) -> Result<(), Canceled> {
+ // The future should not have been linked to a cancel handle before.
+ assert!(matches!(self, NodeInner::Unlinked));
+
+ match head {
+ NodeInner::Unlinked => {
+ *head = NodeInner::Linked {
+ kind: NodeKind::head(rc_pin),
+ prev: self.as_non_null(),
+ next: self.as_non_null(),
+ };
+ *self = NodeInner::Linked {
+ kind: NodeKind::item(waker),
+ prev: head.as_non_null(),
+ next: head.as_non_null(),
+ };
+ Ok(())
+ }
+ NodeInner::Linked {
+ kind: NodeKind::Head { .. },
+ prev: next_prev_nn,
+ ..
+ } => {
+ let prev = unsafe { &mut *next_prev_nn.as_ptr() };
+ match prev {
+ NodeInner::Linked {
+ kind: NodeKind::Item { .. },
+ next: prev_next_nn,
+ ..
+ } => {
+ *self = NodeInner::Linked {
+ kind: NodeKind::item(waker),
+ prev: replace(next_prev_nn, self.as_non_null()),
+ next: replace(prev_next_nn, self.as_non_null()),
+ };
+ Ok(())
+ }
+ _ => unreachable!(),
+ }
+ }
+ NodeInner::Canceled => Err(Canceled),
+ _ => unreachable!(),
+ }
+ }
+
+ fn update_waker(&mut self, new_waker: &Waker) -> Result<(), Canceled> {
+ match self {
+ NodeInner::Unlinked => Ok(()),
+ NodeInner::Linked {
+ kind: NodeKind::Item { waker },
+ ..
+ } => {
+ if !waker.will_wake(new_waker) {
+ *waker = new_waker.clone();
+ }
+ Ok(())
+ }
+ NodeInner::Canceled => Err(Canceled),
+ _ => unreachable!(),
+ }
+ }
+
+ /// If this node is linked to other nodes, remove it from the chain. This
+ /// method is called (only) by the drop handler for `Node`. It is suitable
+ /// for both 'head' and 'item' nodes.
+ fn unlink(&mut self) {
+ if let NodeInner::Linked {
+ prev: mut prev_nn,
+ next: mut next_nn,
+ ..
+ } = replace(self, NodeInner::Unlinked)
+ {
+ if prev_nn == next_nn {
+ // There were only two nodes in this chain; after unlinking ourselves
+ // the other node is no longer linked.
+ let other = unsafe { prev_nn.as_mut() };
+ *other = NodeInner::Unlinked;
+ } else {
+ // The chain had more than two nodes.
+ match unsafe { prev_nn.as_mut() } {
+ NodeInner::Linked {
+ next: prev_next_nn, ..
+ } => {
+ *prev_next_nn = next_nn;
+ }
+ _ => unreachable!(),
+ }
+ match unsafe { next_nn.as_mut() } {
+ NodeInner::Linked {
+ prev: next_prev_nn, ..
+ } => {
+ *next_prev_nn = prev_nn;
+ }
+ _ => unreachable!(),
+ }
+ }
+ }
+ }
+
+ /// Mark this node and all linked nodes for cancellation. Note that `self`
+ /// must refer to a head (`CancelHandle`) node.
+ fn cancel(&mut self) {
+ let mut head_nn = NonNull::from(self);
+ let mut item_nn;
+
+ // Mark the head node as canceled.
+ match replace(unsafe { head_nn.as_mut() }, NodeInner::Canceled) {
+ NodeInner::Linked {
+ kind: NodeKind::Head { .. },
+ next: next_nn,
+ ..
+ } => item_nn = next_nn,
+ NodeInner::Unlinked | NodeInner::Canceled => return,
+ _ => unreachable!(),
+ };
+
+ // Cancel all item nodes in the chain, waking each stored `Waker`.
+ while item_nn != head_nn {
+ match replace(unsafe { item_nn.as_mut() }, NodeInner::Canceled) {
+ NodeInner::Linked {
+ kind: NodeKind::Item { waker },
+ next: next_nn,
+ ..
+ } => {
+ waker.wake();
+ item_nn = next_nn;
+ }
+ _ => unreachable!(),
+ }
+ }
+ }
+
+ /// Returns true if this node has been marked for cancellation. Note that
+ /// `self` must refer to a head (`CancelHandle`) node.
+ fn is_canceled(&self) -> bool {
+ match self {
+ NodeInner::Unlinked => false,
+ NodeInner::Linked {
+ kind: NodeKind::Head { .. },
+ ..
+ } => false,
+ NodeInner::Canceled => true,
+ _ => unreachable!(),
+ }
+ }
+ }
+
+ #[derive(Debug)]
+ enum NodeKind {
+ /// In a chain of linked nodes, the "head" node is owned by the
+ /// `CancelHandle`. A chain usually contains at most one head node; however
+ /// when a `CancelHandle` is dropped before the futures associated with it
+ /// are dropped, a chain may temporarily contain no head node at all.
+ Head {
+ /// The `weak_pin` field adds adds a weak reference to the `Rc` guarding
+ /// the heap allocation that contains the `CancelHandle`. Without this
+ /// extra weak reference, `Rc::get_mut()` might succeed and allow the
+ /// `CancelHandle` to be moved when it isn't safe to do so.
+ weak_pin: Weak<dyn Any>,
+ },
+ /// All item nodes in a chain are associated with a `Cancelable` head node.
+ Item {
+ /// If this future indeed does get canceled, the waker is needed to make
+ /// sure that the canceled future gets polled as soon as possible.
+ waker: Waker,
+ },
+ }
+
+ impl NodeKind {
+ fn head(rc_pin: &Rc<dyn Any>) -> Self {
+ let weak_pin = Rc::downgrade(rc_pin);
+ Self::Head { weak_pin }
+ }
+
+ fn item(waker: &Waker) -> Self {
+ let waker = waker.clone();
+ Self::Item { waker }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::error::AnyError;
+ use futures::future::pending;
+ use futures::future::poll_fn;
+ use futures::future::ready;
+ use futures::future::FutureExt;
+ use futures::future::TryFutureExt;
+ use futures::select;
+ use futures::task::noop_waker_ref;
+ use futures::task::Context;
+ use futures::task::Poll;
+ use std::convert::Infallible as Never;
+ use std::io;
+ use tokio::net::TcpStream;
+ use tokio::spawn;
+
+ fn box_fused<'a, F: FusedFuture + 'a>(
+ future: F,
+ ) -> Pin<Box<dyn FusedFuture<Output = F::Output> + 'a>> {
+ Box::pin(future)
+ }
+
+ async fn ready_in_n(name: &str, count: usize) -> &str {
+ let mut remaining = count as isize;
+ poll_fn(|_| {
+ assert!(remaining >= 0);
+ if remaining == 0 {
+ Poll::Ready(name)
+ } else {
+ remaining -= 1;
+ Poll::Pending
+ }
+ })
+ .await
+ }
+
+ #[test]
+ fn cancel_future() {
+ let cancel_now = CancelHandle::new_rc();
+ let cancel_at_0 = CancelHandle::new_rc();
+ let cancel_at_1 = CancelHandle::new_rc();
+ let cancel_at_4 = CancelHandle::new_rc();
+ let cancel_never = CancelHandle::new_rc();
+
+ cancel_now.cancel();
+
+ let mut futures = vec![
+ box_fused(ready("A").or_cancel(&cancel_now)),
+ box_fused(ready("B").or_cancel(&cancel_at_0)),
+ box_fused(ready("C").or_cancel(&cancel_at_1)),
+ box_fused(
+ ready_in_n("D", 0)
+ .or_cancel(&cancel_never)
+ .try_or_cancel(&cancel_now),
+ ),
+ box_fused(
+ ready_in_n("E", 1)
+ .or_cancel(&cancel_at_1)
+ .try_or_cancel(&cancel_at_1),
+ ),
+ box_fused(ready_in_n("F", 2).or_cancel(&cancel_at_1)),
+ box_fused(ready_in_n("G", 3).or_cancel(&cancel_at_4)),
+ box_fused(ready_in_n("H", 4).or_cancel(&cancel_at_4)),
+ box_fused(ready_in_n("I", 5).or_cancel(&cancel_at_4)),
+ box_fused(ready_in_n("J", 5).map(Ok)),
+ box_fused(ready_in_n("K", 5).or_cancel(cancel_never)),
+ ];
+
+ let mut cx = Context::from_waker(noop_waker_ref());
+
+ for i in 0..=5 {
+ match i {
+ 0 => cancel_at_0.cancel(),
+ 1 => cancel_at_1.cancel(),
+ 4 => cancel_at_4.cancel(),
+ 2 | 3 | 5 => {}
+ _ => unreachable!(),
+ }
+
+ let results = futures
+ .iter_mut()
+ .filter(|fut| !fut.is_terminated())
+ .filter_map(|fut| match fut.poll_unpin(&mut cx) {
+ Poll::Pending => None,
+ Poll::Ready(res) => Some(res),
+ })
+ .collect::<Vec<_>>();
+
+ match i {
+ 0 => assert_eq!(
+ results,
+ [Err(Canceled), Err(Canceled), Ok("C"), Err(Canceled)]
+ ),
+ 1 => assert_eq!(results, [Ok("E"), Err(Canceled)]),
+ 2 => assert_eq!(results, []),
+ 3 => assert_eq!(results, [Ok("G")]),
+ 4 => assert_eq!(results, [Ok("H"), Err(Canceled)]),
+ 5 => assert_eq!(results, [Ok("J"), Ok("K")]),
+ _ => unreachable!(),
+ }
+ }
+
+ assert_eq!(futures.into_iter().any(|fut| !fut.is_terminated()), false);
+
+ let cancel_handles = [cancel_now, cancel_at_0, cancel_at_1, cancel_at_4];
+ assert_eq!(cancel_handles.iter().any(|c| !c.is_canceled()), false);
+ }
+
+ #[tokio::test]
+ async fn cancel_try_future() {
+ {
+ // Cancel a spawned task before it actually runs.
+ let cancel_handle = Rc::new(CancelHandle::new());
+ let future = spawn(async { panic!("the task should not be spawned") })
+ .map_err(AnyError::from)
+ .try_or_cancel(&cancel_handle);
+ cancel_handle.cancel();
+ let error = future.await.unwrap_err();
+ assert!(error.downcast_ref::<Canceled>().is_some());
+ assert_eq!(error.to_string().as_str(), "operation canceled");
+ }
+
+ {
+ // Cancel a network I/O future right after polling it.
+ let cancel_handle = Rc::new(CancelHandle::new());
+ let result = loop {
+ select! {
+ r = TcpStream::connect("1.2.3.4:12345")
+ .try_or_cancel(&cancel_handle) => break r,
+ default => cancel_handle.cancel(),
+ };
+ };
+ let error = result.unwrap_err();
+ assert_eq!(error.kind(), io::ErrorKind::Interrupted);
+ assert_eq!(error.to_string().as_str(), "operation canceled");
+ }
+ }
+
+ #[test]
+ fn cancel_handle_pinning() {
+ let mut cancel_handle = CancelHandle::new_rc();
+
+ // There is only one reference to `cancel_handle`, so `Rc::get_mut()` should
+ // succeed.
+ assert!(Rc::get_mut(&mut cancel_handle).is_some());
+
+ let mut future = pending::<Never>().or_cancel(&cancel_handle);
+ let future = unsafe { Pin::new_unchecked(&mut future) };
+
+ // There are two `Rc<CancelHandle>` references now, so this fails.
+ assert!(Rc::get_mut(&mut cancel_handle).is_none());
+
+ let mut cx = Context::from_waker(noop_waker_ref());
+ assert!(future.poll(&mut cx).is_pending());
+
+ // Polling `future` has established a link between the future and
+ // `cancel_handle`, so both values should be pinned at this point.
+ assert!(Rc::get_mut(&mut cancel_handle).is_none());
+
+ cancel_handle.cancel();
+
+ // Canceling or dropping the associated future(s) unlinks them from the
+ // cancel handle, therefore `cancel_handle` can now safely be moved again.
+ assert!(Rc::get_mut(&mut cancel_handle).is_some());
+ }
+}
diff --git a/core/async_cell.rs b/core/async_cell.rs
index a140dceb1..bf62692ed 100644
--- a/core/async_cell.rs
+++ b/core/async_cell.rs
@@ -126,6 +126,7 @@ impl<T> RcRef<AsyncRefCell<T>> {
/// let foo_rc: RcRef<u32> = RcRef::map(stuff_rc.clone(), |v| &v.foo);
/// let bar_rc: RcRef<String> = RcRef::map(stuff_rc, |v| &v.bar);
/// ```
+#[derive(Debug)]
pub struct RcRef<T> {
rc: Rc<dyn Any>,
value: *const T,
@@ -136,7 +137,7 @@ impl<T: 'static> RcRef<T> {
Self::from(Rc::new(value))
}
- pub fn map<S: 'static, R: i::RcLike<S>, F: FnOnce(&S) -> &T>(
+ pub fn map<S: 'static, R: RcLike<S>, F: FnOnce(&S) -> &T>(
source: R,
map_fn: F,
) -> RcRef<T> {
@@ -144,6 +145,11 @@ impl<T: 'static> RcRef<T> {
let value = map_fn(unsafe { &*value });
RcRef { rc, value }
}
+
+ pub(crate) fn split(rc_ref: &Self) -> (&T, &Rc<dyn Any>) {
+ let &Self { ref rc, value } = rc_ref;
+ (unsafe { &*value }, rc)
+ }
}
impl<T: Default + 'static> Default for RcRef<T> {
@@ -152,6 +158,21 @@ impl<T: Default + 'static> Default for RcRef<T> {
}
}
+impl<T> Clone for RcRef<T> {
+ fn clone(&self) -> Self {
+ Self {
+ rc: self.rc.clone(),
+ value: self.value,
+ }
+ }
+}
+
+impl<T: 'static> From<&RcRef<T>> for RcRef<T> {
+ fn from(rc_ref: &RcRef<T>) -> Self {
+ rc_ref.clone()
+ }
+}
+
impl<T: 'static> From<Rc<T>> for RcRef<T> {
fn from(rc: Rc<T>) -> Self {
Self {
@@ -161,12 +182,9 @@ impl<T: 'static> From<Rc<T>> for RcRef<T> {
}
}
-impl<T> Clone for RcRef<T> {
- fn clone(&self) -> Self {
- Self {
- rc: self.rc.clone(),
- value: self.value,
- }
+impl<T: 'static> From<&Rc<T>> for RcRef<T> {
+ fn from(rc: &Rc<T>) -> Self {
+ rc.clone().into()
}
}
@@ -189,8 +207,18 @@ impl<T> AsRef<T> for RcRef<T> {
}
}
+/// The `RcLike` trait provides an abstraction over `std::rc::Rc` and `RcRef`,
+/// so that applicable methods can operate on either type.
+pub trait RcLike<T>: AsRef<T> + Into<RcRef<T>> {}
+
+impl<T: 'static> RcLike<T> for Rc<T> {}
+impl<T: 'static> RcLike<T> for RcRef<T> {}
+impl<T: 'static> RcLike<T> for &Rc<T> {}
+impl<T: 'static> RcLike<T> for &RcRef<T> {}
+
mod internal {
use super::AsyncRefCell;
+ use super::RcLike;
use super::RcRef;
use futures::future::Future;
use futures::ready;
@@ -204,32 +232,29 @@ mod internal {
use std::ops::Deref;
use std::ops::DerefMut;
use std::pin::Pin;
- use std::rc::Rc;
impl<T> AsyncRefCell<T> {
/// Borrow the cell's contents synchronouslym without creating an
/// intermediate future. If the cell has already been borrowed and either
/// the existing or the requested borrow is exclusive, this function returns
- /// `None`.
- pub(super) fn borrow_sync<
- M: BorrowModeTrait,
- R: RcLike<AsyncRefCell<T>>,
- >(
- cell: &R,
+ /// `None`.
+ pub fn borrow_sync<M: BorrowModeTrait, R: RcLike<AsyncRefCell<T>>>(
+ cell: R,
) -> Option<AsyncBorrowImpl<T, M>> {
+ let cell_ref = cell.as_ref();
// Don't allow synchronous borrows to cut in line; if there are any
// enqueued waiters, return `None`, even if the current borrow is a shared
// one and the requested borrow is too.
- let waiters = unsafe { &mut *cell.waiters.as_ptr() };
+ let waiters = unsafe { &mut *cell_ref.waiters.as_ptr() };
if waiters.is_empty() {
// There are no enqueued waiters, but it is still possible that the cell
// is currently borrowed. If there are no current borrows, or both the
// existing and requested ones are shared, `try_add()` returns the
// adjusted borrow count.
let new_borrow_count =
- cell.borrow_count.get().try_add(M::borrow_mode())?;
- cell.borrow_count.set(new_borrow_count);
- Some(AsyncBorrowImpl::<T, M>::new(cell.clone().into()))
+ cell_ref.borrow_count.get().try_add(M::borrow_mode())?;
+ cell_ref.borrow_count.set(new_borrow_count);
+ Some(AsyncBorrowImpl::<T, M>::new(cell.into()))
} else {
None
}
@@ -359,10 +384,10 @@ mod internal {
}
impl<T, M: BorrowModeTrait> AsyncBorrowFutureImpl<T, M> {
- pub fn new<R: RcLike<AsyncRefCell<T>>>(cell: &R) -> Self {
+ pub fn new<R: RcLike<AsyncRefCell<T>>>(cell: R) -> Self {
Self {
- cell: Some(cell.clone().into()),
- id: cell.create_waiter::<M>(),
+ id: cell.as_ref().create_waiter::<M>(),
+ cell: Some(cell.into()),
_phantom: PhantomData,
}
}
@@ -561,13 +586,6 @@ mod internal {
self.waker.take()
}
}
-
- /// The `RcLike` trait provides an abstraction over `std::rc::Rc` and `RcRef`,
- /// so that applicable methods can operate on either type.
- pub trait RcLike<T>: Clone + Deref<Target = T> + Into<RcRef<T>> {}
-
- impl<T: 'static> RcLike<T> for Rc<T> {}
- impl<T: 'static> RcLike<T> for RcRef<T> {}
}
#[cfg(test)]
diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs
index 9af74d980..1d7a76c3d 100644
--- a/core/examples/http_bench_bin_ops.rs
+++ b/core/examples/http_bench_bin_ops.rs
@@ -3,10 +3,10 @@
#[macro_use]
extern crate log;
-use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
-use deno_core::AsyncRefFuture;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::JsRuntime;
use deno_core::Op;
use deno_core::OpState;
@@ -46,51 +46,65 @@ impl log::Log for Logger {
fn flush(&self) {}
}
-// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in
-// a cell, because it only supports one op (`accept`) which does not require
-// a mutable reference to the listener.
-struct TcpListener(AsyncRefCell<tokio::net::TcpListener>);
-
-impl Resource for TcpListener {}
+// Note: a `tokio::net::TcpListener` doesn't need to be wrapped in a cell,
+// because it only supports one op (`accept`) which does not require a mutable
+// reference to the listener.
+struct TcpListener {
+ inner: tokio::net::TcpListener,
+ cancel: CancelHandle,
+}
impl TcpListener {
- /// Returns a future that yields a shared borrow of the TCP listener.
- fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> {
- RcRef::map(self, |r| &r.0).borrow()
+ async fn accept(self: Rc<Self>) -> Result<TcpStream, Error> {
+ let cancel = RcRef::map(&self, |r| &r.cancel);
+ let stream = self.inner.accept().try_or_cancel(cancel).await?.0.into();
+ Ok(stream)
+ }
+}
+
+impl Resource for TcpListener {
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
}
}
impl TryFrom<std::net::TcpListener> for TcpListener {
type Error = Error;
- fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> {
- tokio::net::TcpListener::try_from(l)
- .map(AsyncRefCell::new)
- .map(Self)
+ fn try_from(
+ std_listener: std::net::TcpListener,
+ ) -> Result<Self, Self::Error> {
+ tokio::net::TcpListener::try_from(std_listener).map(|tokio_listener| Self {
+ inner: tokio_listener,
+ cancel: Default::default(),
+ })
}
}
struct TcpStream {
rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>,
wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>,
+ // When a `TcpStream` resource is closed, all pending 'read' ops are
+ // canceled, while 'write' ops are allowed to complete. Therefore only
+ // 'read' futures are attached to this cancel handle.
+ cancel: CancelHandle,
}
-impl Resource for TcpStream {}
-
impl TcpStream {
- /// Returns a future that yields an exclusive borrow of the read end of the
- /// tcp stream.
- fn rd_borrow_mut(
- self: Rc<Self>,
- ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> {
- RcRef::map(self, |r| &r.rd).borrow_mut()
+ async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, Error> {
+ let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ rd.read(buf).try_or_cancel(cancel).await
}
- /// Returns a future that yields an exclusive borrow of the write end of the
- /// tcp stream.
- fn wr_borrow_mut(
- self: Rc<Self>,
- ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> {
- RcRef::map(self, |r| &r.wr).borrow_mut()
+ async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, Error> {
+ let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
+ wr.write(buf).await
+ }
+}
+
+impl Resource for TcpStream {
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
}
}
@@ -100,6 +114,7 @@ impl From<tokio::net::TcpStream> for TcpStream {
Self {
rd: rd.into(),
wr: wr.into(),
+ cancel: Default::default(),
}
}
}
@@ -179,14 +194,12 @@ async fn op_accept(
) -> Result<u32, Error> {
debug!("accept rid={}", rid);
- let listener_rc = state
+ let listener = state
.borrow()
.resource_table_2
.get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
- let listener_ref = listener_rc.borrow().await;
-
- let stream: TcpStream = listener_ref.accept().await?.0.into();
+ let stream = listener.accept().await?;
let rid = state.borrow_mut().resource_table_2.add(stream);
Ok(rid)
}
@@ -199,14 +212,12 @@ async fn op_read(
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
debug!("read rid={}", rid);
- let stream_rc = state
+ let stream = state
.borrow()
.resource_table_2
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
- let mut rd_stream_mut = stream_rc.rd_borrow_mut().await;
-
- rd_stream_mut.read(&mut bufs[0]).await
+ stream.read(&mut bufs[0]).await
}
async fn op_write(
@@ -217,14 +228,12 @@ async fn op_write(
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
debug!("write rid={}", rid);
- let stream_rc = state
+ let stream = state
.borrow()
.resource_table_2
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
- let mut wr_stream_mut = stream_rc.wr_borrow_mut().await;
-
- wr_stream_mut.write(&bufs[0]).await
+ stream.write(&bufs[0]).await
}
fn register_op_bin_sync<F>(
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs
index 77f5b9dbe..c4fcd6363 100644
--- a/core/examples/http_bench_json_ops.rs
+++ b/core/examples/http_bench_json_ops.rs
@@ -5,10 +5,10 @@ extern crate log;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
-use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
-use deno_core::AsyncRefFuture;
use deno_core::BufVec;
+use deno_core::CancelHandle;
+use deno_core::CancelTryFuture;
use deno_core::JsRuntime;
use deno_core::OpState;
use deno_core::RcRef;
@@ -41,51 +41,65 @@ impl log::Log for Logger {
fn flush(&self) {}
}
-// Note: it isn't actually necessary to wrap the `tokio::net::TcpListener` in
-// a cell, because it only supports one op (`accept`) which does not require
-// a mutable reference to the listener.
-struct TcpListener(AsyncRefCell<tokio::net::TcpListener>);
-
-impl Resource for TcpListener {}
+// Note: a `tokio::net::TcpListener` doesn't need to be wrapped in a cell,
+// because it only supports one op (`accept`) which does not require a mutable
+// reference to the listener.
+struct TcpListener {
+ inner: tokio::net::TcpListener,
+ cancel: CancelHandle,
+}
impl TcpListener {
- /// Returns a future that yields a shared borrow of the TCP listener.
- fn borrow(self: Rc<Self>) -> AsyncRefFuture<tokio::net::TcpListener> {
- RcRef::map(self, |r| &r.0).borrow()
+ async fn accept(self: Rc<Self>) -> Result<TcpStream, Error> {
+ let cancel = RcRef::map(&self, |r| &r.cancel);
+ let stream = self.inner.accept().try_or_cancel(cancel).await?.0.into();
+ Ok(stream)
+ }
+}
+
+impl Resource for TcpListener {
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel();
}
}
impl TryFrom<std::net::TcpListener> for TcpListener {
type Error = Error;
- fn try_from(l: std::net::TcpListener) -> Result<Self, Self::Error> {
- tokio::net::TcpListener::try_from(l)
- .map(AsyncRefCell::new)
- .map(Self)
+ fn try_from(
+ std_listener: std::net::TcpListener,
+ ) -> Result<Self, Self::Error> {
+ tokio::net::TcpListener::try_from(std_listener).map(|tokio_listener| Self {
+ inner: tokio_listener,
+ cancel: Default::default(),
+ })
}
}
struct TcpStream {
rd: AsyncRefCell<tokio::net::tcp::OwnedReadHalf>,
wr: AsyncRefCell<tokio::net::tcp::OwnedWriteHalf>,
+ // When a `TcpStream` resource is closed, all pending 'read' ops are
+ // canceled, while 'write' ops are allowed to complete. Therefore only
+ // 'read' futures are attached to this cancel handle.
+ cancel: CancelHandle,
}
-impl Resource for TcpStream {}
-
impl TcpStream {
- /// Returns a future that yields an exclusive borrow of the read end of the
- /// tcp stream.
- fn rd_borrow_mut(
- self: Rc<Self>,
- ) -> AsyncMutFuture<tokio::net::tcp::OwnedReadHalf> {
- RcRef::map(self, |r| &r.rd).borrow_mut()
+ async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, Error> {
+ let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
+ let cancel = RcRef::map(self, |r| &r.cancel);
+ rd.read(buf).try_or_cancel(cancel).await
}
- /// Returns a future that yields an exclusive borrow of the write end of the
- /// tcp stream.
- fn wr_borrow_mut(
- self: Rc<Self>,
- ) -> AsyncMutFuture<tokio::net::tcp::OwnedWriteHalf> {
- RcRef::map(self, |r| &r.wr).borrow_mut()
+ async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, Error> {
+ let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
+ wr.write(buf).await
+ }
+}
+
+impl Resource for TcpStream {
+ fn close(self: Rc<Self>) {
+ self.cancel.cancel()
}
}
@@ -95,6 +109,7 @@ impl From<tokio::net::TcpStream> for TcpStream {
Self {
rd: rd.into(),
wr: wr.into(),
+ cancel: Default::default(),
}
}
}
@@ -157,14 +172,12 @@ async fn op_accept(
.unwrap();
debug!("accept rid={}", rid);
- let listener_rc = state
+ let listener = state
.borrow()
.resource_table_2
.get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
- let listener_ref = listener_rc.borrow().await;
-
- let stream: TcpStream = listener_ref.accept().await?.0.into();
+ let stream = listener.accept().await?;
let rid = state.borrow_mut().resource_table_2.add(stream);
Ok(serde_json::json!({ "rid": rid }))
}
@@ -184,14 +197,12 @@ async fn op_read(
.unwrap();
debug!("read rid={}", rid);
- let stream_rc = state
+ let stream = state
.borrow()
.resource_table_2
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
- let mut rd_stream_mut = stream_rc.rd_borrow_mut().await;
-
- let nread = rd_stream_mut.read(&mut bufs[0]).await?;
+ let nread = stream.read(&mut bufs[0]).await?;
Ok(serde_json::json!({ "nread": nread }))
}
@@ -210,14 +221,12 @@ async fn op_write(
.unwrap();
debug!("write rid={}", rid);
- let stream_rc = state
+ let stream = state
.borrow()
.resource_table_2
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
- let mut wr_stream_mut = stream_rc.wr_borrow_mut().await;
-
- let nwritten = wr_stream_mut.write(&bufs[0]).await?;
+ let nwritten = stream.write(&bufs[0]).await?;
Ok(serde_json::json!({ "nwritten": nwritten }))
}
diff --git a/core/lib.rs b/core/lib.rs
index 372cd558e..20ee5a3d5 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -5,6 +5,7 @@ extern crate lazy_static;
#[macro_use]
extern crate log;
+mod async_cancel;
mod async_cell;
mod bindings;
pub mod error;
@@ -28,11 +29,18 @@ pub use serde;
pub use serde_json;
pub use url;
+pub use crate::async_cancel::CancelFuture;
+pub use crate::async_cancel::CancelHandle;
+pub use crate::async_cancel::CancelTryFuture;
+pub use crate::async_cancel::Cancelable;
+pub use crate::async_cancel::Canceled;
+pub use crate::async_cancel::TryCancelable;
pub use crate::async_cell::AsyncMut;
pub use crate::async_cell::AsyncMutFuture;
pub use crate::async_cell::AsyncRef;
pub use crate::async_cell::AsyncRefCell;
pub use crate::async_cell::AsyncRefFuture;
+pub use crate::async_cell::RcLike;
pub use crate::async_cell::RcRef;
pub use crate::flags::v8_set_flags;
pub use crate::module_specifier::ModuleResolutionError;
diff --git a/core/resources2.rs b/core/resources2.rs
index 62cb3f056..92548a556 100644
--- a/core/resources2.rs
+++ b/core/resources2.rs
@@ -24,6 +24,11 @@ pub trait Resource: Any + 'static {
fn name(&self) -> Cow<str> {
type_name::<Self>().into()
}
+
+ /// Resources may implement the `close()` trait method if they need to do
+ /// resource specific clean-ups, such as cancelling pending futures, after a
+ /// resource has been removed from the resource table.
+ fn close(self: Rc<Self>) {}
}
impl dyn Resource {
@@ -117,7 +122,7 @@ impl ResourceTable {
/// cause the resource to be dropped. However, since resources are reference
/// counted, therefore pending ops are not automatically cancelled.
pub fn close(&mut self, rid: ResourceId) -> Option<()> {
- self.index.remove(&rid).map(|_| ())
+ self.index.remove(&rid).map(|resource| resource.close())
}
/// Returns an iterator that yields a `(id, name)` pair for every resource