summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ext')
-rw-r--r--ext/ffi/00_ffi.js18
-rw-r--r--ext/ffi/lib.rs132
2 files changed, 136 insertions, 14 deletions
diff --git a/ext/ffi/00_ffi.js b/ext/ffi/00_ffi.js
index 190873307..f308ecad9 100644
--- a/ext/ffi/00_ffi.js
+++ b/ext/ffi/00_ffi.js
@@ -201,6 +201,7 @@
}
class UnsafeCallback {
+ #refcount;
#rid;
definition;
callback;
@@ -217,13 +218,30 @@
definition,
callback,
);
+ this.#refcount = 0;
this.#rid = rid;
this.pointer = pointer;
this.definition = definition;
this.callback = callback;
}
+ ref() {
+ if (this.#refcount++ === 0) {
+ core.opSync("op_ffi_unsafe_callback_ref", true);
+ }
+ }
+
+ unref() {
+ if (--this.#refcount === 0) {
+ core.opSync("op_ffi_unsafe_callback_ref", false);
+ }
+ }
+
close() {
+ if (this.#refcount) {
+ this.#refcount = 0;
+ core.opSync("op_ffi_unsafe_callback_ref", false);
+ }
core.close(this.#rid);
}
}
diff --git a/ext/ffi/lib.rs b/ext/ffi/lib.rs
index 41b2f0a3b..d90f20a29 100644
--- a/ext/ffi/lib.rs
+++ b/ext/ffi/lib.rs
@@ -6,9 +6,11 @@ use deno_core::error::generic_error;
use deno_core::error::range_error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::futures::channel::mpsc;
use deno_core::futures::Future;
use deno_core::include_js_files;
use deno_core::op;
+use std::sync::mpsc::sync_channel;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
@@ -37,7 +39,7 @@ use std::ptr;
use std::rc::Rc;
thread_local! {
- static IS_ISOLATE_THREAD: RefCell<bool> = RefCell::new(false);
+ static LOCAL_ISOLATE_POINTER: RefCell<*const v8::Isolate> = RefCell::new(ptr::null());
}
pub struct Unstable(pub bool);
@@ -122,7 +124,6 @@ impl DynamicLibraryResource {
name: String,
foreign_fn: ForeignFunction,
) -> Result<(), AnyError> {
- IS_ISOLATE_THREAD.with(|s| s.replace(true));
let symbol = match &foreign_fn.name {
Some(symbol) => symbol,
None => &name,
@@ -178,6 +179,14 @@ impl DynamicLibraryResource {
}
}
+type PendingFfiAsyncWork = Box<dyn FnOnce()>;
+
+struct FfiState {
+ async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
+ async_work_receiver: mpsc::UnboundedReceiver<PendingFfiAsyncWork>,
+ active_refed_functions: usize,
+}
+
pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
Extension::builder()
.js(include_js_files!(
@@ -204,10 +213,51 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
op_ffi_read_f32::decl::<P>(),
op_ffi_read_f64::decl::<P>(),
op_ffi_unsafe_callback_create::decl::<P>(),
+ op_ffi_unsafe_callback_ref::decl(),
])
+ .event_loop_middleware(|op_state_rc, _cx| {
+ // FFI callbacks coming in from other threads will call in and get queued.
+ let mut maybe_scheduling = false;
+
+ let mut work_items: Vec<PendingFfiAsyncWork> = vec![];
+
+ {
+ let mut op_state = op_state_rc.borrow_mut();
+ let ffi_state = op_state.borrow_mut::<FfiState>();
+
+ while let Ok(Some(async_work_fut)) =
+ ffi_state.async_work_receiver.try_next()
+ {
+ // Move received items to a temporary vector so that we can drop the `op_state` borrow before we do the work.
+ work_items.push(async_work_fut);
+ maybe_scheduling = true;
+ }
+
+ if ffi_state.active_refed_functions > 0 {
+ maybe_scheduling = true;
+ }
+
+ drop(op_state);
+ }
+ while let Some(async_work_fut) = work_items.pop() {
+ async_work_fut();
+ }
+
+ maybe_scheduling
+ })
.state(move |state| {
// Stolen from deno_webgpu, is there a better option?
state.put(Unstable(unstable));
+
+ let (async_work_sender, async_work_receiver) =
+ mpsc::unbounded::<PendingFfiAsyncWork>();
+
+ state.put(FfiState {
+ active_refed_functions: 0,
+ async_work_receiver,
+ async_work_sender,
+ });
+
Ok(())
})
.build()
@@ -831,6 +881,7 @@ impl Resource for UnsafeCallbackResource {
}
struct CallbackInfo {
+ pub async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
pub callback: NonNull<v8::Function>,
pub context: NonNull<v8::Context>,
pub isolate: *mut v8::Isolate,
@@ -842,21 +893,55 @@ unsafe extern "C" fn deno_ffi_callback(
args: *const *const c_void,
info: &CallbackInfo,
) {
- let isolate = &mut *info.isolate;
- let callback = v8::Global::from_raw(isolate, info.callback);
- let context = std::mem::transmute::<
- NonNull<v8::Context>,
- v8::Local<v8::Context>,
- >(info.context);
- IS_ISOLATE_THREAD.with(|is_event_loop_thread| {
- if !(*is_event_loop_thread.borrow()) {
- // Call from another thread, not yet supported.
- eprintln!(
- "Calling Deno FFI's callbacks from other threads is not supported"
+ LOCAL_ISOLATE_POINTER.with(|s| {
+ if ptr::eq(*s.borrow(), info.isolate) {
+ // Own isolate thread, okay to call directly
+ do_ffi_callback(
+ cif,
+ result,
+ args,
+ info.callback,
+ info.context,
+ info.isolate,
);
- std::process::exit(1);
+ } else {
+ let async_work_sender = &info.async_work_sender;
+ // SAFETY: Safe as this function blocks until `do_ffi_callback` completes and a response message is received.
+ let cif: &'static libffi::low::ffi_cif = std::mem::transmute(cif);
+ let result: &'static mut c_void = std::mem::transmute(result);
+ let info: &'static CallbackInfo = std::mem::transmute(info);
+ let (response_sender, response_receiver) = sync_channel::<()>(0);
+ let fut = Box::new(move || {
+ do_ffi_callback(
+ cif,
+ result,
+ args,
+ info.callback,
+ info.context,
+ info.isolate,
+ );
+ response_sender.send(()).unwrap();
+ });
+ async_work_sender.unbounded_send(fut).unwrap();
+ response_receiver.recv().unwrap();
}
});
+}
+
+unsafe fn do_ffi_callback(
+ cif: &libffi::low::ffi_cif,
+ result: &mut c_void,
+ args: *const *const c_void,
+ callback: NonNull<v8::Function>,
+ context: NonNull<v8::Context>,
+ isolate: *mut v8::Isolate,
+) {
+ let isolate = &mut *isolate;
+ let callback = v8::Global::from_raw(isolate, callback);
+ let context = std::mem::transmute::<
+ NonNull<v8::Context>,
+ v8::Local<v8::Context>,
+ >(context);
// Call from main thread. If this callback is being triggered due to a
// function call coming from Deno itself, then this callback will build
// ontop of that stack.
@@ -1096,11 +1181,20 @@ where
let cb = v8::Local::<v8::Function>::try_from(v8_value)?;
let isolate: *mut v8::Isolate = &mut *scope as &mut v8::Isolate;
+ LOCAL_ISOLATE_POINTER.with(|s| {
+ if s.borrow().is_null() {
+ s.replace(isolate);
+ }
+ });
+
+ let async_work_sender =
+ state.borrow_mut::<FfiState>().async_work_sender.clone();
let callback = v8::Global::new(scope, cb).into_raw();
let current_context = scope.get_current_context();
let context = v8::Global::new(scope, current_context).into_raw();
let info = Box::leak(Box::new(CallbackInfo {
+ async_work_sender,
callback,
context,
isolate,
@@ -1158,6 +1252,16 @@ where
Ok(result)
}
+#[op]
+fn op_ffi_unsafe_callback_ref(state: &mut deno_core::OpState, inc_dec: bool) {
+ let ffi_state = state.borrow_mut::<FfiState>();
+ if inc_dec {
+ ffi_state.active_refed_functions += 1;
+ } else {
+ ffi_state.active_refed_functions -= 1;
+ }
+}
+
#[op(v8)]
fn op_ffi_call_ptr_nonblocking<'scope, FP>(
scope: &mut v8::HandleScope<'scope>,