diff options
author | Aapo Alasuutari <aapo.alasuutari@gmail.com> | 2022-10-15 16:49:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-15 19:19:46 +0530 |
commit | 75acec0aea3eb39afe9240d7952cc6106363c8de (patch) | |
tree | 09850aa0adb92261de65fe0a6fdefbb58a7e825f /ext/ffi/lib.rs | |
parent | 8283d37c51a8fd5f92ba3f8be0bb26f241f864b6 (diff) |
fix(ext/ffi): Fix UnsafeCallback ref'ing making Deno enter a live-loop (#16216)
Fixes #15136
Currently `UnsafeCallback` class' `ref()` and `unref()` methods rely on
the `event_loop_middleware` implementation in core. If even a single
`UnsafeCallback` is ref'ed, then the FFI event loop middleware will
always return `true` to signify that there may still be more work for
the event loop to do.
The middleware handling in core does not wait a moment to check again,
but will instead synchronously directly re-poll the event loop and
middlewares for more work. This becomes a live-loop.
This PR introduces a `Future` implementation for the `CallbackInfo`
struct that acts as the intermediary data storage between an
`UnsafeCallback` and the `libffi` C callback. Ref'ing a callback now
means calling an async op that binds to the `CallbackInfo` Future and
only resolves once the callback is unref'ed. The `libffi` C callback
will call the waker of this Future when it fires to make sure that the
main thread wakes up to receive the callback.
Diffstat (limited to 'ext/ffi/lib.rs')
-rw-r--r-- | ext/ffi/lib.rs | 94 |
1 files changed, 73 insertions, 21 deletions
diff --git a/ext/ffi/lib.rs b/ext/ffi/lib.rs index 9342abf6b..183fd9214 100644 --- a/ext/ffi/lib.rs +++ b/ext/ffi/lib.rs @@ -13,6 +13,8 @@ use deno_core::op; use deno_core::serde_json::Value; use deno_core::serde_v8; use deno_core::v8; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::Extension; use deno_core::OpState; use deno_core::Resource; @@ -26,14 +28,18 @@ use std::cell::RefCell; use std::collections::HashMap; use std::ffi::c_void; use std::ffi::CStr; +use std::future::IntoFuture; use std::mem::size_of; use std::os::raw::c_char; use std::os::raw::c_short; use std::path::Path; use std::path::PathBuf; +use std::pin::Pin; use std::ptr; use std::rc::Rc; use std::sync::mpsc::sync_channel; +use std::task::Poll; +use std::task::Waker; mod fast_call; @@ -156,7 +162,6 @@ type PendingFfiAsyncWork = Box<dyn FnOnce()>; struct FfiState { async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>, async_work_receiver: mpsc::UnboundedReceiver<PendingFfiAsyncWork>, - active_refed_functions: usize, } pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension { @@ -188,6 +193,7 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension { op_ffi_read_f64::decl::<P>(), op_ffi_unsafe_callback_create::decl::<P>(), op_ffi_unsafe_callback_ref::decl(), + op_ffi_unsafe_callback_unref::decl(), ]) .event_loop_middleware(|op_state_rc, _cx| { // FFI callbacks coming in from other threads will call in and get queued. @@ -207,10 +213,6 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension { maybe_scheduling = true; } - if ffi_state.active_refed_functions > 0 { - maybe_scheduling = true; - } - drop(op_state); } while let Some(async_work_fut) = work_items.pop() { @@ -227,7 +229,6 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension { mpsc::unbounded::<PendingFfiAsyncWork>(); state.put(FfiState { - active_refed_functions: 0, async_work_receiver, async_work_sender, }); @@ -1320,11 +1321,12 @@ fn ffi_call( } struct UnsafeCallbackResource { + cancel: Rc<CancelHandle>, // Closure is never directly touched, but it keeps the C callback alive // until `close()` method is called. #[allow(dead_code)] closure: libffi::middle::Closure<'static>, - info: *const CallbackInfo, + info: *mut CallbackInfo, } impl Resource for UnsafeCallbackResource { @@ -1333,15 +1335,16 @@ impl Resource for UnsafeCallbackResource { } fn close(self: Rc<Self>) { + self.cancel.cancel(); // SAFETY: This drops the closure and the callback info associated with it. // Any retained function pointers to the closure become dangling pointers. // It is up to the user to know that it is safe to call the `close()` on the // UnsafeCallback instance. unsafe { - let info = Box::from_raw(self.info as *mut CallbackInfo); + let info = Box::from_raw(self.info); let isolate = info.isolate.as_mut().unwrap(); - v8::Global::from_raw(isolate, info.callback); - v8::Global::from_raw(isolate, info.context); + let _ = v8::Global::from_raw(isolate, info.callback); + let _ = v8::Global::from_raw(isolate, info.context); } } } @@ -1353,6 +1356,7 @@ struct CallbackInfo { pub callback: NonNull<v8::Function>, pub context: NonNull<v8::Context>, pub isolate: *mut v8::Isolate, + pub waker: Option<Waker>, } unsafe extern "C" fn deno_ffi_callback( @@ -1376,6 +1380,10 @@ unsafe extern "C" fn deno_ffi_callback( response_sender.send(()).unwrap(); }); async_work_sender.unbounded_send(fut).unwrap(); + if let Some(waker) = info.waker.as_ref() { + // Make sure event loop wakes up to receive our message before we start waiting for a response. + waker.wake_by_ref(); + } response_receiver.recv().unwrap(); } }); @@ -1725,22 +1733,30 @@ where let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); - let info = Box::leak(Box::new(CallbackInfo { + let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo { parameters: args.parameters.clone(), result: args.result, async_work_sender, callback, context, isolate, + waker: None, })); let cif = Cif::new( args.parameters.into_iter().map(libffi::middle::Type::from), libffi::middle::Type::from(args.result), ); - let closure = libffi::middle::Closure::new(cif, deno_ffi_callback, info); + // SAFETY: CallbackInfo is leaked, is not null and stays valid as long as the callback exists. + let closure = libffi::middle::Closure::new(cif, deno_ffi_callback, unsafe { + info.as_ref().unwrap() + }); let ptr = *closure.code_ptr() as usize; - let resource = UnsafeCallbackResource { closure, info }; + let resource = UnsafeCallbackResource { + cancel: CancelHandle::new_rc(), + closure, + info, + }; let rid = state.resource_table.add(resource); let rid_local = v8::Integer::new_from_unsigned(scope, rid); @@ -1790,17 +1806,53 @@ where Ok(result) } -#[op] -fn op_ffi_unsafe_callback_ref(state: &mut deno_core::OpState, inc_dec: bool) { - check_unstable(state, "Deno.dlopen"); - let ffi_state = state.borrow_mut::<FfiState>(); - if inc_dec { - ffi_state.active_refed_functions += 1; - } else { - ffi_state.active_refed_functions -= 1; +impl Future for CallbackInfo { + type Output = (); + fn poll( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<Self::Output> { + // Always replace the waker to make sure it's bound to the proper Future. + self.waker.replace(cx.waker().clone()); + // The future for the CallbackInfo never resolves: It can only be canceled. + Poll::Pending } } +#[op] +fn op_ffi_unsafe_callback_ref( + state: &mut deno_core::OpState, + rid: ResourceId, +) -> Result<impl Future<Output = Result<(), AnyError>>, AnyError> { + let callback_resource = + state.resource_table.get::<UnsafeCallbackResource>(rid)?; + + Ok(async move { + let info: &mut CallbackInfo = + // SAFETY: CallbackInfo pointer stays valid as long as the resource is still alive. + unsafe { callback_resource.info.as_mut().unwrap() }; + // Ignore cancellation rejection + let _ = info + .into_future() + .or_cancel(callback_resource.cancel.clone()) + .await; + Ok(()) + }) +} + +#[op(fast)] +fn op_ffi_unsafe_callback_unref( + state: &mut deno_core::OpState, + rid: u32, +) -> Result<(), AnyError> { + state + .resource_table + .get::<UnsafeCallbackResource>(rid)? + .cancel + .cancel(); + Ok(()) +} + #[op(v8)] fn op_ffi_call_ptr_nonblocking<'scope, FP>( scope: &mut v8::HandleScope<'scope>, |