diff options
Diffstat (limited to 'cli/napi/node_api.rs')
-rw-r--r-- | cli/napi/node_api.rs | 151 |
1 files changed, 69 insertions, 82 deletions
diff --git a/cli/napi/node_api.rs b/cli/napi/node_api.rs index 28a11d614..268715fa8 100644 --- a/cli/napi/node_api.rs +++ b/cli/napi/node_api.rs @@ -9,10 +9,11 @@ use super::util::napi_set_last_error; use super::util::SendPtr; use crate::check_arg; use crate::check_env; +use deno_core::parking_lot::Condvar; +use deno_core::parking_lot::Mutex; use deno_core::V8CrossThreadTaskSpawner; use deno_runtime::deno_napi::*; use napi_sym::napi_sym; -use std::ptr::NonNull; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU8; use std::sync::atomic::AtomicUsize; @@ -40,14 +41,7 @@ fn napi_add_env_cleanup_hook( let fun = fun.unwrap(); - let mut env_cleanup_hooks = env.cleanup_hooks.borrow_mut(); - if env_cleanup_hooks - .iter() - .any(|pair| pair.0 == fun && pair.1 == arg) - { - panic!("Cleanup hook with this data already registered"); - } - env_cleanup_hooks.push((fun, arg)); + env.add_cleanup_hook(fun, arg); napi_ok } @@ -63,27 +57,21 @@ fn napi_remove_env_cleanup_hook( let fun = fun.unwrap(); - let mut env_cleanup_hooks = env.cleanup_hooks.borrow_mut(); - // Hooks are supposed to be removed in LIFO order - let maybe_index = env_cleanup_hooks - .iter() - .rposition(|&pair| pair.0 == fun && pair.1 == arg); - - if let Some(index) = maybe_index { - env_cleanup_hooks.remove(index); - } else { - panic!("Cleanup hook with this data not found"); - } + env.remove_cleanup_hook(fun, arg); napi_ok } -type AsyncCleanupHandle = (*mut Env, napi_async_cleanup_hook, *mut c_void); +struct AsyncCleanupHandle { + env: *mut Env, + hook: napi_async_cleanup_hook, + data: *mut c_void, +} unsafe extern "C" fn async_cleanup_handler(arg: *mut c_void) { unsafe { - let (env, hook, arg) = *Box::<AsyncCleanupHandle>::from_raw(arg as _); - hook(env as _, arg); + let handle = Box::<AsyncCleanupHandle>::from_raw(arg as _); + (handle.hook)(arg, handle.data); } } @@ -99,12 +87,13 @@ fn napi_add_async_cleanup_hook( let hook = hook.unwrap(); - let handle = - Box::into_raw(Box::<AsyncCleanupHandle>::new((env, hook, arg))) as _; + let handle = Box::into_raw(Box::new(AsyncCleanupHandle { + env, + hook, + data: arg, + })) as *mut c_void; - unsafe { - napi_add_env_cleanup_hook(env, Some(async_cleanup_handler), handle); - } + env.add_cleanup_hook(async_cleanup_handler, handle); if !remove_handle.is_null() { unsafe { @@ -123,17 +112,12 @@ fn napi_remove_async_cleanup_hook( return napi_invalid_arg; } - let handle = unsafe { &*(remove_handle as *mut AsyncCleanupHandle) }; + let handle = + unsafe { Box::<AsyncCleanupHandle>::from_raw(remove_handle as _) }; - let env = unsafe { &mut *handle.0 }; + let env = unsafe { &mut *handle.env }; - unsafe { - napi_remove_env_cleanup_hook( - env, - Some(async_cleanup_handler), - remove_handle, - ); - } + env.remove_cleanup_hook(async_cleanup_handler, remove_handle); napi_ok } @@ -142,11 +126,7 @@ fn napi_remove_async_cleanup_hook( fn napi_fatal_exception(env: &mut Env, err: napi_value) -> napi_status { check_arg!(env, err); - let report_error = unsafe { - std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>( - env.report_error, - ) - }; + let report_error = v8::Local::new(&mut env.scope(), &env.report_error); let this = v8::undefined(&mut env.scope()); if report_error @@ -262,7 +242,7 @@ fn napi_make_callback<'s>( recv: napi_value, func: napi_value, argc: usize, - argv: *const napi_value, + argv: *const napi_value<'s>, result: *mut napi_value<'s>, ) -> napi_status { check_arg!(env, recv); @@ -312,11 +292,8 @@ fn napi_create_buffer<'s>( let ab = v8::ArrayBuffer::new(&mut env.scope(), length); - let buffer_constructor = unsafe { - std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>( - env.buffer_constructor, - ) - }; + let buffer_constructor = + v8::Local::new(&mut env.scope(), &env.buffer_constructor); let Some(buffer) = buffer_constructor.new_instance(&mut env.scope(), &[ab.into()]) else { @@ -325,7 +302,7 @@ fn napi_create_buffer<'s>( if !data.is_null() { unsafe { - *data = get_array_buffer_ptr(ab) as _; + *data = get_array_buffer_ptr(ab); } } @@ -359,11 +336,8 @@ fn napi_create_external_buffer<'s>( let ab = v8::ArrayBuffer::with_backing_store(&mut env.scope(), &store.make_shared()); - let buffer_constructor = unsafe { - std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>( - env.buffer_constructor, - ) - }; + let buffer_constructor = + v8::Local::new(&mut env.scope(), &env.buffer_constructor); let Some(buffer) = buffer_constructor.new_instance(&mut env.scope(), &[ab.into()]) else { @@ -389,18 +363,15 @@ fn napi_create_buffer_copy<'s>( let ab = v8::ArrayBuffer::new(&mut env.scope(), length); - let buffer_constructor = unsafe { - std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>( - env.buffer_constructor, - ) - }; + let buffer_constructor = + v8::Local::new(&mut env.scope(), &env.buffer_constructor); let Some(buffer) = buffer_constructor.new_instance(&mut env.scope(), &[ab.into()]) else { return napi_generic_failure; }; - let ptr = get_array_buffer_ptr(ab) as *mut c_void; + let ptr = get_array_buffer_ptr(ab); unsafe { std::ptr::copy(data, ptr, length); } @@ -428,11 +399,8 @@ fn napi_is_buffer( check_arg!(env, value); check_arg!(env, result); - let buffer_constructor = unsafe { - std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>( - env.buffer_constructor, - ) - }; + let buffer_constructor = + v8::Local::new(&mut env.scope(), &env.buffer_constructor); let Some(is_buffer) = value .unwrap() @@ -464,11 +432,8 @@ fn napi_get_buffer_info( return napi_set_last_error(env, napi_invalid_arg); }; - let buffer_constructor = unsafe { - std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>( - env.buffer_constructor, - ) - }; + let buffer_constructor = + v8::Local::new(&mut env.scope(), &env.buffer_constructor); if !ta .instance_of(&mut env.scope(), buffer_constructor.into()) @@ -703,7 +668,9 @@ extern "C" fn default_call_js_cb( struct TsFn { env: *mut Env, func: Option<v8::Global<v8::Function>>, - _max_queue_size: usize, + max_queue_size: usize, + queue_size: Mutex<usize>, + queue_cond: Condvar, thread_count: AtomicUsize, thread_finalize_data: *mut c_void, thread_finalize_cb: Option<napi_finalize>, @@ -771,6 +738,7 @@ impl TsFn { .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_ok() { + tsfn.queue_cond.notify_all(); let tsfnptr = SendPtr(tsfn); // drop must be queued in order to preserve ordering consistent // with Node.js and so that the finalizer runs on the main thread. @@ -811,26 +779,34 @@ impl TsFn { pub fn call( &self, data: *mut c_void, - _mode: napi_threadsafe_function_call_mode, + mode: napi_threadsafe_function_call_mode, ) -> napi_status { - // TODO: - // if self.max_queue_size > 0 && queued >= self.max_queue_size { - // if mode == napi_tsfn_blocking { - // wait somehow - // } else { - // return napi_queue_full; - // } - // } - if self.is_closing.load(Ordering::SeqCst) { return napi_closing; } + if self.max_queue_size > 0 { + let mut queue_size = self.queue_size.lock(); + while *queue_size >= self.max_queue_size { + if mode == napi_tsfn_blocking { + self.queue_cond.wait(&mut queue_size); + + if self.is_closing.load(Ordering::SeqCst) { + return napi_closing; + } + } else { + return napi_queue_full; + } + } + *queue_size += 1; + } + let is_closed = self.is_closed.clone(); let tsfn = SendPtr(self); let data = SendPtr(data); let context = SendPtr(self.context); let call_js_cb = self.call_js_cb; + self.sender.spawn(move |scope: &mut v8::HandleScope| { let data = data.take(); @@ -849,6 +825,15 @@ impl TsFn { let tsfn = unsafe { &*tsfn }; + if tsfn.max_queue_size > 0 { + let mut queue_size = tsfn.queue_size.lock(); + let size = *queue_size; + *queue_size -= 1; + if size == tsfn.max_queue_size { + tsfn.queue_cond.notify_one(); + } + } + let func = tsfn.func.as_ref().map(|f| v8::Local::new(scope, f)); unsafe { @@ -918,7 +903,9 @@ fn napi_create_threadsafe_function( let mut tsfn = Box::new(TsFn { env, func, - _max_queue_size: max_queue_size, + max_queue_size, + queue_size: Mutex::new(0), + queue_cond: Condvar::new(), thread_count: AtomicUsize::new(initial_thread_count), thread_finalize_data, thread_finalize_cb, |