summaryrefslogtreecommitdiff
path: root/ext/napi/lib.rs
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-12-11 10:52:55 -0700
committerGitHub <noreply@github.com>2023-12-11 10:52:55 -0700
commitd13e45f2b3e50b85953c31d9c16e35d0cd87545f (patch)
tree1b206fa8dafc4ee23cebfd586a58a192b3557d66 /ext/napi/lib.rs
parenta272bc1bd0793b2f522e4e611963231dc344531f (diff)
perf(ext/napi): port NAPI to v8 tasks (#21406)
Part 2 of removing middleware. This is somewhat awkward because `V8CrossThreadTaskSpawner` requires tasks to be `Send`, but NAPI makes heavy use of `!Send` pointers. In addition, Rust causes a closure to be `!Send` if you pull a `!Send` value out of a struct. --------- Signed-off-by: Matt Mastracci <matthew@mastracci.com> Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
Diffstat (limited to 'ext/napi/lib.rs')
-rw-r--r--ext/napi/lib.rs77
1 files changed, 9 insertions, 68 deletions
diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs
index e897e149d..782635a27 100644
--- a/ext/napi/lib.rs
+++ b/ext/napi/lib.rs
@@ -9,10 +9,10 @@ use core::ptr::NonNull;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
-use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::parking_lot::Mutex;
use deno_core::OpState;
+use deno_core::V8CrossThreadTaskSpawner;
use std::cell::RefCell;
use std::ffi::CString;
use std::path::Path;
@@ -20,7 +20,6 @@ use std::path::PathBuf;
use std::rc::Rc;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
-use std::task::Poll;
use std::thread_local;
#[cfg(unix)]
@@ -231,13 +230,11 @@ pub struct napi_node_version {
pub release: *const c_char,
}
-pub type PendingNapiAsyncWork = Box<dyn FnOnce()>;
+pub trait PendingNapiAsyncWork: FnOnce() + Send + 'static {}
+impl<T> PendingNapiAsyncWork for T where T: FnOnce() + Send + 'static {}
+
pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>;
pub struct NapiState {
- // Async tasks.
- pub pending_async_work: Vec<PendingNapiAsyncWork>,
- pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
- pub async_work_receiver: mpsc::UnboundedReceiver<PendingNapiAsyncWork>,
// Thread safe functions.
pub active_threadsafe_functions: usize,
pub threadsafe_function_receiver:
@@ -318,7 +315,7 @@ pub struct Env {
pub isolate_ptr: *mut v8::OwnedIsolate,
pub open_handle_scopes: usize,
pub shared: *mut EnvShared,
- pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
+ pub async_work_sender: V8CrossThreadTaskSpawner,
pub threadsafe_function_sender:
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
pub cleanup_hooks:
@@ -336,7 +333,7 @@ impl Env {
isolate_ptr: *mut v8::OwnedIsolate,
context: v8::Global<v8::Context>,
global: v8::Global<v8::Value>,
- sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
+ sender: V8CrossThreadTaskSpawner,
threadsafe_function_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
cleanup_hooks: Rc<
RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>,
@@ -372,8 +369,8 @@ impl Env {
unsafe { &mut *self.shared }
}
- pub fn add_async_work(&mut self, async_work: PendingNapiAsyncWork) {
- self.async_work_sender.unbounded_send(async_work).unwrap();
+ pub fn add_async_work(&mut self, async_work: impl FnOnce() + Send + 'static) {
+ self.async_work_sender.spawn(|_| async_work());
}
#[inline]
@@ -418,14 +415,9 @@ deno_core::extension!(deno_napi,
op_napi_open<P>
],
state = |state| {
- let (async_work_sender, async_work_receiver) =
- mpsc::unbounded::<PendingNapiAsyncWork>();
let (threadsafe_function_sender, threadsafe_function_receiver) =
mpsc::unbounded::<ThreadSafeFunctionStatus>();
state.put(NapiState {
- pending_async_work: Vec::new(),
- async_work_sender,
- async_work_receiver,
threadsafe_function_sender,
threadsafe_function_receiver,
active_threadsafe_functions: 0,
@@ -433,59 +425,8 @@ deno_core::extension!(deno_napi,
tsfn_ref_counters: Arc::new(Mutex::new(vec![])),
});
},
- event_loop_middleware = event_loop_middleware,
);
-fn event_loop_middleware(
- op_state_rc: Rc<RefCell<OpState>>,
- cx: &mut std::task::Context,
-) -> bool {
- // `work` can call back into the runtime. It can also schedule an async task
- // but we don't know that now. We need to make the runtime re-poll to make
- // sure no pending NAPI tasks exist.
- let mut maybe_scheduling = false;
-
- {
- let mut op_state = op_state_rc.borrow_mut();
- let napi_state = op_state.borrow_mut::<NapiState>();
-
- while let Poll::Ready(Some(async_work_fut)) =
- napi_state.async_work_receiver.poll_next_unpin(cx)
- {
- napi_state.pending_async_work.push(async_work_fut);
- }
-
- if napi_state.active_threadsafe_functions > 0 {
- maybe_scheduling = true;
- }
-
- let tsfn_ref_counters = napi_state.tsfn_ref_counters.lock().clone();
- for (_id, counter) in tsfn_ref_counters.iter() {
- if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 {
- maybe_scheduling = true;
- break;
- }
- }
- }
-
- loop {
- let maybe_work = {
- let mut op_state = op_state_rc.borrow_mut();
- let napi_state = op_state.borrow_mut::<NapiState>();
- napi_state.pending_async_work.pop()
- };
-
- if let Some(work) = maybe_work {
- work();
- maybe_scheduling = true;
- } else {
- break;
- }
- }
-
- maybe_scheduling
-}
-
pub trait NapiPermissions {
fn check(&mut self, path: Option<&Path>)
-> std::result::Result<(), AnyError>;
@@ -557,7 +498,7 @@ where
let napi_state = op_state.borrow::<NapiState>();
let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>();
(
- napi_state.async_work_sender.clone(),
+ op_state.borrow::<V8CrossThreadTaskSpawner>().clone(),
napi_state.threadsafe_function_sender.clone(),
*isolate_ptr,
napi_state.env_cleanup_hooks.clone(),