diff options
Diffstat (limited to 'ext/napi/lib.rs')
-rw-r--r-- | ext/napi/lib.rs | 77 |
1 files changed, 9 insertions, 68 deletions
diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs index e897e149d..782635a27 100644 --- a/ext/napi/lib.rs +++ b/ext/napi/lib.rs @@ -9,10 +9,10 @@ use core::ptr::NonNull; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; -use deno_core::futures::StreamExt; use deno_core::op2; use deno_core::parking_lot::Mutex; use deno_core::OpState; +use deno_core::V8CrossThreadTaskSpawner; use std::cell::RefCell; use std::ffi::CString; use std::path::Path; @@ -20,7 +20,6 @@ use std::path::PathBuf; use std::rc::Rc; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use std::task::Poll; use std::thread_local; #[cfg(unix)] @@ -231,13 +230,11 @@ pub struct napi_node_version { pub release: *const c_char, } -pub type PendingNapiAsyncWork = Box<dyn FnOnce()>; +pub trait PendingNapiAsyncWork: FnOnce() + Send + 'static {} +impl<T> PendingNapiAsyncWork for T where T: FnOnce() + Send + 'static {} + pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>; pub struct NapiState { - // Async tasks. - pub pending_async_work: Vec<PendingNapiAsyncWork>, - pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, - pub async_work_receiver: mpsc::UnboundedReceiver<PendingNapiAsyncWork>, // Thread safe functions. pub active_threadsafe_functions: usize, pub threadsafe_function_receiver: @@ -318,7 +315,7 @@ pub struct Env { pub isolate_ptr: *mut v8::OwnedIsolate, pub open_handle_scopes: usize, pub shared: *mut EnvShared, - pub async_work_sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, + pub async_work_sender: V8CrossThreadTaskSpawner, pub threadsafe_function_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>, pub cleanup_hooks: @@ -336,7 +333,7 @@ impl Env { isolate_ptr: *mut v8::OwnedIsolate, context: v8::Global<v8::Context>, global: v8::Global<v8::Value>, - sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, + sender: V8CrossThreadTaskSpawner, threadsafe_function_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>, cleanup_hooks: Rc< RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>, @@ -372,8 +369,8 @@ impl Env { unsafe { &mut *self.shared } } - pub fn add_async_work(&mut self, async_work: PendingNapiAsyncWork) { - self.async_work_sender.unbounded_send(async_work).unwrap(); + pub fn add_async_work(&mut self, async_work: impl FnOnce() + Send + 'static) { + self.async_work_sender.spawn(|_| async_work()); } #[inline] @@ -418,14 +415,9 @@ deno_core::extension!(deno_napi, op_napi_open<P> ], state = |state| { - let (async_work_sender, async_work_receiver) = - mpsc::unbounded::<PendingNapiAsyncWork>(); let (threadsafe_function_sender, threadsafe_function_receiver) = mpsc::unbounded::<ThreadSafeFunctionStatus>(); state.put(NapiState { - pending_async_work: Vec::new(), - async_work_sender, - async_work_receiver, threadsafe_function_sender, threadsafe_function_receiver, active_threadsafe_functions: 0, @@ -433,59 +425,8 @@ deno_core::extension!(deno_napi, tsfn_ref_counters: Arc::new(Mutex::new(vec![])), }); }, - event_loop_middleware = event_loop_middleware, ); -fn event_loop_middleware( - op_state_rc: Rc<RefCell<OpState>>, - cx: &mut std::task::Context, -) -> bool { - // `work` can call back into the runtime. It can also schedule an async task - // but we don't know that now. We need to make the runtime re-poll to make - // sure no pending NAPI tasks exist. - let mut maybe_scheduling = false; - - { - let mut op_state = op_state_rc.borrow_mut(); - let napi_state = op_state.borrow_mut::<NapiState>(); - - while let Poll::Ready(Some(async_work_fut)) = - napi_state.async_work_receiver.poll_next_unpin(cx) - { - napi_state.pending_async_work.push(async_work_fut); - } - - if napi_state.active_threadsafe_functions > 0 { - maybe_scheduling = true; - } - - let tsfn_ref_counters = napi_state.tsfn_ref_counters.lock().clone(); - for (_id, counter) in tsfn_ref_counters.iter() { - if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 { - maybe_scheduling = true; - break; - } - } - } - - loop { - let maybe_work = { - let mut op_state = op_state_rc.borrow_mut(); - let napi_state = op_state.borrow_mut::<NapiState>(); - napi_state.pending_async_work.pop() - }; - - if let Some(work) = maybe_work { - work(); - maybe_scheduling = true; - } else { - break; - } - } - - maybe_scheduling -} - pub trait NapiPermissions { fn check(&mut self, path: Option<&Path>) -> std::result::Result<(), AnyError>; @@ -557,7 +498,7 @@ where let napi_state = op_state.borrow::<NapiState>(); let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>(); ( - napi_state.async_work_sender.clone(), + op_state.borrow::<V8CrossThreadTaskSpawner>().clone(), napi_state.threadsafe_function_sender.clone(), *isolate_ptr, napi_state.env_cleanup_hooks.clone(), |