summaryrefslogtreecommitdiff
path: root/core/runtime.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/runtime.rs')
-rw-r--r--core/runtime.rs120
1 files changed, 83 insertions, 37 deletions
diff --git a/core/runtime.rs b/core/runtime.rs
index 3af090a1c..ad7f16886 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -29,6 +29,7 @@ use futures::task::AtomicWaker;
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
+use std::collections::HashSet;
use std::ffi::c_void;
use std::mem::forget;
use std::option::Option;
@@ -135,23 +136,6 @@ pub type SharedArrayBufferStore =
pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>;
-struct AsyncOpIterator<'a, 'b, 'c> {
- ops: &'b mut FuturesUnordered<PendingOpFuture>,
- cx: &'a mut Context<'c>,
-}
-
-impl Iterator for AsyncOpIterator<'_, '_, '_> {
- type Item = (PromiseId, OpId, OpResult);
-
- #[inline]
- fn next(&mut self) -> Option<Self::Item> {
- match self.ops.poll_next_unpin(self.cx) {
- Poll::Ready(Some(item)) => Some(item),
- _ => None,
- }
- }
-}
-
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@@ -171,7 +155,7 @@ pub(crate) struct JsRuntimeState {
dyn_module_evaluate_idle_counter: u32,
pub(crate) js_error_create_fn: Rc<JsErrorCreateFn>,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
- pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
+ pub(crate) unrefed_ops: HashSet<i32>,
pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>,
@@ -371,7 +355,7 @@ impl JsRuntime {
js_wasm_streaming_cb: None,
js_error_create_fn,
pending_ops: FuturesUnordered::new(),
- pending_unref_ops: FuturesUnordered::new(),
+ unrefed_ops: HashSet::new(),
shared_array_buffer_store: options.shared_array_buffer_store,
compiled_wasm_module_store: options.compiled_wasm_module_store,
op_state: op_state.clone(),
@@ -801,7 +785,8 @@ impl JsRuntime {
let mut state = state_rc.borrow_mut();
let module_map = module_map_rc.borrow();
- let has_pending_ops = !state.pending_ops.is_empty();
+ let has_pending_refed_ops =
+ state.pending_ops.len() > state.unrefed_ops.len();
let has_pending_dyn_imports = module_map.has_pending_dynamic_imports();
let has_pending_dyn_module_evaluation =
!state.pending_dyn_mod_evaluate.is_empty();
@@ -815,7 +800,7 @@ impl JsRuntime {
.map(|i| i.has_active_sessions())
.unwrap_or(false);
- if !has_pending_ops
+ if !has_pending_refed_ops
&& !has_pending_dyn_imports
&& !has_pending_dyn_module_evaluation
&& !has_pending_module_evaluation
@@ -841,7 +826,7 @@ impl JsRuntime {
}
if has_pending_module_evaluation {
- if has_pending_ops
+ if has_pending_refed_ops
|| has_pending_dyn_imports
|| has_pending_dyn_module_evaluation
|| has_pending_background_tasks
@@ -854,7 +839,7 @@ impl JsRuntime {
}
if has_pending_dyn_module_evaluation {
- if has_pending_ops
+ if has_pending_refed_ops
|| has_pending_dyn_imports
|| has_pending_background_tasks
{
@@ -1529,21 +1514,12 @@ impl JsRuntime {
state.have_unpolled_ops = false;
let op_state = state.op_state.clone();
- let ops = AsyncOpIterator {
- ops: &mut state.pending_ops,
- cx,
- };
- for (promise_id, op_id, resp) in ops {
+
+ while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx)
+ {
+ let (promise_id, op_id, resp) = item;
op_state.borrow().tracker.track_async_completed(op_id);
- args.push(v8::Integer::new(scope, promise_id as i32).into());
- args.push(resp.to_v8(scope).unwrap());
- }
- let ops = AsyncOpIterator {
- ops: &mut state.pending_unref_ops,
- cx,
- };
- for (promise_id, op_id, resp) in ops {
- op_state.borrow().tracker.track_unref_completed(op_id);
+ state.unrefed_ops.remove(&promise_id);
args.push(v8::Integer::new(scope, promise_id as i32).into());
args.push(resp.to_v8(scope).unwrap());
}
@@ -1744,6 +1720,76 @@ pub mod tests {
}
#[test]
+ fn test_op_async_promise_id() {
+ let (mut runtime, _dispatch_count) = setup(Mode::Async);
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ const p = Deno.core.opAsync("op_test", 42);
+ if (p[Symbol.for("Deno.core.internalPromiseId")] == undefined) {
+ throw new Error("missing id on returned promise");
+ }
+ "#,
+ )
+ .unwrap();
+ }
+
+ #[test]
+ fn test_ref_unref_ops() {
+ let (mut runtime, _dispatch_count) = setup(Mode::Async);
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ var promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId");
+ var p1 = Deno.core.opAsync("op_test", 42);
+ var p2 = Deno.core.opAsync("op_test", 42);
+ "#,
+ )
+ .unwrap();
+ {
+ let isolate = runtime.v8_isolate();
+ let state_rc = JsRuntime::state(isolate);
+ let state = state_rc.borrow();
+ assert_eq!(state.pending_ops.len(), 2);
+ assert_eq!(state.unrefed_ops.len(), 0);
+ }
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ Deno.core.unrefOp(p1[promiseIdSymbol]);
+ Deno.core.unrefOp(p2[promiseIdSymbol]);
+ "#,
+ )
+ .unwrap();
+ {
+ let isolate = runtime.v8_isolate();
+ let state_rc = JsRuntime::state(isolate);
+ let state = state_rc.borrow();
+ assert_eq!(state.pending_ops.len(), 2);
+ assert_eq!(state.unrefed_ops.len(), 2);
+ }
+ runtime
+ .execute_script(
+ "filename.js",
+ r#"
+ Deno.core.refOp(p1[promiseIdSymbol]);
+ Deno.core.refOp(p2[promiseIdSymbol]);
+ "#,
+ )
+ .unwrap();
+ {
+ let isolate = runtime.v8_isolate();
+ let state_rc = JsRuntime::state(isolate);
+ let state = state_rc.borrow();
+ assert_eq!(state.pending_ops.len(), 2);
+ assert_eq!(state.unrefed_ops.len(), 0);
+ }
+ }
+
+ #[test]
fn test_dispatch_no_zero_copy_buf() {
let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false));
runtime