diff options
-rw-r--r-- | Cargo.lock | 7 | ||||
-rw-r--r-- | core/Cargo.toml | 1 | ||||
-rw-r--r-- | core/joinset.rs | 92 | ||||
-rw-r--r-- | core/lib.rs | 1 | ||||
-rw-r--r-- | core/runtime/jsrealm.rs | 6 | ||||
-rw-r--r-- | core/runtime/jsruntime.rs | 12 | ||||
-rw-r--r-- | core/runtime/ops.rs | 23 | ||||
-rw-r--r-- | core/runtime/tests.rs | 90 |
8 files changed, 204 insertions, 28 deletions
diff --git a/Cargo.lock b/Cargo.lock index ba0e8929b..2a4389b30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -608,6 +608,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" [[package]] +name = "cooked-waker" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147be55d677052dabc6b22252d5dd0fd4c29c8c27aa4f2fbef0f94aa003b406f" + +[[package]] name = "core-foundation" version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -971,6 +977,7 @@ version = "0.191.0" dependencies = [ "anyhow", "bytes", + "cooked-waker", "deno_ast", "deno_ops", "futures", diff --git a/core/Cargo.toml b/core/Cargo.toml index 31f11ae3a..e8e659726 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -46,4 +46,5 @@ path = "examples/http_bench_json_ops/main.rs" # These dependencies are only used for the 'http_bench_*_ops' examples. [dev-dependencies] +cooked-waker = "5" deno_ast.workspace = true diff --git a/core/joinset.rs b/core/joinset.rs new file mode 100644 index 000000000..f80c95712 --- /dev/null +++ b/core/joinset.rs @@ -0,0 +1,92 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// Some code and comments under MIT license where adapted from Tokio code +// Copyright (c) 2023 Tokio Contributors + +use std::task::Context; +use std::task::Poll; +use std::task::Waker; + +use futures::Future; +use tokio::task::AbortHandle; +use tokio::task::JoinError; + +use crate::task::MaskFutureAsSend; +use crate::task::MaskResultAsSend; + +/// Wraps the tokio [`JoinSet`] to make it !Send-friendly and to make it easier and safer for us to +/// poll while empty. +pub(crate) struct JoinSet<T> { + joinset: tokio::task::JoinSet<MaskResultAsSend<T>>, + /// If join_next returns Ready(None), we stash the waker + waker: Option<Waker>, +} + +impl<T> Default for JoinSet<T> { + fn default() -> Self { + Self { + joinset: Default::default(), + waker: None, + } + } +} + +impl<T: 'static> JoinSet<T> { + /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`] + /// that can be used to remotely cancel the task. + /// + /// The provided future will start running in the background immediately + /// when this method is called, even if you don't await anything on this + /// `JoinSet`. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// + /// [`AbortHandle`]: tokio::task::AbortHandle + #[track_caller] + pub fn spawn<F>(&mut self, task: F) -> AbortHandle + where + F: Future<Output = T>, + F: 'static, + T: 'static, + { + // SAFETY: We only use this with the single-thread executor + let handle = self.joinset.spawn(unsafe { MaskFutureAsSend::new(task) }); + + // If someone had called poll_join_next while we were empty, ask them to poll again + // so we can properly register the waker with the underlying JoinSet. + if let Some(waker) = self.waker.take() { + waker.wake(); + } + handle + } + + /// Returns the number of tasks currently in the `JoinSet`. + pub fn len(&self) -> usize { + self.joinset.len() + } + + /// Waits until one of the tasks in the set completes and returns its output. + /// + /// # Cancel Safety + /// + /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!` + /// statement and some other branch completes first, it is guaranteed that no tasks were + /// removed from this `JoinSet`. + pub fn poll_join_next( + &mut self, + cx: &mut Context, + ) -> Poll<Result<T, JoinError>> { + // TODO(mmastrac): Use poll_join_next from Tokio + let next = std::pin::pin!(self.joinset.join_next()); + match next.poll(cx) { + Poll::Ready(Some(res)) => Poll::Ready(res.map(|res| res.into_inner())), + Poll::Ready(None) => { + // Stash waker + self.waker = Some(cx.waker().clone()); + Poll::Pending + } + Poll::Pending => Poll::Pending, + } + } +} diff --git a/core/lib.rs b/core/lib.rs index 1042bf55c..250d7dead 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -9,6 +9,7 @@ mod flags; mod gotham_state; mod inspector; mod io; +mod joinset; mod module_specifier; mod modules; mod normalize_path; diff --git a/core/runtime/jsrealm.rs b/core/runtime/jsrealm.rs index f41f2bbb2..72818eebe 100644 --- a/core/runtime/jsrealm.rs +++ b/core/runtime/jsrealm.rs @@ -1,10 +1,10 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use super::bindings; use crate::error::exception_to_err_result; +use crate::joinset::JoinSet; use crate::modules::ModuleCode; use crate::ops::OpCtx; use crate::runtime::JsRuntimeState; -use crate::task::MaskResultAsSend; use crate::JsRuntime; use crate::OpId; use crate::OpResult; @@ -17,7 +17,6 @@ use std::hash::BuildHasherDefault; use std::hash::Hasher; use std::option::Option; use std::rc::Rc; -use tokio::task::JoinSet; use v8::HandleScope; use v8::Local; @@ -50,8 +49,7 @@ pub(crate) struct ContextState { pub(crate) pending_promise_rejections: VecDeque<(v8::Global<v8::Promise>, v8::Global<v8::Value>)>, pub(crate) unrefed_ops: HashSet<i32, BuildHasherDefault<IdentityHasher>>, - pub(crate) pending_ops: - JoinSet<MaskResultAsSend<(PromiseId, OpId, OpResult)>>, + pub(crate) pending_ops: JoinSet<(PromiseId, OpId, OpResult)>, // We don't explicitly re-read this prop but need the slice to live alongside // the context pub(crate) op_ctxs: Box<[OpCtx]>, diff --git a/core/runtime/jsruntime.rs b/core/runtime/jsruntime.rs index 7f9e2dcd8..3b2ac49aa 100644 --- a/core/runtime/jsruntime.rs +++ b/core/runtime/jsruntime.rs @@ -37,7 +37,6 @@ use anyhow::Context as AnyhowContext; use anyhow::Error; use futures::channel::oneshot; use futures::future::poll_fn; -use futures::future::Future; use futures::stream::StreamExt; use smallvec::SmallVec; use std::any::Any; @@ -2261,14 +2260,11 @@ impl JsRuntime { SmallVec::with_capacity(32); loop { - let item = { - let next = std::pin::pin!(context_state.pending_ops.join_next()); - let Poll::Ready(Some(item)) = next.poll(cx) else { - break; - }; - item + let Poll::Ready(item) = context_state.pending_ops.poll_join_next(cx) else { + break; }; - let (promise_id, op_id, mut resp) = item.unwrap().into_inner(); + // TODO(mmastrac): If this task is really errored, things could be pretty bad + let (promise_id, op_id, mut resp) = item.unwrap(); state .borrow() .op_state diff --git a/core/runtime/ops.rs b/core/runtime/ops.rs index 5e51414d3..1c871cda0 100644 --- a/core/runtime/ops.rs +++ b/core/runtime/ops.rs @@ -25,10 +25,11 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>( state.get_error_class_fn }; let fut = op.map(|result| crate::_ops::to_op_result(get_class, result)); - // SAFETY: this is guaranteed to be running on a current-thread executor - ctx.context_state.borrow_mut().pending_ops.spawn(unsafe { - crate::task::MaskFutureAsSend::new(OpCall::new(ctx, promise_id, fut)) - }); + ctx + .context_state + .borrow_mut() + .pending_ops + .spawn(OpCall::new(ctx, promise_id, fut)); } #[inline] @@ -123,12 +124,7 @@ pub fn queue_async_op<'s>( Poll::Pending => {} Poll::Ready(mut res) => { if deferred { - ctx - .context_state - .borrow_mut() - .pending_ops - // SAFETY: this is guaranteed to be running on a current-thread executor - .spawn(unsafe { crate::task::MaskFutureAsSend::new(ready(res)) }); + ctx.context_state.borrow_mut().pending_ops.spawn(ready(res)); return None; } else { ctx.state.borrow_mut().tracker.track_async_completed(ctx.id); @@ -137,12 +133,7 @@ pub fn queue_async_op<'s>( } } - ctx - .context_state - .borrow_mut() - .pending_ops - // SAFETY: this is guaranteed to be running on a current-thread executor - .spawn(unsafe { crate::task::MaskFutureAsSend::new(pinned) }); + ctx.context_state.borrow_mut().pending_ops.spawn(pinned); None } diff --git a/core/runtime/tests.rs b/core/runtime/tests.rs index dbfeecf3c..663645bb1 100644 --- a/core/runtime/tests.rs +++ b/core/runtime/tests.rs @@ -21,6 +21,9 @@ use crate::Extension; use crate::JsBuffer; use crate::*; use anyhow::Error; +use cooked_waker::IntoWaker; +use cooked_waker::Wake; +use cooked_waker::WakeRef; use deno_ops::op; use futures::future::poll_fn; use futures::future::Future; @@ -28,11 +31,14 @@ use futures::FutureExt; use std::cell::RefCell; use std::pin::Pin; use std::rc::Rc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicI8; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::time::Duration; // deno_ops macros generate code assuming deno_core in scope. mod deno_core { @@ -264,6 +270,90 @@ fn test_execute_script_return_value() { } } +#[derive(Default)] +struct LoggingWaker { + woken: AtomicBool, +} + +impl Wake for LoggingWaker { + fn wake(self) { + self.woken.store(true, Ordering::SeqCst); + } +} + +impl WakeRef for LoggingWaker { + fn wake_by_ref(&self) { + self.woken.store(true, Ordering::SeqCst); + } +} + +/// This is a reproduction for a very obscure bug where the Deno runtime locks up we end up polling +/// an empty JoinSet and attempt to resolve ops after-the-fact. There's a small footgun in the JoinSet +/// API where polling it while empty returns Ready(None), which means that it never holds on to the +/// waker. This means that if we aren't testing for this particular return value and don't stash the waker +/// ourselves for a future async op to eventually queue, we can end up losing the waker entirely and the +/// op wakes up, notifies tokio, which notifies the JoinSet, which then has nobody to notify )`:. +#[tokio::test] +async fn test_wakers_for_async_ops() { + static STATE: AtomicI8 = AtomicI8::new(0); + + #[op] + async fn op_async_sleep() -> Result<(), Error> { + STATE.store(1, Ordering::SeqCst); + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + STATE.store(2, Ordering::SeqCst); + Ok(()) + } + + STATE.store(0, Ordering::SeqCst); + + let logging_waker = Arc::new(LoggingWaker::default()); + let waker = logging_waker.clone().into_waker(); + + deno_core::extension!(test_ext, ops = [op_async_sleep]); + let mut runtime = JsRuntime::new(RuntimeOptions { + extensions: vec![test_ext::init_ops()], + ..Default::default() + }); + + // Drain events until we get to Ready + loop { + logging_waker.woken.store(false, Ordering::SeqCst); + let res = runtime.poll_event_loop(&mut Context::from_waker(&waker), false); + let ready = matches!(res, Poll::Ready(Ok(()))); + assert!(ready || logging_waker.woken.load(Ordering::SeqCst)); + if ready { + break; + } + } + + // Start the AIIFE + runtime + .execute_script( + "", + FastString::from_static( + "(async () => { await Deno.core.opAsync('op_async_sleep'); })()", + ), + ) + .unwrap(); + + // Wait for future to finish + while STATE.load(Ordering::SeqCst) < 2 { + tokio::time::sleep(Duration::from_millis(1)).await; + } + + // This shouldn't take one minute, but if it does, things are definitely locked up + for _ in 0..Duration::from_secs(60).as_millis() { + if logging_waker.woken.load(Ordering::SeqCst) { + // Success + return; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + + panic!("The waker was never woken after the future completed"); +} + #[tokio::test] async fn test_poll_value() { let mut runtime = JsRuntime::new(Default::default()); |