diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2023-01-12 04:47:55 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-12 13:47:55 +0100 |
commit | dd2829be0c1f5c40ec38a045ea0a14bec34a82c5 (patch) | |
tree | 83cfa62a784ecb0fdada7ce556998d1d7247fa0a | |
parent | cc806cdf2121878ae4c10b1fd0c4c03b14ba33c7 (diff) |
fix(napi): Implement `napi_threadsafe_function` ref and unref (#17304)
Co-authored-by: Bartek IwaĆczuk <biwanczuk@gmail.com>
-rw-r--r-- | cli/napi/threadsafe_functions.rs | 54 | ||||
-rw-r--r-- | ext/napi/lib.rs | 55 | ||||
-rw-r--r-- | test_napi/src/lib.rs | 2 | ||||
-rw-r--r-- | test_napi/src/tsfn.rs | 108 |
4 files changed, 201 insertions, 18 deletions
diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs index d692b5bb2..119ee81da 100644 --- a/cli/napi/threadsafe_functions.rs +++ b/cli/napi/threadsafe_functions.rs @@ -2,19 +2,33 @@ use deno_core::futures::channel::mpsc; use deno_runtime::deno_napi::*; +use once_cell::sync::Lazy; use std::mem::forget; +use std::sync::atomic::AtomicUsize; use std::sync::mpsc::channel; +use std::sync::Arc; + +static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0)); pub struct TsFn { + pub id: usize, pub env: *mut Env, pub maybe_func: Option<v8::Global<v8::Function>>, pub maybe_call_js_cb: Option<napi_threadsafe_function_call_js>, pub context: *mut c_void, pub thread_counter: usize, + pub ref_counter: Arc<AtomicUsize>, sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>, } +impl Drop for TsFn { + fn drop(&mut self) { + let env = unsafe { self.env.as_mut().unwrap() }; + env.remove_threadsafe_function_ref_counter(self.id) + } +} + impl TsFn { pub fn acquire(&mut self) -> Result { self.thread_counter += 1; @@ -35,6 +49,29 @@ impl TsFn { Ok(()) } + pub fn ref_(&mut self) -> Result { + self + .ref_counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) + } + + pub fn unref(&mut self) -> Result { + let _ = self.ref_counter.fetch_update( + std::sync::atomic::Ordering::SeqCst, + std::sync::atomic::Ordering::SeqCst, + |x| { + if x == 0 { + None + } else { + Some(x - 1) + } + }, + ); + + Ok(()) + } + pub fn call(&self, data: *mut c_void, is_blocking: bool) { let js_func = self.maybe_func.clone(); let (tx, rx) = channel(); @@ -107,15 +144,21 @@ fn napi_create_threadsafe_function( }) .transpose()?; + let id = TS_FN_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let tsfn = TsFn { + id, maybe_func, maybe_call_js_cb, context, thread_counter: initial_thread_count, sender: env_ref.async_work_sender.clone(), tsfn_sender: env_ref.threadsafe_function_sender.clone(), + ref_counter: Arc::new(AtomicUsize::new(1)), env, }; + env_ref + .add_threadsafe_function_ref_counter(tsfn.id, tsfn.ref_counter.clone()); env_ref .threadsafe_function_sender @@ -142,7 +185,8 @@ fn napi_unref_threadsafe_function( _env: &mut Env, tsfn: napi_threadsafe_function, ) -> Result { - let _tsfn: &TsFn = &*(tsfn as *const TsFn); + let tsfn: &mut TsFn = &mut *(tsfn as *mut TsFn); + tsfn.unref()?; Ok(()) } @@ -170,8 +214,12 @@ fn napi_call_threadsafe_function( } #[napi_sym::napi_sym] -fn napi_ref_threadsafe_function() -> Result { - // TODO +fn napi_ref_threadsafe_function( + _env: &mut Env, + func: napi_threadsafe_function, +) -> Result { + let tsfn: &mut TsFn = &mut *(func as *mut TsFn); + tsfn.ref_()?; Ok(()) } diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs index 882f7c19d..57f73a0ca 100644 --- a/ext/napi/lib.rs +++ b/ext/napi/lib.rs @@ -19,6 +19,8 @@ use std::ffi::CString; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; use std::task::Poll; use std::thread_local; @@ -322,7 +324,7 @@ pub struct napi_node_version { } pub type PendingNapiAsyncWork = Box<dyn FnOnce()>; - +pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>; pub struct NapiState { // Async tasks. pub pending_async_work: Vec<PendingNapiAsyncWork>, @@ -336,6 +338,7 @@ pub struct NapiState { mpsc::UnboundedSender<ThreadSafeFunctionStatus>, pub env_cleanup_hooks: Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>, + pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>, } impl Drop for NapiState { @@ -391,6 +394,7 @@ pub struct Env { mpsc::UnboundedSender<ThreadSafeFunctionStatus>, pub cleanup_hooks: Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>, + pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>, } unsafe impl Send for Env {} @@ -405,6 +409,7 @@ impl Env { cleanup_hooks: Rc< RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>, >, + tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>, ) -> Self { let sc = sender.clone(); ASYNC_WORK_SENDER.with(|s| { @@ -423,6 +428,7 @@ impl Env { async_work_sender: sender, threadsafe_function_sender, cleanup_hooks, + tsfn_ref_counters, } } @@ -458,6 +464,22 @@ impl Env { // using `napi_open_handle_scope`. unsafe { v8::CallbackScope::new(context) } } + + pub fn add_threadsafe_function_ref_counter( + &mut self, + id: usize, + counter: Arc<AtomicUsize>, + ) { + let mut counters = self.tsfn_ref_counters.borrow_mut(); + assert!(!counters.iter().any(|(i, _)| *i == id)); + counters.push((id, counter)); + } + + pub fn remove_threadsafe_function_ref_counter(&mut self, id: usize) { + let mut counters = self.tsfn_ref_counters.borrow_mut(); + let index = counters.iter().position(|(i, _)| *i == id).unwrap(); + counters.remove(index); + } } pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension { @@ -479,22 +501,16 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension { napi_state.pending_async_work.push(async_work_fut); } - while let Poll::Ready(Some(tsfn_status)) = - napi_state.threadsafe_function_receiver.poll_next_unpin(cx) - { - match tsfn_status { - ThreadSafeFunctionStatus::Alive => { - napi_state.active_threadsafe_functions += 1 - } - ThreadSafeFunctionStatus::Dead => { - napi_state.active_threadsafe_functions -= 1 - } - }; - } - if napi_state.active_threadsafe_functions > 0 { maybe_scheduling = true; } + + for (_id, counter) in napi_state.tsfn_ref_counters.borrow().iter() { + if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 { + maybe_scheduling = true; + break; + } + } } loop { @@ -527,6 +543,7 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension { threadsafe_function_receiver, active_threadsafe_functions: 0, env_cleanup_hooks: Rc::new(RefCell::new(vec![])), + tsfn_ref_counters: Rc::new(RefCell::new(vec![])), }); state.put(Unstable(unstable)); Ok(()) @@ -563,7 +580,13 @@ where let permissions = op_state.borrow_mut::<NP>(); permissions.check(Some(&PathBuf::from(&path)))?; - let (async_work_sender, tsfn_sender, isolate_ptr, cleanup_hooks) = { + let ( + async_work_sender, + tsfn_sender, + isolate_ptr, + cleanup_hooks, + tsfn_ref_counters, + ) = { let napi_state = op_state.borrow::<NapiState>(); let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>(); ( @@ -571,6 +594,7 @@ where napi_state.threadsafe_function_sender.clone(), *isolate_ptr, napi_state.env_cleanup_hooks.clone(), + napi_state.tsfn_ref_counters.clone(), ) }; @@ -593,6 +617,7 @@ where async_work_sender, tsfn_sender, cleanup_hooks, + tsfn_ref_counters, ); env.shared = Box::into_raw(Box::new(env_shared)); let env_ptr = Box::into_raw(Box::new(env)) as _; diff --git a/test_napi/src/lib.rs b/test_napi/src/lib.rs index 3a28e4471..025fbf5d2 100644 --- a/test_napi/src/lib.rs +++ b/test_napi/src/lib.rs @@ -17,6 +17,7 @@ pub mod primitives; pub mod promise; pub mod properties; pub mod strings; +pub mod tsfn; pub mod typedarray; #[macro_export] @@ -126,6 +127,7 @@ unsafe extern "C" fn napi_register_module_v1( object_wrap::init(env, exports); callback::init(env, exports); r#async::init(env, exports); + tsfn::init(env, exports); init_cleanup_hook(env, exports); exports diff --git a/test_napi/src/tsfn.rs b/test_napi/src/tsfn.rs new file mode 100644 index 000000000..314975f39 --- /dev/null +++ b/test_napi/src/tsfn.rs @@ -0,0 +1,108 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// This test performs initilization similar to napi-rs. +// https://github.com/napi-rs/napi-rs/commit/a5a04a4e545f268769cc78e2bd6c45af4336aac3 + +use napi_sys as sys; +use std::ffi::c_char; +use std::ffi::c_void; +use std::ptr; + +macro_rules! check_status_or_panic { + ($code:expr, $msg:expr) => {{ + let c = $code; + match c { + sys::Status::napi_ok => {} + _ => panic!($msg), + } + }}; +} + +fn create_custom_gc(env: sys::napi_env) { + let mut custom_gc_fn = ptr::null_mut(); + check_status_or_panic!( + unsafe { + sys::napi_create_function( + env, + "custom_gc".as_ptr() as *const c_char, + 9, + Some(empty), + ptr::null_mut(), + &mut custom_gc_fn, + ) + }, + "Create Custom GC Function in napi_register_module_v1 failed" + ); + let mut async_resource_name = ptr::null_mut(); + check_status_or_panic!( + unsafe { + sys::napi_create_string_utf8( + env, + "CustomGC".as_ptr() as *const c_char, + 8, + &mut async_resource_name, + ) + }, + "Create async resource string in napi_register_module_v1 napi_register_module_v1" + ); + let mut custom_gc_tsfn = ptr::null_mut(); + check_status_or_panic!( + unsafe { + sys::napi_create_threadsafe_function( + env, + custom_gc_fn, + ptr::null_mut(), + async_resource_name, + 0, + 1, + ptr::null_mut(), + Some(custom_gc_finalize), + ptr::null_mut(), + Some(custom_gc), + &mut custom_gc_tsfn, + ) + }, + "Create Custom GC ThreadsafeFunction in napi_register_module_v1 failed" + ); + check_status_or_panic!( + unsafe { sys::napi_unref_threadsafe_function(env, custom_gc_tsfn) }, + "Unref Custom GC ThreadsafeFunction in napi_register_module_v1 failed" + ); +} + +unsafe extern "C" fn empty( + _env: sys::napi_env, + _info: sys::napi_callback_info, +) -> sys::napi_value { + ptr::null_mut() +} + +unsafe extern "C" fn custom_gc_finalize( + _env: sys::napi_env, + _finalize_data: *mut c_void, + _finalize_hint: *mut c_void, +) { +} + +extern "C" fn custom_gc( + env: sys::napi_env, + _js_callback: sys::napi_value, + _context: *mut c_void, + data: *mut c_void, +) { + let mut ref_count = 0; + check_status_or_panic!( + unsafe { + sys::napi_reference_unref(env, data as sys::napi_ref, &mut ref_count) + }, + "Failed to unref Buffer reference in Custom GC" + ); + check_status_or_panic!( + unsafe { sys::napi_delete_reference(env, data as sys::napi_ref) }, + "Failed to delete Buffer reference in Custom GC" + ); +} + +pub fn init(env: sys::napi_env, _exports: sys::napi_value) { + create_custom_gc(env); +} |