summaryrefslogtreecommitdiff
path: root/ext/ffi/callback.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-06-07 23:50:14 +0200
committerGitHub <noreply@github.com>2023-06-07 23:50:14 +0200
commit19f82b0eaa14f0df58fdfc685e60c8560582c5a4 (patch)
tree0269a3fb0e70fb37856b5d4a2b5a1e737be9feb7 /ext/ffi/callback.rs
parent7e91f74d2b00cdc64042ba66e45d912fa2d9b647 (diff)
refactor(core): use JoinSet instead of FuturesUnordered (#19378)
This commit migrates "deno_core" from using "FuturesUnordered" to "tokio::task::JoinSet". This makes every op to be a separate Tokio task and should unlock better utilization of kqueue/epoll. There were two quirks added to this PR: - because of the fact that "JoinSet" immediately polls spawn tasks, op sanitizers can give false positives in some cases, this was alleviated by polling event loop once before running a test with "deno test", which gives canceled ops an opportunity to settle - "JsRuntimeState::waker" was moved to "OpState::waker" so that FFI API can still use threadsafe functions - without this change the registered wakers were wrong as they would not wake up the whole "JsRuntime" but the task associated with an op --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'ext/ffi/callback.rs')
-rw-r--r--ext/ffi/callback.rs21
1 files changed, 10 insertions, 11 deletions
diff --git a/ext/ffi/callback.rs b/ext/ffi/callback.rs
index 2d2cf491b..78a21ab8f 100644
--- a/ext/ffi/callback.rs
+++ b/ext/ffi/callback.rs
@@ -10,6 +10,7 @@ use crate::MAX_SAFE_INTEGER;
use crate::MIN_SAFE_INTEGER;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
+use deno_core::futures::task::AtomicWaker;
use deno_core::op;
use deno_core::serde_v8;
use deno_core::v8;
@@ -32,8 +33,8 @@ use std::rc::Rc;
use std::sync::atomic;
use std::sync::atomic::AtomicU32;
use std::sync::mpsc::sync_channel;
+use std::sync::Arc;
use std::task::Poll;
-use std::task::Waker;
static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
@@ -99,21 +100,20 @@ struct CallbackInfo {
pub parameters: Box<[NativeType]>,
pub result: NativeType,
pub thread_id: u32,
- pub waker: Option<Waker>,
+ pub waker: Arc<AtomicWaker>,
}
impl Future for CallbackInfo {
type Output = ();
fn poll(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
+ self: Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
- // Always replace the waker to make sure it's bound to the proper Future.
- self.waker.replace(cx.waker().clone());
// The future for the CallbackInfo never resolves: It can only be canceled.
Poll::Pending
}
}
+
unsafe extern "C" fn deno_ffi_callback(
cif: &libffi::low::ffi_cif,
result: &mut c_void,
@@ -136,10 +136,8 @@ unsafe extern "C" fn deno_ffi_callback(
response_sender.send(()).unwrap();
});
async_work_sender.unbounded_send(fut).unwrap();
- if let Some(waker) = info.waker.as_ref() {
- // Make sure event loop wakes up to receive our message before we start waiting for a response.
- waker.wake_by_ref();
- }
+ // Make sure event loop wakes up to receive our message before we start waiting for a response.
+ info.waker.wake();
response_receiver.recv().unwrap();
}
});
@@ -574,6 +572,7 @@ where
let current_context = scope.get_current_context();
let context = v8::Global::new(scope, current_context).into_raw();
+ let waker = state.waker.clone();
let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo {
async_work_sender,
callback,
@@ -581,7 +580,7 @@ where
parameters: args.parameters.clone().into(),
result: args.result.clone(),
thread_id,
- waker: None,
+ waker,
}));
let cif = Cif::new(
args