diff options
author | Divy Srivastava <dj.srivastava23@gmail.com> | 2022-10-05 07:06:44 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-05 19:36:44 +0530 |
commit | 0b016a7fb8639ce49603c8c339539174b191a4b1 (patch) | |
tree | c511060d701db60ede0214b7280e89c5749bbe62 /cli/napi/threadsafe_functions.rs | |
parent | 3a3a8484069c9c6955fcb83ea761f9f74638175a (diff) |
feat(npm): implement Node API (#13633)
This PR implements the NAPI for loading native modules into Deno.
Co-authored-by: Bartek IwaĆczuk <biwanczuk@gmail.com>
Co-authored-by: DjDeveloper <43033058+DjDeveloperr@users.noreply.github.com>
Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
Diffstat (limited to 'cli/napi/threadsafe_functions.rs')
-rw-r--r-- | cli/napi/threadsafe_functions.rs | 199 |
1 files changed, 199 insertions, 0 deletions
diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs new file mode 100644 index 000000000..5374b6159 --- /dev/null +++ b/cli/napi/threadsafe_functions.rs @@ -0,0 +1,199 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use deno_core::futures::channel::mpsc; +use deno_runtime::deno_napi::*; +use std::mem::forget; +use std::sync::mpsc::channel; + +pub struct TsFn { + pub env: *mut Env, + pub maybe_func: Option<v8::Global<v8::Function>>, + pub maybe_call_js_cb: Option<napi_threadsafe_function_call_js>, + pub context: *mut c_void, + pub thread_counter: usize, + sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, + tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>, +} + +impl TsFn { + pub fn acquire(&mut self) -> Result { + self.thread_counter += 1; + Ok(()) + } + + pub fn release(mut self) -> Result { + self.thread_counter -= 1; + if self.thread_counter == 0 { + self + .tsfn_sender + .unbounded_send(ThreadSafeFunctionStatus::Dead) + .map_err(|_| Error::GenericFailure)?; + drop(self); + } else { + forget(self); + } + Ok(()) + } + + pub fn call(&self, data: *mut c_void, is_blocking: bool) { + let js_func = self.maybe_func.clone(); + let (tx, rx) = channel(); + + if let Some(call_js_cb) = self.maybe_call_js_cb { + let context = self.context; + let env = self.env; + let call = Box::new(move || { + let scope = &mut unsafe { (*env).scope() }; + match js_func { + Some(func) => { + let func: v8::Local<v8::Value> = + func.open(scope).to_object(scope).unwrap().into(); + unsafe { + call_js_cb( + env as *mut c_void, + transmute::<v8::Local<v8::Value>, napi_value>(func), + context, + data, + ) + }; + } + None => { + unsafe { + call_js_cb( + env as *mut c_void, + std::ptr::null_mut(), + context, + data, + ) + }; + } + } + + // Receiver might have been already dropped + let _ = tx.send(()); + }); + // This call should never fail + self.sender.unbounded_send(call).unwrap(); + } else if let Some(_js_func) = js_func { + let call = Box::new(move || { + // TODO: func.call + // let func = js_func.open(scope); + // Receiver might have been already dropped + let _ = tx.send(()); + }); + // This call should never fail + self.sender.unbounded_send(call).unwrap(); + } + + if is_blocking { + rx.recv().unwrap(); + } + } +} + +#[napi_sym::napi_sym] +fn napi_create_threadsafe_function( + env: *mut Env, + func: napi_value, + _async_resource: napi_value, + _async_resource_name: napi_value, + _max_queue_size: usize, + initial_thread_count: usize, + _thread_finialize_data: *mut c_void, + _thread_finalize_cb: napi_finalize, + context: *mut c_void, + maybe_call_js_cb: Option<napi_threadsafe_function_call_js>, + result: *mut napi_threadsafe_function, +) -> Result { + let env_ref = env.as_mut().ok_or(Error::GenericFailure)?; + if initial_thread_count == 0 { + return Err(Error::InvalidArg); + } + let maybe_func = func + .as_mut() + .map(|func| { + let value = transmute::<napi_value, v8::Local<v8::Value>>(func); + let func = v8::Local::<v8::Function>::try_from(value) + .map_err(|_| Error::FunctionExpected)?; + Ok(v8::Global::new(&mut env_ref.scope(), func)) + }) + .transpose()?; + + let tsfn = TsFn { + maybe_func, + maybe_call_js_cb, + context, + thread_counter: initial_thread_count, + sender: env_ref.async_work_sender.clone(), + tsfn_sender: env_ref.threadsafe_function_sender.clone(), + env, + }; + + env_ref + .threadsafe_function_sender + .unbounded_send(ThreadSafeFunctionStatus::Alive) + .map_err(|_| Error::GenericFailure)?; + *result = transmute::<Box<TsFn>, _>(Box::new(tsfn)); + + Ok(()) +} + +#[napi_sym::napi_sym] +fn napi_acquire_threadsafe_function( + tsfn: napi_threadsafe_function, + _mode: napi_threadsafe_function_release_mode, +) -> Result { + let tsfn: &mut TsFn = &mut *(tsfn as *mut TsFn); + tsfn.acquire()?; + + Ok(()) +} + +#[napi_sym::napi_sym] +fn napi_unref_threadsafe_function( + _env: &mut Env, + tsfn: napi_threadsafe_function, +) -> Result { + let _tsfn: &TsFn = &*(tsfn as *const TsFn); + + Ok(()) +} + +/// Maybe called from any thread. +#[napi_sym::napi_sym] +pub fn napi_get_threadsafe_function_context( + func: napi_threadsafe_function, + result: *mut *const c_void, +) -> Result { + let tsfn: &TsFn = &*(func as *const TsFn); + *result = tsfn.context; + Ok(()) +} + +#[napi_sym::napi_sym] +fn napi_call_threadsafe_function( + func: napi_threadsafe_function, + data: *mut c_void, + is_blocking: napi_threadsafe_function_call_mode, +) -> Result { + let tsfn: &TsFn = &*(func as *const TsFn); + tsfn.call(data, is_blocking != 0); + Ok(()) +} + +#[napi_sym::napi_sym] +fn napi_ref_threadsafe_function() -> Result { + // TODO + Ok(()) +} + +#[napi_sym::napi_sym] +fn napi_release_threadsafe_function( + tsfn: napi_threadsafe_function, + _mode: napi_threadsafe_function_release_mode, +) -> Result { + let tsfn: Box<TsFn> = Box::from_raw(tsfn as *mut TsFn); + tsfn.release()?; + + Ok(()) +} |