diff options
author | Aaron O'Mullan <aaron.omullan@gmail.com> | 2021-10-24 21:41:57 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-24 21:41:57 +0200 |
commit | 53d38ad1e5e2e4ea4482f5bfaacded6b9a2e9f4d (patch) | |
tree | 4425a8703f2efbbe6f7067ab37474eb93e172e46 /core/runtime.rs | |
parent | 439a2914dbdd0b71a4af06cde402d80653d71d04 (diff) |
cleanup(core): AsyncOpIterator (#11860)
Diffstat (limited to 'core/runtime.rs')
-rw-r--r-- | core/runtime.rs | 106 |
1 files changed, 49 insertions, 57 deletions
diff --git a/core/runtime.rs b/core/runtime.rs index b0304884d..e453b453f 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -139,6 +139,23 @@ pub type SharedArrayBufferStore = pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>; +struct AsyncOpIterator<'a, 'b, 'c> { + ops: &'b mut FuturesUnordered<PendingOpFuture>, + cx: &'a mut Context<'c>, +} + +impl Iterator for AsyncOpIterator<'_, '_, '_> { + type Item = (PromiseId, OpId, OpResult); + + #[inline] + fn next(&mut self) -> Option<Self::Item> { + match self.ops.poll_next_unpin(self.cx) { + Poll::Ready(Some(item)) => Some(item), + _ => None, + } + } +} + /// Internal state for JsRuntime which is stored in one of v8::Isolate's /// embedder slots. pub(crate) struct JsRuntimeState { @@ -759,8 +776,7 @@ impl JsRuntime { // Ops { - let async_responses = self.poll_pending_ops(cx); - self.async_op_response(async_responses)?; + self.resolve_async_ops(cx)?; self.drain_macrotasks()?; self.check_promise_exceptions()?; } @@ -1458,45 +1474,6 @@ impl JsRuntime { Ok(root_id) } - fn poll_pending_ops( - &mut self, - cx: &mut Context, - ) -> Vec<(PromiseId, OpResult)> { - let state_rc = Self::state(self.v8_isolate()); - let mut async_responses: Vec<(PromiseId, OpResult)> = Vec::new(); - - let mut state = state_rc.borrow_mut(); - - // Now handle actual ops. - state.have_unpolled_ops = false; - - loop { - let pending_r = state.pending_ops.poll_next_unpin(cx); - match pending_r { - Poll::Ready(None) => break, - Poll::Pending => break, - Poll::Ready(Some((promise_id, op_id, resp))) => { - state.op_state.borrow().tracker.track_async_completed(op_id); - async_responses.push((promise_id, resp)); - } - }; - } - - loop { - let unref_r = state.pending_unref_ops.poll_next_unpin(cx); - match unref_r { - Poll::Ready(None) => break, - Poll::Pending => break, - Poll::Ready(Some((promise_id, op_id, resp))) => { - state.op_state.borrow().tracker.track_unref_completed(op_id); - async_responses.push((promise_id, resp)); - } - }; - } - - async_responses - } - fn check_promise_exceptions(&mut self) -> Result<(), AnyError> { let state_rc = Self::state(self.v8_isolate()); let mut state = state_rc.borrow_mut(); @@ -1522,19 +1499,10 @@ impl JsRuntime { } // Send finished responses to JS - fn async_op_response( - &mut self, - async_responses: Vec<(PromiseId, OpResult)>, - ) -> Result<(), AnyError> { + fn resolve_async_ops(&mut self, cx: &mut Context) -> Result<(), AnyError> { let state_rc = Self::state(self.v8_isolate()); - let async_responses_size = async_responses.len(); - if async_responses_size == 0 { - return Ok(()); - } - let js_recv_cb_handle = state_rc.borrow().js_recv_cb.clone().unwrap(); - let scope = &mut self.handle_scope(); // We return async responses to JS in unbounded batches (may change), @@ -1544,12 +1512,36 @@ impl JsRuntime { // which contains a value OR an error, encoded as a tuple. // This batch is received in JS via the special `arguments` variable // and then each tuple is used to resolve or reject promises - let mut args: Vec<v8::Local<v8::Value>> = - Vec::with_capacity(2 * async_responses_size); - for overflown_response in async_responses { - let (promise_id, resp) = overflown_response; - args.push(v8::Integer::new(scope, promise_id as i32).into()); - args.push(resp.to_v8(scope).unwrap()); + let mut args: Vec<v8::Local<v8::Value>> = vec![]; + + // Now handle actual ops. + { + let mut state = state_rc.borrow_mut(); + state.have_unpolled_ops = false; + + let op_state = state.op_state.clone(); + let ops = AsyncOpIterator { + ops: &mut state.pending_ops, + cx, + }; + for (promise_id, op_id, resp) in ops { + op_state.borrow().tracker.track_async_completed(op_id); + args.push(v8::Integer::new(scope, promise_id as i32).into()); + args.push(resp.to_v8(scope).unwrap()); + } + let ops = AsyncOpIterator { + ops: &mut state.pending_unref_ops, + cx, + }; + for (promise_id, op_id, resp) in ops { + op_state.borrow().tracker.track_unref_completed(op_id); + args.push(v8::Integer::new(scope, promise_id as i32).into()); + args.push(resp.to_v8(scope).unwrap()); + } + } + + if args.is_empty() { + return Ok(()); } let tc_scope = &mut v8::TryCatch::new(scope); |