summaryrefslogtreecommitdiff
path: root/cli/napi/threadsafe_functions.rs
diff options
context:
space:
mode:
Diffstat (limited to 'cli/napi/threadsafe_functions.rs')
-rw-r--r--cli/napi/threadsafe_functions.rs112
1 files changed, 76 insertions, 36 deletions
diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs
index f47da46e9..15395529d 100644
--- a/cli/napi/threadsafe_functions.rs
+++ b/cli/napi/threadsafe_functions.rs
@@ -1,11 +1,12 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_core::futures::channel::mpsc;
+use deno_core::V8CrossThreadTaskSpawner;
use deno_runtime::deno_napi::*;
use once_cell::sync::Lazy;
use std::mem::forget;
+use std::ptr::NonNull;
use std::sync::atomic::AtomicUsize;
-use std::sync::mpsc::channel;
use std::sync::Arc;
static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
@@ -20,7 +21,7 @@ pub struct TsFn {
pub ref_counter: Arc<AtomicUsize>,
finalizer: Option<napi_finalize>,
finalizer_data: *mut c_void,
- sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
+ sender: V8CrossThreadTaskSpawner,
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
}
@@ -84,47 +85,86 @@ impl TsFn {
pub fn call(&self, data: *mut c_void, is_blocking: bool) {
let js_func = self.maybe_func.clone();
- let (tx, rx) = channel();
+
+ #[repr(transparent)]
+ struct SendPtr<T>(*const T);
+ unsafe impl<T> Send for SendPtr<T> {}
+ unsafe impl<T> Sync for SendPtr<T> {}
+
+ let env = SendPtr(self.env);
+ let context = SendPtr(self.context);
+ let data = SendPtr(data);
+
+ #[inline(always)]
+ fn spawn(
+ sender: &V8CrossThreadTaskSpawner,
+ is_blocking: bool,
+ f: impl FnOnce(&mut v8::HandleScope) + Send + 'static,
+ ) {
+ if is_blocking {
+ sender.spawn_blocking(f);
+ } else {
+ sender.spawn(f);
+ }
+ }
if let Some(call_js_cb) = self.maybe_call_js_cb {
- let context = self.context;
- let env = self.env;
- let call = Box::new(move || {
- let scope = &mut unsafe { (*env).scope() };
- match js_func {
- Some(func) => {
- let func: v8::Local<v8::Value> =
- func.open(scope).to_object(scope).unwrap().into();
- unsafe {
- call_js_cb(env as *mut c_void, func.into(), context, data)
- };
+ if let Some(func) = js_func {
+ let func = SendPtr(func.into_raw().as_ptr());
+ #[inline(always)]
+ fn call(
+ scope: &mut v8::HandleScope,
+ call_js_cb: napi_threadsafe_function_call_js,
+ func: SendPtr<v8::Function>,
+ env: SendPtr<Env>,
+ context: SendPtr<c_void>,
+ data: SendPtr<c_void>,
+ ) {
+ // SAFETY: This is a valid global from above
+ let func: v8::Global<v8::Function> = unsafe {
+ v8::Global::<v8::Function>::from_raw(
+ scope,
+ NonNull::new_unchecked(func.0 as _),
+ )
+ };
+ let func: v8::Local<v8::Value> =
+ func.open(scope).to_object(scope).unwrap().into();
+ // SAFETY: env is valid for the duration of the callback.
+ // data lifetime is users responsibility.
+ unsafe {
+ call_js_cb(env.0 as _, func.into(), context.0 as _, data.0 as _)
}
- None => {
- unsafe {
- call_js_cb(env as *mut c_void, std::mem::zeroed(), context, data)
- };
+ }
+ spawn(&self.sender, is_blocking, move |scope| {
+ call(scope, call_js_cb, func, env, context, data);
+ });
+ } else {
+ #[inline(always)]
+ fn call(
+ call_js_cb: napi_threadsafe_function_call_js,
+ env: SendPtr<Env>,
+ context: SendPtr<c_void>,
+ data: SendPtr<c_void>,
+ ) {
+ // SAFETY: We're calling the provided callback with valid args
+ unsafe {
+ call_js_cb(
+ env.0 as _,
+ std::mem::zeroed(),
+ context.0 as _,
+ data.0 as _,
+ )
}
}
-
- // Receiver might have been already dropped
- let _ = tx.send(());
- });
- // This call should never fail
- self.sender.unbounded_send(call).unwrap();
- } else if let Some(_js_func) = js_func {
- let call = Box::new(move || {
+ spawn(&self.sender, is_blocking, move |_| {
+ call(call_js_cb, env, context, data);
+ });
+ }
+ } else {
+ spawn(&self.sender, is_blocking, |_| {
// TODO: func.call
- // let func = js_func.open(scope);
- // Receiver might have been already dropped
- let _ = tx.send(());
});
- // This call should never fail
- self.sender.unbounded_send(call).unwrap();
- }
-
- if is_blocking {
- let _ = rx.recv();
- }
+ };
}
}