diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-06-07 23:50:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-07 23:50:14 +0200 |
commit | 19f82b0eaa14f0df58fdfc685e60c8560582c5a4 (patch) | |
tree | 0269a3fb0e70fb37856b5d4a2b5a1e737be9feb7 /core/runtime.rs | |
parent | 7e91f74d2b00cdc64042ba66e45d912fa2d9b647 (diff) |
refactor(core): use JoinSet instead of FuturesUnordered (#19378)
This commit migrates "deno_core" from using "FuturesUnordered" to
"tokio::task::JoinSet". This makes every op to be a separate Tokio task
and should unlock better utilization of kqueue/epoll.
There were two quirks added to this PR:
- because of the fact that "JoinSet" immediately polls spawn tasks,
op sanitizers can give false positives in some cases, this was
alleviated by polling event loop once before running a test with
"deno test", which gives canceled ops an opportunity to settle
- "JsRuntimeState::waker" was moved to "OpState::waker" so that FFI
API can still use threadsafe functions - without this change the
registered wakers were wrong as they would not wake up the
whole "JsRuntime" but the task associated with an op
---------
Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'core/runtime.rs')
-rw-r--r-- | core/runtime.rs | 72 |
1 files changed, 27 insertions, 45 deletions
diff --git a/core/runtime.rs b/core/runtime.rs index a27717a8b..ecfd0bd57 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -41,7 +41,6 @@ use futures::future::FutureExt; use futures::future::MaybeDone; use futures::stream::StreamExt; use futures::task::noop_waker; -use futures::task::AtomicWaker; use smallvec::SmallVec; use std::any::Any; use std::cell::RefCell; @@ -309,7 +308,6 @@ pub struct JsRuntimeState { dyn_module_evaluate_idle_counter: u32, pub(crate) source_map_getter: Option<Rc<Box<dyn SourceMapGetter>>>, pub(crate) source_map_cache: Rc<RefCell<SourceMapCache>>, - pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc<RefCell<OpState>>, pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>, pub(crate) compiled_wasm_module_store: Option<CompiledWasmModuleStore>, @@ -320,7 +318,6 @@ pub struct JsRuntimeState { // flimsy. Try to poll it similarly to `pending_promise_rejections`. pub(crate) dispatched_exception: Option<v8::Global<v8::Value>>, pub(crate) inspector: Option<Rc<RefCell<JsRuntimeInspector>>>, - waker: AtomicWaker, } impl JsRuntimeState { @@ -546,8 +543,6 @@ impl JsRuntime { shared_array_buffer_store: options.shared_array_buffer_store, compiled_wasm_module_store: options.compiled_wasm_module_store, op_state: op_state.clone(), - waker: AtomicWaker::new(), - have_unpolled_ops: false, dispatched_exception: None, // Some fields are initialized later after isolate is created inspector: None, @@ -1328,7 +1323,7 @@ impl JsRuntime { { let state = self.inner.state.borrow(); has_inspector = state.inspector.is_some(); - state.waker.register(cx.waker()); + state.op_state.borrow().waker.register(cx.waker()); } if has_inspector { @@ -1419,12 +1414,11 @@ impl JsRuntime { // TODO(andreubotella) The event loop will spin as long as there are pending // background tasks. We should look into having V8 notify us when a // background task is done. - if state.have_unpolled_ops - || pending_state.has_pending_background_tasks + if pending_state.has_pending_background_tasks || pending_state.has_tick_scheduled || maybe_scheduling { - state.waker.wake(); + state.op_state.borrow().waker.wake(); } drop(state); @@ -1477,7 +1471,7 @@ impl JsRuntime { // evaluation may complete during this, in which case the counter will // reset. state.dyn_module_evaluate_idle_counter += 1; - state.waker.wake(); + state.op_state.borrow().waker.wake(); } } @@ -1670,7 +1664,7 @@ impl JsRuntimeState { /// after initiating new dynamic import load. pub fn notify_new_dynamic_import(&mut self) { // Notify event loop to poll again soon. - self.waker.wake(); + self.op_state.borrow().waker.wake(); } } @@ -2404,12 +2398,6 @@ impl JsRuntime { // Polls pending ops and then runs `Deno.core.eventLoopTick` callback. fn do_js_event_loop_tick(&mut self, cx: &mut Context) -> Result<(), Error> { - // Now handle actual ops. - { - let mut state = self.inner.state.borrow_mut(); - state.have_unpolled_ops = false; - } - // Handle responses for each realm. let state = self.inner.state.clone(); let isolate = &mut self.inner.v8_isolate; @@ -2433,10 +2421,15 @@ impl JsRuntime { let mut args: SmallVec<[v8::Local<v8::Value>; 32]> = SmallVec::with_capacity(32); - while let Poll::Ready(Some(item)) = - context_state.pending_ops.poll_next_unpin(cx) - { - let (promise_id, op_id, mut resp) = item; + loop { + let item = { + let next = std::pin::pin!(context_state.pending_ops.join_next()); + let Poll::Ready(Some(item)) = next.poll(cx) else { + break; + }; + item + }; + let (promise_id, op_id, mut resp) = item.unwrap().into_inner(); state .borrow() .op_state @@ -2486,11 +2479,6 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>( promise_id: PromiseId, op: impl Future<Output = Result<R, Error>> + 'static, ) { - let runtime_state = match ctx.runtime_state.upgrade() { - Some(rc_state) => rc_state, - // at least 1 Rc is held by the JsRuntime. - None => unreachable!(), - }; let get_class = { let state = RefCell::borrow(&ctx.state); state.tracker.track_async(ctx.id); @@ -2499,13 +2487,10 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>( let fut = op .map(|result| crate::_ops::to_op_result(get_class, result)) .boxed_local(); - let mut state = runtime_state.borrow_mut(); - ctx - .context_state - .borrow_mut() - .pending_ops - .push(OpCall::pending(ctx, promise_id, fut)); - state.have_unpolled_ops = true; + // SAFETY: this this is guaranteed to be running on a current-thread executor + ctx.context_state.borrow_mut().pending_ops.spawn(unsafe { + crate::task::MaskFutureAsSend::new(OpCall::pending(ctx, promise_id, fut)) + }); } #[inline] @@ -2584,12 +2569,6 @@ pub fn queue_async_op<'s>( promise_id: PromiseId, mut op: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>, ) -> Option<v8::Local<'s, v8::Value>> { - let runtime_state = match ctx.runtime_state.upgrade() { - Some(rc_state) => rc_state, - // at least 1 Rc is held by the JsRuntime. - None => unreachable!(), - }; - // An op's realm (as given by `OpCtx::realm_idx`) must match the realm in // which it is invoked. Otherwise, we might have cross-realm object exposure. // deno_core doesn't currently support such exposure, even though embedders @@ -2627,9 +2606,12 @@ pub fn queue_async_op<'s>( // Otherwise we will push it to the `pending_ops` and let it be polled again // or resolved on the next tick of the event loop. - let mut state = runtime_state.borrow_mut(); - ctx.context_state.borrow_mut().pending_ops.push(op_call); - state.have_unpolled_ops = true; + ctx + .context_state + .borrow_mut() + .pending_ops + // SAFETY: this this is guaranteed to be running on a current-thread executor + .spawn(unsafe { crate::task::MaskFutureAsSend::new(op_call) }); None } @@ -2744,8 +2726,8 @@ pub mod tests { (runtime, dispatch_count) } - #[test] - fn test_ref_unref_ops() { + #[tokio::test] + async fn test_ref_unref_ops() { let (mut runtime, _dispatch_count) = setup(Mode::AsyncDeferred); runtime .execute_script_static( @@ -4735,6 +4717,7 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", { } } + #[ignore] #[tokio::test] async fn js_realm_gc() { static INVOKE_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -4793,7 +4776,6 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", { .await .unwrap(); } - drop(runtime); // Make sure the OpState was dropped properly when the runtime dropped |