summaryrefslogtreecommitdiff
path: root/core/runtime.rs
diff options
context:
space:
mode:
authorAaron O'Mullan <aaron.omullan@gmail.com>2021-10-24 21:41:57 +0200
committerGitHub <noreply@github.com>2021-10-24 21:41:57 +0200
commit53d38ad1e5e2e4ea4482f5bfaacded6b9a2e9f4d (patch)
tree4425a8703f2efbbe6f7067ab37474eb93e172e46 /core/runtime.rs
parent439a2914dbdd0b71a4af06cde402d80653d71d04 (diff)
cleanup(core): AsyncOpIterator (#11860)
Diffstat (limited to 'core/runtime.rs')
-rw-r--r--core/runtime.rs106
1 files changed, 49 insertions, 57 deletions
diff --git a/core/runtime.rs b/core/runtime.rs
index b0304884d..e453b453f 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -139,6 +139,23 @@ pub type SharedArrayBufferStore =
pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>;
+struct AsyncOpIterator<'a, 'b, 'c> {
+ ops: &'b mut FuturesUnordered<PendingOpFuture>,
+ cx: &'a mut Context<'c>,
+}
+
+impl Iterator for AsyncOpIterator<'_, '_, '_> {
+ type Item = (PromiseId, OpId, OpResult);
+
+ #[inline]
+ fn next(&mut self) -> Option<Self::Item> {
+ match self.ops.poll_next_unpin(self.cx) {
+ Poll::Ready(Some(item)) => Some(item),
+ _ => None,
+ }
+ }
+}
+
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@@ -759,8 +776,7 @@ impl JsRuntime {
// Ops
{
- let async_responses = self.poll_pending_ops(cx);
- self.async_op_response(async_responses)?;
+ self.resolve_async_ops(cx)?;
self.drain_macrotasks()?;
self.check_promise_exceptions()?;
}
@@ -1458,45 +1474,6 @@ impl JsRuntime {
Ok(root_id)
}
- fn poll_pending_ops(
- &mut self,
- cx: &mut Context,
- ) -> Vec<(PromiseId, OpResult)> {
- let state_rc = Self::state(self.v8_isolate());
- let mut async_responses: Vec<(PromiseId, OpResult)> = Vec::new();
-
- let mut state = state_rc.borrow_mut();
-
- // Now handle actual ops.
- state.have_unpolled_ops = false;
-
- loop {
- let pending_r = state.pending_ops.poll_next_unpin(cx);
- match pending_r {
- Poll::Ready(None) => break,
- Poll::Pending => break,
- Poll::Ready(Some((promise_id, op_id, resp))) => {
- state.op_state.borrow().tracker.track_async_completed(op_id);
- async_responses.push((promise_id, resp));
- }
- };
- }
-
- loop {
- let unref_r = state.pending_unref_ops.poll_next_unpin(cx);
- match unref_r {
- Poll::Ready(None) => break,
- Poll::Pending => break,
- Poll::Ready(Some((promise_id, op_id, resp))) => {
- state.op_state.borrow().tracker.track_unref_completed(op_id);
- async_responses.push((promise_id, resp));
- }
- };
- }
-
- async_responses
- }
-
fn check_promise_exceptions(&mut self) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
let mut state = state_rc.borrow_mut();
@@ -1522,19 +1499,10 @@ impl JsRuntime {
}
// Send finished responses to JS
- fn async_op_response(
- &mut self,
- async_responses: Vec<(PromiseId, OpResult)>,
- ) -> Result<(), AnyError> {
+ fn resolve_async_ops(&mut self, cx: &mut Context) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
- let async_responses_size = async_responses.len();
- if async_responses_size == 0 {
- return Ok(());
- }
-
let js_recv_cb_handle = state_rc.borrow().js_recv_cb.clone().unwrap();
-
let scope = &mut self.handle_scope();
// We return async responses to JS in unbounded batches (may change),
@@ -1544,12 +1512,36 @@ impl JsRuntime {
// which contains a value OR an error, encoded as a tuple.
// This batch is received in JS via the special `arguments` variable
// and then each tuple is used to resolve or reject promises
- let mut args: Vec<v8::Local<v8::Value>> =
- Vec::with_capacity(2 * async_responses_size);
- for overflown_response in async_responses {
- let (promise_id, resp) = overflown_response;
- args.push(v8::Integer::new(scope, promise_id as i32).into());
- args.push(resp.to_v8(scope).unwrap());
+ let mut args: Vec<v8::Local<v8::Value>> = vec![];
+
+ // Now handle actual ops.
+ {
+ let mut state = state_rc.borrow_mut();
+ state.have_unpolled_ops = false;
+
+ let op_state = state.op_state.clone();
+ let ops = AsyncOpIterator {
+ ops: &mut state.pending_ops,
+ cx,
+ };
+ for (promise_id, op_id, resp) in ops {
+ op_state.borrow().tracker.track_async_completed(op_id);
+ args.push(v8::Integer::new(scope, promise_id as i32).into());
+ args.push(resp.to_v8(scope).unwrap());
+ }
+ let ops = AsyncOpIterator {
+ ops: &mut state.pending_unref_ops,
+ cx,
+ };
+ for (promise_id, op_id, resp) in ops {
+ op_state.borrow().tracker.track_unref_completed(op_id);
+ args.push(v8::Integer::new(scope, promise_id as i32).into());
+ args.push(resp.to_v8(scope).unwrap());
+ }
+ }
+
+ if args.is_empty() {
+ return Ok(());
}
let tc_scope = &mut v8::TryCatch::new(scope);