diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2023-01-12 04:47:55 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-12 13:47:55 +0100 |
commit | dd2829be0c1f5c40ec38a045ea0a14bec34a82c5 (patch) | |
tree | 83cfa62a784ecb0fdada7ce556998d1d7247fa0a /ext/napi/lib.rs | |
parent | cc806cdf2121878ae4c10b1fd0c4c03b14ba33c7 (diff) |
fix(napi): Implement `napi_threadsafe_function` ref and unref (#17304)
Co-authored-by: Bartek IwaĆczuk <biwanczuk@gmail.com>
Diffstat (limited to 'ext/napi/lib.rs')
-rw-r--r-- | ext/napi/lib.rs | 55 |
1 files changed, 40 insertions, 15 deletions
diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs index 882f7c19d..57f73a0ca 100644 --- a/ext/napi/lib.rs +++ b/ext/napi/lib.rs @@ -19,6 +19,8 @@ use std::ffi::CString; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; use std::task::Poll; use std::thread_local; @@ -322,7 +324,7 @@ pub struct napi_node_version { } pub type PendingNapiAsyncWork = Box<dyn FnOnce()>; - +pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>; pub struct NapiState { // Async tasks. pub pending_async_work: Vec<PendingNapiAsyncWork>, @@ -336,6 +338,7 @@ pub struct NapiState { mpsc::UnboundedSender<ThreadSafeFunctionStatus>, pub env_cleanup_hooks: Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>, + pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>, } impl Drop for NapiState { @@ -391,6 +394,7 @@ pub struct Env { mpsc::UnboundedSender<ThreadSafeFunctionStatus>, pub cleanup_hooks: Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>, + pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>, } unsafe impl Send for Env {} @@ -405,6 +409,7 @@ impl Env { cleanup_hooks: Rc< RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>, >, + tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>, ) -> Self { let sc = sender.clone(); ASYNC_WORK_SENDER.with(|s| { @@ -423,6 +428,7 @@ impl Env { async_work_sender: sender, threadsafe_function_sender, cleanup_hooks, + tsfn_ref_counters, } } @@ -458,6 +464,22 @@ impl Env { // using `napi_open_handle_scope`. unsafe { v8::CallbackScope::new(context) } } + + pub fn add_threadsafe_function_ref_counter( + &mut self, + id: usize, + counter: Arc<AtomicUsize>, + ) { + let mut counters = self.tsfn_ref_counters.borrow_mut(); + assert!(!counters.iter().any(|(i, _)| *i == id)); + counters.push((id, counter)); + } + + pub fn remove_threadsafe_function_ref_counter(&mut self, id: usize) { + let mut counters = self.tsfn_ref_counters.borrow_mut(); + let index = counters.iter().position(|(i, _)| *i == id).unwrap(); + counters.remove(index); + } } pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension { @@ -479,22 +501,16 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension { napi_state.pending_async_work.push(async_work_fut); } - while let Poll::Ready(Some(tsfn_status)) = - napi_state.threadsafe_function_receiver.poll_next_unpin(cx) - { - match tsfn_status { - ThreadSafeFunctionStatus::Alive => { - napi_state.active_threadsafe_functions += 1 - } - ThreadSafeFunctionStatus::Dead => { - napi_state.active_threadsafe_functions -= 1 - } - }; - } - if napi_state.active_threadsafe_functions > 0 { maybe_scheduling = true; } + + for (_id, counter) in napi_state.tsfn_ref_counters.borrow().iter() { + if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 { + maybe_scheduling = true; + break; + } + } } loop { @@ -527,6 +543,7 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension { threadsafe_function_receiver, active_threadsafe_functions: 0, env_cleanup_hooks: Rc::new(RefCell::new(vec![])), + tsfn_ref_counters: Rc::new(RefCell::new(vec![])), }); state.put(Unstable(unstable)); Ok(()) @@ -563,7 +580,13 @@ where let permissions = op_state.borrow_mut::<NP>(); permissions.check(Some(&PathBuf::from(&path)))?; - let (async_work_sender, tsfn_sender, isolate_ptr, cleanup_hooks) = { + let ( + async_work_sender, + tsfn_sender, + isolate_ptr, + cleanup_hooks, + tsfn_ref_counters, + ) = { let napi_state = op_state.borrow::<NapiState>(); let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>(); ( @@ -571,6 +594,7 @@ where napi_state.threadsafe_function_sender.clone(), *isolate_ptr, napi_state.env_cleanup_hooks.clone(), + napi_state.tsfn_ref_counters.clone(), ) }; @@ -593,6 +617,7 @@ where async_work_sender, tsfn_sender, cleanup_hooks, + tsfn_ref_counters, ); env.shared = Box::into_raw(Box::new(env_shared)); let env_ptr = Box::into_raw(Box::new(env)) as _; |