summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-11 10:52:55 -0700
committerGitHub <noreply@github.com>2023-12-11 10:52:55 -0700
commitd13e45f2b3e50b85953c31d9c16e35d0cd87545f (patch)
tree1b206fa8dafc4ee23cebfd586a58a192b3557d66
parenta272bc1bd0793b2f522e4e611963231dc344531f (diff)
perf(ext/napi): port NAPI to v8 tasks (#21406)
Part 2 of removing middleware. This is somewhat awkward because `V8CrossThreadTaskSpawner` requires tasks to be `Send`, but NAPI makes heavy use of `!Send` pointers. In addition, Rust causes a closure to be `!Send` if you pull a `!Send` value out of a struct. --------- Signed-off-by: Matt Mastracci <matthew@mastracci.com> Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
-rw-r--r--cli/napi/async.rs25
-rw-r--r--cli/napi/threadsafe_functions.rs112
-rw-r--r--ext/napi/lib.rs77
3 files changed, 104 insertions, 110 deletions
diff --git a/cli/napi/async.rs b/cli/napi/async.rs
index 48de36728..1fb0c6374 100644
--- a/cli/napi/async.rs
+++ b/cli/napi/async.rs
@@ -11,6 +11,9 @@ pub struct AsyncWork {
pub complete: napi_async_complete_callback,
}
+unsafe impl Send for AsyncWork {}
+unsafe impl Sync for AsyncWork {}
+
#[napi_sym::napi_sym]
fn napi_create_async_work(
_env: *mut Env,
@@ -61,12 +64,22 @@ fn napi_queue_async_work(
return napi_invalid_arg;
};
- let fut = Box::new(move || {
- (work.execute)(env_ptr as napi_env, work.data);
- // Note: Must be called from the loop thread.
- (work.complete)(env_ptr as napi_env, napi_ok, work.data);
- });
- env.add_async_work(fut);
+ #[repr(transparent)]
+ struct SendPtr<T>(*const T);
+ unsafe impl<T> Send for SendPtr<T> {}
+ unsafe impl<T> Sync for SendPtr<T> {}
+ let send_env = SendPtr(env_ptr);
+
+ #[inline(always)]
+ fn do_work(ptr: SendPtr<Env>, work: &AsyncWork) {
+ // SAFETY: This is a valid async work queue call and it runs on the event loop thread
+ unsafe {
+ (work.execute)(ptr.0 as napi_env, work.data);
+ (work.complete)(ptr.0 as napi_env, napi_ok, work.data);
+ }
+ }
+
+ env.add_async_work(move || do_work(send_env, work));
napi_ok
}
diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs
index f47da46e9..15395529d 100644
--- a/cli/napi/threadsafe_functions.rs
+++ b/cli/napi/threadsafe_functions.rs
@@ -1,11 +1,12 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_core::futures::channel::mpsc;
+use deno_core::V8CrossThreadTaskSpawner;
use deno_runtime::deno_napi::*;
use once_cell::sync::Lazy;
use std::mem::forget;
+use std::ptr::NonNull;
use std::sync::atomic::AtomicUsize;
-use std::sync::mpsc::channel;
use std::sync::Arc;
static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
@@ -20,7 +21,7 @@ pub struct TsFn {
pub ref_counter: Arc<AtomicUsize>,
finalizer: Option<napi_finalize>,
finalizer_data: *mut c_void,
- sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
+ sender: V8CrossThreadTaskSpawner,
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
}
@@ -84,47 +85,86 @@ impl TsFn {
pub fn call(&self, data: *mut c_void, is_blocking: bool) {
let js_func = self.maybe_func.clone();
- let (tx, rx) = channel();
+
+ #[repr(transparent)]
+ struct SendPtr<T>(*const T);
+ unsafe impl<T> Send for SendPtr<T> {}
+ unsafe impl<T> Sync for SendPtr<T> {}
+
+ let env = SendPtr(self.env);
+ let context = SendPtr(self.context);
+ let data = SendPtr(data);
+
+ #[inline(always)]
+ fn spawn(
+ sender: &V8CrossThreadTaskSpawner,
+ is_blocking: bool,
+ f: impl FnOnce(&mut v8::HandleScope) + Send + 'static,
+ ) {
+ if is_blocking {
+ sender.spawn_blocking(f);
+ } else {
+ sender.spawn(f);
+ }
+ }
if let Some(call_js_cb) = self.maybe_call_js_cb {
- let context = self.context;
- let env = self.env;
- let call = Box::new(move || {
- let scope = &mut unsafe { (*env).scope() };
- match js_func {
- Some(func) => {
- let func: v8::Local<v8::Value> =
- func.open(scope).to_object(scope).unwrap().into();
- unsafe {
- call_js_cb(env as *mut c_void, func.into(), context, data)
- };
+ if let Some(func) = js_func {
+ let func = SendPtr(func.into_raw().as_ptr());
+ #[inline(always)]
+ fn call(
+ scope: &mut v8::HandleScope,
+ call_js_cb: napi_threadsafe_function_call_js,
+ func: SendPtr<v8::Function>,
+ env: SendPtr<Env>,
+ context: SendPtr<c_void>,
+ data: SendPtr<c_void>,
+ ) {
+ // SAFETY: This is a valid global from above
+ let func: v8::Global<v8::Function> = unsafe {
+ v8::Global::<v8::Function>::from_raw(
+ scope,
+ NonNull::new_unchecked(func.0 as _),
+ )
+ };
+ let func: v8::Local<v8::Value> =
+ func.open(scope).to_object(scope).unwrap().into();
+ // SAFETY: env is valid for the duration of the callback.
+ // data lifetime is users responsibility.
+ unsafe {
+ call_js_cb(env.0 as _, func.into(), context.0 as _, data.0 as _)
}
- None => {
- unsafe {
- call_js_cb(env as *mut c_void, std::mem::zeroed(), context, data)
- };
+ }
+ spawn(&self.sender, is_blocking, move |scope| {
+ call(scope, call_js_cb, func, env, context, data);
+ });
+ } else {
+ #[inline(always)]
+ fn call(
+ call_js_cb: napi_threadsafe_function_call_js,
+ env: SendPtr<Env>,
+ context: SendPtr<c_void>,
+ data: SendPtr<c_void>,
+ ) {
+ // SAFETY: We're calling the provided callback with valid args
+ unsafe {
+ call_js_cb(
+ env.0 as _,
+ std::mem::zeroed(),
+ context.0 as _,
+ data.0 as _,
+ )
}
}
-
- // Receiver might have been already dropped
- let _ = tx.send(());
- });
- // This call should never fail
- self.sender.unbounded_send(call).unwrap();
- } else if let Some(_js_func) = js_func {
- let call = Box::new(move || {
+ spawn(&self.sender, is_blocking, move |_| {
+ call(call_js_cb, env, context, data);
+ });
+ }
+ } else {
+ spawn(&self.sender, is_blocking, |_| {
// TODO: func.call
- // let func = js_func.open(scope);
- // Receiver might have been already dropped
- let _ = tx.send(());
});
- // This call should never fail
- self.sender.unbounded_send(call).unwrap();
- }
-
- if is_blocking {
- let _ = rx.recv();
- }
+ };
}
}
diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs
index e897e149d..782635a27 100644
--- a/ext/napi/lib.rs
+++ b/ext/napi/lib.rs
@@ -9,10 +9,10 @@ use core::ptr::NonNull;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
-use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::parking_lot::Mutex;
use deno_core::OpState;
+use deno_core::V8CrossThreadTaskSpawner;
use std::cell::RefCell;
use std::ffi::CString;
use std::path::Path;
@@ -20,7 +20,6 @@ use std::path::PathBuf;
use std::rc::Rc;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
-use std::task::Poll;
use std::thread_local;
#[cfg(unix)]
@@ -231,13 +230,11 @@ pub struct napi_node_version {
pub release: *const c_char,
}
-pub type PendingNapiAsyncWork = Box<dyn FnOnce()>;
+pub trait PendingNapiAsyncWork: FnOnce() + Send + 'static {}
+impl<T> PendingNapiAsyncWork for T where T: FnOnce() + Send + 'static {}
+
pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>;
pub struct NapiState {
- // Async tasks.
- pub pending_async_work: Vec<PendingNapiAsyncWork>,
- pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
- pub async_work_receiver: mpsc::UnboundedReceiver<PendingNapiAsyncWork>,
// Thread safe functions.
pub active_threadsafe_functions: usize,
pub threadsafe_function_receiver:
@@ -318,7 +315,7 @@ pub struct Env {
pub isolate_ptr: *mut v8::OwnedIsolate,
pub open_handle_scopes: usize,
pub shared: *mut EnvShared,
- pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
+ pub async_work_sender: V8CrossThreadTaskSpawner,
pub threadsafe_function_sender:
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
pub cleanup_hooks:
@@ -336,7 +333,7 @@ impl Env {
isolate_ptr: *mut v8::OwnedIsolate,
context: v8::Global<v8::Context>,
global: v8::Global<v8::Value>,
- sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
+ sender: V8CrossThreadTaskSpawner,
threadsafe_function_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
cleanup_hooks: Rc<
RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>,
@@ -372,8 +369,8 @@ impl Env {
unsafe { &mut *self.shared }
}
- pub fn add_async_work(&mut self, async_work: PendingNapiAsyncWork) {
- self.async_work_sender.unbounded_send(async_work).unwrap();
+ pub fn add_async_work(&mut self, async_work: impl FnOnce() + Send + 'static) {
+ self.async_work_sender.spawn(|_| async_work());
}
#[inline]
@@ -418,14 +415,9 @@ deno_core::extension!(deno_napi,
op_napi_open<P>
],
state = |state| {
- let (async_work_sender, async_work_receiver) =
- mpsc::unbounded::<PendingNapiAsyncWork>();
let (threadsafe_function_sender, threadsafe_function_receiver) =
mpsc::unbounded::<ThreadSafeFunctionStatus>();
state.put(NapiState {
- pending_async_work: Vec::new(),
- async_work_sender,
- async_work_receiver,
threadsafe_function_sender,
threadsafe_function_receiver,
active_threadsafe_functions: 0,
@@ -433,59 +425,8 @@ deno_core::extension!(deno_napi,
tsfn_ref_counters: Arc::new(Mutex::new(vec![])),
});
},
- event_loop_middleware = event_loop_middleware,
);
-fn event_loop_middleware(
- op_state_rc: Rc<RefCell<OpState>>,
- cx: &mut std::task::Context,
-) -> bool {
- // `work` can call back into the runtime. It can also schedule an async task
- // but we don't know that now. We need to make the runtime re-poll to make
- // sure no pending NAPI tasks exist.
- let mut maybe_scheduling = false;
-
- {
- let mut op_state = op_state_rc.borrow_mut();
- let napi_state = op_state.borrow_mut::<NapiState>();
-
- while let Poll::Ready(Some(async_work_fut)) =
- napi_state.async_work_receiver.poll_next_unpin(cx)
- {
- napi_state.pending_async_work.push(async_work_fut);
- }
-
- if napi_state.active_threadsafe_functions > 0 {
- maybe_scheduling = true;
- }
-
- let tsfn_ref_counters = napi_state.tsfn_ref_counters.lock().clone();
- for (_id, counter) in tsfn_ref_counters.iter() {
- if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 {
- maybe_scheduling = true;
- break;
- }
- }
- }
-
- loop {
- let maybe_work = {
- let mut op_state = op_state_rc.borrow_mut();
- let napi_state = op_state.borrow_mut::<NapiState>();
- napi_state.pending_async_work.pop()
- };
-
- if let Some(work) = maybe_work {
- work();
- maybe_scheduling = true;
- } else {
- break;
- }
- }
-
- maybe_scheduling
-}
-
pub trait NapiPermissions {
fn check(&mut self, path: Option<&Path>)
-> std::result::Result<(), AnyError>;
@@ -557,7 +498,7 @@ where
let napi_state = op_state.borrow::<NapiState>();
let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>();
(
- napi_state.async_work_sender.clone(),
+ op_state.borrow::<V8CrossThreadTaskSpawner>().clone(),
napi_state.threadsafe_function_sender.clone(),
*isolate_ptr,
napi_state.env_cleanup_hooks.clone(),