summaryrefslogtreecommitdiff
path: root/ext/ffi/callback.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/ffi/callback.rs')
-rw-r--r--ext/ffi/callback.rs21
1 files changed, 10 insertions, 11 deletions
diff --git a/ext/ffi/callback.rs b/ext/ffi/callback.rs
index 2d2cf491b..78a21ab8f 100644
--- a/ext/ffi/callback.rs
+++ b/ext/ffi/callback.rs
@@ -10,6 +10,7 @@ use crate::MAX_SAFE_INTEGER;
use crate::MIN_SAFE_INTEGER;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
+use deno_core::futures::task::AtomicWaker;
use deno_core::op;
use deno_core::serde_v8;
use deno_core::v8;
@@ -32,8 +33,8 @@ use std::rc::Rc;
use std::sync::atomic;
use std::sync::atomic::AtomicU32;
use std::sync::mpsc::sync_channel;
+use std::sync::Arc;
use std::task::Poll;
-use std::task::Waker;
static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
@@ -99,21 +100,20 @@ struct CallbackInfo {
pub parameters: Box<[NativeType]>,
pub result: NativeType,
pub thread_id: u32,
- pub waker: Option<Waker>,
+ pub waker: Arc<AtomicWaker>,
}
impl Future for CallbackInfo {
type Output = ();
fn poll(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
+ self: Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
- // Always replace the waker to make sure it's bound to the proper Future.
- self.waker.replace(cx.waker().clone());
// The future for the CallbackInfo never resolves: It can only be canceled.
Poll::Pending
}
}
+
unsafe extern "C" fn deno_ffi_callback(
cif: &libffi::low::ffi_cif,
result: &mut c_void,
@@ -136,10 +136,8 @@ unsafe extern "C" fn deno_ffi_callback(
response_sender.send(()).unwrap();
});
async_work_sender.unbounded_send(fut).unwrap();
- if let Some(waker) = info.waker.as_ref() {
- // Make sure event loop wakes up to receive our message before we start waiting for a response.
- waker.wake_by_ref();
- }
+ // Make sure event loop wakes up to receive our message before we start waiting for a response.
+ info.waker.wake();
response_receiver.recv().unwrap();
}
});
@@ -574,6 +572,7 @@ where
let current_context = scope.get_current_context();
let context = v8::Global::new(scope, current_context).into_raw();
+ let waker = state.waker.clone();
let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo {
async_work_sender,
callback,
@@ -581,7 +580,7 @@ where
parameters: args.parameters.clone().into(),
result: args.result.clone(),
thread_id,
- waker: None,
+ waker,
}));
let cif = Cif::new(
args