summaryrefslogtreecommitdiff
path: root/cli/napi/threadsafe_functions.rs
diff options
context:
space:
mode:
authorDivy Srivastava <dj.srivastava23@gmail.com>2023-01-12 04:47:55 -0800
committerGitHub <noreply@github.com>2023-01-12 13:47:55 +0100
commitdd2829be0c1f5c40ec38a045ea0a14bec34a82c5 (patch)
tree83cfa62a784ecb0fdada7ce556998d1d7247fa0a /cli/napi/threadsafe_functions.rs
parentcc806cdf2121878ae4c10b1fd0c4c03b14ba33c7 (diff)
fix(napi): Implement `napi_threadsafe_function` ref and unref (#17304)
Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
Diffstat (limited to 'cli/napi/threadsafe_functions.rs')
-rw-r--r--cli/napi/threadsafe_functions.rs54
1 files changed, 51 insertions, 3 deletions
diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs
index d692b5bb2..119ee81da 100644
--- a/cli/napi/threadsafe_functions.rs
+++ b/cli/napi/threadsafe_functions.rs
@@ -2,19 +2,33 @@
use deno_core::futures::channel::mpsc;
use deno_runtime::deno_napi::*;
+use once_cell::sync::Lazy;
use std::mem::forget;
+use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
+use std::sync::Arc;
+
+static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
pub struct TsFn {
+ pub id: usize,
pub env: *mut Env,
pub maybe_func: Option<v8::Global<v8::Function>>,
pub maybe_call_js_cb: Option<napi_threadsafe_function_call_js>,
pub context: *mut c_void,
pub thread_counter: usize,
+ pub ref_counter: Arc<AtomicUsize>,
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
}
+impl Drop for TsFn {
+ fn drop(&mut self) {
+ let env = unsafe { self.env.as_mut().unwrap() };
+ env.remove_threadsafe_function_ref_counter(self.id)
+ }
+}
+
impl TsFn {
pub fn acquire(&mut self) -> Result {
self.thread_counter += 1;
@@ -35,6 +49,29 @@ impl TsFn {
Ok(())
}
+ pub fn ref_(&mut self) -> Result {
+ self
+ .ref_counter
+ .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ Ok(())
+ }
+
+ pub fn unref(&mut self) -> Result {
+ let _ = self.ref_counter.fetch_update(
+ std::sync::atomic::Ordering::SeqCst,
+ std::sync::atomic::Ordering::SeqCst,
+ |x| {
+ if x == 0 {
+ None
+ } else {
+ Some(x - 1)
+ }
+ },
+ );
+
+ Ok(())
+ }
+
pub fn call(&self, data: *mut c_void, is_blocking: bool) {
let js_func = self.maybe_func.clone();
let (tx, rx) = channel();
@@ -107,15 +144,21 @@ fn napi_create_threadsafe_function(
})
.transpose()?;
+ let id = TS_FN_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+
let tsfn = TsFn {
+ id,
maybe_func,
maybe_call_js_cb,
context,
thread_counter: initial_thread_count,
sender: env_ref.async_work_sender.clone(),
tsfn_sender: env_ref.threadsafe_function_sender.clone(),
+ ref_counter: Arc::new(AtomicUsize::new(1)),
env,
};
+ env_ref
+ .add_threadsafe_function_ref_counter(tsfn.id, tsfn.ref_counter.clone());
env_ref
.threadsafe_function_sender
@@ -142,7 +185,8 @@ fn napi_unref_threadsafe_function(
_env: &mut Env,
tsfn: napi_threadsafe_function,
) -> Result {
- let _tsfn: &TsFn = &*(tsfn as *const TsFn);
+ let tsfn: &mut TsFn = &mut *(tsfn as *mut TsFn);
+ tsfn.unref()?;
Ok(())
}
@@ -170,8 +214,12 @@ fn napi_call_threadsafe_function(
}
#[napi_sym::napi_sym]
-fn napi_ref_threadsafe_function() -> Result {
- // TODO
+fn napi_ref_threadsafe_function(
+ _env: &mut Env,
+ func: napi_threadsafe_function,
+) -> Result {
+ let tsfn: &mut TsFn = &mut *(func as *mut TsFn);
+ tsfn.ref_()?;
Ok(())
}