diff options
| author | Matt Mastracci <matthew@mastracci.com> | 2023-06-29 10:01:54 -0600 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-06-29 16:01:54 +0000 |
| commit | 98df69fd4cbe3687e2ff3519fbd6bff4e5f3101f (patch) | |
| tree | 0c959f6b987f0398c29d7048ab1ec3f0734877cb /core/runtime | |
| parent | 93b3ff017078b2c1e993457ef43af6b52e715ba6 (diff) | |
fix(core): Ensure we don't lose the waker when polling an empty JoinSet (#19655)
This is a reproduction and fix for a very obscure bug where the Deno
runtime locks up we end up polling an empty JoinSet and attempt to
resolve ops after-the-fact. There's a small footgun in the JoinSet API
where polling it while empty returns Ready(None), which means that it
never holds on to the waker. This means that if we aren't testing for
this particular return value and don't stash the waker ourselves for a
future async op to eventually queue, we can end up losing the waker
entirely and the op wakes up, notifies tokio, which notifies the
JoinSet, which then has nobody to notify 😢.
Co-authored-by: Luca Casonato <hello@lcas.dev>
Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
Diffstat (limited to 'core/runtime')
| -rw-r--r-- | core/runtime/jsrealm.rs | 6 | ||||
| -rw-r--r-- | core/runtime/jsruntime.rs | 12 | ||||
| -rw-r--r-- | core/runtime/ops.rs | 23 | ||||
| -rw-r--r-- | core/runtime/tests.rs | 90 |
4 files changed, 103 insertions, 28 deletions
diff --git a/core/runtime/jsrealm.rs b/core/runtime/jsrealm.rs index f41f2bbb2..72818eebe 100644 --- a/core/runtime/jsrealm.rs +++ b/core/runtime/jsrealm.rs @@ -1,10 +1,10 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use super::bindings; use crate::error::exception_to_err_result; +use crate::joinset::JoinSet; use crate::modules::ModuleCode; use crate::ops::OpCtx; use crate::runtime::JsRuntimeState; -use crate::task::MaskResultAsSend; use crate::JsRuntime; use crate::OpId; use crate::OpResult; @@ -17,7 +17,6 @@ use std::hash::BuildHasherDefault; use std::hash::Hasher; use std::option::Option; use std::rc::Rc; -use tokio::task::JoinSet; use v8::HandleScope; use v8::Local; @@ -50,8 +49,7 @@ pub(crate) struct ContextState { pub(crate) pending_promise_rejections: VecDeque<(v8::Global<v8::Promise>, v8::Global<v8::Value>)>, pub(crate) unrefed_ops: HashSet<i32, BuildHasherDefault<IdentityHasher>>, - pub(crate) pending_ops: - JoinSet<MaskResultAsSend<(PromiseId, OpId, OpResult)>>, + pub(crate) pending_ops: JoinSet<(PromiseId, OpId, OpResult)>, // We don't explicitly re-read this prop but need the slice to live alongside // the context pub(crate) op_ctxs: Box<[OpCtx]>, diff --git a/core/runtime/jsruntime.rs b/core/runtime/jsruntime.rs index 7f9e2dcd8..3b2ac49aa 100644 --- a/core/runtime/jsruntime.rs +++ b/core/runtime/jsruntime.rs @@ -37,7 +37,6 @@ use anyhow::Context as AnyhowContext; use anyhow::Error; use futures::channel::oneshot; use futures::future::poll_fn; -use futures::future::Future; use futures::stream::StreamExt; use smallvec::SmallVec; use std::any::Any; @@ -2261,14 +2260,11 @@ impl JsRuntime { SmallVec::with_capacity(32); loop { - let item = { - let next = std::pin::pin!(context_state.pending_ops.join_next()); - let Poll::Ready(Some(item)) = next.poll(cx) else { - break; - }; - item + let Poll::Ready(item) = context_state.pending_ops.poll_join_next(cx) else { + break; }; - let (promise_id, op_id, mut resp) = item.unwrap().into_inner(); + // TODO(mmastrac): If this task is really errored, things could be pretty bad + let (promise_id, op_id, mut resp) = item.unwrap(); state .borrow() .op_state diff --git a/core/runtime/ops.rs b/core/runtime/ops.rs index 5e51414d3..1c871cda0 100644 --- a/core/runtime/ops.rs +++ b/core/runtime/ops.rs @@ -25,10 +25,11 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>( state.get_error_class_fn }; let fut = op.map(|result| crate::_ops::to_op_result(get_class, result)); - // SAFETY: this is guaranteed to be running on a current-thread executor - ctx.context_state.borrow_mut().pending_ops.spawn(unsafe { - crate::task::MaskFutureAsSend::new(OpCall::new(ctx, promise_id, fut)) - }); + ctx + .context_state + .borrow_mut() + .pending_ops + .spawn(OpCall::new(ctx, promise_id, fut)); } #[inline] @@ -123,12 +124,7 @@ pub fn queue_async_op<'s>( Poll::Pending => {} Poll::Ready(mut res) => { if deferred { - ctx - .context_state - .borrow_mut() - .pending_ops - // SAFETY: this is guaranteed to be running on a current-thread executor - .spawn(unsafe { crate::task::MaskFutureAsSend::new(ready(res)) }); + ctx.context_state.borrow_mut().pending_ops.spawn(ready(res)); return None; } else { ctx.state.borrow_mut().tracker.track_async_completed(ctx.id); @@ -137,12 +133,7 @@ pub fn queue_async_op<'s>( } } - ctx - .context_state - .borrow_mut() - .pending_ops - // SAFETY: this is guaranteed to be running on a current-thread executor - .spawn(unsafe { crate::task::MaskFutureAsSend::new(pinned) }); + ctx.context_state.borrow_mut().pending_ops.spawn(pinned); None } diff --git a/core/runtime/tests.rs b/core/runtime/tests.rs index dbfeecf3c..663645bb1 100644 --- a/core/runtime/tests.rs +++ b/core/runtime/tests.rs @@ -21,6 +21,9 @@ use crate::Extension; use crate::JsBuffer; use crate::*; use anyhow::Error; +use cooked_waker::IntoWaker; +use cooked_waker::Wake; +use cooked_waker::WakeRef; use deno_ops::op; use futures::future::poll_fn; use futures::future::Future; @@ -28,11 +31,14 @@ use futures::FutureExt; use std::cell::RefCell; use std::pin::Pin; use std::rc::Rc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicI8; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::time::Duration; // deno_ops macros generate code assuming deno_core in scope. mod deno_core { @@ -264,6 +270,90 @@ fn test_execute_script_return_value() { } } +#[derive(Default)] +struct LoggingWaker { + woken: AtomicBool, +} + +impl Wake for LoggingWaker { + fn wake(self) { + self.woken.store(true, Ordering::SeqCst); + } +} + +impl WakeRef for LoggingWaker { + fn wake_by_ref(&self) { + self.woken.store(true, Ordering::SeqCst); + } +} + +/// This is a reproduction for a very obscure bug where the Deno runtime locks up we end up polling +/// an empty JoinSet and attempt to resolve ops after-the-fact. There's a small footgun in the JoinSet +/// API where polling it while empty returns Ready(None), which means that it never holds on to the +/// waker. This means that if we aren't testing for this particular return value and don't stash the waker +/// ourselves for a future async op to eventually queue, we can end up losing the waker entirely and the +/// op wakes up, notifies tokio, which notifies the JoinSet, which then has nobody to notify )`:. +#[tokio::test] +async fn test_wakers_for_async_ops() { + static STATE: AtomicI8 = AtomicI8::new(0); + + #[op] + async fn op_async_sleep() -> Result<(), Error> { + STATE.store(1, Ordering::SeqCst); + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + STATE.store(2, Ordering::SeqCst); + Ok(()) + } + + STATE.store(0, Ordering::SeqCst); + + let logging_waker = Arc::new(LoggingWaker::default()); + let waker = logging_waker.clone().into_waker(); + + deno_core::extension!(test_ext, ops = [op_async_sleep]); + let mut runtime = JsRuntime::new(RuntimeOptions { + extensions: vec![test_ext::init_ops()], + ..Default::default() + }); + + // Drain events until we get to Ready + loop { + logging_waker.woken.store(false, Ordering::SeqCst); + let res = runtime.poll_event_loop(&mut Context::from_waker(&waker), false); + let ready = matches!(res, Poll::Ready(Ok(()))); + assert!(ready || logging_waker.woken.load(Ordering::SeqCst)); + if ready { + break; + } + } + + // Start the AIIFE + runtime + .execute_script( + "", + FastString::from_static( + "(async () => { await Deno.core.opAsync('op_async_sleep'); })()", + ), + ) + .unwrap(); + + // Wait for future to finish + while STATE.load(Ordering::SeqCst) < 2 { + tokio::time::sleep(Duration::from_millis(1)).await; + } + + // This shouldn't take one minute, but if it does, things are definitely locked up + for _ in 0..Duration::from_secs(60).as_millis() { + if logging_waker.woken.load(Ordering::SeqCst) { + // Success + return; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + + panic!("The waker was never woken after the future completed"); +} + #[tokio::test] async fn test_poll_value() { let mut runtime = JsRuntime::new(Default::default()); |
