summaryrefslogtreecommitdiff
path: root/cli/napi/node_api.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/napi/node_api.rs')
-rw-r--r--cli/napi/node_api.rs151
1 files changed, 69 insertions, 82 deletions
diff --git a/cli/napi/node_api.rs b/cli/napi/node_api.rs
index 28a11d614..268715fa8 100644
--- a/cli/napi/node_api.rs
+++ b/cli/napi/node_api.rs
@@ -9,10 +9,11 @@ use super::util::napi_set_last_error;
use super::util::SendPtr;
use crate::check_arg;
use crate::check_env;
+use deno_core::parking_lot::Condvar;
+use deno_core::parking_lot::Mutex;
use deno_core::V8CrossThreadTaskSpawner;
use deno_runtime::deno_napi::*;
use napi_sym::napi_sym;
-use std::ptr::NonNull;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU8;
use std::sync::atomic::AtomicUsize;
@@ -40,14 +41,7 @@ fn napi_add_env_cleanup_hook(
let fun = fun.unwrap();
- let mut env_cleanup_hooks = env.cleanup_hooks.borrow_mut();
- if env_cleanup_hooks
- .iter()
- .any(|pair| pair.0 == fun && pair.1 == arg)
- {
- panic!("Cleanup hook with this data already registered");
- }
- env_cleanup_hooks.push((fun, arg));
+ env.add_cleanup_hook(fun, arg);
napi_ok
}
@@ -63,27 +57,21 @@ fn napi_remove_env_cleanup_hook(
let fun = fun.unwrap();
- let mut env_cleanup_hooks = env.cleanup_hooks.borrow_mut();
- // Hooks are supposed to be removed in LIFO order
- let maybe_index = env_cleanup_hooks
- .iter()
- .rposition(|&pair| pair.0 == fun && pair.1 == arg);
-
- if let Some(index) = maybe_index {
- env_cleanup_hooks.remove(index);
- } else {
- panic!("Cleanup hook with this data not found");
- }
+ env.remove_cleanup_hook(fun, arg);
napi_ok
}
-type AsyncCleanupHandle = (*mut Env, napi_async_cleanup_hook, *mut c_void);
+struct AsyncCleanupHandle {
+ env: *mut Env,
+ hook: napi_async_cleanup_hook,
+ data: *mut c_void,
+}
unsafe extern "C" fn async_cleanup_handler(arg: *mut c_void) {
unsafe {
- let (env, hook, arg) = *Box::<AsyncCleanupHandle>::from_raw(arg as _);
- hook(env as _, arg);
+ let handle = Box::<AsyncCleanupHandle>::from_raw(arg as _);
+ (handle.hook)(arg, handle.data);
}
}
@@ -99,12 +87,13 @@ fn napi_add_async_cleanup_hook(
let hook = hook.unwrap();
- let handle =
- Box::into_raw(Box::<AsyncCleanupHandle>::new((env, hook, arg))) as _;
+ let handle = Box::into_raw(Box::new(AsyncCleanupHandle {
+ env,
+ hook,
+ data: arg,
+ })) as *mut c_void;
- unsafe {
- napi_add_env_cleanup_hook(env, Some(async_cleanup_handler), handle);
- }
+ env.add_cleanup_hook(async_cleanup_handler, handle);
if !remove_handle.is_null() {
unsafe {
@@ -123,17 +112,12 @@ fn napi_remove_async_cleanup_hook(
return napi_invalid_arg;
}
- let handle = unsafe { &*(remove_handle as *mut AsyncCleanupHandle) };
+ let handle =
+ unsafe { Box::<AsyncCleanupHandle>::from_raw(remove_handle as _) };
- let env = unsafe { &mut *handle.0 };
+ let env = unsafe { &mut *handle.env };
- unsafe {
- napi_remove_env_cleanup_hook(
- env,
- Some(async_cleanup_handler),
- remove_handle,
- );
- }
+ env.remove_cleanup_hook(async_cleanup_handler, remove_handle);
napi_ok
}
@@ -142,11 +126,7 @@ fn napi_remove_async_cleanup_hook(
fn napi_fatal_exception(env: &mut Env, err: napi_value) -> napi_status {
check_arg!(env, err);
- let report_error = unsafe {
- std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>(
- env.report_error,
- )
- };
+ let report_error = v8::Local::new(&mut env.scope(), &env.report_error);
let this = v8::undefined(&mut env.scope());
if report_error
@@ -262,7 +242,7 @@ fn napi_make_callback<'s>(
recv: napi_value,
func: napi_value,
argc: usize,
- argv: *const napi_value,
+ argv: *const napi_value<'s>,
result: *mut napi_value<'s>,
) -> napi_status {
check_arg!(env, recv);
@@ -312,11 +292,8 @@ fn napi_create_buffer<'s>(
let ab = v8::ArrayBuffer::new(&mut env.scope(), length);
- let buffer_constructor = unsafe {
- std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>(
- env.buffer_constructor,
- )
- };
+ let buffer_constructor =
+ v8::Local::new(&mut env.scope(), &env.buffer_constructor);
let Some(buffer) =
buffer_constructor.new_instance(&mut env.scope(), &[ab.into()])
else {
@@ -325,7 +302,7 @@ fn napi_create_buffer<'s>(
if !data.is_null() {
unsafe {
- *data = get_array_buffer_ptr(ab) as _;
+ *data = get_array_buffer_ptr(ab);
}
}
@@ -359,11 +336,8 @@ fn napi_create_external_buffer<'s>(
let ab =
v8::ArrayBuffer::with_backing_store(&mut env.scope(), &store.make_shared());
- let buffer_constructor = unsafe {
- std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>(
- env.buffer_constructor,
- )
- };
+ let buffer_constructor =
+ v8::Local::new(&mut env.scope(), &env.buffer_constructor);
let Some(buffer) =
buffer_constructor.new_instance(&mut env.scope(), &[ab.into()])
else {
@@ -389,18 +363,15 @@ fn napi_create_buffer_copy<'s>(
let ab = v8::ArrayBuffer::new(&mut env.scope(), length);
- let buffer_constructor = unsafe {
- std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>(
- env.buffer_constructor,
- )
- };
+ let buffer_constructor =
+ v8::Local::new(&mut env.scope(), &env.buffer_constructor);
let Some(buffer) =
buffer_constructor.new_instance(&mut env.scope(), &[ab.into()])
else {
return napi_generic_failure;
};
- let ptr = get_array_buffer_ptr(ab) as *mut c_void;
+ let ptr = get_array_buffer_ptr(ab);
unsafe {
std::ptr::copy(data, ptr, length);
}
@@ -428,11 +399,8 @@ fn napi_is_buffer(
check_arg!(env, value);
check_arg!(env, result);
- let buffer_constructor = unsafe {
- std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>(
- env.buffer_constructor,
- )
- };
+ let buffer_constructor =
+ v8::Local::new(&mut env.scope(), &env.buffer_constructor);
let Some(is_buffer) = value
.unwrap()
@@ -464,11 +432,8 @@ fn napi_get_buffer_info(
return napi_set_last_error(env, napi_invalid_arg);
};
- let buffer_constructor = unsafe {
- std::mem::transmute::<NonNull<v8::Function>, v8::Local<v8::Function>>(
- env.buffer_constructor,
- )
- };
+ let buffer_constructor =
+ v8::Local::new(&mut env.scope(), &env.buffer_constructor);
if !ta
.instance_of(&mut env.scope(), buffer_constructor.into())
@@ -703,7 +668,9 @@ extern "C" fn default_call_js_cb(
struct TsFn {
env: *mut Env,
func: Option<v8::Global<v8::Function>>,
- _max_queue_size: usize,
+ max_queue_size: usize,
+ queue_size: Mutex<usize>,
+ queue_cond: Condvar,
thread_count: AtomicUsize,
thread_finalize_data: *mut c_void,
thread_finalize_cb: Option<napi_finalize>,
@@ -771,6 +738,7 @@ impl TsFn {
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
+ tsfn.queue_cond.notify_all();
let tsfnptr = SendPtr(tsfn);
// drop must be queued in order to preserve ordering consistent
// with Node.js and so that the finalizer runs on the main thread.
@@ -811,26 +779,34 @@ impl TsFn {
pub fn call(
&self,
data: *mut c_void,
- _mode: napi_threadsafe_function_call_mode,
+ mode: napi_threadsafe_function_call_mode,
) -> napi_status {
- // TODO:
- // if self.max_queue_size > 0 && queued >= self.max_queue_size {
- // if mode == napi_tsfn_blocking {
- // wait somehow
- // } else {
- // return napi_queue_full;
- // }
- // }
-
if self.is_closing.load(Ordering::SeqCst) {
return napi_closing;
}
+ if self.max_queue_size > 0 {
+ let mut queue_size = self.queue_size.lock();
+ while *queue_size >= self.max_queue_size {
+ if mode == napi_tsfn_blocking {
+ self.queue_cond.wait(&mut queue_size);
+
+ if self.is_closing.load(Ordering::SeqCst) {
+ return napi_closing;
+ }
+ } else {
+ return napi_queue_full;
+ }
+ }
+ *queue_size += 1;
+ }
+
let is_closed = self.is_closed.clone();
let tsfn = SendPtr(self);
let data = SendPtr(data);
let context = SendPtr(self.context);
let call_js_cb = self.call_js_cb;
+
self.sender.spawn(move |scope: &mut v8::HandleScope| {
let data = data.take();
@@ -849,6 +825,15 @@ impl TsFn {
let tsfn = unsafe { &*tsfn };
+ if tsfn.max_queue_size > 0 {
+ let mut queue_size = tsfn.queue_size.lock();
+ let size = *queue_size;
+ *queue_size -= 1;
+ if size == tsfn.max_queue_size {
+ tsfn.queue_cond.notify_one();
+ }
+ }
+
let func = tsfn.func.as_ref().map(|f| v8::Local::new(scope, f));
unsafe {
@@ -918,7 +903,9 @@ fn napi_create_threadsafe_function(
let mut tsfn = Box::new(TsFn {
env,
func,
- _max_queue_size: max_queue_size,
+ max_queue_size,
+ queue_size: Mutex::new(0),
+ queue_cond: Condvar::new(),
thread_count: AtomicUsize::new(initial_thread_count),
thread_finalize_data,
thread_finalize_cb,