diff options
Diffstat (limited to 'cli')
-rw-r--r-- | cli/napi/threadsafe_functions.rs | 54 |
1 files changed, 51 insertions, 3 deletions
diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs index d692b5bb2..119ee81da 100644 --- a/cli/napi/threadsafe_functions.rs +++ b/cli/napi/threadsafe_functions.rs @@ -2,19 +2,33 @@ use deno_core::futures::channel::mpsc; use deno_runtime::deno_napi::*; +use once_cell::sync::Lazy; use std::mem::forget; +use std::sync::atomic::AtomicUsize; use std::sync::mpsc::channel; +use std::sync::Arc; + +static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0)); pub struct TsFn { + pub id: usize, pub env: *mut Env, pub maybe_func: Option<v8::Global<v8::Function>>, pub maybe_call_js_cb: Option<napi_threadsafe_function_call_js>, pub context: *mut c_void, pub thread_counter: usize, + pub ref_counter: Arc<AtomicUsize>, sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>, } +impl Drop for TsFn { + fn drop(&mut self) { + let env = unsafe { self.env.as_mut().unwrap() }; + env.remove_threadsafe_function_ref_counter(self.id) + } +} + impl TsFn { pub fn acquire(&mut self) -> Result { self.thread_counter += 1; @@ -35,6 +49,29 @@ impl TsFn { Ok(()) } + pub fn ref_(&mut self) -> Result { + self + .ref_counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) + } + + pub fn unref(&mut self) -> Result { + let _ = self.ref_counter.fetch_update( + std::sync::atomic::Ordering::SeqCst, + std::sync::atomic::Ordering::SeqCst, + |x| { + if x == 0 { + None + } else { + Some(x - 1) + } + }, + ); + + Ok(()) + } + pub fn call(&self, data: *mut c_void, is_blocking: bool) { let js_func = self.maybe_func.clone(); let (tx, rx) = channel(); @@ -107,15 +144,21 @@ fn napi_create_threadsafe_function( }) .transpose()?; + let id = TS_FN_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let tsfn = TsFn { + id, maybe_func, maybe_call_js_cb, context, thread_counter: initial_thread_count, sender: env_ref.async_work_sender.clone(), tsfn_sender: env_ref.threadsafe_function_sender.clone(), + ref_counter: Arc::new(AtomicUsize::new(1)), env, }; + env_ref + .add_threadsafe_function_ref_counter(tsfn.id, tsfn.ref_counter.clone()); env_ref .threadsafe_function_sender @@ -142,7 +185,8 @@ fn napi_unref_threadsafe_function( _env: &mut Env, tsfn: napi_threadsafe_function, ) -> Result { - let _tsfn: &TsFn = &*(tsfn as *const TsFn); + let tsfn: &mut TsFn = &mut *(tsfn as *mut TsFn); + tsfn.unref()?; Ok(()) } @@ -170,8 +214,12 @@ fn napi_call_threadsafe_function( } #[napi_sym::napi_sym] -fn napi_ref_threadsafe_function() -> Result { - // TODO +fn napi_ref_threadsafe_function( + _env: &mut Env, + func: napi_threadsafe_function, +) -> Result { + let tsfn: &mut TsFn = &mut *(func as *mut TsFn); + tsfn.ref_()?; Ok(()) } |