summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock7
-rw-r--r--core/Cargo.toml1
-rw-r--r--core/joinset.rs92
-rw-r--r--core/lib.rs1
-rw-r--r--core/runtime/jsrealm.rs6
-rw-r--r--core/runtime/jsruntime.rs12
-rw-r--r--core/runtime/ops.rs23
-rw-r--r--core/runtime/tests.rs90
8 files changed, 204 insertions, 28 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ba0e8929b..2a4389b30 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -608,6 +608,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
+name = "cooked-waker"
+version = "5.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "147be55d677052dabc6b22252d5dd0fd4c29c8c27aa4f2fbef0f94aa003b406f"
+
+[[package]]
name = "core-foundation"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -971,6 +977,7 @@ version = "0.191.0"
dependencies = [
"anyhow",
"bytes",
+ "cooked-waker",
"deno_ast",
"deno_ops",
"futures",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 31f11ae3a..e8e659726 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -46,4 +46,5 @@ path = "examples/http_bench_json_ops/main.rs"
# These dependencies are only used for the 'http_bench_*_ops' examples.
[dev-dependencies]
+cooked-waker = "5"
deno_ast.workspace = true
diff --git a/core/joinset.rs b/core/joinset.rs
new file mode 100644
index 000000000..f80c95712
--- /dev/null
+++ b/core/joinset.rs
@@ -0,0 +1,92 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+// Some code and comments under MIT license where adapted from Tokio code
+// Copyright (c) 2023 Tokio Contributors
+
+use std::task::Context;
+use std::task::Poll;
+use std::task::Waker;
+
+use futures::Future;
+use tokio::task::AbortHandle;
+use tokio::task::JoinError;
+
+use crate::task::MaskFutureAsSend;
+use crate::task::MaskResultAsSend;
+
+/// Wraps the tokio [`JoinSet`] to make it !Send-friendly and to make it easier and safer for us to
+/// poll while empty.
+pub(crate) struct JoinSet<T> {
+ joinset: tokio::task::JoinSet<MaskResultAsSend<T>>,
+ /// If join_next returns Ready(None), we stash the waker
+ waker: Option<Waker>,
+}
+
+impl<T> Default for JoinSet<T> {
+ fn default() -> Self {
+ Self {
+ joinset: Default::default(),
+ waker: None,
+ }
+ }
+}
+
+impl<T: 'static> JoinSet<T> {
+ /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
+ /// that can be used to remotely cancel the task.
+ ///
+ /// The provided future will start running in the background immediately
+ /// when this method is called, even if you don't await anything on this
+ /// `JoinSet`.
+ ///
+ /// # Panics
+ ///
+ /// This method panics if called outside of a Tokio runtime.
+ ///
+ /// [`AbortHandle`]: tokio::task::AbortHandle
+ #[track_caller]
+ pub fn spawn<F>(&mut self, task: F) -> AbortHandle
+ where
+ F: Future<Output = T>,
+ F: 'static,
+ T: 'static,
+ {
+ // SAFETY: We only use this with the single-thread executor
+ let handle = self.joinset.spawn(unsafe { MaskFutureAsSend::new(task) });
+
+ // If someone had called poll_join_next while we were empty, ask them to poll again
+ // so we can properly register the waker with the underlying JoinSet.
+ if let Some(waker) = self.waker.take() {
+ waker.wake();
+ }
+ handle
+ }
+
+ /// Returns the number of tasks currently in the `JoinSet`.
+ pub fn len(&self) -> usize {
+ self.joinset.len()
+ }
+
+ /// Waits until one of the tasks in the set completes and returns its output.
+ ///
+ /// # Cancel Safety
+ ///
+ /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
+ /// statement and some other branch completes first, it is guaranteed that no tasks were
+ /// removed from this `JoinSet`.
+ pub fn poll_join_next(
+ &mut self,
+ cx: &mut Context,
+ ) -> Poll<Result<T, JoinError>> {
+ // TODO(mmastrac): Use poll_join_next from Tokio
+ let next = std::pin::pin!(self.joinset.join_next());
+ match next.poll(cx) {
+ Poll::Ready(Some(res)) => Poll::Ready(res.map(|res| res.into_inner())),
+ Poll::Ready(None) => {
+ // Stash waker
+ self.waker = Some(cx.waker().clone());
+ Poll::Pending
+ }
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
diff --git a/core/lib.rs b/core/lib.rs
index 1042bf55c..250d7dead 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -9,6 +9,7 @@ mod flags;
mod gotham_state;
mod inspector;
mod io;
+mod joinset;
mod module_specifier;
mod modules;
mod normalize_path;
diff --git a/core/runtime/jsrealm.rs b/core/runtime/jsrealm.rs
index f41f2bbb2..72818eebe 100644
--- a/core/runtime/jsrealm.rs
+++ b/core/runtime/jsrealm.rs
@@ -1,10 +1,10 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use super::bindings;
use crate::error::exception_to_err_result;
+use crate::joinset::JoinSet;
use crate::modules::ModuleCode;
use crate::ops::OpCtx;
use crate::runtime::JsRuntimeState;
-use crate::task::MaskResultAsSend;
use crate::JsRuntime;
use crate::OpId;
use crate::OpResult;
@@ -17,7 +17,6 @@ use std::hash::BuildHasherDefault;
use std::hash::Hasher;
use std::option::Option;
use std::rc::Rc;
-use tokio::task::JoinSet;
use v8::HandleScope;
use v8::Local;
@@ -50,8 +49,7 @@ pub(crate) struct ContextState {
pub(crate) pending_promise_rejections:
VecDeque<(v8::Global<v8::Promise>, v8::Global<v8::Value>)>,
pub(crate) unrefed_ops: HashSet<i32, BuildHasherDefault<IdentityHasher>>,
- pub(crate) pending_ops:
- JoinSet<MaskResultAsSend<(PromiseId, OpId, OpResult)>>,
+ pub(crate) pending_ops: JoinSet<(PromiseId, OpId, OpResult)>,
// We don't explicitly re-read this prop but need the slice to live alongside
// the context
pub(crate) op_ctxs: Box<[OpCtx]>,
diff --git a/core/runtime/jsruntime.rs b/core/runtime/jsruntime.rs
index 7f9e2dcd8..3b2ac49aa 100644
--- a/core/runtime/jsruntime.rs
+++ b/core/runtime/jsruntime.rs
@@ -37,7 +37,6 @@ use anyhow::Context as AnyhowContext;
use anyhow::Error;
use futures::channel::oneshot;
use futures::future::poll_fn;
-use futures::future::Future;
use futures::stream::StreamExt;
use smallvec::SmallVec;
use std::any::Any;
@@ -2261,14 +2260,11 @@ impl JsRuntime {
SmallVec::with_capacity(32);
loop {
- let item = {
- let next = std::pin::pin!(context_state.pending_ops.join_next());
- let Poll::Ready(Some(item)) = next.poll(cx) else {
- break;
- };
- item
+ let Poll::Ready(item) = context_state.pending_ops.poll_join_next(cx) else {
+ break;
};
- let (promise_id, op_id, mut resp) = item.unwrap().into_inner();
+ // TODO(mmastrac): If this task is really errored, things could be pretty bad
+ let (promise_id, op_id, mut resp) = item.unwrap();
state
.borrow()
.op_state
diff --git a/core/runtime/ops.rs b/core/runtime/ops.rs
index 5e51414d3..1c871cda0 100644
--- a/core/runtime/ops.rs
+++ b/core/runtime/ops.rs
@@ -25,10 +25,11 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
state.get_error_class_fn
};
let fut = op.map(|result| crate::_ops::to_op_result(get_class, result));
- // SAFETY: this is guaranteed to be running on a current-thread executor
- ctx.context_state.borrow_mut().pending_ops.spawn(unsafe {
- crate::task::MaskFutureAsSend::new(OpCall::new(ctx, promise_id, fut))
- });
+ ctx
+ .context_state
+ .borrow_mut()
+ .pending_ops
+ .spawn(OpCall::new(ctx, promise_id, fut));
}
#[inline]
@@ -123,12 +124,7 @@ pub fn queue_async_op<'s>(
Poll::Pending => {}
Poll::Ready(mut res) => {
if deferred {
- ctx
- .context_state
- .borrow_mut()
- .pending_ops
- // SAFETY: this is guaranteed to be running on a current-thread executor
- .spawn(unsafe { crate::task::MaskFutureAsSend::new(ready(res)) });
+ ctx.context_state.borrow_mut().pending_ops.spawn(ready(res));
return None;
} else {
ctx.state.borrow_mut().tracker.track_async_completed(ctx.id);
@@ -137,12 +133,7 @@ pub fn queue_async_op<'s>(
}
}
- ctx
- .context_state
- .borrow_mut()
- .pending_ops
- // SAFETY: this is guaranteed to be running on a current-thread executor
- .spawn(unsafe { crate::task::MaskFutureAsSend::new(pinned) });
+ ctx.context_state.borrow_mut().pending_ops.spawn(pinned);
None
}
diff --git a/core/runtime/tests.rs b/core/runtime/tests.rs
index dbfeecf3c..663645bb1 100644
--- a/core/runtime/tests.rs
+++ b/core/runtime/tests.rs
@@ -21,6 +21,9 @@ use crate::Extension;
use crate::JsBuffer;
use crate::*;
use anyhow::Error;
+use cooked_waker::IntoWaker;
+use cooked_waker::Wake;
+use cooked_waker::WakeRef;
use deno_ops::op;
use futures::future::poll_fn;
use futures::future::Future;
@@ -28,11 +31,14 @@ use futures::FutureExt;
use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::AtomicI8;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
+use std::time::Duration;
// deno_ops macros generate code assuming deno_core in scope.
mod deno_core {
@@ -264,6 +270,90 @@ fn test_execute_script_return_value() {
}
}
+#[derive(Default)]
+struct LoggingWaker {
+ woken: AtomicBool,
+}
+
+impl Wake for LoggingWaker {
+ fn wake(self) {
+ self.woken.store(true, Ordering::SeqCst);
+ }
+}
+
+impl WakeRef for LoggingWaker {
+ fn wake_by_ref(&self) {
+ self.woken.store(true, Ordering::SeqCst);
+ }
+}
+
+/// This is a reproduction for a very obscure bug where the Deno runtime locks up we end up polling
+/// an empty JoinSet and attempt to resolve ops after-the-fact. There's a small footgun in the JoinSet
+/// API where polling it while empty returns Ready(None), which means that it never holds on to the
+/// waker. This means that if we aren't testing for this particular return value and don't stash the waker
+/// ourselves for a future async op to eventually queue, we can end up losing the waker entirely and the
+/// op wakes up, notifies tokio, which notifies the JoinSet, which then has nobody to notify )`:.
+#[tokio::test]
+async fn test_wakers_for_async_ops() {
+ static STATE: AtomicI8 = AtomicI8::new(0);
+
+ #[op]
+ async fn op_async_sleep() -> Result<(), Error> {
+ STATE.store(1, Ordering::SeqCst);
+ tokio::time::sleep(std::time::Duration::from_millis(1)).await;
+ STATE.store(2, Ordering::SeqCst);
+ Ok(())
+ }
+
+ STATE.store(0, Ordering::SeqCst);
+
+ let logging_waker = Arc::new(LoggingWaker::default());
+ let waker = logging_waker.clone().into_waker();
+
+ deno_core::extension!(test_ext, ops = [op_async_sleep]);
+ let mut runtime = JsRuntime::new(RuntimeOptions {
+ extensions: vec![test_ext::init_ops()],
+ ..Default::default()
+ });
+
+ // Drain events until we get to Ready
+ loop {
+ logging_waker.woken.store(false, Ordering::SeqCst);
+ let res = runtime.poll_event_loop(&mut Context::from_waker(&waker), false);
+ let ready = matches!(res, Poll::Ready(Ok(())));
+ assert!(ready || logging_waker.woken.load(Ordering::SeqCst));
+ if ready {
+ break;
+ }
+ }
+
+ // Start the AIIFE
+ runtime
+ .execute_script(
+ "",
+ FastString::from_static(
+ "(async () => { await Deno.core.opAsync('op_async_sleep'); })()",
+ ),
+ )
+ .unwrap();
+
+ // Wait for future to finish
+ while STATE.load(Ordering::SeqCst) < 2 {
+ tokio::time::sleep(Duration::from_millis(1)).await;
+ }
+
+ // This shouldn't take one minute, but if it does, things are definitely locked up
+ for _ in 0..Duration::from_secs(60).as_millis() {
+ if logging_waker.woken.load(Ordering::SeqCst) {
+ // Success
+ return;
+ }
+ tokio::time::sleep(Duration::from_millis(1)).await;
+ }
+
+ panic!("The waker was never woken after the future completed");
+}
+
#[tokio::test]
async fn test_poll_value() {
let mut runtime = JsRuntime::new(Default::default());