diff options
Diffstat (limited to 'cli/napi/threadsafe_functions.rs')
-rw-r--r-- | cli/napi/threadsafe_functions.rs | 112 |
1 files changed, 76 insertions, 36 deletions
diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs index f47da46e9..15395529d 100644 --- a/cli/napi/threadsafe_functions.rs +++ b/cli/napi/threadsafe_functions.rs @@ -1,11 +1,12 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use deno_core::futures::channel::mpsc; +use deno_core::V8CrossThreadTaskSpawner; use deno_runtime::deno_napi::*; use once_cell::sync::Lazy; use std::mem::forget; +use std::ptr::NonNull; use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::channel; use std::sync::Arc; static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0)); @@ -20,7 +21,7 @@ pub struct TsFn { pub ref_counter: Arc<AtomicUsize>, finalizer: Option<napi_finalize>, finalizer_data: *mut c_void, - sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, + sender: V8CrossThreadTaskSpawner, tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>, } @@ -84,47 +85,86 @@ impl TsFn { pub fn call(&self, data: *mut c_void, is_blocking: bool) { let js_func = self.maybe_func.clone(); - let (tx, rx) = channel(); + + #[repr(transparent)] + struct SendPtr<T>(*const T); + unsafe impl<T> Send for SendPtr<T> {} + unsafe impl<T> Sync for SendPtr<T> {} + + let env = SendPtr(self.env); + let context = SendPtr(self.context); + let data = SendPtr(data); + + #[inline(always)] + fn spawn( + sender: &V8CrossThreadTaskSpawner, + is_blocking: bool, + f: impl FnOnce(&mut v8::HandleScope) + Send + 'static, + ) { + if is_blocking { + sender.spawn_blocking(f); + } else { + sender.spawn(f); + } + } if let Some(call_js_cb) = self.maybe_call_js_cb { - let context = self.context; - let env = self.env; - let call = Box::new(move || { - let scope = &mut unsafe { (*env).scope() }; - match js_func { - Some(func) => { - let func: v8::Local<v8::Value> = - func.open(scope).to_object(scope).unwrap().into(); - unsafe { - call_js_cb(env as *mut c_void, func.into(), context, data) - }; + if let Some(func) = js_func { + let func = SendPtr(func.into_raw().as_ptr()); + #[inline(always)] + fn call( + scope: &mut v8::HandleScope, + call_js_cb: napi_threadsafe_function_call_js, + func: SendPtr<v8::Function>, + env: SendPtr<Env>, + context: SendPtr<c_void>, + data: SendPtr<c_void>, + ) { + // SAFETY: This is a valid global from above + let func: v8::Global<v8::Function> = unsafe { + v8::Global::<v8::Function>::from_raw( + scope, + NonNull::new_unchecked(func.0 as _), + ) + }; + let func: v8::Local<v8::Value> = + func.open(scope).to_object(scope).unwrap().into(); + // SAFETY: env is valid for the duration of the callback. + // data lifetime is users responsibility. + unsafe { + call_js_cb(env.0 as _, func.into(), context.0 as _, data.0 as _) } - None => { - unsafe { - call_js_cb(env as *mut c_void, std::mem::zeroed(), context, data) - }; + } + spawn(&self.sender, is_blocking, move |scope| { + call(scope, call_js_cb, func, env, context, data); + }); + } else { + #[inline(always)] + fn call( + call_js_cb: napi_threadsafe_function_call_js, + env: SendPtr<Env>, + context: SendPtr<c_void>, + data: SendPtr<c_void>, + ) { + // SAFETY: We're calling the provided callback with valid args + unsafe { + call_js_cb( + env.0 as _, + std::mem::zeroed(), + context.0 as _, + data.0 as _, + ) } } - - // Receiver might have been already dropped - let _ = tx.send(()); - }); - // This call should never fail - self.sender.unbounded_send(call).unwrap(); - } else if let Some(_js_func) = js_func { - let call = Box::new(move || { + spawn(&self.sender, is_blocking, move |_| { + call(call_js_cb, env, context, data); + }); + } + } else { + spawn(&self.sender, is_blocking, |_| { // TODO: func.call - // let func = js_func.open(scope); - // Receiver might have been already dropped - let _ = tx.send(()); }); - // This call should never fail - self.sender.unbounded_send(call).unwrap(); - } - - if is_blocking { - let _ = rx.recv(); - } + }; } } |