diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2023-06-07 23:50:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-07 23:50:14 +0200 |
commit | 19f82b0eaa14f0df58fdfc685e60c8560582c5a4 (patch) | |
tree | 0269a3fb0e70fb37856b5d4a2b5a1e737be9feb7 /ext/ffi/callback.rs | |
parent | 7e91f74d2b00cdc64042ba66e45d912fa2d9b647 (diff) |
refactor(core): use JoinSet instead of FuturesUnordered (#19378)
This commit migrates "deno_core" from using "FuturesUnordered" to
"tokio::task::JoinSet". This makes every op to be a separate Tokio task
and should unlock better utilization of kqueue/epoll.
There were two quirks added to this PR:
- because of the fact that "JoinSet" immediately polls spawn tasks,
op sanitizers can give false positives in some cases, this was
alleviated by polling event loop once before running a test with
"deno test", which gives canceled ops an opportunity to settle
- "JsRuntimeState::waker" was moved to "OpState::waker" so that FFI
API can still use threadsafe functions - without this change the
registered wakers were wrong as they would not wake up the
whole "JsRuntime" but the task associated with an op
---------
Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'ext/ffi/callback.rs')
-rw-r--r-- | ext/ffi/callback.rs | 21 |
1 files changed, 10 insertions, 11 deletions
diff --git a/ext/ffi/callback.rs b/ext/ffi/callback.rs index 2d2cf491b..78a21ab8f 100644 --- a/ext/ffi/callback.rs +++ b/ext/ffi/callback.rs @@ -10,6 +10,7 @@ use crate::MAX_SAFE_INTEGER; use crate::MIN_SAFE_INTEGER; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; +use deno_core::futures::task::AtomicWaker; use deno_core::op; use deno_core::serde_v8; use deno_core::v8; @@ -32,8 +33,8 @@ use std::rc::Rc; use std::sync::atomic; use std::sync::atomic::AtomicU32; use std::sync::mpsc::sync_channel; +use std::sync::Arc; use std::task::Poll; -use std::task::Waker; static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1); @@ -99,21 +100,20 @@ struct CallbackInfo { pub parameters: Box<[NativeType]>, pub result: NativeType, pub thread_id: u32, - pub waker: Option<Waker>, + pub waker: Arc<AtomicWaker>, } impl Future for CallbackInfo { type Output = (); fn poll( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Self::Output> { - // Always replace the waker to make sure it's bound to the proper Future. - self.waker.replace(cx.waker().clone()); // The future for the CallbackInfo never resolves: It can only be canceled. Poll::Pending } } + unsafe extern "C" fn deno_ffi_callback( cif: &libffi::low::ffi_cif, result: &mut c_void, @@ -136,10 +136,8 @@ unsafe extern "C" fn deno_ffi_callback( response_sender.send(()).unwrap(); }); async_work_sender.unbounded_send(fut).unwrap(); - if let Some(waker) = info.waker.as_ref() { - // Make sure event loop wakes up to receive our message before we start waiting for a response. - waker.wake_by_ref(); - } + // Make sure event loop wakes up to receive our message before we start waiting for a response. + info.waker.wake(); response_receiver.recv().unwrap(); } }); @@ -574,6 +572,7 @@ where let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); + let waker = state.waker.clone(); let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo { async_work_sender, callback, @@ -581,7 +580,7 @@ where parameters: args.parameters.clone().into(), result: args.result.clone(), thread_id, - waker: None, + waker, })); let cif = Cif::new( args |