summaryrefslogtreecommitdiff
path: root/core/runtime.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-06-07 23:50:14 +0200
committerGitHub <noreply@github.com>2023-06-07 23:50:14 +0200
commit19f82b0eaa14f0df58fdfc685e60c8560582c5a4 (patch)
tree0269a3fb0e70fb37856b5d4a2b5a1e737be9feb7 /core/runtime.rs
parent7e91f74d2b00cdc64042ba66e45d912fa2d9b647 (diff)
refactor(core): use JoinSet instead of FuturesUnordered (#19378)
This commit migrates "deno_core" from using "FuturesUnordered" to "tokio::task::JoinSet". This makes every op to be a separate Tokio task and should unlock better utilization of kqueue/epoll. There were two quirks added to this PR: - because of the fact that "JoinSet" immediately polls spawn tasks, op sanitizers can give false positives in some cases, this was alleviated by polling event loop once before running a test with "deno test", which gives canceled ops an opportunity to settle - "JsRuntimeState::waker" was moved to "OpState::waker" so that FFI API can still use threadsafe functions - without this change the registered wakers were wrong as they would not wake up the whole "JsRuntime" but the task associated with an op --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'core/runtime.rs')
-rw-r--r--core/runtime.rs72
1 files changed, 27 insertions, 45 deletions
diff --git a/core/runtime.rs b/core/runtime.rs
index a27717a8b..ecfd0bd57 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -41,7 +41,6 @@ use futures::future::FutureExt;
use futures::future::MaybeDone;
use futures::stream::StreamExt;
use futures::task::noop_waker;
-use futures::task::AtomicWaker;
use smallvec::SmallVec;
use std::any::Any;
use std::cell::RefCell;
@@ -309,7 +308,6 @@ pub struct JsRuntimeState {
dyn_module_evaluate_idle_counter: u32,
pub(crate) source_map_getter: Option<Rc<Box<dyn SourceMapGetter>>>,
pub(crate) source_map_cache: Rc<RefCell<SourceMapCache>>,
- pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>,
pub(crate) compiled_wasm_module_store: Option<CompiledWasmModuleStore>,
@@ -320,7 +318,6 @@ pub struct JsRuntimeState {
// flimsy. Try to poll it similarly to `pending_promise_rejections`.
pub(crate) dispatched_exception: Option<v8::Global<v8::Value>>,
pub(crate) inspector: Option<Rc<RefCell<JsRuntimeInspector>>>,
- waker: AtomicWaker,
}
impl JsRuntimeState {
@@ -546,8 +543,6 @@ impl JsRuntime {
shared_array_buffer_store: options.shared_array_buffer_store,
compiled_wasm_module_store: options.compiled_wasm_module_store,
op_state: op_state.clone(),
- waker: AtomicWaker::new(),
- have_unpolled_ops: false,
dispatched_exception: None,
// Some fields are initialized later after isolate is created
inspector: None,
@@ -1328,7 +1323,7 @@ impl JsRuntime {
{
let state = self.inner.state.borrow();
has_inspector = state.inspector.is_some();
- state.waker.register(cx.waker());
+ state.op_state.borrow().waker.register(cx.waker());
}
if has_inspector {
@@ -1419,12 +1414,11 @@ impl JsRuntime {
// TODO(andreubotella) The event loop will spin as long as there are pending
// background tasks. We should look into having V8 notify us when a
// background task is done.
- if state.have_unpolled_ops
- || pending_state.has_pending_background_tasks
+ if pending_state.has_pending_background_tasks
|| pending_state.has_tick_scheduled
|| maybe_scheduling
{
- state.waker.wake();
+ state.op_state.borrow().waker.wake();
}
drop(state);
@@ -1477,7 +1471,7 @@ impl JsRuntime {
// evaluation may complete during this, in which case the counter will
// reset.
state.dyn_module_evaluate_idle_counter += 1;
- state.waker.wake();
+ state.op_state.borrow().waker.wake();
}
}
@@ -1670,7 +1664,7 @@ impl JsRuntimeState {
/// after initiating new dynamic import load.
pub fn notify_new_dynamic_import(&mut self) {
// Notify event loop to poll again soon.
- self.waker.wake();
+ self.op_state.borrow().waker.wake();
}
}
@@ -2404,12 +2398,6 @@ impl JsRuntime {
// Polls pending ops and then runs `Deno.core.eventLoopTick` callback.
fn do_js_event_loop_tick(&mut self, cx: &mut Context) -> Result<(), Error> {
- // Now handle actual ops.
- {
- let mut state = self.inner.state.borrow_mut();
- state.have_unpolled_ops = false;
- }
-
// Handle responses for each realm.
let state = self.inner.state.clone();
let isolate = &mut self.inner.v8_isolate;
@@ -2433,10 +2421,15 @@ impl JsRuntime {
let mut args: SmallVec<[v8::Local<v8::Value>; 32]> =
SmallVec::with_capacity(32);
- while let Poll::Ready(Some(item)) =
- context_state.pending_ops.poll_next_unpin(cx)
- {
- let (promise_id, op_id, mut resp) = item;
+ loop {
+ let item = {
+ let next = std::pin::pin!(context_state.pending_ops.join_next());
+ let Poll::Ready(Some(item)) = next.poll(cx) else {
+ break;
+ };
+ item
+ };
+ let (promise_id, op_id, mut resp) = item.unwrap().into_inner();
state
.borrow()
.op_state
@@ -2486,11 +2479,6 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
promise_id: PromiseId,
op: impl Future<Output = Result<R, Error>> + 'static,
) {
- let runtime_state = match ctx.runtime_state.upgrade() {
- Some(rc_state) => rc_state,
- // at least 1 Rc is held by the JsRuntime.
- None => unreachable!(),
- };
let get_class = {
let state = RefCell::borrow(&ctx.state);
state.tracker.track_async(ctx.id);
@@ -2499,13 +2487,10 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
let fut = op
.map(|result| crate::_ops::to_op_result(get_class, result))
.boxed_local();
- let mut state = runtime_state.borrow_mut();
- ctx
- .context_state
- .borrow_mut()
- .pending_ops
- .push(OpCall::pending(ctx, promise_id, fut));
- state.have_unpolled_ops = true;
+ // SAFETY: this this is guaranteed to be running on a current-thread executor
+ ctx.context_state.borrow_mut().pending_ops.spawn(unsafe {
+ crate::task::MaskFutureAsSend::new(OpCall::pending(ctx, promise_id, fut))
+ });
}
#[inline]
@@ -2584,12 +2569,6 @@ pub fn queue_async_op<'s>(
promise_id: PromiseId,
mut op: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>,
) -> Option<v8::Local<'s, v8::Value>> {
- let runtime_state = match ctx.runtime_state.upgrade() {
- Some(rc_state) => rc_state,
- // at least 1 Rc is held by the JsRuntime.
- None => unreachable!(),
- };
-
// An op's realm (as given by `OpCtx::realm_idx`) must match the realm in
// which it is invoked. Otherwise, we might have cross-realm object exposure.
// deno_core doesn't currently support such exposure, even though embedders
@@ -2627,9 +2606,12 @@ pub fn queue_async_op<'s>(
// Otherwise we will push it to the `pending_ops` and let it be polled again
// or resolved on the next tick of the event loop.
- let mut state = runtime_state.borrow_mut();
- ctx.context_state.borrow_mut().pending_ops.push(op_call);
- state.have_unpolled_ops = true;
+ ctx
+ .context_state
+ .borrow_mut()
+ .pending_ops
+ // SAFETY: this this is guaranteed to be running on a current-thread executor
+ .spawn(unsafe { crate::task::MaskFutureAsSend::new(op_call) });
None
}
@@ -2744,8 +2726,8 @@ pub mod tests {
(runtime, dispatch_count)
}
- #[test]
- fn test_ref_unref_ops() {
+ #[tokio::test]
+ async fn test_ref_unref_ops() {
let (mut runtime, _dispatch_count) = setup(Mode::AsyncDeferred);
runtime
.execute_script_static(
@@ -4735,6 +4717,7 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", {
}
}
+ #[ignore]
#[tokio::test]
async fn js_realm_gc() {
static INVOKE_COUNT: AtomicUsize = AtomicUsize::new(0);
@@ -4793,7 +4776,6 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", {
.await
.unwrap();
}
-
drop(runtime);
// Make sure the OpState was dropped properly when the runtime dropped