diff options
Diffstat (limited to 'core/runtime.rs')
-rw-r--r-- | core/runtime.rs | 120 |
1 files changed, 83 insertions, 37 deletions
diff --git a/core/runtime.rs b/core/runtime.rs index 3af090a1c..ad7f16886 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -29,6 +29,7 @@ use futures::task::AtomicWaker; use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; +use std::collections::HashSet; use std::ffi::c_void; use std::mem::forget; use std::option::Option; @@ -135,23 +136,6 @@ pub type SharedArrayBufferStore = pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>; -struct AsyncOpIterator<'a, 'b, 'c> { - ops: &'b mut FuturesUnordered<PendingOpFuture>, - cx: &'a mut Context<'c>, -} - -impl Iterator for AsyncOpIterator<'_, '_, '_> { - type Item = (PromiseId, OpId, OpResult); - - #[inline] - fn next(&mut self) -> Option<Self::Item> { - match self.ops.poll_next_unpin(self.cx) { - Poll::Ready(Some(item)) => Some(item), - _ => None, - } - } -} - /// Internal state for JsRuntime which is stored in one of v8::Isolate's /// embedder slots. pub(crate) struct JsRuntimeState { @@ -171,7 +155,7 @@ pub(crate) struct JsRuntimeState { dyn_module_evaluate_idle_counter: u32, pub(crate) js_error_create_fn: Rc<JsErrorCreateFn>, pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>, - pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>, + pub(crate) unrefed_ops: HashSet<i32>, pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc<RefCell<OpState>>, pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>, @@ -371,7 +355,7 @@ impl JsRuntime { js_wasm_streaming_cb: None, js_error_create_fn, pending_ops: FuturesUnordered::new(), - pending_unref_ops: FuturesUnordered::new(), + unrefed_ops: HashSet::new(), shared_array_buffer_store: options.shared_array_buffer_store, compiled_wasm_module_store: options.compiled_wasm_module_store, op_state: op_state.clone(), @@ -801,7 +785,8 @@ impl JsRuntime { let mut state = state_rc.borrow_mut(); let module_map = module_map_rc.borrow(); - let has_pending_ops = !state.pending_ops.is_empty(); + let has_pending_refed_ops = + state.pending_ops.len() > state.unrefed_ops.len(); let has_pending_dyn_imports = module_map.has_pending_dynamic_imports(); let has_pending_dyn_module_evaluation = !state.pending_dyn_mod_evaluate.is_empty(); @@ -815,7 +800,7 @@ impl JsRuntime { .map(|i| i.has_active_sessions()) .unwrap_or(false); - if !has_pending_ops + if !has_pending_refed_ops && !has_pending_dyn_imports && !has_pending_dyn_module_evaluation && !has_pending_module_evaluation @@ -841,7 +826,7 @@ impl JsRuntime { } if has_pending_module_evaluation { - if has_pending_ops + if has_pending_refed_ops || has_pending_dyn_imports || has_pending_dyn_module_evaluation || has_pending_background_tasks @@ -854,7 +839,7 @@ impl JsRuntime { } if has_pending_dyn_module_evaluation { - if has_pending_ops + if has_pending_refed_ops || has_pending_dyn_imports || has_pending_background_tasks { @@ -1529,21 +1514,12 @@ impl JsRuntime { state.have_unpolled_ops = false; let op_state = state.op_state.clone(); - let ops = AsyncOpIterator { - ops: &mut state.pending_ops, - cx, - }; - for (promise_id, op_id, resp) in ops { + + while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx) + { + let (promise_id, op_id, resp) = item; op_state.borrow().tracker.track_async_completed(op_id); - args.push(v8::Integer::new(scope, promise_id as i32).into()); - args.push(resp.to_v8(scope).unwrap()); - } - let ops = AsyncOpIterator { - ops: &mut state.pending_unref_ops, - cx, - }; - for (promise_id, op_id, resp) in ops { - op_state.borrow().tracker.track_unref_completed(op_id); + state.unrefed_ops.remove(&promise_id); args.push(v8::Integer::new(scope, promise_id as i32).into()); args.push(resp.to_v8(scope).unwrap()); } @@ -1744,6 +1720,76 @@ pub mod tests { } #[test] + fn test_op_async_promise_id() { + let (mut runtime, _dispatch_count) = setup(Mode::Async); + runtime + .execute_script( + "filename.js", + r#" + const p = Deno.core.opAsync("op_test", 42); + if (p[Symbol.for("Deno.core.internalPromiseId")] == undefined) { + throw new Error("missing id on returned promise"); + } + "#, + ) + .unwrap(); + } + + #[test] + fn test_ref_unref_ops() { + let (mut runtime, _dispatch_count) = setup(Mode::Async); + runtime + .execute_script( + "filename.js", + r#" + var promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId"); + var p1 = Deno.core.opAsync("op_test", 42); + var p2 = Deno.core.opAsync("op_test", 42); + "#, + ) + .unwrap(); + { + let isolate = runtime.v8_isolate(); + let state_rc = JsRuntime::state(isolate); + let state = state_rc.borrow(); + assert_eq!(state.pending_ops.len(), 2); + assert_eq!(state.unrefed_ops.len(), 0); + } + runtime + .execute_script( + "filename.js", + r#" + Deno.core.unrefOp(p1[promiseIdSymbol]); + Deno.core.unrefOp(p2[promiseIdSymbol]); + "#, + ) + .unwrap(); + { + let isolate = runtime.v8_isolate(); + let state_rc = JsRuntime::state(isolate); + let state = state_rc.borrow(); + assert_eq!(state.pending_ops.len(), 2); + assert_eq!(state.unrefed_ops.len(), 2); + } + runtime + .execute_script( + "filename.js", + r#" + Deno.core.refOp(p1[promiseIdSymbol]); + Deno.core.refOp(p2[promiseIdSymbol]); + "#, + ) + .unwrap(); + { + let isolate = runtime.v8_isolate(); + let state_rc = JsRuntime::state(isolate); + let state = state_rc.borrow(); + assert_eq!(state.pending_ops.len(), 2); + assert_eq!(state.unrefed_ops.len(), 0); + } + } + + #[test] fn test_dispatch_no_zero_copy_buf() { let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false)); runtime |