diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-12-11 20:10:33 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-12 03:10:33 +0000 |
commit | a4f45f709278208cb61501df2792412f11aed3c4 (patch) | |
tree | c8e0eaff17141fd5a8097fe2870efb4bca05e431 /ext/ffi/callback.rs | |
parent | d13e45f2b3e50b85953c31d9c16e35d0cd87545f (diff) |
perf(ext/ffi): switch from middleware to tasks (#21239)
Deno-side changes for https://github.com/denoland/deno_core/pull/350
---------
Co-authored-by: Aapo Alasuutari <aapo.alasuutari@gmail.com>
Diffstat (limited to 'ext/ffi/callback.rs')
-rw-r--r-- | ext/ffi/callback.rs | 119 |
1 files changed, 65 insertions, 54 deletions
diff --git a/ext/ffi/callback.rs b/ext/ffi/callback.rs index 4974c7f9f..ea18189d8 100644 --- a/ext/ffi/callback.rs +++ b/ext/ffi/callback.rs @@ -3,21 +3,19 @@ use crate::check_unstable; use crate::symbol::NativeType; use crate::FfiPermissions; -use crate::FfiState; use crate::ForeignFunction; -use crate::PendingFfiAsyncWork; use crate::MAX_SAFE_INTEGER; use crate::MIN_SAFE_INTEGER; use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; -use deno_core::futures::task::AtomicWaker; use deno_core::op2; use deno_core::v8; +use deno_core::v8::TryCatch; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::OpState; use deno_core::Resource; use deno_core::ResourceId; +use deno_core::V8CrossThreadTaskSpawner; use libffi::middle::Cif; use serde::Deserialize; use std::borrow::Cow; @@ -31,8 +29,6 @@ use std::ptr::NonNull; use std::rc::Rc; use std::sync::atomic; use std::sync::atomic::AtomicU32; -use std::sync::mpsc::sync_channel; -use std::sync::Arc; use std::task::Poll; static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1); @@ -93,13 +89,12 @@ impl Resource for UnsafeCallbackResource { } struct CallbackInfo { - pub async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>, + pub async_work_sender: V8CrossThreadTaskSpawner, pub callback: NonNull<v8::Function>, pub context: NonNull<v8::Context>, pub parameters: Box<[NativeType]>, pub result: NativeType, pub thread_id: u32, - pub waker: Arc<AtomicWaker>, } impl Future for CallbackInfo { @@ -113,6 +108,32 @@ impl Future for CallbackInfo { } } +struct TaskArgs { + cif: NonNull<libffi::low::ffi_cif>, + result: NonNull<c_void>, + args: *const *const c_void, + info: NonNull<CallbackInfo>, +} + +// SAFETY: we know these are valid Send-safe pointers as they are for FFI +unsafe impl Send for TaskArgs {} + +impl TaskArgs { + fn run(&mut self, scope: &mut v8::HandleScope) { + // SAFETY: making a call using Send-safe pointers turned back into references. We know the + // lifetime of these will last because we block on the result of the spawn call. + unsafe { + do_ffi_callback( + scope, + self.cif.as_ref(), + self.info.as_ref(), + self.result.as_mut(), + self.args, + ) + } + } +} + unsafe extern "C" fn deno_ffi_callback( cif: &libffi::low::ffi_cif, result: &mut c_void, @@ -121,51 +142,55 @@ unsafe extern "C" fn deno_ffi_callback( ) { LOCAL_THREAD_ID.with(|s| { if *s.borrow() == info.thread_id { - // Own isolate thread, okay to call directly - do_ffi_callback(cif, info, result, args); + // Call from main thread. If this callback is being triggered due to a + // function call coming from Deno itself, then this callback will build + // ontop of that stack. + // If this callback is being triggered outside of Deno (for example from a + // signal handler) then this will either create an empty new stack if + // Deno currently has nothing running and is waiting for promises to resolve, + // or will (very incorrectly) build ontop of whatever stack exists. + // The callback will even be called through from a `while (true)` liveloop, but + // it somehow cannot change the values that the loop sees, even if they both + // refer the same `let bool_value`. + let context: NonNull<v8::Context> = info.context; + let context = std::mem::transmute::< + NonNull<v8::Context>, + v8::Local<v8::Context>, + >(context); + let mut cb_scope = v8::CallbackScope::new(context); + let scope = &mut v8::HandleScope::new(&mut cb_scope); + + do_ffi_callback(scope, cif, info, result, args); } else { let async_work_sender = &info.async_work_sender; - // SAFETY: Safe as this function blocks until `do_ffi_callback` completes and a response message is received. - let cif: &'static libffi::low::ffi_cif = std::mem::transmute(cif); - let result: &'static mut c_void = std::mem::transmute(result); - let info: &'static CallbackInfo = std::mem::transmute(info); - let (response_sender, response_receiver) = sync_channel::<()>(0); - let fut = Box::new(move || { - do_ffi_callback(cif, info, result, args); - response_sender.send(()).unwrap(); + + let mut args = TaskArgs { + cif: NonNull::from(cif), + result: NonNull::from(result), + args, + info: NonNull::from(info), + }; + + async_work_sender.spawn_blocking(move |scope| { + // We don't have a lot of choice here, so just print an unhandled exception message + let tc_scope = &mut TryCatch::new(scope); + args.run(tc_scope); + if tc_scope.exception().is_some() { + eprintln!("Illegal unhandled exception in nonblocking callback."); + } }); - async_work_sender.unbounded_send(fut).unwrap(); - // Make sure event loop wakes up to receive our message before we start waiting for a response. - info.waker.wake(); - response_receiver.recv().unwrap(); } }); } unsafe fn do_ffi_callback( + scope: &mut v8::HandleScope, cif: &libffi::low::ffi_cif, info: &CallbackInfo, result: &mut c_void, args: *const *const c_void, ) { let callback: NonNull<v8::Function> = info.callback; - let context: NonNull<v8::Context> = info.context; - let context = std::mem::transmute::< - NonNull<v8::Context>, - v8::Local<v8::Context>, - >(context); - // Call from main thread. If this callback is being triggered due to a - // function call coming from Deno itself, then this callback will build - // ontop of that stack. - // If this callback is being triggered outside of Deno (for example from a - // signal handler) then this will either create an empty new stack if - // Deno currently has nothing running and is waiting for promises to resolve, - // or will (very incorrectly) build ontop of whatever stack exists. - // The callback will even be called through from a `while (true)` liveloop, but - // it somehow cannot change the values that the loop sees, even if they both - // refer the same `let bool_value`. - let mut cb_scope = v8::CallbackScope::new(context); - let scope = &mut v8::HandleScope::new(&mut cb_scope); let func = std::mem::transmute::< NonNull<v8::Function>, v8::Local<v8::Function>, @@ -562,25 +587,12 @@ where panic!("Isolate ID counter overflowed u32"); } - let async_work_sender = - if let Some(ffi_state) = state.try_borrow_mut::<FfiState>() { - ffi_state.async_work_sender.clone() - } else { - let (async_work_sender, async_work_receiver) = - mpsc::unbounded::<PendingFfiAsyncWork>(); - - state.put(FfiState { - async_work_receiver, - async_work_sender: async_work_sender.clone(), - }); + let async_work_sender = state.borrow::<V8CrossThreadTaskSpawner>().clone(); - async_work_sender - }; let callback = v8::Global::new(scope, cb).into_raw(); let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); - let waker = state.waker.clone(); let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo { async_work_sender, callback, @@ -588,7 +600,6 @@ where parameters: args.parameters.clone().into(), result: args.result.clone(), thread_id, - waker, })); let cif = Cif::new( args |