summaryrefslogtreecommitdiff
path: root/ext/napi/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/napi/lib.rs')
-rw-r--r--ext/napi/lib.rs55
1 files changed, 40 insertions, 15 deletions
diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs
index 882f7c19d..57f73a0ca 100644
--- a/ext/napi/lib.rs
+++ b/ext/napi/lib.rs
@@ -19,6 +19,8 @@ use std::ffi::CString;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
use std::task::Poll;
use std::thread_local;
@@ -322,7 +324,7 @@ pub struct napi_node_version {
}
pub type PendingNapiAsyncWork = Box<dyn FnOnce()>;
-
+pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>;
pub struct NapiState {
// Async tasks.
pub pending_async_work: Vec<PendingNapiAsyncWork>,
@@ -336,6 +338,7 @@ pub struct NapiState {
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
pub env_cleanup_hooks:
Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>,
+ pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
}
impl Drop for NapiState {
@@ -391,6 +394,7 @@ pub struct Env {
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
pub cleanup_hooks:
Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>,
+ pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
}
unsafe impl Send for Env {}
@@ -405,6 +409,7 @@ impl Env {
cleanup_hooks: Rc<
RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>,
>,
+ tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
) -> Self {
let sc = sender.clone();
ASYNC_WORK_SENDER.with(|s| {
@@ -423,6 +428,7 @@ impl Env {
async_work_sender: sender,
threadsafe_function_sender,
cleanup_hooks,
+ tsfn_ref_counters,
}
}
@@ -458,6 +464,22 @@ impl Env {
// using `napi_open_handle_scope`.
unsafe { v8::CallbackScope::new(context) }
}
+
+ pub fn add_threadsafe_function_ref_counter(
+ &mut self,
+ id: usize,
+ counter: Arc<AtomicUsize>,
+ ) {
+ let mut counters = self.tsfn_ref_counters.borrow_mut();
+ assert!(!counters.iter().any(|(i, _)| *i == id));
+ counters.push((id, counter));
+ }
+
+ pub fn remove_threadsafe_function_ref_counter(&mut self, id: usize) {
+ let mut counters = self.tsfn_ref_counters.borrow_mut();
+ let index = counters.iter().position(|(i, _)| *i == id).unwrap();
+ counters.remove(index);
+ }
}
pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
@@ -479,22 +501,16 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
napi_state.pending_async_work.push(async_work_fut);
}
- while let Poll::Ready(Some(tsfn_status)) =
- napi_state.threadsafe_function_receiver.poll_next_unpin(cx)
- {
- match tsfn_status {
- ThreadSafeFunctionStatus::Alive => {
- napi_state.active_threadsafe_functions += 1
- }
- ThreadSafeFunctionStatus::Dead => {
- napi_state.active_threadsafe_functions -= 1
- }
- };
- }
-
if napi_state.active_threadsafe_functions > 0 {
maybe_scheduling = true;
}
+
+ for (_id, counter) in napi_state.tsfn_ref_counters.borrow().iter() {
+ if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 {
+ maybe_scheduling = true;
+ break;
+ }
+ }
}
loop {
@@ -527,6 +543,7 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
threadsafe_function_receiver,
active_threadsafe_functions: 0,
env_cleanup_hooks: Rc::new(RefCell::new(vec![])),
+ tsfn_ref_counters: Rc::new(RefCell::new(vec![])),
});
state.put(Unstable(unstable));
Ok(())
@@ -563,7 +580,13 @@ where
let permissions = op_state.borrow_mut::<NP>();
permissions.check(Some(&PathBuf::from(&path)))?;
- let (async_work_sender, tsfn_sender, isolate_ptr, cleanup_hooks) = {
+ let (
+ async_work_sender,
+ tsfn_sender,
+ isolate_ptr,
+ cleanup_hooks,
+ tsfn_ref_counters,
+ ) = {
let napi_state = op_state.borrow::<NapiState>();
let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>();
(
@@ -571,6 +594,7 @@ where
napi_state.threadsafe_function_sender.clone(),
*isolate_ptr,
napi_state.env_cleanup_hooks.clone(),
+ napi_state.tsfn_ref_counters.clone(),
)
};
@@ -593,6 +617,7 @@ where
async_work_sender,
tsfn_sender,
cleanup_hooks,
+ tsfn_ref_counters,
);
env.shared = Box::into_raw(Box::new(env_shared));
let env_ptr = Box::into_raw(Box::new(env)) as _;