summaryrefslogtreecommitdiff
path: root/ext/ffi/callback.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-11 20:10:33 -0700
committerGitHub <noreply@github.com>2023-12-12 03:10:33 +0000
commita4f45f709278208cb61501df2792412f11aed3c4 (patch)
treec8e0eaff17141fd5a8097fe2870efb4bca05e431 /ext/ffi/callback.rs
parentd13e45f2b3e50b85953c31d9c16e35d0cd87545f (diff)
perf(ext/ffi): switch from middleware to tasks (#21239)
Deno-side changes for https://github.com/denoland/deno_core/pull/350 --------- Co-authored-by: Aapo Alasuutari <aapo.alasuutari@gmail.com>
Diffstat (limited to 'ext/ffi/callback.rs')
-rw-r--r--ext/ffi/callback.rs119
1 files changed, 65 insertions, 54 deletions
diff --git a/ext/ffi/callback.rs b/ext/ffi/callback.rs
index 4974c7f9f..ea18189d8 100644
--- a/ext/ffi/callback.rs
+++ b/ext/ffi/callback.rs
@@ -3,21 +3,19 @@
use crate::check_unstable;
use crate::symbol::NativeType;
use crate::FfiPermissions;
-use crate::FfiState;
use crate::ForeignFunction;
-use crate::PendingFfiAsyncWork;
use crate::MAX_SAFE_INTEGER;
use crate::MIN_SAFE_INTEGER;
use deno_core::error::AnyError;
-use deno_core::futures::channel::mpsc;
-use deno_core::futures::task::AtomicWaker;
use deno_core::op2;
use deno_core::v8;
+use deno_core::v8::TryCatch;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
+use deno_core::V8CrossThreadTaskSpawner;
use libffi::middle::Cif;
use serde::Deserialize;
use std::borrow::Cow;
@@ -31,8 +29,6 @@ use std::ptr::NonNull;
use std::rc::Rc;
use std::sync::atomic;
use std::sync::atomic::AtomicU32;
-use std::sync::mpsc::sync_channel;
-use std::sync::Arc;
use std::task::Poll;
static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
@@ -93,13 +89,12 @@ impl Resource for UnsafeCallbackResource {
}
struct CallbackInfo {
- pub async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
+ pub async_work_sender: V8CrossThreadTaskSpawner,
pub callback: NonNull<v8::Function>,
pub context: NonNull<v8::Context>,
pub parameters: Box<[NativeType]>,
pub result: NativeType,
pub thread_id: u32,
- pub waker: Arc<AtomicWaker>,
}
impl Future for CallbackInfo {
@@ -113,6 +108,32 @@ impl Future for CallbackInfo {
}
}
+struct TaskArgs {
+ cif: NonNull<libffi::low::ffi_cif>,
+ result: NonNull<c_void>,
+ args: *const *const c_void,
+ info: NonNull<CallbackInfo>,
+}
+
+// SAFETY: we know these are valid Send-safe pointers as they are for FFI
+unsafe impl Send for TaskArgs {}
+
+impl TaskArgs {
+ fn run(&mut self, scope: &mut v8::HandleScope) {
+ // SAFETY: making a call using Send-safe pointers turned back into references. We know the
+ // lifetime of these will last because we block on the result of the spawn call.
+ unsafe {
+ do_ffi_callback(
+ scope,
+ self.cif.as_ref(),
+ self.info.as_ref(),
+ self.result.as_mut(),
+ self.args,
+ )
+ }
+ }
+}
+
unsafe extern "C" fn deno_ffi_callback(
cif: &libffi::low::ffi_cif,
result: &mut c_void,
@@ -121,51 +142,55 @@ unsafe extern "C" fn deno_ffi_callback(
) {
LOCAL_THREAD_ID.with(|s| {
if *s.borrow() == info.thread_id {
- // Own isolate thread, okay to call directly
- do_ffi_callback(cif, info, result, args);
+ // Call from main thread. If this callback is being triggered due to a
+ // function call coming from Deno itself, then this callback will build
+ // ontop of that stack.
+ // If this callback is being triggered outside of Deno (for example from a
+ // signal handler) then this will either create an empty new stack if
+ // Deno currently has nothing running and is waiting for promises to resolve,
+ // or will (very incorrectly) build ontop of whatever stack exists.
+ // The callback will even be called through from a `while (true)` liveloop, but
+ // it somehow cannot change the values that the loop sees, even if they both
+ // refer the same `let bool_value`.
+ let context: NonNull<v8::Context> = info.context;
+ let context = std::mem::transmute::<
+ NonNull<v8::Context>,
+ v8::Local<v8::Context>,
+ >(context);
+ let mut cb_scope = v8::CallbackScope::new(context);
+ let scope = &mut v8::HandleScope::new(&mut cb_scope);
+
+ do_ffi_callback(scope, cif, info, result, args);
} else {
let async_work_sender = &info.async_work_sender;
- // SAFETY: Safe as this function blocks until `do_ffi_callback` completes and a response message is received.
- let cif: &'static libffi::low::ffi_cif = std::mem::transmute(cif);
- let result: &'static mut c_void = std::mem::transmute(result);
- let info: &'static CallbackInfo = std::mem::transmute(info);
- let (response_sender, response_receiver) = sync_channel::<()>(0);
- let fut = Box::new(move || {
- do_ffi_callback(cif, info, result, args);
- response_sender.send(()).unwrap();
+
+ let mut args = TaskArgs {
+ cif: NonNull::from(cif),
+ result: NonNull::from(result),
+ args,
+ info: NonNull::from(info),
+ };
+
+ async_work_sender.spawn_blocking(move |scope| {
+ // We don't have a lot of choice here, so just print an unhandled exception message
+ let tc_scope = &mut TryCatch::new(scope);
+ args.run(tc_scope);
+ if tc_scope.exception().is_some() {
+ eprintln!("Illegal unhandled exception in nonblocking callback.");
+ }
});
- async_work_sender.unbounded_send(fut).unwrap();
- // Make sure event loop wakes up to receive our message before we start waiting for a response.
- info.waker.wake();
- response_receiver.recv().unwrap();
}
});
}
unsafe fn do_ffi_callback(
+ scope: &mut v8::HandleScope,
cif: &libffi::low::ffi_cif,
info: &CallbackInfo,
result: &mut c_void,
args: *const *const c_void,
) {
let callback: NonNull<v8::Function> = info.callback;
- let context: NonNull<v8::Context> = info.context;
- let context = std::mem::transmute::<
- NonNull<v8::Context>,
- v8::Local<v8::Context>,
- >(context);
- // Call from main thread. If this callback is being triggered due to a
- // function call coming from Deno itself, then this callback will build
- // ontop of that stack.
- // If this callback is being triggered outside of Deno (for example from a
- // signal handler) then this will either create an empty new stack if
- // Deno currently has nothing running and is waiting for promises to resolve,
- // or will (very incorrectly) build ontop of whatever stack exists.
- // The callback will even be called through from a `while (true)` liveloop, but
- // it somehow cannot change the values that the loop sees, even if they both
- // refer the same `let bool_value`.
- let mut cb_scope = v8::CallbackScope::new(context);
- let scope = &mut v8::HandleScope::new(&mut cb_scope);
let func = std::mem::transmute::<
NonNull<v8::Function>,
v8::Local<v8::Function>,
@@ -562,25 +587,12 @@ where
panic!("Isolate ID counter overflowed u32");
}
- let async_work_sender =
- if let Some(ffi_state) = state.try_borrow_mut::<FfiState>() {
- ffi_state.async_work_sender.clone()
- } else {
- let (async_work_sender, async_work_receiver) =
- mpsc::unbounded::<PendingFfiAsyncWork>();
-
- state.put(FfiState {
- async_work_receiver,
- async_work_sender: async_work_sender.clone(),
- });
+ let async_work_sender = state.borrow::<V8CrossThreadTaskSpawner>().clone();
- async_work_sender
- };
let callback = v8::Global::new(scope, cb).into_raw();
let current_context = scope.get_current_context();
let context = v8::Global::new(scope, current_context).into_raw();
- let waker = state.waker.clone();
let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo {
async_work_sender,
callback,
@@ -588,7 +600,6 @@ where
parameters: args.parameters.clone().into(),
result: args.result.clone(),
thread_id,
- waker,
}));
let cif = Cif::new(
args