diff options
Diffstat (limited to 'core/runtime')
-rw-r--r-- | core/runtime/jsrealm.rs | 6 | ||||
-rw-r--r-- | core/runtime/jsruntime.rs | 12 | ||||
-rw-r--r-- | core/runtime/ops.rs | 23 | ||||
-rw-r--r-- | core/runtime/tests.rs | 90 |
4 files changed, 103 insertions, 28 deletions
diff --git a/core/runtime/jsrealm.rs b/core/runtime/jsrealm.rs index f41f2bbb2..72818eebe 100644 --- a/core/runtime/jsrealm.rs +++ b/core/runtime/jsrealm.rs @@ -1,10 +1,10 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use super::bindings; use crate::error::exception_to_err_result; +use crate::joinset::JoinSet; use crate::modules::ModuleCode; use crate::ops::OpCtx; use crate::runtime::JsRuntimeState; -use crate::task::MaskResultAsSend; use crate::JsRuntime; use crate::OpId; use crate::OpResult; @@ -17,7 +17,6 @@ use std::hash::BuildHasherDefault; use std::hash::Hasher; use std::option::Option; use std::rc::Rc; -use tokio::task::JoinSet; use v8::HandleScope; use v8::Local; @@ -50,8 +49,7 @@ pub(crate) struct ContextState { pub(crate) pending_promise_rejections: VecDeque<(v8::Global<v8::Promise>, v8::Global<v8::Value>)>, pub(crate) unrefed_ops: HashSet<i32, BuildHasherDefault<IdentityHasher>>, - pub(crate) pending_ops: - JoinSet<MaskResultAsSend<(PromiseId, OpId, OpResult)>>, + pub(crate) pending_ops: JoinSet<(PromiseId, OpId, OpResult)>, // We don't explicitly re-read this prop but need the slice to live alongside // the context pub(crate) op_ctxs: Box<[OpCtx]>, diff --git a/core/runtime/jsruntime.rs b/core/runtime/jsruntime.rs index 7f9e2dcd8..3b2ac49aa 100644 --- a/core/runtime/jsruntime.rs +++ b/core/runtime/jsruntime.rs @@ -37,7 +37,6 @@ use anyhow::Context as AnyhowContext; use anyhow::Error; use futures::channel::oneshot; use futures::future::poll_fn; -use futures::future::Future; use futures::stream::StreamExt; use smallvec::SmallVec; use std::any::Any; @@ -2261,14 +2260,11 @@ impl JsRuntime { SmallVec::with_capacity(32); loop { - let item = { - let next = std::pin::pin!(context_state.pending_ops.join_next()); - let Poll::Ready(Some(item)) = next.poll(cx) else { - break; - }; - item + let Poll::Ready(item) = context_state.pending_ops.poll_join_next(cx) else { + break; }; - let (promise_id, op_id, mut resp) = item.unwrap().into_inner(); + // TODO(mmastrac): If this task is really errored, things could be pretty bad + let (promise_id, op_id, mut resp) = item.unwrap(); state .borrow() .op_state diff --git a/core/runtime/ops.rs b/core/runtime/ops.rs index 5e51414d3..1c871cda0 100644 --- a/core/runtime/ops.rs +++ b/core/runtime/ops.rs @@ -25,10 +25,11 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>( state.get_error_class_fn }; let fut = op.map(|result| crate::_ops::to_op_result(get_class, result)); - // SAFETY: this is guaranteed to be running on a current-thread executor - ctx.context_state.borrow_mut().pending_ops.spawn(unsafe { - crate::task::MaskFutureAsSend::new(OpCall::new(ctx, promise_id, fut)) - }); + ctx + .context_state + .borrow_mut() + .pending_ops + .spawn(OpCall::new(ctx, promise_id, fut)); } #[inline] @@ -123,12 +124,7 @@ pub fn queue_async_op<'s>( Poll::Pending => {} Poll::Ready(mut res) => { if deferred { - ctx - .context_state - .borrow_mut() - .pending_ops - // SAFETY: this is guaranteed to be running on a current-thread executor - .spawn(unsafe { crate::task::MaskFutureAsSend::new(ready(res)) }); + ctx.context_state.borrow_mut().pending_ops.spawn(ready(res)); return None; } else { ctx.state.borrow_mut().tracker.track_async_completed(ctx.id); @@ -137,12 +133,7 @@ pub fn queue_async_op<'s>( } } - ctx - .context_state - .borrow_mut() - .pending_ops - // SAFETY: this is guaranteed to be running on a current-thread executor - .spawn(unsafe { crate::task::MaskFutureAsSend::new(pinned) }); + ctx.context_state.borrow_mut().pending_ops.spawn(pinned); None } diff --git a/core/runtime/tests.rs b/core/runtime/tests.rs index dbfeecf3c..663645bb1 100644 --- a/core/runtime/tests.rs +++ b/core/runtime/tests.rs @@ -21,6 +21,9 @@ use crate::Extension; use crate::JsBuffer; use crate::*; use anyhow::Error; +use cooked_waker::IntoWaker; +use cooked_waker::Wake; +use cooked_waker::WakeRef; use deno_ops::op; use futures::future::poll_fn; use futures::future::Future; @@ -28,11 +31,14 @@ use futures::FutureExt; use std::cell::RefCell; use std::pin::Pin; use std::rc::Rc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicI8; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::time::Duration; // deno_ops macros generate code assuming deno_core in scope. mod deno_core { @@ -264,6 +270,90 @@ fn test_execute_script_return_value() { } } +#[derive(Default)] +struct LoggingWaker { + woken: AtomicBool, +} + +impl Wake for LoggingWaker { + fn wake(self) { + self.woken.store(true, Ordering::SeqCst); + } +} + +impl WakeRef for LoggingWaker { + fn wake_by_ref(&self) { + self.woken.store(true, Ordering::SeqCst); + } +} + +/// This is a reproduction for a very obscure bug where the Deno runtime locks up we end up polling +/// an empty JoinSet and attempt to resolve ops after-the-fact. There's a small footgun in the JoinSet +/// API where polling it while empty returns Ready(None), which means that it never holds on to the +/// waker. This means that if we aren't testing for this particular return value and don't stash the waker +/// ourselves for a future async op to eventually queue, we can end up losing the waker entirely and the +/// op wakes up, notifies tokio, which notifies the JoinSet, which then has nobody to notify )`:. +#[tokio::test] +async fn test_wakers_for_async_ops() { + static STATE: AtomicI8 = AtomicI8::new(0); + + #[op] + async fn op_async_sleep() -> Result<(), Error> { + STATE.store(1, Ordering::SeqCst); + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + STATE.store(2, Ordering::SeqCst); + Ok(()) + } + + STATE.store(0, Ordering::SeqCst); + + let logging_waker = Arc::new(LoggingWaker::default()); + let waker = logging_waker.clone().into_waker(); + + deno_core::extension!(test_ext, ops = [op_async_sleep]); + let mut runtime = JsRuntime::new(RuntimeOptions { + extensions: vec![test_ext::init_ops()], + ..Default::default() + }); + + // Drain events until we get to Ready + loop { + logging_waker.woken.store(false, Ordering::SeqCst); + let res = runtime.poll_event_loop(&mut Context::from_waker(&waker), false); + let ready = matches!(res, Poll::Ready(Ok(()))); + assert!(ready || logging_waker.woken.load(Ordering::SeqCst)); + if ready { + break; + } + } + + // Start the AIIFE + runtime + .execute_script( + "", + FastString::from_static( + "(async () => { await Deno.core.opAsync('op_async_sleep'); })()", + ), + ) + .unwrap(); + + // Wait for future to finish + while STATE.load(Ordering::SeqCst) < 2 { + tokio::time::sleep(Duration::from_millis(1)).await; + } + + // This shouldn't take one minute, but if it does, things are definitely locked up + for _ in 0..Duration::from_secs(60).as_millis() { + if logging_waker.woken.load(Ordering::SeqCst) { + // Success + return; + } + tokio::time::sleep(Duration::from_millis(1)).await; + } + + panic!("The waker was never woken after the future completed"); +} + #[tokio::test] async fn test_poll_value() { let mut runtime = JsRuntime::new(Default::default()); |