summaryrefslogtreecommitdiff
path: root/core/runtime.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2021-11-25 19:49:09 +0100
committerGitHub <noreply@github.com>2021-11-25 19:49:09 +0100
commitf3c0f0565bbf43b4cc31979b05e729d4f4a1538f (patch)
tree6554dfdecd89e076166531b91824d6750d5b76e0 /core/runtime.rs
parent2853e3760471560c1812b46007e1a525966b1365 (diff)
feat(core): Add ability to "ref" and "unref" pending ops (#12889)
This commit adds an ability to "ref" or "unref" pending ops. Up to this point Deno had a notion of "async ops" and "unref async ops"; the former keep event loop alive, while the latter do not block event loop from finishing. It was not possible to change between op types after dispatching, one had to decide which type to use before dispatch. Instead of storing ops in two separate "FuturesUnordered" collections, now ops are stored in a single collection, with supplemental "HashSet" storing ids of promises that were "unrefed". Two APIs were added to "Deno.core": "Deno.core.refOp(promiseId)" which allows to mark promise id to be "refed" and keep event loop alive (the default behavior) "Deno.core.unrefOp(promiseId)" which allows to mark promise id as "unrefed" which won't block event loop from exiting
Diffstat (limited to 'core/runtime.rs')
-rw-r--r--core/runtime.rs120
1 files changed, 83 insertions, 37 deletions
diff --git a/core/runtime.rs b/core/runtime.rs
index 3af090a1c..ad7f16886 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -29,6 +29,7 @@ use futures::task::AtomicWaker;
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
+use std::collections::HashSet;
use std::ffi::c_void;
use std::mem::forget;
use std::option::Option;
@@ -135,23 +136,6 @@ pub type SharedArrayBufferStore =
pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>;
-struct AsyncOpIterator<'a, 'b, 'c> {
- ops: &'b mut FuturesUnordered<PendingOpFuture>,
- cx: &'a mut Context<'c>,
-}
-
-impl Iterator for AsyncOpIterator<'_, '_, '_> {
- type Item = (PromiseId, OpId, OpResult);
-
- #[inline]
- fn next(&mut self) -> Option<Self::Item> {
- match self.ops.poll_next_unpin(self.cx) {
- Poll::Ready(Some(item)) => Some(item),
- _ => None,
- }
- }
-}
-
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@@ -171,7 +155,7 @@ pub(crate) struct JsRuntimeState {
dyn_module_evaluate_idle_counter: u32,
pub(crate) js_error_create_fn: Rc<JsErrorCreateFn>,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
- pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
+ pub(crate) unrefed_ops: HashSet<i32>,
pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>,
@@ -371,7 +355,7 @@ impl JsRuntime {
js_wasm_streaming_cb: None,
js_error_create_fn,
pending_ops: FuturesUnordered::new(),
- pending_unref_ops: FuturesUnordered::new(),
+ unrefed_ops: HashSet::new(),
shared_array_buffer_store: options.shared_array_buffer_store,
compiled_wasm_module_store: options.compiled_wasm_module_store,
op_state: op_state.clone(),
@@ -801,7 +785,8 @@ impl JsRuntime {
let mut state = state_rc.borrow_mut();
let module_map = module_map_rc.borrow();
- let has_pending_ops = !state.pending_ops.is_empty();
+ let has_pending_refed_ops =
+ state.pending_ops.len() > state.unrefed_ops.len();
let has_pending_dyn_imports = module_map.has_pending_dynamic_imports();
let has_pending_dyn_module_evaluation =
!state.pending_dyn_mod_evaluate.is_empty();
@@ -815,7 +800,7 @@ impl JsRuntime {
.map(|i| i.has_active_sessions())
.unwrap_or(false);
- if !has_pending_ops
+ if !has_pending_refed_ops
&& !has_pending_dyn_imports
&& !has_pending_dyn_module_evaluation
&& !has_pending_module_evaluation
@@ -841,7 +826,7 @@ impl JsRuntime {
}
if has_pending_module_evaluation {
- if has_pending_ops
+ if has_pending_refed_ops
|| has_pending_dyn_imports
|| has_pending_dyn_module_evaluation
|| has_pending_background_tasks
@@ -854,7 +839,7 @@ impl JsRuntime {
}
if has_pending_dyn_module_evaluation {
- if has_pending_ops
+ if has_pending_refed_ops
|| has_pending_dyn_imports
|| has_pending_background_tasks
{
@@ -1529,21 +1514,12 @@ impl JsRuntime {
state.have_unpolled_ops = false;
let op_state = state.op_state.clone();
- let ops = AsyncOpIterator {
- ops: &mut state.pending_ops,
- cx,
- };
- for (promise_id, op_id, resp) in ops {
+
+ while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx)
+ {
+ let (promise_id, op_id, resp) = item;
op_state.borrow().tracker.track_async_completed(op_id);
- args.push(v8::Integer::new(scope, promise_id as i32).into());
- args.push(resp.to_v8(scope).unwrap());
- }
- let ops = AsyncOpIterator {
- ops: &mut state.pending_unref_ops,
- cx,
- };
- for (promise_id, op_id, resp) in ops {
- op_state.borrow().tracker.track_unref_completed(op_id);
+ state.unrefed_ops.remove(&promise_id);
args.push(v8::Integer::new(scope, promise_id as i32).into());
args.push(resp.to_v8(scope).unwrap());
}
@@ -1744,6 +1720,76 @@ pub mod tests {
}
#[test]
+ fn test_op_async_promise_id() {
+ let (mut runtime, _dispatch_count) = setup(Mode::Async);
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ const p = Deno.core.opAsync("op_test", 42);
+ if (p[Symbol.for("Deno.core.internalPromiseId")] == undefined) {
+ throw new Error("missing id on returned promise");
+ }
+ "#,
+ )
+ .unwrap();
+ }
+
+ #[test]
+ fn test_ref_unref_ops() {
+ let (mut runtime, _dispatch_count) = setup(Mode::Async);
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ var promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId");
+ var p1 = Deno.core.opAsync("op_test", 42);
+ var p2 = Deno.core.opAsync("op_test", 42);
+ "#,
+ )
+ .unwrap();
+ {
+ let isolate = runtime.v8_isolate();
+ let state_rc = JsRuntime::state(isolate);
+ let state = state_rc.borrow();
+ assert_eq!(state.pending_ops.len(), 2);
+ assert_eq!(state.unrefed_ops.len(), 0);
+ }
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ Deno.core.unrefOp(p1[promiseIdSymbol]);
+ Deno.core.unrefOp(p2[promiseIdSymbol]);
+ "#,
+ )
+ .unwrap();
+ {
+ let isolate = runtime.v8_isolate();
+ let state_rc = JsRuntime::state(isolate);
+ let state = state_rc.borrow();
+ assert_eq!(state.pending_ops.len(), 2);
+ assert_eq!(state.unrefed_ops.len(), 2);
+ }
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ Deno.core.refOp(p1[promiseIdSymbol]);
+ Deno.core.refOp(p2[promiseIdSymbol]);
+ "#,
+ )
+ .unwrap();
+ {
+ let isolate = runtime.v8_isolate();
+ let state_rc = JsRuntime::state(isolate);
+ let state = state_rc.borrow();
+ assert_eq!(state.pending_ops.len(), 2);
+ assert_eq!(state.unrefed_ops.len(), 0);
+ }
+ }
+
+ #[test]
fn test_dispatch_no_zero_copy_buf() {
let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false));
runtime