From 19f82b0eaa14f0df58fdfc685e60c8560582c5a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 7 Jun 2023 23:50:14 +0200 Subject: refactor(core): use JoinSet instead of FuturesUnordered (#19378) This commit migrates "deno_core" from using "FuturesUnordered" to "tokio::task::JoinSet". This makes every op to be a separate Tokio task and should unlock better utilization of kqueue/epoll. There were two quirks added to this PR: - because of the fact that "JoinSet" immediately polls spawn tasks, op sanitizers can give false positives in some cases, this was alleviated by polling event loop once before running a test with "deno test", which gives canceled ops an opportunity to settle - "JsRuntimeState::waker" was moved to "OpState::waker" so that FFI API can still use threadsafe functions - without this change the registered wakers were wrong as they would not wake up the whole "JsRuntime" but the task associated with an op --------- Co-authored-by: Matt Mastracci --- ext/ffi/callback.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) (limited to 'ext') diff --git a/ext/ffi/callback.rs b/ext/ffi/callback.rs index 2d2cf491b..78a21ab8f 100644 --- a/ext/ffi/callback.rs +++ b/ext/ffi/callback.rs @@ -10,6 +10,7 @@ use crate::MAX_SAFE_INTEGER; use crate::MIN_SAFE_INTEGER; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; +use deno_core::futures::task::AtomicWaker; use deno_core::op; use deno_core::serde_v8; use deno_core::v8; @@ -32,8 +33,8 @@ use std::rc::Rc; use std::sync::atomic; use std::sync::atomic::AtomicU32; use std::sync::mpsc::sync_channel; +use std::sync::Arc; use std::task::Poll; -use std::task::Waker; static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1); @@ -99,21 +100,20 @@ struct CallbackInfo { pub parameters: Box<[NativeType]>, pub result: NativeType, pub thread_id: u32, - pub waker: Option, + pub waker: Arc, } impl Future for CallbackInfo { type Output = (); fn poll( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll { - // Always replace the waker to make sure it's bound to the proper Future. - self.waker.replace(cx.waker().clone()); // The future for the CallbackInfo never resolves: It can only be canceled. Poll::Pending } } + unsafe extern "C" fn deno_ffi_callback( cif: &libffi::low::ffi_cif, result: &mut c_void, @@ -136,10 +136,8 @@ unsafe extern "C" fn deno_ffi_callback( response_sender.send(()).unwrap(); }); async_work_sender.unbounded_send(fut).unwrap(); - if let Some(waker) = info.waker.as_ref() { - // Make sure event loop wakes up to receive our message before we start waiting for a response. - waker.wake_by_ref(); - } + // Make sure event loop wakes up to receive our message before we start waiting for a response. + info.waker.wake(); response_receiver.recv().unwrap(); } }); @@ -574,6 +572,7 @@ where let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); + let waker = state.waker.clone(); let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo { async_work_sender, callback, @@ -581,7 +580,7 @@ where parameters: args.parameters.clone().into(), result: args.result.clone(), thread_id, - waker: None, + waker, })); let cif = Cif::new( args -- cgit v1.2.3