From d13e45f2b3e50b85953c31d9c16e35d0cd87545f Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Mon, 11 Dec 2023 10:52:55 -0700 Subject: perf(ext/napi): port NAPI to v8 tasks (#21406) Part 2 of removing middleware. This is somewhat awkward because `V8CrossThreadTaskSpawner` requires tasks to be `Send`, but NAPI makes heavy use of `!Send` pointers. In addition, Rust causes a closure to be `!Send` if you pull a `!Send` value out of a struct. --------- Signed-off-by: Matt Mastracci Co-authored-by: Divy Srivastava --- ext/napi/lib.rs | 77 +++++++-------------------------------------------------- 1 file changed, 9 insertions(+), 68 deletions(-) (limited to 'ext/napi/lib.rs') diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs index e897e149d..782635a27 100644 --- a/ext/napi/lib.rs +++ b/ext/napi/lib.rs @@ -9,10 +9,10 @@ use core::ptr::NonNull; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; -use deno_core::futures::StreamExt; use deno_core::op2; use deno_core::parking_lot::Mutex; use deno_core::OpState; +use deno_core::V8CrossThreadTaskSpawner; use std::cell::RefCell; use std::ffi::CString; use std::path::Path; @@ -20,7 +20,6 @@ use std::path::PathBuf; use std::rc::Rc; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use std::task::Poll; use std::thread_local; #[cfg(unix)] @@ -231,13 +230,11 @@ pub struct napi_node_version { pub release: *const c_char, } -pub type PendingNapiAsyncWork = Box; +pub trait PendingNapiAsyncWork: FnOnce() + Send + 'static {} +impl PendingNapiAsyncWork for T where T: FnOnce() + Send + 'static {} + pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc)>; pub struct NapiState { - // Async tasks. - pub pending_async_work: Vec, - pub async_work_sender: mpsc::UnboundedSender, - pub async_work_receiver: mpsc::UnboundedReceiver, // Thread safe functions. pub active_threadsafe_functions: usize, pub threadsafe_function_receiver: @@ -318,7 +315,7 @@ pub struct Env { pub isolate_ptr: *mut v8::OwnedIsolate, pub open_handle_scopes: usize, pub shared: *mut EnvShared, - pub async_work_sender: mpsc::UnboundedSender, + pub async_work_sender: V8CrossThreadTaskSpawner, pub threadsafe_function_sender: mpsc::UnboundedSender, pub cleanup_hooks: @@ -336,7 +333,7 @@ impl Env { isolate_ptr: *mut v8::OwnedIsolate, context: v8::Global, global: v8::Global, - sender: mpsc::UnboundedSender, + sender: V8CrossThreadTaskSpawner, threadsafe_function_sender: mpsc::UnboundedSender, cleanup_hooks: Rc< RefCell>, @@ -372,8 +369,8 @@ impl Env { unsafe { &mut *self.shared } } - pub fn add_async_work(&mut self, async_work: PendingNapiAsyncWork) { - self.async_work_sender.unbounded_send(async_work).unwrap(); + pub fn add_async_work(&mut self, async_work: impl FnOnce() + Send + 'static) { + self.async_work_sender.spawn(|_| async_work()); } #[inline] @@ -418,14 +415,9 @@ deno_core::extension!(deno_napi, op_napi_open

], state = |state| { - let (async_work_sender, async_work_receiver) = - mpsc::unbounded::(); let (threadsafe_function_sender, threadsafe_function_receiver) = mpsc::unbounded::(); state.put(NapiState { - pending_async_work: Vec::new(), - async_work_sender, - async_work_receiver, threadsafe_function_sender, threadsafe_function_receiver, active_threadsafe_functions: 0, @@ -433,59 +425,8 @@ deno_core::extension!(deno_napi, tsfn_ref_counters: Arc::new(Mutex::new(vec![])), }); }, - event_loop_middleware = event_loop_middleware, ); -fn event_loop_middleware( - op_state_rc: Rc>, - cx: &mut std::task::Context, -) -> bool { - // `work` can call back into the runtime. It can also schedule an async task - // but we don't know that now. We need to make the runtime re-poll to make - // sure no pending NAPI tasks exist. - let mut maybe_scheduling = false; - - { - let mut op_state = op_state_rc.borrow_mut(); - let napi_state = op_state.borrow_mut::(); - - while let Poll::Ready(Some(async_work_fut)) = - napi_state.async_work_receiver.poll_next_unpin(cx) - { - napi_state.pending_async_work.push(async_work_fut); - } - - if napi_state.active_threadsafe_functions > 0 { - maybe_scheduling = true; - } - - let tsfn_ref_counters = napi_state.tsfn_ref_counters.lock().clone(); - for (_id, counter) in tsfn_ref_counters.iter() { - if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 { - maybe_scheduling = true; - break; - } - } - } - - loop { - let maybe_work = { - let mut op_state = op_state_rc.borrow_mut(); - let napi_state = op_state.borrow_mut::(); - napi_state.pending_async_work.pop() - }; - - if let Some(work) = maybe_work { - work(); - maybe_scheduling = true; - } else { - break; - } - } - - maybe_scheduling -} - pub trait NapiPermissions { fn check(&mut self, path: Option<&Path>) -> std::result::Result<(), AnyError>; @@ -557,7 +498,7 @@ where let napi_state = op_state.borrow::(); let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>(); ( - napi_state.async_work_sender.clone(), + op_state.borrow::().clone(), napi_state.threadsafe_function_sender.clone(), *isolate_ptr, napi_state.env_cleanup_hooks.clone(), -- cgit v1.2.3