summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cli/bench/main.rs9
-rw-r--r--cli/tests/workers/large_message_worker.js14
-rw-r--r--cli/tests/workers_large_message_bench.ts35
-rw-r--r--core/bindings.rs6
-rw-r--r--core/core.js11
-rw-r--r--core/runtime.rs129
6 files changed, 156 insertions, 48 deletions
diff --git a/cli/bench/main.rs b/cli/bench/main.rs
index b69bbc7ac..c6f786e31 100644
--- a/cli/bench/main.rs
+++ b/cli/bench/main.rs
@@ -68,6 +68,15 @@ const EXEC_TIME_BENCHMARKS: &[(&str, &[&str], Option<i32>)] = &[
None,
),
(
+ "workers_large_message",
+ &[
+ "run",
+ "--allow-read",
+ "cli/tests/workers_large_message_bench.ts",
+ ],
+ None,
+ ),
+ (
"text_decoder",
&["run", "cli/tests/text_decoder_perf.js"],
None,
diff --git a/cli/tests/workers/large_message_worker.js b/cli/tests/workers/large_message_worker.js
new file mode 100644
index 000000000..f7b7da8a0
--- /dev/null
+++ b/cli/tests/workers/large_message_worker.js
@@ -0,0 +1,14 @@
+// Copyright 2020 the Deno authors. All rights reserved. MIT license.
+
+const dataSmall = "";
+const dataLarge = "x".repeat(10 * 1024);
+
+onmessage = function (e) {
+ for (let i = 0; i <= 10; i++) {
+ if (i % 2 == 0) {
+ postMessage(dataLarge);
+ } else {
+ postMessage(dataSmall);
+ }
+ }
+};
diff --git a/cli/tests/workers_large_message_bench.ts b/cli/tests/workers_large_message_bench.ts
new file mode 100644
index 000000000..9cda5a40d
--- /dev/null
+++ b/cli/tests/workers_large_message_bench.ts
@@ -0,0 +1,35 @@
+// Copyright 2020 the Deno authors. All rights reserved. MIT license.
+
+// deno-lint-ignore-file
+
+import { deferred } from "../../test_util/std/async/deferred.ts";
+
+function oneWorker(i: any): Promise<void> {
+ return new Promise<void>((resolve) => {
+ let countDown = 10;
+ const worker = new Worker(
+ new URL("workers/large_message_worker.js", import.meta.url).href,
+ { type: "module" },
+ );
+ worker.onmessage = (e): void => {
+ if (countDown > 0) {
+ countDown--;
+ return;
+ }
+ worker.terminate();
+ resolve();
+ };
+ worker.postMessage("hi " + i);
+ });
+}
+
+function bench(): Promise<any> {
+ let promises = [];
+ for (let i = 0; i < 50; i++) {
+ promises.push(oneWorker(i));
+ }
+
+ return Promise.all(promises);
+}
+
+bench();
diff --git a/core/bindings.rs b/core/bindings.rs
index bb5589080..157b58a9d 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -368,7 +368,7 @@ fn send<'s>(
mut rv: v8::ReturnValue,
) {
let state_rc = JsRuntime::state(scope);
- let state = state_rc.borrow_mut();
+ let mut state = state_rc.borrow_mut();
let op_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
.map_err(AnyError::from)
@@ -412,12 +412,12 @@ fn send<'s>(
Op::Async(fut) => {
let fut2 = fut.map(move |buf| (op_id, buf));
state.pending_ops.push(fut2.boxed_local());
- state.have_unpolled_ops.set(true);
+ state.have_unpolled_ops = true;
}
Op::AsyncUnref(fut) => {
let fut2 = fut.map(move |buf| (op_id, buf));
state.pending_unref_ops.push(fut2.boxed_local());
- state.have_unpolled_ops.set(true);
+ state.have_unpolled_ops = true;
}
Op::NotFound => {
let msg = format!("Unknown op id: {}", op_id);
diff --git a/core/core.js b/core/core.js
index a96ce81d7..bda6739a2 100644
--- a/core/core.js
+++ b/core/core.js
@@ -155,12 +155,7 @@ SharedQueue Binary Layout
asyncHandlers[opId] = cb;
}
- function handleAsyncMsgFromRust(opId, buf) {
- if (buf) {
- // This is the overflow_response case of deno::JsRuntime::poll().
- asyncHandlers[opId](buf);
- return;
- }
+ function handleAsyncMsgFromRust() {
while (true) {
const opIdBuf = shift();
if (opIdBuf == null) {
@@ -169,6 +164,10 @@ SharedQueue Binary Layout
assert(asyncHandlers[opIdBuf[0]] != null);
asyncHandlers[opIdBuf[0]](opIdBuf[1]);
}
+
+ for (let i = 0; i < arguments.length; i += 2) {
+ asyncHandlers[arguments[i]](arguments[i + 1]);
+ }
}
function dispatch(opName, control, ...zeroCopy) {
diff --git a/core/runtime.rs b/core/runtime.rs
index f7cc0fa99..67161d5e7 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -32,7 +32,6 @@ use futures::stream::StreamFuture;
use futures::task::AtomicWaker;
use futures::Future;
use std::any::Any;
-use std::cell::Cell;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::TryFrom;
@@ -110,8 +109,7 @@ pub(crate) struct JsRuntimeState {
pub(crate) shared: SharedQueue,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
- pub(crate) have_unpolled_ops: Cell<bool>,
- //pub(crate) op_table: OpTable,
+ pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub loader: Rc<dyn ModuleLoader>,
pub modules: Modules,
@@ -287,7 +285,7 @@ impl JsRuntime {
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
op_state: Rc::new(RefCell::new(op_state)),
- have_unpolled_ops: Cell::new(false),
+ have_unpolled_ops: false,
modules: Modules::new(),
loader,
dyn_import_map: HashMap::new(),
@@ -562,7 +560,7 @@ impl JsRuntime {
// Check if more async ops have been dispatched
// during this turn of event loop.
- if state.have_unpolled_ops.get() {
+ if state.have_unpolled_ops {
state.waker.wake();
}
@@ -1346,18 +1344,16 @@ impl JsRuntime {
self.mod_instantiate(root_id, None).map(|_| root_id)
}
- fn poll_pending_ops(
- &mut self,
- cx: &mut Context,
- ) -> Option<(OpId, Box<[u8]>)> {
+ fn poll_pending_ops(&mut self, cx: &mut Context) -> Vec<(OpId, Box<[u8]>)> {
let state_rc = Self::state(self.v8_isolate());
- let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
+ let mut overflow_response: Vec<(OpId, Box<[u8]>)> = Vec::new();
- loop {
- let mut state = state_rc.borrow_mut();
- // Now handle actual ops.
- state.have_unpolled_ops.set(false);
+ let mut state = state_rc.borrow_mut();
+
+ // Now handle actual ops.
+ state.have_unpolled_ops = false;
+ loop {
let pending_r = state.pending_ops.poll_next_unpin(cx);
match pending_r {
Poll::Ready(None) => break,
@@ -1365,31 +1361,21 @@ impl JsRuntime {
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
- // If we couldn't push the response to the shared queue, because
- // there wasn't enough size, we will return the buffer via the
- // legacy route, using the argument of deno_respond.
- overflow_response = Some((op_id, buf));
- break;
+ overflow_response.push((op_id, buf));
}
}
};
}
loop {
- let mut state = state_rc.borrow_mut();
let unref_r = state.pending_unref_ops.poll_next_unpin(cx);
- #[allow(clippy::match_wild_err_arm)]
match unref_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
- // If we couldn't push the response to the shared queue, because
- // there wasn't enough size, we will return the buffer via the
- // legacy route, using the argument of deno_respond.
- overflow_response = Some((op_id, buf));
- break;
+ overflow_response.push((op_id, buf));
}
}
};
@@ -1427,13 +1413,14 @@ impl JsRuntime {
// Respond using shared queue and optionally overflown response
fn async_op_response(
&mut self,
- maybe_overflown_response: Option<(OpId, Box<[u8]>)>,
+ overflown_responses: Vec<(OpId, Box<[u8]>)>,
) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
let shared_queue_size = state_rc.borrow().shared.size();
+ let overflown_responses_size = overflown_responses.len();
- if shared_queue_size == 0 && maybe_overflown_response.is_none() {
+ if shared_queue_size == 0 && overflown_responses_size == 0 {
return Ok(());
}
@@ -1454,22 +1441,21 @@ impl JsRuntime {
let tc_scope = &mut v8::TryCatch::new(scope);
- if shared_queue_size > 0 {
- js_recv_cb.call(tc_scope, global, &[]);
+ let mut args: Vec<v8::Local<v8::Value>> =
+ Vec::with_capacity(2 * overflown_responses_size);
+ for overflown_response in overflown_responses {
+ let (op_id, buf) = overflown_response;
+ args.push(v8::Integer::new(tc_scope, op_id as i32).into());
+ args.push(bindings::boxed_slice_to_uint8array(tc_scope, buf).into());
+ }
+
+ if shared_queue_size > 0 || overflown_responses_size > 0 {
+ js_recv_cb.call(tc_scope, global, args.as_slice());
// The other side should have shifted off all the messages.
let shared_queue_size = state_rc.borrow().shared.size();
assert_eq!(shared_queue_size, 0);
}
- if let Some(overflown_response) = maybe_overflown_response {
- let (op_id, buf) = overflown_response;
- let op_id: v8::Local<v8::Value> =
- v8::Integer::new(tc_scope, op_id as i32).into();
- let ui8: v8::Local<v8::Value> =
- bindings::boxed_slice_to_uint8array(tc_scope, buf).into();
- js_recv_cb.call(tc_scope, global, &[op_id, ui8]);
- }
-
match tc_scope.exception() {
None => Ok(()),
Some(exception) => exception_to_err_result(tc_scope, exception, false),
@@ -1925,6 +1911,71 @@ pub mod tests {
}
#[test]
+ fn overflow_res_async_combined_with_unref() {
+ run_in_task(|cx| {
+ let mut runtime = JsRuntime::new(Default::default());
+
+ runtime.register_op(
+ "test1",
+ |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
+ let mut vec = vec![0u8; 100 * 1024 * 1024];
+ vec[0] = 4;
+ let buf = vec.into_boxed_slice();
+ Op::Async(futures::future::ready(buf).boxed())
+ },
+ );
+
+ runtime.register_op(
+ "test2",
+ |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
+ let mut vec = vec![0u8; 100 * 1024 * 1024];
+ vec[0] = 4;
+ let buf = vec.into_boxed_slice();
+ Op::AsyncUnref(futures::future::ready(buf).boxed())
+ },
+ );
+
+ runtime
+ .execute(
+ "overflow_res_async_combined_with_unref.js",
+ r#"
+ function assert(cond) {
+ if (!cond) {
+ throw Error("assert");
+ }
+ }
+
+ let asyncRecv = 0;
+ Deno.core.setAsyncHandler(1, (buf) => {
+ assert(buf.byteLength === 100 * 1024 * 1024);
+ assert(buf[0] === 4);
+ asyncRecv++;
+ });
+ Deno.core.setAsyncHandler(2, (buf) => {
+ assert(buf.byteLength === 100 * 1024 * 1024);
+ assert(buf[0] === 4);
+ asyncRecv++;
+ });
+ let control = new Uint8Array(1);
+ let response1 = Deno.core.dispatch(1, control);
+ // Async messages always have null response.
+ assert(response1 == null);
+ assert(asyncRecv == 0);
+ let response2 = Deno.core.dispatch(2, control);
+ // Async messages always have null response.
+ assert(response2 == null);
+ assert(asyncRecv == 0);
+ "#,
+ )
+ .unwrap();
+ assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
+ runtime
+ .execute("check.js", "assert(asyncRecv == 2);")
+ .unwrap();
+ });
+ }
+
+ #[test]
fn overflow_res_async() {
run_in_task(|_cx| {
// TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We