summaryrefslogtreecommitdiff
path: root/ext/ffi/lib.rs
diff options
context:
space:
mode:
authorAapo Alasuutari <aapo.alasuutari@gmail.com>2022-10-15 16:49:46 +0300
committerGitHub <noreply@github.com>2022-10-15 19:19:46 +0530
commit75acec0aea3eb39afe9240d7952cc6106363c8de (patch)
tree09850aa0adb92261de65fe0a6fdefbb58a7e825f /ext/ffi/lib.rs
parent8283d37c51a8fd5f92ba3f8be0bb26f241f864b6 (diff)
fix(ext/ffi): Fix UnsafeCallback ref'ing making Deno enter a live-loop (#16216)
Fixes #15136 Currently `UnsafeCallback` class' `ref()` and `unref()` methods rely on the `event_loop_middleware` implementation in core. If even a single `UnsafeCallback` is ref'ed, then the FFI event loop middleware will always return `true` to signify that there may still be more work for the event loop to do. The middleware handling in core does not wait a moment to check again, but will instead synchronously directly re-poll the event loop and middlewares for more work. This becomes a live-loop. This PR introduces a `Future` implementation for the `CallbackInfo` struct that acts as the intermediary data storage between an `UnsafeCallback` and the `libffi` C callback. Ref'ing a callback now means calling an async op that binds to the `CallbackInfo` Future and only resolves once the callback is unref'ed. The `libffi` C callback will call the waker of this Future when it fires to make sure that the main thread wakes up to receive the callback.
Diffstat (limited to 'ext/ffi/lib.rs')
-rw-r--r--ext/ffi/lib.rs94
1 files changed, 73 insertions, 21 deletions
diff --git a/ext/ffi/lib.rs b/ext/ffi/lib.rs
index 9342abf6b..183fd9214 100644
--- a/ext/ffi/lib.rs
+++ b/ext/ffi/lib.rs
@@ -13,6 +13,8 @@ use deno_core::op;
use deno_core::serde_json::Value;
use deno_core::serde_v8;
use deno_core::v8;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::Resource;
@@ -26,14 +28,18 @@ use std::cell::RefCell;
use std::collections::HashMap;
use std::ffi::c_void;
use std::ffi::CStr;
+use std::future::IntoFuture;
use std::mem::size_of;
use std::os::raw::c_char;
use std::os::raw::c_short;
use std::path::Path;
use std::path::PathBuf;
+use std::pin::Pin;
use std::ptr;
use std::rc::Rc;
use std::sync::mpsc::sync_channel;
+use std::task::Poll;
+use std::task::Waker;
mod fast_call;
@@ -156,7 +162,6 @@ type PendingFfiAsyncWork = Box<dyn FnOnce()>;
struct FfiState {
async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
async_work_receiver: mpsc::UnboundedReceiver<PendingFfiAsyncWork>,
- active_refed_functions: usize,
}
pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
@@ -188,6 +193,7 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
op_ffi_read_f64::decl::<P>(),
op_ffi_unsafe_callback_create::decl::<P>(),
op_ffi_unsafe_callback_ref::decl(),
+ op_ffi_unsafe_callback_unref::decl(),
])
.event_loop_middleware(|op_state_rc, _cx| {
// FFI callbacks coming in from other threads will call in and get queued.
@@ -207,10 +213,6 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
maybe_scheduling = true;
}
- if ffi_state.active_refed_functions > 0 {
- maybe_scheduling = true;
- }
-
drop(op_state);
}
while let Some(async_work_fut) = work_items.pop() {
@@ -227,7 +229,6 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
mpsc::unbounded::<PendingFfiAsyncWork>();
state.put(FfiState {
- active_refed_functions: 0,
async_work_receiver,
async_work_sender,
});
@@ -1320,11 +1321,12 @@ fn ffi_call(
}
struct UnsafeCallbackResource {
+ cancel: Rc<CancelHandle>,
// Closure is never directly touched, but it keeps the C callback alive
// until `close()` method is called.
#[allow(dead_code)]
closure: libffi::middle::Closure<'static>,
- info: *const CallbackInfo,
+ info: *mut CallbackInfo,
}
impl Resource for UnsafeCallbackResource {
@@ -1333,15 +1335,16 @@ impl Resource for UnsafeCallbackResource {
}
fn close(self: Rc<Self>) {
+ self.cancel.cancel();
// SAFETY: This drops the closure and the callback info associated with it.
// Any retained function pointers to the closure become dangling pointers.
// It is up to the user to know that it is safe to call the `close()` on the
// UnsafeCallback instance.
unsafe {
- let info = Box::from_raw(self.info as *mut CallbackInfo);
+ let info = Box::from_raw(self.info);
let isolate = info.isolate.as_mut().unwrap();
- v8::Global::from_raw(isolate, info.callback);
- v8::Global::from_raw(isolate, info.context);
+ let _ = v8::Global::from_raw(isolate, info.callback);
+ let _ = v8::Global::from_raw(isolate, info.context);
}
}
}
@@ -1353,6 +1356,7 @@ struct CallbackInfo {
pub callback: NonNull<v8::Function>,
pub context: NonNull<v8::Context>,
pub isolate: *mut v8::Isolate,
+ pub waker: Option<Waker>,
}
unsafe extern "C" fn deno_ffi_callback(
@@ -1376,6 +1380,10 @@ unsafe extern "C" fn deno_ffi_callback(
response_sender.send(()).unwrap();
});
async_work_sender.unbounded_send(fut).unwrap();
+ if let Some(waker) = info.waker.as_ref() {
+ // Make sure event loop wakes up to receive our message before we start waiting for a response.
+ waker.wake_by_ref();
+ }
response_receiver.recv().unwrap();
}
});
@@ -1725,22 +1733,30 @@ where
let current_context = scope.get_current_context();
let context = v8::Global::new(scope, current_context).into_raw();
- let info = Box::leak(Box::new(CallbackInfo {
+ let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo {
parameters: args.parameters.clone(),
result: args.result,
async_work_sender,
callback,
context,
isolate,
+ waker: None,
}));
let cif = Cif::new(
args.parameters.into_iter().map(libffi::middle::Type::from),
libffi::middle::Type::from(args.result),
);
- let closure = libffi::middle::Closure::new(cif, deno_ffi_callback, info);
+ // SAFETY: CallbackInfo is leaked, is not null and stays valid as long as the callback exists.
+ let closure = libffi::middle::Closure::new(cif, deno_ffi_callback, unsafe {
+ info.as_ref().unwrap()
+ });
let ptr = *closure.code_ptr() as usize;
- let resource = UnsafeCallbackResource { closure, info };
+ let resource = UnsafeCallbackResource {
+ cancel: CancelHandle::new_rc(),
+ closure,
+ info,
+ };
let rid = state.resource_table.add(resource);
let rid_local = v8::Integer::new_from_unsigned(scope, rid);
@@ -1790,17 +1806,53 @@ where
Ok(result)
}
-#[op]
-fn op_ffi_unsafe_callback_ref(state: &mut deno_core::OpState, inc_dec: bool) {
- check_unstable(state, "Deno.dlopen");
- let ffi_state = state.borrow_mut::<FfiState>();
- if inc_dec {
- ffi_state.active_refed_functions += 1;
- } else {
- ffi_state.active_refed_functions -= 1;
+impl Future for CallbackInfo {
+ type Output = ();
+ fn poll(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ // Always replace the waker to make sure it's bound to the proper Future.
+ self.waker.replace(cx.waker().clone());
+ // The future for the CallbackInfo never resolves: It can only be canceled.
+ Poll::Pending
}
}
+#[op]
+fn op_ffi_unsafe_callback_ref(
+ state: &mut deno_core::OpState,
+ rid: ResourceId,
+) -> Result<impl Future<Output = Result<(), AnyError>>, AnyError> {
+ let callback_resource =
+ state.resource_table.get::<UnsafeCallbackResource>(rid)?;
+
+ Ok(async move {
+ let info: &mut CallbackInfo =
+ // SAFETY: CallbackInfo pointer stays valid as long as the resource is still alive.
+ unsafe { callback_resource.info.as_mut().unwrap() };
+ // Ignore cancellation rejection
+ let _ = info
+ .into_future()
+ .or_cancel(callback_resource.cancel.clone())
+ .await;
+ Ok(())
+ })
+}
+
+#[op(fast)]
+fn op_ffi_unsafe_callback_unref(
+ state: &mut deno_core::OpState,
+ rid: u32,
+) -> Result<(), AnyError> {
+ state
+ .resource_table
+ .get::<UnsafeCallbackResource>(rid)?
+ .cancel
+ .cancel();
+ Ok(())
+}
+
#[op(v8)]
fn op_ffi_call_ptr_nonblocking<'scope, FP>(
scope: &mut v8::HandleScope<'scope>,