From dd2829be0c1f5c40ec38a045ea0a14bec34a82c5 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Thu, 12 Jan 2023 04:47:55 -0800 Subject: fix(napi): Implement `napi_threadsafe_function` ref and unref (#17304) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bartek IwaƄczuk --- ext/napi/lib.rs | 55 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 15 deletions(-) (limited to 'ext') diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs index 882f7c19d..57f73a0ca 100644 --- a/ext/napi/lib.rs +++ b/ext/napi/lib.rs @@ -19,6 +19,8 @@ use std::ffi::CString; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; use std::task::Poll; use std::thread_local; @@ -322,7 +324,7 @@ pub struct napi_node_version { } pub type PendingNapiAsyncWork = Box; - +pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc)>; pub struct NapiState { // Async tasks. pub pending_async_work: Vec, @@ -336,6 +338,7 @@ pub struct NapiState { mpsc::UnboundedSender, pub env_cleanup_hooks: Rc>>, + pub tsfn_ref_counters: Rc>, } impl Drop for NapiState { @@ -391,6 +394,7 @@ pub struct Env { mpsc::UnboundedSender, pub cleanup_hooks: Rc>>, + pub tsfn_ref_counters: Rc>, } unsafe impl Send for Env {} @@ -405,6 +409,7 @@ impl Env { cleanup_hooks: Rc< RefCell>, >, + tsfn_ref_counters: Rc>, ) -> Self { let sc = sender.clone(); ASYNC_WORK_SENDER.with(|s| { @@ -423,6 +428,7 @@ impl Env { async_work_sender: sender, threadsafe_function_sender, cleanup_hooks, + tsfn_ref_counters, } } @@ -458,6 +464,22 @@ impl Env { // using `napi_open_handle_scope`. unsafe { v8::CallbackScope::new(context) } } + + pub fn add_threadsafe_function_ref_counter( + &mut self, + id: usize, + counter: Arc, + ) { + let mut counters = self.tsfn_ref_counters.borrow_mut(); + assert!(!counters.iter().any(|(i, _)| *i == id)); + counters.push((id, counter)); + } + + pub fn remove_threadsafe_function_ref_counter(&mut self, id: usize) { + let mut counters = self.tsfn_ref_counters.borrow_mut(); + let index = counters.iter().position(|(i, _)| *i == id).unwrap(); + counters.remove(index); + } } pub fn init(unstable: bool) -> Extension { @@ -479,22 +501,16 @@ pub fn init(unstable: bool) -> Extension { napi_state.pending_async_work.push(async_work_fut); } - while let Poll::Ready(Some(tsfn_status)) = - napi_state.threadsafe_function_receiver.poll_next_unpin(cx) - { - match tsfn_status { - ThreadSafeFunctionStatus::Alive => { - napi_state.active_threadsafe_functions += 1 - } - ThreadSafeFunctionStatus::Dead => { - napi_state.active_threadsafe_functions -= 1 - } - }; - } - if napi_state.active_threadsafe_functions > 0 { maybe_scheduling = true; } + + for (_id, counter) in napi_state.tsfn_ref_counters.borrow().iter() { + if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 { + maybe_scheduling = true; + break; + } + } } loop { @@ -527,6 +543,7 @@ pub fn init(unstable: bool) -> Extension { threadsafe_function_receiver, active_threadsafe_functions: 0, env_cleanup_hooks: Rc::new(RefCell::new(vec![])), + tsfn_ref_counters: Rc::new(RefCell::new(vec![])), }); state.put(Unstable(unstable)); Ok(()) @@ -563,7 +580,13 @@ where let permissions = op_state.borrow_mut::(); permissions.check(Some(&PathBuf::from(&path)))?; - let (async_work_sender, tsfn_sender, isolate_ptr, cleanup_hooks) = { + let ( + async_work_sender, + tsfn_sender, + isolate_ptr, + cleanup_hooks, + tsfn_ref_counters, + ) = { let napi_state = op_state.borrow::(); let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>(); ( @@ -571,6 +594,7 @@ where napi_state.threadsafe_function_sender.clone(), *isolate_ptr, napi_state.env_cleanup_hooks.clone(), + napi_state.tsfn_ref_counters.clone(), ) }; @@ -593,6 +617,7 @@ where async_work_sender, tsfn_sender, cleanup_hooks, + tsfn_ref_counters, ); env.shared = Box::into_raw(Box::new(env_shared)); let env_ptr = Box::into_raw(Box::new(env)) as _; -- cgit v1.2.3