summaryrefslogtreecommitdiff
path: root/core/runtime.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/runtime.rs')
-rw-r--r--core/runtime.rs129
1 files changed, 90 insertions, 39 deletions
diff --git a/core/runtime.rs b/core/runtime.rs
index f7cc0fa99..67161d5e7 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -32,7 +32,6 @@ use futures::stream::StreamFuture;
use futures::task::AtomicWaker;
use futures::Future;
use std::any::Any;
-use std::cell::Cell;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::TryFrom;
@@ -110,8 +109,7 @@ pub(crate) struct JsRuntimeState {
pub(crate) shared: SharedQueue,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
- pub(crate) have_unpolled_ops: Cell<bool>,
- //pub(crate) op_table: OpTable,
+ pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub loader: Rc<dyn ModuleLoader>,
pub modules: Modules,
@@ -287,7 +285,7 @@ impl JsRuntime {
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
op_state: Rc::new(RefCell::new(op_state)),
- have_unpolled_ops: Cell::new(false),
+ have_unpolled_ops: false,
modules: Modules::new(),
loader,
dyn_import_map: HashMap::new(),
@@ -562,7 +560,7 @@ impl JsRuntime {
// Check if more async ops have been dispatched
// during this turn of event loop.
- if state.have_unpolled_ops.get() {
+ if state.have_unpolled_ops {
state.waker.wake();
}
@@ -1346,18 +1344,16 @@ impl JsRuntime {
self.mod_instantiate(root_id, None).map(|_| root_id)
}
- fn poll_pending_ops(
- &mut self,
- cx: &mut Context,
- ) -> Option<(OpId, Box<[u8]>)> {
+ fn poll_pending_ops(&mut self, cx: &mut Context) -> Vec<(OpId, Box<[u8]>)> {
let state_rc = Self::state(self.v8_isolate());
- let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
+ let mut overflow_response: Vec<(OpId, Box<[u8]>)> = Vec::new();
- loop {
- let mut state = state_rc.borrow_mut();
- // Now handle actual ops.
- state.have_unpolled_ops.set(false);
+ let mut state = state_rc.borrow_mut();
+
+ // Now handle actual ops.
+ state.have_unpolled_ops = false;
+ loop {
let pending_r = state.pending_ops.poll_next_unpin(cx);
match pending_r {
Poll::Ready(None) => break,
@@ -1365,31 +1361,21 @@ impl JsRuntime {
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
- // If we couldn't push the response to the shared queue, because
- // there wasn't enough size, we will return the buffer via the
- // legacy route, using the argument of deno_respond.
- overflow_response = Some((op_id, buf));
- break;
+ overflow_response.push((op_id, buf));
}
}
};
}
loop {
- let mut state = state_rc.borrow_mut();
let unref_r = state.pending_unref_ops.poll_next_unpin(cx);
- #[allow(clippy::match_wild_err_arm)]
match unref_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
- // If we couldn't push the response to the shared queue, because
- // there wasn't enough size, we will return the buffer via the
- // legacy route, using the argument of deno_respond.
- overflow_response = Some((op_id, buf));
- break;
+ overflow_response.push((op_id, buf));
}
}
};
@@ -1427,13 +1413,14 @@ impl JsRuntime {
// Respond using shared queue and optionally overflown response
fn async_op_response(
&mut self,
- maybe_overflown_response: Option<(OpId, Box<[u8]>)>,
+ overflown_responses: Vec<(OpId, Box<[u8]>)>,
) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
let shared_queue_size = state_rc.borrow().shared.size();
+ let overflown_responses_size = overflown_responses.len();
- if shared_queue_size == 0 && maybe_overflown_response.is_none() {
+ if shared_queue_size == 0 && overflown_responses_size == 0 {
return Ok(());
}
@@ -1454,22 +1441,21 @@ impl JsRuntime {
let tc_scope = &mut v8::TryCatch::new(scope);
- if shared_queue_size > 0 {
- js_recv_cb.call(tc_scope, global, &[]);
+ let mut args: Vec<v8::Local<v8::Value>> =
+ Vec::with_capacity(2 * overflown_responses_size);
+ for overflown_response in overflown_responses {
+ let (op_id, buf) = overflown_response;
+ args.push(v8::Integer::new(tc_scope, op_id as i32).into());
+ args.push(bindings::boxed_slice_to_uint8array(tc_scope, buf).into());
+ }
+
+ if shared_queue_size > 0 || overflown_responses_size > 0 {
+ js_recv_cb.call(tc_scope, global, args.as_slice());
// The other side should have shifted off all the messages.
let shared_queue_size = state_rc.borrow().shared.size();
assert_eq!(shared_queue_size, 0);
}
- if let Some(overflown_response) = maybe_overflown_response {
- let (op_id, buf) = overflown_response;
- let op_id: v8::Local<v8::Value> =
- v8::Integer::new(tc_scope, op_id as i32).into();
- let ui8: v8::Local<v8::Value> =
- bindings::boxed_slice_to_uint8array(tc_scope, buf).into();
- js_recv_cb.call(tc_scope, global, &[op_id, ui8]);
- }
-
match tc_scope.exception() {
None => Ok(()),
Some(exception) => exception_to_err_result(tc_scope, exception, false),
@@ -1925,6 +1911,71 @@ pub mod tests {
}
#[test]
+ fn overflow_res_async_combined_with_unref() {
+ run_in_task(|cx| {
+ let mut runtime = JsRuntime::new(Default::default());
+
+ runtime.register_op(
+ "test1",
+ |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
+ let mut vec = vec![0u8; 100 * 1024 * 1024];
+ vec[0] = 4;
+ let buf = vec.into_boxed_slice();
+ Op::Async(futures::future::ready(buf).boxed())
+ },
+ );
+
+ runtime.register_op(
+ "test2",
+ |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
+ let mut vec = vec![0u8; 100 * 1024 * 1024];
+ vec[0] = 4;
+ let buf = vec.into_boxed_slice();
+ Op::AsyncUnref(futures::future::ready(buf).boxed())
+ },
+ );
+
+ runtime
+ .execute(
+ "overflow_res_async_combined_with_unref.js",
+ r#"
+ function assert(cond) {
+ if (!cond) {
+ throw Error("assert");
+ }
+ }
+
+ let asyncRecv = 0;
+ Deno.core.setAsyncHandler(1, (buf) => {
+ assert(buf.byteLength === 100 * 1024 * 1024);
+ assert(buf[0] === 4);
+ asyncRecv++;
+ });
+ Deno.core.setAsyncHandler(2, (buf) => {
+ assert(buf.byteLength === 100 * 1024 * 1024);
+ assert(buf[0] === 4);
+ asyncRecv++;
+ });
+ let control = new Uint8Array(1);
+ let response1 = Deno.core.dispatch(1, control);
+ // Async messages always have null response.
+ assert(response1 == null);
+ assert(asyncRecv == 0);
+ let response2 = Deno.core.dispatch(2, control);
+ // Async messages always have null response.
+ assert(response2 == null);
+ assert(asyncRecv == 0);
+ "#,
+ )
+ .unwrap();
+ assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
+ runtime
+ .execute("check.js", "assert(asyncRecv == 2);")
+ .unwrap();
+ });
+ }
+
+ #[test]
fn overflow_res_async() {
run_in_task(|_cx| {
// TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We