diff options
Diffstat (limited to 'ext')
-rw-r--r-- | ext/ffi/00_ffi.js | 18 | ||||
-rw-r--r-- | ext/ffi/lib.rs | 132 |
2 files changed, 136 insertions, 14 deletions
diff --git a/ext/ffi/00_ffi.js b/ext/ffi/00_ffi.js index 190873307..f308ecad9 100644 --- a/ext/ffi/00_ffi.js +++ b/ext/ffi/00_ffi.js @@ -201,6 +201,7 @@ } class UnsafeCallback { + #refcount; #rid; definition; callback; @@ -217,13 +218,30 @@ definition, callback, ); + this.#refcount = 0; this.#rid = rid; this.pointer = pointer; this.definition = definition; this.callback = callback; } + ref() { + if (this.#refcount++ === 0) { + core.opSync("op_ffi_unsafe_callback_ref", true); + } + } + + unref() { + if (--this.#refcount === 0) { + core.opSync("op_ffi_unsafe_callback_ref", false); + } + } + close() { + if (this.#refcount) { + this.#refcount = 0; + core.opSync("op_ffi_unsafe_callback_ref", false); + } core.close(this.#rid); } } diff --git a/ext/ffi/lib.rs b/ext/ffi/lib.rs index 41b2f0a3b..d90f20a29 100644 --- a/ext/ffi/lib.rs +++ b/ext/ffi/lib.rs @@ -6,9 +6,11 @@ use deno_core::error::generic_error; use deno_core::error::range_error; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::channel::mpsc; use deno_core::futures::Future; use deno_core::include_js_files; use deno_core::op; +use std::sync::mpsc::sync_channel; use deno_core::serde_json::json; use deno_core::serde_json::Value; @@ -37,7 +39,7 @@ use std::ptr; use std::rc::Rc; thread_local! { - static IS_ISOLATE_THREAD: RefCell<bool> = RefCell::new(false); + static LOCAL_ISOLATE_POINTER: RefCell<*const v8::Isolate> = RefCell::new(ptr::null()); } pub struct Unstable(pub bool); @@ -122,7 +124,6 @@ impl DynamicLibraryResource { name: String, foreign_fn: ForeignFunction, ) -> Result<(), AnyError> { - IS_ISOLATE_THREAD.with(|s| s.replace(true)); let symbol = match &foreign_fn.name { Some(symbol) => symbol, None => &name, @@ -178,6 +179,14 @@ impl DynamicLibraryResource { } } +type PendingFfiAsyncWork = Box<dyn FnOnce()>; + +struct FfiState { + async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>, + async_work_receiver: mpsc::UnboundedReceiver<PendingFfiAsyncWork>, + active_refed_functions: usize, +} + pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension { Extension::builder() .js(include_js_files!( @@ -204,10 +213,51 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension { op_ffi_read_f32::decl::<P>(), op_ffi_read_f64::decl::<P>(), op_ffi_unsafe_callback_create::decl::<P>(), + op_ffi_unsafe_callback_ref::decl(), ]) + .event_loop_middleware(|op_state_rc, _cx| { + // FFI callbacks coming in from other threads will call in and get queued. + let mut maybe_scheduling = false; + + let mut work_items: Vec<PendingFfiAsyncWork> = vec![]; + + { + let mut op_state = op_state_rc.borrow_mut(); + let ffi_state = op_state.borrow_mut::<FfiState>(); + + while let Ok(Some(async_work_fut)) = + ffi_state.async_work_receiver.try_next() + { + // Move received items to a temporary vector so that we can drop the `op_state` borrow before we do the work. + work_items.push(async_work_fut); + maybe_scheduling = true; + } + + if ffi_state.active_refed_functions > 0 { + maybe_scheduling = true; + } + + drop(op_state); + } + while let Some(async_work_fut) = work_items.pop() { + async_work_fut(); + } + + maybe_scheduling + }) .state(move |state| { // Stolen from deno_webgpu, is there a better option? state.put(Unstable(unstable)); + + let (async_work_sender, async_work_receiver) = + mpsc::unbounded::<PendingFfiAsyncWork>(); + + state.put(FfiState { + active_refed_functions: 0, + async_work_receiver, + async_work_sender, + }); + Ok(()) }) .build() @@ -831,6 +881,7 @@ impl Resource for UnsafeCallbackResource { } struct CallbackInfo { + pub async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>, pub callback: NonNull<v8::Function>, pub context: NonNull<v8::Context>, pub isolate: *mut v8::Isolate, @@ -842,21 +893,55 @@ unsafe extern "C" fn deno_ffi_callback( args: *const *const c_void, info: &CallbackInfo, ) { - let isolate = &mut *info.isolate; - let callback = v8::Global::from_raw(isolate, info.callback); - let context = std::mem::transmute::< - NonNull<v8::Context>, - v8::Local<v8::Context>, - >(info.context); - IS_ISOLATE_THREAD.with(|is_event_loop_thread| { - if !(*is_event_loop_thread.borrow()) { - // Call from another thread, not yet supported. - eprintln!( - "Calling Deno FFI's callbacks from other threads is not supported" + LOCAL_ISOLATE_POINTER.with(|s| { + if ptr::eq(*s.borrow(), info.isolate) { + // Own isolate thread, okay to call directly + do_ffi_callback( + cif, + result, + args, + info.callback, + info.context, + info.isolate, ); - std::process::exit(1); + } else { + let async_work_sender = &info.async_work_sender; + // SAFETY: Safe as this function blocks until `do_ffi_callback` completes and a response message is received. + let cif: &'static libffi::low::ffi_cif = std::mem::transmute(cif); + let result: &'static mut c_void = std::mem::transmute(result); + let info: &'static CallbackInfo = std::mem::transmute(info); + let (response_sender, response_receiver) = sync_channel::<()>(0); + let fut = Box::new(move || { + do_ffi_callback( + cif, + result, + args, + info.callback, + info.context, + info.isolate, + ); + response_sender.send(()).unwrap(); + }); + async_work_sender.unbounded_send(fut).unwrap(); + response_receiver.recv().unwrap(); } }); +} + +unsafe fn do_ffi_callback( + cif: &libffi::low::ffi_cif, + result: &mut c_void, + args: *const *const c_void, + callback: NonNull<v8::Function>, + context: NonNull<v8::Context>, + isolate: *mut v8::Isolate, +) { + let isolate = &mut *isolate; + let callback = v8::Global::from_raw(isolate, callback); + let context = std::mem::transmute::< + NonNull<v8::Context>, + v8::Local<v8::Context>, + >(context); // Call from main thread. If this callback is being triggered due to a // function call coming from Deno itself, then this callback will build // ontop of that stack. @@ -1096,11 +1181,20 @@ where let cb = v8::Local::<v8::Function>::try_from(v8_value)?; let isolate: *mut v8::Isolate = &mut *scope as &mut v8::Isolate; + LOCAL_ISOLATE_POINTER.with(|s| { + if s.borrow().is_null() { + s.replace(isolate); + } + }); + + let async_work_sender = + state.borrow_mut::<FfiState>().async_work_sender.clone(); let callback = v8::Global::new(scope, cb).into_raw(); let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); let info = Box::leak(Box::new(CallbackInfo { + async_work_sender, callback, context, isolate, @@ -1158,6 +1252,16 @@ where Ok(result) } +#[op] +fn op_ffi_unsafe_callback_ref(state: &mut deno_core::OpState, inc_dec: bool) { + let ffi_state = state.borrow_mut::<FfiState>(); + if inc_dec { + ffi_state.active_refed_functions += 1; + } else { + ffi_state.active_refed_functions -= 1; + } +} + #[op(v8)] fn op_ffi_call_ptr_nonblocking<'scope, FP>( scope: &mut v8::HandleScope<'scope>, |