summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/dts/lib.deno.unstable.d.ts27
-rw-r--r--core/examples/schedule_task.rs3
-rw-r--r--core/extensions.rs10
-rw-r--r--core/runtime.rs2
-rw-r--r--ext/ffi/00_ffi.js18
-rw-r--r--ext/ffi/lib.rs132
-rw-r--r--test_ffi/src/lib.rs13
-rw-r--r--test_ffi/tests/integration_tests.rs40
-rw-r--r--test_ffi/tests/test.js41
-rw-r--r--test_ffi/tests/thread_safe_test.js101
-rw-r--r--test_ffi/tests/thread_safe_test_worker.js41
11 files changed, 400 insertions, 28 deletions
diff --git a/cli/dts/lib.deno.unstable.d.ts b/cli/dts/lib.deno.unstable.d.ts
index ab6f8634f..ac27648fe 100644
--- a/cli/dts/lib.deno.unstable.d.ts
+++ b/cli/dts/lib.deno.unstable.d.ts
@@ -565,6 +565,9 @@ declare namespace Deno {
* as C function pointers to ffi calls.
*
* The function pointer remains valid until the `close()` method is called.
+ *
+ * The callback can be explicitly ref'ed and deref'ed to stop Deno's
+ * process from exiting.
*/
export class UnsafeCallback<
Definition extends UnsafeCallbackDefinition = UnsafeCallbackDefinition,
@@ -584,6 +587,30 @@ declare namespace Deno {
Definition["result"]
>;
+ /**
+ * Adds one to this callback's reference counting.
+ *
+ * If the callback's reference count becomes non-zero, it will keep
+ * Deno's process from exiting.
+ */
+ ref(): void;
+
+ /**
+ * Removes one from this callback's reference counting.
+ *
+ * If the callback's reference counter becomes zero, it will no longer
+ * keep Deno's process from exiting.
+ */
+ unref(): void;
+
+ /**
+ * Removes the C function pointer associated with the UnsafeCallback.
+ * Continuing to use the instance after calling this object will lead to errors
+ * and crashes.
+ *
+ * Calling this method will also immediately set the callback's reference
+ * counting to zero and it will no longer keep Deno's process from exiting.
+ */
close(): void;
}
diff --git a/core/examples/schedule_task.rs b/core/examples/schedule_task.rs
index 7812dcb49..bd4bcb028 100644
--- a/core/examples/schedule_task.rs
+++ b/core/examples/schedule_task.rs
@@ -20,7 +20,8 @@ type Task = Box<dyn FnOnce()>;
fn main() {
let my_ext = Extension::builder()
.ops(vec![op_schedule_task::decl()])
- .event_loop_middleware(|state, cx| {
+ .event_loop_middleware(|state_rc, cx| {
+ let mut state = state_rc.borrow_mut();
let recv = state.borrow_mut::<mpsc::UnboundedReceiver<Task>>();
let mut ref_loop = false;
while let Poll::Ready(Some(call)) = recv.poll_next_unpin(cx) {
diff --git a/core/extensions.rs b/core/extensions.rs
index 682987124..ce6957875 100644
--- a/core/extensions.rs
+++ b/core/extensions.rs
@@ -1,13 +1,13 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use crate::OpState;
use anyhow::Error;
-use std::task::Context;
+use std::{cell::RefCell, rc::Rc, task::Context};
pub type SourcePair = (&'static str, &'static str);
pub type OpFnRef = v8::FunctionCallback;
pub type OpMiddlewareFn = dyn Fn(OpDecl) -> OpDecl;
pub type OpStateFn = dyn Fn(&mut OpState) -> Result<(), Error>;
-pub type OpEventLoopFn = dyn Fn(&mut OpState, &mut Context) -> bool;
+pub type OpEventLoopFn = dyn Fn(Rc<RefCell<OpState>>, &mut Context) -> bool;
#[derive(Clone, Copy)]
pub struct OpDecl {
@@ -90,13 +90,13 @@ impl Extension {
pub fn run_event_loop_middleware(
&self,
- op_state: &mut OpState,
+ op_state_rc: Rc<RefCell<OpState>>,
cx: &mut Context,
) -> bool {
self
.event_loop_middleware
.as_ref()
- .map(|f| f(op_state, cx))
+ .map(|f| f(op_state_rc, cx))
.unwrap_or(false)
}
@@ -148,7 +148,7 @@ impl ExtensionBuilder {
pub fn event_loop_middleware<F>(&mut self, middleware_fn: F) -> &mut Self
where
- F: Fn(&mut OpState, &mut Context) -> bool + 'static,
+ F: Fn(Rc<RefCell<OpState>>, &mut Context) -> bool + 'static,
{
self.event_loop_middleware = Some(Box::new(middleware_fn));
self
diff --git a/core/runtime.rs b/core/runtime.rs
index 7cb556fd3..23fe73013 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -921,7 +921,7 @@ impl JsRuntime {
let state = state_rc.borrow();
let op_state = state.op_state.clone();
for f in &self.event_loop_middlewares {
- if f(&mut op_state.borrow_mut(), cx) {
+ if f(op_state.clone(), cx) {
maybe_scheduling = true;
}
}
diff --git a/ext/ffi/00_ffi.js b/ext/ffi/00_ffi.js
index 190873307..f308ecad9 100644
--- a/ext/ffi/00_ffi.js
+++ b/ext/ffi/00_ffi.js
@@ -201,6 +201,7 @@
}
class UnsafeCallback {
+ #refcount;
#rid;
definition;
callback;
@@ -217,13 +218,30 @@
definition,
callback,
);
+ this.#refcount = 0;
this.#rid = rid;
this.pointer = pointer;
this.definition = definition;
this.callback = callback;
}
+ ref() {
+ if (this.#refcount++ === 0) {
+ core.opSync("op_ffi_unsafe_callback_ref", true);
+ }
+ }
+
+ unref() {
+ if (--this.#refcount === 0) {
+ core.opSync("op_ffi_unsafe_callback_ref", false);
+ }
+ }
+
close() {
+ if (this.#refcount) {
+ this.#refcount = 0;
+ core.opSync("op_ffi_unsafe_callback_ref", false);
+ }
core.close(this.#rid);
}
}
diff --git a/ext/ffi/lib.rs b/ext/ffi/lib.rs
index 41b2f0a3b..d90f20a29 100644
--- a/ext/ffi/lib.rs
+++ b/ext/ffi/lib.rs
@@ -6,9 +6,11 @@ use deno_core::error::generic_error;
use deno_core::error::range_error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::futures::channel::mpsc;
use deno_core::futures::Future;
use deno_core::include_js_files;
use deno_core::op;
+use std::sync::mpsc::sync_channel;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
@@ -37,7 +39,7 @@ use std::ptr;
use std::rc::Rc;
thread_local! {
- static IS_ISOLATE_THREAD: RefCell<bool> = RefCell::new(false);
+ static LOCAL_ISOLATE_POINTER: RefCell<*const v8::Isolate> = RefCell::new(ptr::null());
}
pub struct Unstable(pub bool);
@@ -122,7 +124,6 @@ impl DynamicLibraryResource {
name: String,
foreign_fn: ForeignFunction,
) -> Result<(), AnyError> {
- IS_ISOLATE_THREAD.with(|s| s.replace(true));
let symbol = match &foreign_fn.name {
Some(symbol) => symbol,
None => &name,
@@ -178,6 +179,14 @@ impl DynamicLibraryResource {
}
}
+type PendingFfiAsyncWork = Box<dyn FnOnce()>;
+
+struct FfiState {
+ async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
+ async_work_receiver: mpsc::UnboundedReceiver<PendingFfiAsyncWork>,
+ active_refed_functions: usize,
+}
+
pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
Extension::builder()
.js(include_js_files!(
@@ -204,10 +213,51 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
op_ffi_read_f32::decl::<P>(),
op_ffi_read_f64::decl::<P>(),
op_ffi_unsafe_callback_create::decl::<P>(),
+ op_ffi_unsafe_callback_ref::decl(),
])
+ .event_loop_middleware(|op_state_rc, _cx| {
+ // FFI callbacks coming in from other threads will call in and get queued.
+ let mut maybe_scheduling = false;
+
+ let mut work_items: Vec<PendingFfiAsyncWork> = vec![];
+
+ {
+ let mut op_state = op_state_rc.borrow_mut();
+ let ffi_state = op_state.borrow_mut::<FfiState>();
+
+ while let Ok(Some(async_work_fut)) =
+ ffi_state.async_work_receiver.try_next()
+ {
+ // Move received items to a temporary vector so that we can drop the `op_state` borrow before we do the work.
+ work_items.push(async_work_fut);
+ maybe_scheduling = true;
+ }
+
+ if ffi_state.active_refed_functions > 0 {
+ maybe_scheduling = true;
+ }
+
+ drop(op_state);
+ }
+ while let Some(async_work_fut) = work_items.pop() {
+ async_work_fut();
+ }
+
+ maybe_scheduling
+ })
.state(move |state| {
// Stolen from deno_webgpu, is there a better option?
state.put(Unstable(unstable));
+
+ let (async_work_sender, async_work_receiver) =
+ mpsc::unbounded::<PendingFfiAsyncWork>();
+
+ state.put(FfiState {
+ active_refed_functions: 0,
+ async_work_receiver,
+ async_work_sender,
+ });
+
Ok(())
})
.build()
@@ -831,6 +881,7 @@ impl Resource for UnsafeCallbackResource {
}
struct CallbackInfo {
+ pub async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
pub callback: NonNull<v8::Function>,
pub context: NonNull<v8::Context>,
pub isolate: *mut v8::Isolate,
@@ -842,21 +893,55 @@ unsafe extern "C" fn deno_ffi_callback(
args: *const *const c_void,
info: &CallbackInfo,
) {
- let isolate = &mut *info.isolate;
- let callback = v8::Global::from_raw(isolate, info.callback);
- let context = std::mem::transmute::<
- NonNull<v8::Context>,
- v8::Local<v8::Context>,
- >(info.context);
- IS_ISOLATE_THREAD.with(|is_event_loop_thread| {
- if !(*is_event_loop_thread.borrow()) {
- // Call from another thread, not yet supported.
- eprintln!(
- "Calling Deno FFI's callbacks from other threads is not supported"
+ LOCAL_ISOLATE_POINTER.with(|s| {
+ if ptr::eq(*s.borrow(), info.isolate) {
+ // Own isolate thread, okay to call directly
+ do_ffi_callback(
+ cif,
+ result,
+ args,
+ info.callback,
+ info.context,
+ info.isolate,
);
- std::process::exit(1);
+ } else {
+ let async_work_sender = &info.async_work_sender;
+ // SAFETY: Safe as this function blocks until `do_ffi_callback` completes and a response message is received.
+ let cif: &'static libffi::low::ffi_cif = std::mem::transmute(cif);
+ let result: &'static mut c_void = std::mem::transmute(result);
+ let info: &'static CallbackInfo = std::mem::transmute(info);
+ let (response_sender, response_receiver) = sync_channel::<()>(0);
+ let fut = Box::new(move || {
+ do_ffi_callback(
+ cif,
+ result,
+ args,
+ info.callback,
+ info.context,
+ info.isolate,
+ );
+ response_sender.send(()).unwrap();
+ });
+ async_work_sender.unbounded_send(fut).unwrap();
+ response_receiver.recv().unwrap();
}
});
+}
+
+unsafe fn do_ffi_callback(
+ cif: &libffi::low::ffi_cif,
+ result: &mut c_void,
+ args: *const *const c_void,
+ callback: NonNull<v8::Function>,
+ context: NonNull<v8::Context>,
+ isolate: *mut v8::Isolate,
+) {
+ let isolate = &mut *isolate;
+ let callback = v8::Global::from_raw(isolate, callback);
+ let context = std::mem::transmute::<
+ NonNull<v8::Context>,
+ v8::Local<v8::Context>,
+ >(context);
// Call from main thread. If this callback is being triggered due to a
// function call coming from Deno itself, then this callback will build
// ontop of that stack.
@@ -1096,11 +1181,20 @@ where
let cb = v8::Local::<v8::Function>::try_from(v8_value)?;
let isolate: *mut v8::Isolate = &mut *scope as &mut v8::Isolate;
+ LOCAL_ISOLATE_POINTER.with(|s| {
+ if s.borrow().is_null() {
+ s.replace(isolate);
+ }
+ });
+
+ let async_work_sender =
+ state.borrow_mut::<FfiState>().async_work_sender.clone();
let callback = v8::Global::new(scope, cb).into_raw();
let current_context = scope.get_current_context();
let context = v8::Global::new(scope, current_context).into_raw();
let info = Box::leak(Box::new(CallbackInfo {
+ async_work_sender,
callback,
context,
isolate,
@@ -1158,6 +1252,16 @@ where
Ok(result)
}
+#[op]
+fn op_ffi_unsafe_callback_ref(state: &mut deno_core::OpState, inc_dec: bool) {
+ let ffi_state = state.borrow_mut::<FfiState>();
+ if inc_dec {
+ ffi_state.active_refed_functions += 1;
+ } else {
+ ffi_state.active_refed_functions -= 1;
+ }
+}
+
#[op(v8)]
fn op_ffi_call_ptr_nonblocking<'scope, FP>(
scope: &mut v8::HandleScope<'scope>,
diff --git a/test_ffi/src/lib.rs b/test_ffi/src/lib.rs
index b4908d9cd..d6f29cbb8 100644
--- a/test_ffi/src/lib.rs
+++ b/test_ffi/src/lib.rs
@@ -211,6 +211,19 @@ pub extern "C" fn call_stored_function_2(arg: u8) {
}
}
+#[no_mangle]
+pub extern "C" fn call_stored_function_thread_safe() {
+ std::thread::spawn(move || {
+ std::thread::sleep(std::time::Duration::from_millis(1500));
+ unsafe {
+ if STORED_FUNCTION.is_none() {
+ return;
+ }
+ STORED_FUNCTION.unwrap()();
+ }
+ });
+}
+
// FFI performance helper functions
#[no_mangle]
pub extern "C" fn nop() {}
diff --git a/test_ffi/tests/integration_tests.rs b/test_ffi/tests/integration_tests.rs
index 77bcc758e..35f37aa14 100644
--- a/test_ffi/tests/integration_tests.rs
+++ b/test_ffi/tests/integration_tests.rs
@@ -77,6 +77,8 @@ fn basic() {
true\n\
Before\n\
true\n\
+ After\n\
+ true\n\
logCallback\n\
1 -1 2 -2 3 -3 4n -4n 0.5 -0.5 1 2 3 4 5 6 7 8\n\
u8: 8\n\
@@ -85,12 +87,14 @@ fn basic() {
30\n\
STORED_FUNCTION cleared\n\
STORED_FUNCTION_2 cleared\n\
+ Thread safe call counter: 0\n\
+ logCallback\n\
+ Thread safe call counter: 1\n\
+ u8: 8\n\
Static u32: 42\n\
Static i64: -1242464576485n\n\
Static ptr: true\n\
Static ptr value: 42\n\
- After\n\
- true\n\
Correct number of resources\n";
assert_eq!(stdout, expected);
assert_eq!(stderr, "");
@@ -118,3 +122,35 @@ fn symbol_types() {
assert!(output.status.success());
assert_eq!(stderr, "");
}
+
+#[test]
+fn thread_safe_callback() {
+ build();
+
+ let output = deno_cmd()
+ .arg("run")
+ .arg("--allow-ffi")
+ .arg("--allow-read")
+ .arg("--unstable")
+ .arg("--quiet")
+ .arg("tests/thread_safe_test.js")
+ .env("NO_COLOR", "1")
+ .output()
+ .unwrap();
+ let stdout = std::str::from_utf8(&output.stdout).unwrap();
+ let stderr = std::str::from_utf8(&output.stderr).unwrap();
+ if !output.status.success() {
+ println!("stdout {}", stdout);
+ println!("stderr {}", stderr);
+ }
+ println!("{:?}", output.status);
+ assert!(output.status.success());
+ let expected = "\
+ Callback on main thread\n\
+ Callback on worker thread\n\
+ Calling callback, isolate should stay asleep until callback is called\n\
+ Callback being called\n\
+ Isolate should now exit\n";
+ assert_eq!(stdout, expected);
+ assert_eq!(stderr, "");
+}
diff --git a/test_ffi/tests/test.js b/test_ffi/tests/test.js
index ab31dcb83..03c166a7c 100644
--- a/test_ffi/tests/test.js
+++ b/test_ffi/tests/test.js
@@ -130,6 +130,12 @@ const dylib = Deno.dlopen(libPath, {
parameters: ["function"],
result: "void",
},
+ call_fn_ptr_thread_safe: {
+ name: "call_fn_ptr",
+ parameters: ["function"],
+ result: "void",
+ nonblocking: true,
+ },
call_fn_ptr_many_parameters: {
parameters: ["function"],
result: "void",
@@ -138,6 +144,11 @@ const dylib = Deno.dlopen(libPath, {
parameters: ["function"],
result: "void",
},
+ call_fn_ptr_return_u8_thread_safe: {
+ name: "call_fn_ptr_return_u8",
+ parameters: ["function"],
+ result: "void",
+ },
call_fn_ptr_return_buffer: {
parameters: ["function"],
result: "void",
@@ -292,15 +303,16 @@ console.log("After sleep_blocking");
console.log(performance.now() - start >= 100);
start = performance.now();
-dylib.symbols.sleep_nonblocking(100).then(() => {
+const promise_2 = dylib.symbols.sleep_nonblocking(100).then(() => {
console.log("After");
console.log(performance.now() - start >= 100);
- // Close after task is complete.
- cleanup();
});
console.log("Before");
console.log(performance.now() - start < 100);
+// Await to make sure `sleep_nonblocking` calls and logs before we proceed
+await promise_2;
+
// Test calls with callback parameters
const logCallback = new Deno.UnsafeCallback(
{ parameters: [], result: "void" },
@@ -376,6 +388,24 @@ dylib.symbols.store_function(ptr(nestedCallback));
dylib.symbols.store_function(null);
dylib.symbols.store_function_2(null);
+let counter = 0;
+const addToFooCallback = new Deno.UnsafeCallback({
+ parameters: [],
+ result: "void",
+}, () => counter++);
+
+// Test thread safe callbacks
+console.log("Thread safe call counter:", counter);
+addToFooCallback.ref();
+await dylib.symbols.call_fn_ptr_thread_safe(ptr(addToFooCallback));
+addToFooCallback.unref();
+logCallback.ref();
+await dylib.symbols.call_fn_ptr_thread_safe(ptr(logCallback));
+logCallback.unref();
+console.log("Thread safe call counter:", counter);
+returnU8Callback.ref();
+await dylib.symbols.call_fn_ptr_return_u8_thread_safe(ptr(returnU8Callback));
+
// Test statics
console.log("Static u32:", dylib.symbols.static_u32);
console.log("Static i64:", dylib.symbols.static_i64);
@@ -386,7 +416,7 @@ console.log(
const view = new Deno.UnsafePointerView(dylib.symbols.static_ptr);
console.log("Static ptr value:", view.getUint32());
-function cleanup() {
+(function cleanup() {
dylib.close();
throwCallback.close();
logCallback.close();
@@ -395,6 +425,7 @@ function cleanup() {
returnBufferCallback.close();
add10Callback.close();
nestedCallback.close();
+ addToFooCallback.close();
const resourcesPost = Deno.resources();
@@ -409,4 +440,4 @@ After: ${postStr}`,
}
console.log("Correct number of resources");
-}
+})();
diff --git a/test_ffi/tests/thread_safe_test.js b/test_ffi/tests/thread_safe_test.js
new file mode 100644
index 000000000..e54114055
--- /dev/null
+++ b/test_ffi/tests/thread_safe_test.js
@@ -0,0 +1,101 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+const targetDir = Deno.execPath().replace(/[^\/\\]+$/, "");
+const [libPrefix, libSuffix] = {
+ darwin: ["lib", "dylib"],
+ linux: ["lib", "so"],
+ windows: ["", "dll"],
+}[Deno.build.os];
+const libPath = `${targetDir}/${libPrefix}test_ffi.${libSuffix}`;
+
+const resourcesPre = Deno.resources();
+
+const dylib = Deno.dlopen(libPath, {
+ store_function: {
+ parameters: ["function"],
+ result: "void",
+ },
+ call_stored_function: {
+ parameters: [],
+ result: "void",
+ },
+ call_stored_function_thread_safe: {
+ parameters: [],
+ result: "void",
+ },
+});
+
+let resolveWorker;
+let workerResponsePromise;
+
+const worker = new Worker(
+ new URL("./thread_safe_test_worker.js", import.meta.url).href,
+ { type: "module" },
+);
+
+worker.addEventListener("message", () => {
+ if (resolveWorker) {
+ resolveWorker();
+ }
+});
+
+const sendWorkerMessage = async (data) => {
+ workerResponsePromise = new Promise((res) => {
+ resolveWorker = res;
+ });
+ worker.postMessage(data);
+ await workerResponsePromise;
+};
+
+// Test step 1: Register main thread callback, trigger on worker thread
+
+const mainThreadCallback = new Deno.UnsafeCallback(
+ { parameters: [], result: "void" },
+ () => {
+ console.log("Callback on main thread");
+ },
+);
+
+mainThreadCallback.ref();
+
+dylib.symbols.store_function(mainThreadCallback.pointer);
+
+await sendWorkerMessage("call");
+
+// Test step 2: Register on worker thread, trigger on main thread
+
+await sendWorkerMessage("register");
+
+dylib.symbols.call_stored_function();
+
+// Unref both main and worker thread callbacks and terminate the wrorker: Note, the stored function pointer in lib is now dangling.
+
+mainThreadCallback.unref();
+await sendWorkerMessage("unref");
+worker.terminate();
+
+// Test step 3: Register a callback that will be the only thing left keeping the isolate from exiting.
+// Rely on it to keep Deno running until the callback comes in and unrefs the callback, after which Deno should exit.
+
+const cleanupCallback = new Deno.UnsafeCallback(
+ { parameters: [], result: "void" },
+ () => {
+ console.log("Callback being called");
+ Promise.resolve().then(() => cleanup());
+ },
+);
+
+cleanupCallback.ref();
+
+function cleanup() {
+ cleanupCallback.unref();
+ console.log("Isolate should now exit");
+}
+
+dylib.symbols.store_function(cleanupCallback.pointer);
+
+console.log(
+ "Calling callback, isolate should stay asleep until callback is called",
+);
+dylib.symbols.call_stored_function_thread_safe();
diff --git a/test_ffi/tests/thread_safe_test_worker.js b/test_ffi/tests/thread_safe_test_worker.js
new file mode 100644
index 000000000..067004469
--- /dev/null
+++ b/test_ffi/tests/thread_safe_test_worker.js
@@ -0,0 +1,41 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+// deno-lint-ignore-file
+
+const targetDir = Deno.execPath().replace(/[^\/\\]+$/, "");
+const [libPrefix, libSuffix] = {
+ darwin: ["lib", "dylib"],
+ linux: ["lib", "so"],
+ windows: ["", "dll"],
+}[Deno.build.os];
+const libPath = `${targetDir}/${libPrefix}test_ffi.${libSuffix}`;
+
+const dylib = Deno.dlopen(libPath, {
+ store_function: {
+ parameters: ["function"],
+ result: "void",
+ },
+ call_stored_function: {
+ parameters: [],
+ result: "void",
+ },
+});
+
+const callback = new Deno.UnsafeCallback(
+ { parameters: [], result: "void" },
+ () => {
+ console.log("Callback on worker thread");
+ },
+);
+
+callback.ref();
+
+self.addEventListener("message", ({ data }) => {
+ if (data === "register") {
+ dylib.symbols.store_function(callback.pointer);
+ } else if (data === "call") {
+ dylib.symbols.call_stored_function();
+ } else if (data === "unref") {
+ callback.unref();
+ }
+ self.postMessage("done");
+});