summaryrefslogtreecommitdiff
path: root/cli/napi/threadsafe_functions.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-11 10:52:55 -0700
committerGitHub <noreply@github.com>2023-12-11 10:52:55 -0700
commitd13e45f2b3e50b85953c31d9c16e35d0cd87545f (patch)
tree1b206fa8dafc4ee23cebfd586a58a192b3557d66 /cli/napi/threadsafe_functions.rs
parenta272bc1bd0793b2f522e4e611963231dc344531f (diff)
perf(ext/napi): port NAPI to v8 tasks (#21406)
Part 2 of removing middleware. This is somewhat awkward because `V8CrossThreadTaskSpawner` requires tasks to be `Send`, but NAPI makes heavy use of `!Send` pointers. In addition, Rust causes a closure to be `!Send` if you pull a `!Send` value out of a struct. --------- Signed-off-by: Matt Mastracci <matthew@mastracci.com> Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
Diffstat (limited to 'cli/napi/threadsafe_functions.rs')
-rw-r--r--cli/napi/threadsafe_functions.rs112
1 files changed, 76 insertions, 36 deletions
diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs
index f47da46e9..15395529d 100644
--- a/cli/napi/threadsafe_functions.rs
+++ b/cli/napi/threadsafe_functions.rs
@@ -1,11 +1,12 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_core::futures::channel::mpsc;
+use deno_core::V8CrossThreadTaskSpawner;
use deno_runtime::deno_napi::*;
use once_cell::sync::Lazy;
use std::mem::forget;
+use std::ptr::NonNull;
use std::sync::atomic::AtomicUsize;
-use std::sync::mpsc::channel;
use std::sync::Arc;
static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
@@ -20,7 +21,7 @@ pub struct TsFn {
pub ref_counter: Arc<AtomicUsize>,
finalizer: Option<napi_finalize>,
finalizer_data: *mut c_void,
- sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
+ sender: V8CrossThreadTaskSpawner,
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
}
@@ -84,47 +85,86 @@ impl TsFn {
pub fn call(&self, data: *mut c_void, is_blocking: bool) {
let js_func = self.maybe_func.clone();
- let (tx, rx) = channel();
+
+ #[repr(transparent)]
+ struct SendPtr<T>(*const T);
+ unsafe impl<T> Send for SendPtr<T> {}
+ unsafe impl<T> Sync for SendPtr<T> {}
+
+ let env = SendPtr(self.env);
+ let context = SendPtr(self.context);
+ let data = SendPtr(data);
+
+ #[inline(always)]
+ fn spawn(
+ sender: &V8CrossThreadTaskSpawner,
+ is_blocking: bool,
+ f: impl FnOnce(&mut v8::HandleScope) + Send + 'static,
+ ) {
+ if is_blocking {
+ sender.spawn_blocking(f);
+ } else {
+ sender.spawn(f);
+ }
+ }
if let Some(call_js_cb) = self.maybe_call_js_cb {
- let context = self.context;
- let env = self.env;
- let call = Box::new(move || {
- let scope = &mut unsafe { (*env).scope() };
- match js_func {
- Some(func) => {
- let func: v8::Local<v8::Value> =
- func.open(scope).to_object(scope).unwrap().into();
- unsafe {
- call_js_cb(env as *mut c_void, func.into(), context, data)
- };
+ if let Some(func) = js_func {
+ let func = SendPtr(func.into_raw().as_ptr());
+ #[inline(always)]
+ fn call(
+ scope: &mut v8::HandleScope,
+ call_js_cb: napi_threadsafe_function_call_js,
+ func: SendPtr<v8::Function>,
+ env: SendPtr<Env>,
+ context: SendPtr<c_void>,
+ data: SendPtr<c_void>,
+ ) {
+ // SAFETY: This is a valid global from above
+ let func: v8::Global<v8::Function> = unsafe {
+ v8::Global::<v8::Function>::from_raw(
+ scope,
+ NonNull::new_unchecked(func.0 as _),
+ )
+ };
+ let func: v8::Local<v8::Value> =
+ func.open(scope).to_object(scope).unwrap().into();
+ // SAFETY: env is valid for the duration of the callback.
+ // data lifetime is users responsibility.
+ unsafe {
+ call_js_cb(env.0 as _, func.into(), context.0 as _, data.0 as _)
}
- None => {
- unsafe {
- call_js_cb(env as *mut c_void, std::mem::zeroed(), context, data)
- };
+ }
+ spawn(&self.sender, is_blocking, move |scope| {
+ call(scope, call_js_cb, func, env, context, data);
+ });
+ } else {
+ #[inline(always)]
+ fn call(
+ call_js_cb: napi_threadsafe_function_call_js,
+ env: SendPtr<Env>,
+ context: SendPtr<c_void>,
+ data: SendPtr<c_void>,
+ ) {
+ // SAFETY: We're calling the provided callback with valid args
+ unsafe {
+ call_js_cb(
+ env.0 as _,
+ std::mem::zeroed(),
+ context.0 as _,
+ data.0 as _,
+ )
}
}
-
- // Receiver might have been already dropped
- let _ = tx.send(());
- });
- // This call should never fail
- self.sender.unbounded_send(call).unwrap();
- } else if let Some(_js_func) = js_func {
- let call = Box::new(move || {
+ spawn(&self.sender, is_blocking, move |_| {
+ call(call_js_cb, env, context, data);
+ });
+ }
+ } else {
+ spawn(&self.sender, is_blocking, |_| {
// TODO: func.call
- // let func = js_func.open(scope);
- // Receiver might have been already dropped
- let _ = tx.send(());
});
- // This call should never fail
- self.sender.unbounded_send(call).unwrap();
- }
-
- if is_blocking {
- let _ = rx.recv();
- }
+ };
}
}