summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2023-01-12 04:47:55 -0800
committerGitHub <noreply@github.com>2023-01-12 13:47:55 +0100
commitdd2829be0c1f5c40ec38a045ea0a14bec34a82c5 (patch)
tree83cfa62a784ecb0fdada7ce556998d1d7247fa0a
parentcc806cdf2121878ae4c10b1fd0c4c03b14ba33c7 (diff)
fix(napi): Implement `napi_threadsafe_function` ref and unref (#17304)
Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
-rw-r--r--cli/napi/threadsafe_functions.rs54
-rw-r--r--ext/napi/lib.rs55
-rw-r--r--test_napi/src/lib.rs2
-rw-r--r--test_napi/src/tsfn.rs108
4 files changed, 201 insertions, 18 deletions
diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs
index d692b5bb2..119ee81da 100644
--- a/cli/napi/threadsafe_functions.rs
+++ b/cli/napi/threadsafe_functions.rs
@@ -2,19 +2,33 @@
use deno_core::futures::channel::mpsc;
use deno_runtime::deno_napi::*;
+use once_cell::sync::Lazy;
use std::mem::forget;
+use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
+use std::sync::Arc;
+
+static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
pub struct TsFn {
+ pub id: usize,
pub env: *mut Env,
pub maybe_func: Option<v8::Global<v8::Function>>,
pub maybe_call_js_cb: Option<napi_threadsafe_function_call_js>,
pub context: *mut c_void,
pub thread_counter: usize,
+ pub ref_counter: Arc<AtomicUsize>,
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
}
+impl Drop for TsFn {
+ fn drop(&mut self) {
+ let env = unsafe { self.env.as_mut().unwrap() };
+ env.remove_threadsafe_function_ref_counter(self.id)
+ }
+}
+
impl TsFn {
pub fn acquire(&mut self) -> Result {
self.thread_counter += 1;
@@ -35,6 +49,29 @@ impl TsFn {
Ok(())
}
+ pub fn ref_(&mut self) -> Result {
+ self
+ .ref_counter
+ .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ Ok(())
+ }
+
+ pub fn unref(&mut self) -> Result {
+ let _ = self.ref_counter.fetch_update(
+ std::sync::atomic::Ordering::SeqCst,
+ std::sync::atomic::Ordering::SeqCst,
+ |x| {
+ if x == 0 {
+ None
+ } else {
+ Some(x - 1)
+ }
+ },
+ );
+
+ Ok(())
+ }
+
pub fn call(&self, data: *mut c_void, is_blocking: bool) {
let js_func = self.maybe_func.clone();
let (tx, rx) = channel();
@@ -107,15 +144,21 @@ fn napi_create_threadsafe_function(
})
.transpose()?;
+ let id = TS_FN_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+
let tsfn = TsFn {
+ id,
maybe_func,
maybe_call_js_cb,
context,
thread_counter: initial_thread_count,
sender: env_ref.async_work_sender.clone(),
tsfn_sender: env_ref.threadsafe_function_sender.clone(),
+ ref_counter: Arc::new(AtomicUsize::new(1)),
env,
};
+ env_ref
+ .add_threadsafe_function_ref_counter(tsfn.id, tsfn.ref_counter.clone());
env_ref
.threadsafe_function_sender
@@ -142,7 +185,8 @@ fn napi_unref_threadsafe_function(
_env: &mut Env,
tsfn: napi_threadsafe_function,
) -> Result {
- let _tsfn: &TsFn = &*(tsfn as *const TsFn);
+ let tsfn: &mut TsFn = &mut *(tsfn as *mut TsFn);
+ tsfn.unref()?;
Ok(())
}
@@ -170,8 +214,12 @@ fn napi_call_threadsafe_function(
}
#[napi_sym::napi_sym]
-fn napi_ref_threadsafe_function() -> Result {
- // TODO
+fn napi_ref_threadsafe_function(
+ _env: &mut Env,
+ func: napi_threadsafe_function,
+) -> Result {
+ let tsfn: &mut TsFn = &mut *(func as *mut TsFn);
+ tsfn.ref_()?;
Ok(())
}
diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs
index 882f7c19d..57f73a0ca 100644
--- a/ext/napi/lib.rs
+++ b/ext/napi/lib.rs
@@ -19,6 +19,8 @@ use std::ffi::CString;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
use std::task::Poll;
use std::thread_local;
@@ -322,7 +324,7 @@ pub struct napi_node_version {
}
pub type PendingNapiAsyncWork = Box<dyn FnOnce()>;
-
+pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>;
pub struct NapiState {
// Async tasks.
pub pending_async_work: Vec<PendingNapiAsyncWork>,
@@ -336,6 +338,7 @@ pub struct NapiState {
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
pub env_cleanup_hooks:
Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>,
+ pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
}
impl Drop for NapiState {
@@ -391,6 +394,7 @@ pub struct Env {
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
pub cleanup_hooks:
Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>,
+ pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
}
unsafe impl Send for Env {}
@@ -405,6 +409,7 @@ impl Env {
cleanup_hooks: Rc<
RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>,
>,
+ tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
) -> Self {
let sc = sender.clone();
ASYNC_WORK_SENDER.with(|s| {
@@ -423,6 +428,7 @@ impl Env {
async_work_sender: sender,
threadsafe_function_sender,
cleanup_hooks,
+ tsfn_ref_counters,
}
}
@@ -458,6 +464,22 @@ impl Env {
// using `napi_open_handle_scope`.
unsafe { v8::CallbackScope::new(context) }
}
+
+ pub fn add_threadsafe_function_ref_counter(
+ &mut self,
+ id: usize,
+ counter: Arc<AtomicUsize>,
+ ) {
+ let mut counters = self.tsfn_ref_counters.borrow_mut();
+ assert!(!counters.iter().any(|(i, _)| *i == id));
+ counters.push((id, counter));
+ }
+
+ pub fn remove_threadsafe_function_ref_counter(&mut self, id: usize) {
+ let mut counters = self.tsfn_ref_counters.borrow_mut();
+ let index = counters.iter().position(|(i, _)| *i == id).unwrap();
+ counters.remove(index);
+ }
}
pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
@@ -479,22 +501,16 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
napi_state.pending_async_work.push(async_work_fut);
}
- while let Poll::Ready(Some(tsfn_status)) =
- napi_state.threadsafe_function_receiver.poll_next_unpin(cx)
- {
- match tsfn_status {
- ThreadSafeFunctionStatus::Alive => {
- napi_state.active_threadsafe_functions += 1
- }
- ThreadSafeFunctionStatus::Dead => {
- napi_state.active_threadsafe_functions -= 1
- }
- };
- }
-
if napi_state.active_threadsafe_functions > 0 {
maybe_scheduling = true;
}
+
+ for (_id, counter) in napi_state.tsfn_ref_counters.borrow().iter() {
+ if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 {
+ maybe_scheduling = true;
+ break;
+ }
+ }
}
loop {
@@ -527,6 +543,7 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
threadsafe_function_receiver,
active_threadsafe_functions: 0,
env_cleanup_hooks: Rc::new(RefCell::new(vec![])),
+ tsfn_ref_counters: Rc::new(RefCell::new(vec![])),
});
state.put(Unstable(unstable));
Ok(())
@@ -563,7 +580,13 @@ where
let permissions = op_state.borrow_mut::<NP>();
permissions.check(Some(&PathBuf::from(&path)))?;
- let (async_work_sender, tsfn_sender, isolate_ptr, cleanup_hooks) = {
+ let (
+ async_work_sender,
+ tsfn_sender,
+ isolate_ptr,
+ cleanup_hooks,
+ tsfn_ref_counters,
+ ) = {
let napi_state = op_state.borrow::<NapiState>();
let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>();
(
@@ -571,6 +594,7 @@ where
napi_state.threadsafe_function_sender.clone(),
*isolate_ptr,
napi_state.env_cleanup_hooks.clone(),
+ napi_state.tsfn_ref_counters.clone(),
)
};
@@ -593,6 +617,7 @@ where
async_work_sender,
tsfn_sender,
cleanup_hooks,
+ tsfn_ref_counters,
);
env.shared = Box::into_raw(Box::new(env_shared));
let env_ptr = Box::into_raw(Box::new(env)) as _;
diff --git a/test_napi/src/lib.rs b/test_napi/src/lib.rs
index 3a28e4471..025fbf5d2 100644
--- a/test_napi/src/lib.rs
+++ b/test_napi/src/lib.rs
@@ -17,6 +17,7 @@ pub mod primitives;
pub mod promise;
pub mod properties;
pub mod strings;
+pub mod tsfn;
pub mod typedarray;
#[macro_export]
@@ -126,6 +127,7 @@ unsafe extern "C" fn napi_register_module_v1(
object_wrap::init(env, exports);
callback::init(env, exports);
r#async::init(env, exports);
+ tsfn::init(env, exports);
init_cleanup_hook(env, exports);
exports
diff --git a/test_napi/src/tsfn.rs b/test_napi/src/tsfn.rs
new file mode 100644
index 000000000..314975f39
--- /dev/null
+++ b/test_napi/src/tsfn.rs
@@ -0,0 +1,108 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+// This test performs initilization similar to napi-rs.
+// https://github.com/napi-rs/napi-rs/commit/a5a04a4e545f268769cc78e2bd6c45af4336aac3
+
+use napi_sys as sys;
+use std::ffi::c_char;
+use std::ffi::c_void;
+use std::ptr;
+
+macro_rules! check_status_or_panic {
+ ($code:expr, $msg:expr) => {{
+ let c = $code;
+ match c {
+ sys::Status::napi_ok => {}
+ _ => panic!($msg),
+ }
+ }};
+}
+
+fn create_custom_gc(env: sys::napi_env) {
+ let mut custom_gc_fn = ptr::null_mut();
+ check_status_or_panic!(
+ unsafe {
+ sys::napi_create_function(
+ env,
+ "custom_gc".as_ptr() as *const c_char,
+ 9,
+ Some(empty),
+ ptr::null_mut(),
+ &mut custom_gc_fn,
+ )
+ },
+ "Create Custom GC Function in napi_register_module_v1 failed"
+ );
+ let mut async_resource_name = ptr::null_mut();
+ check_status_or_panic!(
+ unsafe {
+ sys::napi_create_string_utf8(
+ env,
+ "CustomGC".as_ptr() as *const c_char,
+ 8,
+ &mut async_resource_name,
+ )
+ },
+ "Create async resource string in napi_register_module_v1 napi_register_module_v1"
+ );
+ let mut custom_gc_tsfn = ptr::null_mut();
+ check_status_or_panic!(
+ unsafe {
+ sys::napi_create_threadsafe_function(
+ env,
+ custom_gc_fn,
+ ptr::null_mut(),
+ async_resource_name,
+ 0,
+ 1,
+ ptr::null_mut(),
+ Some(custom_gc_finalize),
+ ptr::null_mut(),
+ Some(custom_gc),
+ &mut custom_gc_tsfn,
+ )
+ },
+ "Create Custom GC ThreadsafeFunction in napi_register_module_v1 failed"
+ );
+ check_status_or_panic!(
+ unsafe { sys::napi_unref_threadsafe_function(env, custom_gc_tsfn) },
+ "Unref Custom GC ThreadsafeFunction in napi_register_module_v1 failed"
+ );
+}
+
+unsafe extern "C" fn empty(
+ _env: sys::napi_env,
+ _info: sys::napi_callback_info,
+) -> sys::napi_value {
+ ptr::null_mut()
+}
+
+unsafe extern "C" fn custom_gc_finalize(
+ _env: sys::napi_env,
+ _finalize_data: *mut c_void,
+ _finalize_hint: *mut c_void,
+) {
+}
+
+extern "C" fn custom_gc(
+ env: sys::napi_env,
+ _js_callback: sys::napi_value,
+ _context: *mut c_void,
+ data: *mut c_void,
+) {
+ let mut ref_count = 0;
+ check_status_or_panic!(
+ unsafe {
+ sys::napi_reference_unref(env, data as sys::napi_ref, &mut ref_count)
+ },
+ "Failed to unref Buffer reference in Custom GC"
+ );
+ check_status_or_panic!(
+ unsafe { sys::napi_delete_reference(env, data as sys::napi_ref) },
+ "Failed to delete Buffer reference in Custom GC"
+ );
+}
+
+pub fn init(env: sys::napi_env, _exports: sys::napi_value) {
+ create_custom_gc(env);
+}