diff options
author | Inteon <42113979+inteon@users.noreply.github.com> | 2021-02-23 13:08:50 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-02-23 13:08:50 +0100 |
commit | dccf5e0c5c7f04409809104dd23472bcc058e170 (patch) | |
tree | 23bad9b434c45fd08315abef66f1fe16add14a44 /core/runtime.rs | |
parent | 2e24af23002b6d77543861bf9b2a6027e0357a93 (diff) |
refactor(core): Allow multiple overflown responses in single poll (#9433)
This commit rewrites "JsRuntime::poll" function to fix a corner case that
might caused "overflown_response" to be overwritten by other overflown response.
The logic has been changed to allow returning multiple overflown response
alongside responses from shared queue.
Diffstat (limited to 'core/runtime.rs')
-rw-r--r-- | core/runtime.rs | 129 |
1 files changed, 90 insertions, 39 deletions
diff --git a/core/runtime.rs b/core/runtime.rs index f7cc0fa99..67161d5e7 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -32,7 +32,6 @@ use futures::stream::StreamFuture; use futures::task::AtomicWaker; use futures::Future; use std::any::Any; -use std::cell::Cell; use std::cell::RefCell; use std::collections::HashMap; use std::convert::TryFrom; @@ -110,8 +109,7 @@ pub(crate) struct JsRuntimeState { pub(crate) shared: SharedQueue, pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>, pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>, - pub(crate) have_unpolled_ops: Cell<bool>, - //pub(crate) op_table: OpTable, + pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc<RefCell<OpState>>, pub loader: Rc<dyn ModuleLoader>, pub modules: Modules, @@ -287,7 +285,7 @@ impl JsRuntime { pending_ops: FuturesUnordered::new(), pending_unref_ops: FuturesUnordered::new(), op_state: Rc::new(RefCell::new(op_state)), - have_unpolled_ops: Cell::new(false), + have_unpolled_ops: false, modules: Modules::new(), loader, dyn_import_map: HashMap::new(), @@ -562,7 +560,7 @@ impl JsRuntime { // Check if more async ops have been dispatched // during this turn of event loop. - if state.have_unpolled_ops.get() { + if state.have_unpolled_ops { state.waker.wake(); } @@ -1346,18 +1344,16 @@ impl JsRuntime { self.mod_instantiate(root_id, None).map(|_| root_id) } - fn poll_pending_ops( - &mut self, - cx: &mut Context, - ) -> Option<(OpId, Box<[u8]>)> { + fn poll_pending_ops(&mut self, cx: &mut Context) -> Vec<(OpId, Box<[u8]>)> { let state_rc = Self::state(self.v8_isolate()); - let mut overflow_response: Option<(OpId, Box<[u8]>)> = None; + let mut overflow_response: Vec<(OpId, Box<[u8]>)> = Vec::new(); - loop { - let mut state = state_rc.borrow_mut(); - // Now handle actual ops. - state.have_unpolled_ops.set(false); + let mut state = state_rc.borrow_mut(); + + // Now handle actual ops. + state.have_unpolled_ops = false; + loop { let pending_r = state.pending_ops.poll_next_unpin(cx); match pending_r { Poll::Ready(None) => break, @@ -1365,31 +1361,21 @@ impl JsRuntime { Poll::Ready(Some((op_id, buf))) => { let successful_push = state.shared.push(op_id, &buf); if !successful_push { - // If we couldn't push the response to the shared queue, because - // there wasn't enough size, we will return the buffer via the - // legacy route, using the argument of deno_respond. - overflow_response = Some((op_id, buf)); - break; + overflow_response.push((op_id, buf)); } } }; } loop { - let mut state = state_rc.borrow_mut(); let unref_r = state.pending_unref_ops.poll_next_unpin(cx); - #[allow(clippy::match_wild_err_arm)] match unref_r { Poll::Ready(None) => break, Poll::Pending => break, Poll::Ready(Some((op_id, buf))) => { let successful_push = state.shared.push(op_id, &buf); if !successful_push { - // If we couldn't push the response to the shared queue, because - // there wasn't enough size, we will return the buffer via the - // legacy route, using the argument of deno_respond. - overflow_response = Some((op_id, buf)); - break; + overflow_response.push((op_id, buf)); } } }; @@ -1427,13 +1413,14 @@ impl JsRuntime { // Respond using shared queue and optionally overflown response fn async_op_response( &mut self, - maybe_overflown_response: Option<(OpId, Box<[u8]>)>, + overflown_responses: Vec<(OpId, Box<[u8]>)>, ) -> Result<(), AnyError> { let state_rc = Self::state(self.v8_isolate()); let shared_queue_size = state_rc.borrow().shared.size(); + let overflown_responses_size = overflown_responses.len(); - if shared_queue_size == 0 && maybe_overflown_response.is_none() { + if shared_queue_size == 0 && overflown_responses_size == 0 { return Ok(()); } @@ -1454,22 +1441,21 @@ impl JsRuntime { let tc_scope = &mut v8::TryCatch::new(scope); - if shared_queue_size > 0 { - js_recv_cb.call(tc_scope, global, &[]); + let mut args: Vec<v8::Local<v8::Value>> = + Vec::with_capacity(2 * overflown_responses_size); + for overflown_response in overflown_responses { + let (op_id, buf) = overflown_response; + args.push(v8::Integer::new(tc_scope, op_id as i32).into()); + args.push(bindings::boxed_slice_to_uint8array(tc_scope, buf).into()); + } + + if shared_queue_size > 0 || overflown_responses_size > 0 { + js_recv_cb.call(tc_scope, global, args.as_slice()); // The other side should have shifted off all the messages. let shared_queue_size = state_rc.borrow().shared.size(); assert_eq!(shared_queue_size, 0); } - if let Some(overflown_response) = maybe_overflown_response { - let (op_id, buf) = overflown_response; - let op_id: v8::Local<v8::Value> = - v8::Integer::new(tc_scope, op_id as i32).into(); - let ui8: v8::Local<v8::Value> = - bindings::boxed_slice_to_uint8array(tc_scope, buf).into(); - js_recv_cb.call(tc_scope, global, &[op_id, ui8]); - } - match tc_scope.exception() { None => Ok(()), Some(exception) => exception_to_err_result(tc_scope, exception, false), @@ -1925,6 +1911,71 @@ pub mod tests { } #[test] + fn overflow_res_async_combined_with_unref() { + run_in_task(|cx| { + let mut runtime = JsRuntime::new(Default::default()); + + runtime.register_op( + "test1", + |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op { + let mut vec = vec![0u8; 100 * 1024 * 1024]; + vec[0] = 4; + let buf = vec.into_boxed_slice(); + Op::Async(futures::future::ready(buf).boxed()) + }, + ); + + runtime.register_op( + "test2", + |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op { + let mut vec = vec![0u8; 100 * 1024 * 1024]; + vec[0] = 4; + let buf = vec.into_boxed_slice(); + Op::AsyncUnref(futures::future::ready(buf).boxed()) + }, + ); + + runtime + .execute( + "overflow_res_async_combined_with_unref.js", + r#" + function assert(cond) { + if (!cond) { + throw Error("assert"); + } + } + + let asyncRecv = 0; + Deno.core.setAsyncHandler(1, (buf) => { + assert(buf.byteLength === 100 * 1024 * 1024); + assert(buf[0] === 4); + asyncRecv++; + }); + Deno.core.setAsyncHandler(2, (buf) => { + assert(buf.byteLength === 100 * 1024 * 1024); + assert(buf[0] === 4); + asyncRecv++; + }); + let control = new Uint8Array(1); + let response1 = Deno.core.dispatch(1, control); + // Async messages always have null response. + assert(response1 == null); + assert(asyncRecv == 0); + let response2 = Deno.core.dispatch(2, control); + // Async messages always have null response. + assert(response2 == null); + assert(asyncRecv == 0); + "#, + ) + .unwrap(); + assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_)))); + runtime + .execute("check.js", "assert(asyncRecv == 2);") + .unwrap(); + }); + } + + #[test] fn overflow_res_async() { run_in_task(|_cx| { // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We |