summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2019-10-14 23:46:27 +0200
committerRyan Dahl <ry@tinyclouds.org>2019-10-14 17:46:27 -0400
commit4221b90c3febbe03a4b47e47248263741a0fdd4a (patch)
tree0c16487c197863fb1223f37a9c589905517dfdcd
parent605659535794ca0d8fe3ee4ea5857b418d7ce091 (diff)
perf: eager poll async ops in Isolate (#3046)
-rw-r--r--cli/js/dispatch_json.ts12
-rw-r--r--cli/js/dispatch_minimal.ts16
-rw-r--r--cli/lib.rs32
-rw-r--r--cli/tokio_util.rs29
-rw-r--r--cli/worker.rs8
-rw-r--r--core/examples/http_bench.js25
-rw-r--r--core/isolate.rs209
-rw-r--r--core/libdeno/api.cc2
8 files changed, 240 insertions, 93 deletions
diff --git a/cli/js/dispatch_json.ts b/cli/js/dispatch_json.ts
index 572ec855a..890568409 100644
--- a/cli/js/dispatch_json.ts
+++ b/cli/js/dispatch_json.ts
@@ -75,11 +75,17 @@ export async function sendAsync(
const promiseId = nextPromiseId();
args = Object.assign(args, { promiseId });
const promise = util.createResolvable<Ok>();
- promiseTable.set(promiseId, promise);
const argsUi8 = encode(args);
- const resUi8 = core.dispatch(opId, argsUi8, zeroCopy);
- util.assert(resUi8 == null);
+ const buf = core.dispatch(opId, argsUi8, zeroCopy);
+ if (buf) {
+ // Sync result.
+ const res = decode(buf);
+ promise.resolve(res);
+ } else {
+ // Async result.
+ promiseTable.set(promiseId, promise);
+ }
const res = await promise;
return unwrapResponse(res);
diff --git a/cli/js/dispatch_minimal.ts b/cli/js/dispatch_minimal.ts
index 98636f85b..74a5e211c 100644
--- a/cli/js/dispatch_minimal.ts
+++ b/cli/js/dispatch_minimal.ts
@@ -61,8 +61,20 @@ export function sendAsyncMinimal(
scratch32[1] = arg;
scratch32[2] = 0; // result
const promise = util.createResolvable<number>();
- promiseTableMin.set(promiseId, promise);
- core.dispatch(opId, scratchBytes, zeroCopy);
+ const buf = core.dispatch(opId, scratchBytes, zeroCopy);
+ if (buf) {
+ const buf32 = new Int32Array(
+ buf.buffer,
+ buf.byteOffset,
+ buf.byteLength / 4
+ );
+ const record = recordFromBufMinimal(opId, buf32);
+ // Sync result.
+ promise.resolve(record.result);
+ } else {
+ // Async result.
+ promiseTableMin.set(promiseId, promise);
+ }
return promise;
}
diff --git a/cli/lib.rs b/cli/lib.rs
index 8d0904ddb..3c093cda4 100644
--- a/cli/lib.rs
+++ b/cli/lib.rs
@@ -298,25 +298,31 @@ fn eval_command(flags: DenoFlags, argv: Vec<String>) {
}
fn bundle_command(flags: DenoFlags, argv: Vec<String>) {
- let (mut _worker, state) = create_worker_and_state(flags, argv);
+ let (worker, state) = create_worker_and_state(flags, argv);
let main_module = state.main_module().unwrap();
assert!(state.argv.len() >= 3);
let out_file = state.argv[2].clone();
debug!(">>>>> bundle_async START");
- let bundle_future = state
- .ts_compiler
- .bundle_async(state.clone(), main_module.to_string(), out_file)
- .map_err(|err| {
- debug!("diagnostics returned, exiting!");
- eprintln!("");
- print_err_and_exit(err);
+ // NOTE: we need to poll `worker` otherwise TS compiler worker won't run properly
+ let main_future = lazy(move || {
+ worker.then(move |result| {
+ js_check(result);
+ state
+ .ts_compiler
+ .bundle_async(state.clone(), main_module.to_string(), out_file)
+ .map_err(|err| {
+ debug!("diagnostics returned, exiting!");
+ eprintln!("");
+ print_err_and_exit(err);
+ })
+ .and_then(move |_| {
+ debug!(">>>>> bundle_async END");
+ Ok(())
+ })
})
- .and_then(move |_| {
- debug!(">>>>> bundle_async END");
- Ok(())
- });
- tokio_util::run(bundle_future);
+ });
+ tokio_util::run(main_future);
}
fn run_repl(flags: DenoFlags, argv: Vec<String>) {
diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs
index 678bb8e66..4ee73eef9 100644
--- a/cli/tokio_util.rs
+++ b/cli/tokio_util.rs
@@ -7,6 +7,7 @@ use futures::Poll;
use std::io;
use std::mem;
use std::net::SocketAddr;
+use std::ops::FnOnce;
use tokio;
use tokio::net::TcpStream;
use tokio::runtime;
@@ -78,6 +79,7 @@ where
#[derive(Debug)]
enum AcceptState {
+ Eager(Resource),
Pending(Resource),
Empty,
}
@@ -85,7 +87,7 @@ enum AcceptState {
/// Simply accepts a connection.
pub fn accept(r: Resource) -> Accept {
Accept {
- state: AcceptState::Pending(r),
+ state: AcceptState::Eager(r),
}
}
@@ -107,6 +109,16 @@ impl Future for Accept {
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
+ AcceptState::Eager(ref mut r) => match r.poll_accept() {
+ Ok(futures::prelude::Async::Ready(t)) => t,
+ Ok(futures::prelude::Async::NotReady) => {
+ self.state = AcceptState::Pending(r.to_owned());
+ return Ok(futures::prelude::Async::NotReady);
+ }
+ Err(e) => {
+ return Err(e);
+ }
+ },
AcceptState::Pending(ref mut r) => match r.poll_accept() {
Ok(futures::prelude::Async::Ready(t)) => {
r.untrack_task();
@@ -126,8 +138,8 @@ impl Future for Accept {
};
match mem::replace(&mut self.state, AcceptState::Empty) {
- AcceptState::Pending(_) => Ok((stream, addr).into()),
AcceptState::Empty => panic!("invalid internal state"),
+ _ => Ok((stream, addr).into()),
}
}
}
@@ -166,3 +178,16 @@ where
{
f.map_err(|err| panic!("Future got unexpected error: {:?}", err))
}
+
+#[cfg(test)]
+pub fn run_in_task<F>(f: F)
+where
+ F: FnOnce() + Send + 'static,
+{
+ let fut = futures::future::lazy(move || {
+ f();
+ futures::future::ok(())
+ });
+
+ run(fut)
+}
diff --git a/cli/worker.rs b/cli/worker.rs
index 6ea17d915..990dd613a 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -263,7 +263,7 @@ mod tests {
#[test]
fn test_worker_messages() {
- tokio_util::init(|| {
+ tokio_util::run_in_task(|| {
let mut worker = create_test_worker();
let source = r#"
onmessage = function(e) {
@@ -314,7 +314,7 @@ mod tests {
#[test]
fn removed_from_resource_table_on_close() {
- tokio_util::init(|| {
+ tokio_util::run_in_task(|| {
let mut worker = create_test_worker();
worker
.execute("onmessage = () => { delete window.onmessage; }")
@@ -349,7 +349,7 @@ mod tests {
#[test]
fn execute_mod_resolve_error() {
- tokio_util::init(|| {
+ tokio_util::run_in_task(|| {
// "foo" is not a valid module specifier so this should return an error.
let mut worker = create_test_worker();
let module_specifier =
@@ -361,7 +361,7 @@ mod tests {
#[test]
fn execute_mod_002_hello() {
- tokio_util::init(|| {
+ tokio_util::run_in_task(|| {
// This assumes cwd is project root (an assumption made throughout the
// tests).
let mut worker = create_test_worker();
diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js
index a7142b09d..f553d4800 100644
--- a/core/examples/http_bench.js
+++ b/core/examples/http_bench.js
@@ -43,11 +43,25 @@ function send(promiseId, opId, arg, zeroCopy = null) {
function sendAsync(opId, arg, zeroCopy = null) {
const promiseId = nextPromiseId++;
const p = createResolvable();
- promiseMap.set(promiseId, p);
- send(promiseId, opId, arg, zeroCopy);
+ const buf = send(promiseId, opId, arg, zeroCopy);
+ if (buf) {
+ const record = recordFromBuf(buf);
+ // Sync result.
+ p.resolve(record.result);
+ } else {
+ // Async result.
+ promiseMap.set(promiseId, p);
+ }
return p;
}
+/** Returns i32 number */
+function sendSync(opId, arg) {
+ const buf = send(0, opId, arg);
+ const record = recordFromBuf(buf);
+ return record.result;
+}
+
function recordFromBuf(buf) {
assert(buf.byteLength === 3 * 4);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
@@ -58,13 +72,6 @@ function recordFromBuf(buf) {
};
}
-/** Returns i32 number */
-function sendSync(opId, arg) {
- const buf = send(0, opId, arg);
- const record = recordFromBuf(buf);
- return record.result;
-}
-
function handleAsyncMsgFromRust(opId, buf) {
const record = recordFromBuf(buf);
const { promiseId, result } = record;
diff --git a/core/isolate.rs b/core/isolate.rs
index 2f544a20a..ac5c8402a 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -315,6 +315,21 @@ impl Isolate {
PinnedBuf::new(zero_copy_buf),
);
+ let op = match op {
+ Op::Async(mut fut) => {
+ // Tries to greedily poll async ops once. Often they are immediately ready, in
+ // which case they can be turned into a sync op before we return to V8. This
+ // can save a boundary crossing.
+ #[allow(clippy::match_wild_err_arm)]
+ match fut.poll() {
+ Err(_) => panic!("unexpected op error"),
+ Ok(Ready(buf)) => Op::Sync(buf),
+ Ok(NotReady) => Op::Async(fut),
+ }
+ }
+ Op::Sync(buf) => Op::Sync(buf),
+ };
+
debug_assert_eq!(isolate.shared.size(), 0);
match op {
Op::Sync(buf) => {
@@ -748,8 +763,34 @@ pub mod tests {
)
}
+ struct DelayedFuture {
+ counter: u32,
+ buf: Box<[u8]>,
+ }
+
+ impl DelayedFuture {
+ pub fn new(buf: Box<[u8]>) -> Self {
+ DelayedFuture { counter: 0, buf }
+ }
+ }
+
+ impl Future for DelayedFuture {
+ type Item = Box<[u8]>;
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+ if self.counter > 0 {
+ return Ok(Async::Ready(self.buf.clone()));
+ }
+
+ self.counter += 1;
+ Ok(Async::NotReady)
+ }
+ }
+
pub enum Mode {
AsyncImmediate,
+ AsyncDelayed,
OverflowReqSync,
OverflowResSync,
OverflowReqAsync,
@@ -772,6 +813,12 @@ pub mod tests {
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(Box::new(futures::future::ok(buf)))
}
+ Mode::AsyncDelayed => {
+ assert_eq!(control.len(), 1);
+ assert_eq!(control[0], 42);
+ let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
+ Op::Async(Box::new(DelayedFuture::new(buf)))
+ }
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
@@ -789,7 +836,7 @@ pub mod tests {
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
- Op::Async(Box::new(futures::future::ok(buf)))
+ Op::Async(Box::new(DelayedFuture::new(buf)))
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
@@ -798,7 +845,7 @@ pub mod tests {
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
- Op::Async(Box::new(futures::future::ok(buf)))
+ Op::Async(Box::new(DelayedFuture::new(buf)))
}
}
};
@@ -892,21 +939,65 @@ pub mod tests {
js_check(isolate.execute(
"setup2.js",
r#"
- let nrecv = 0;
- Deno.core.setAsyncHandler((opId, buf) => {
- nrecv++;
- });
- "#,
+ let nrecv = 0;
+ Deno.core.setAsyncHandler((opId, buf) => {
+ nrecv++;
+ });
+ "#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 0);
js_check(isolate.execute(
"check1.js",
r#"
- assert(nrecv == 0);
- let control = new Uint8Array([42]);
- Deno.core.send(1, control);
- assert(nrecv == 0);
- "#,
+ assert(nrecv == 0);
+ let control = new Uint8Array([42]);
+ const res1 = Deno.core.send(1, control);
+ assert(res1);
+ assert(nrecv == 0);
+ "#,
+ ));
+ assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
+ assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
+ js_check(isolate.execute(
+ "check2.js",
+ r#"
+ assert(nrecv == 0);
+ Deno.core.send(1, control);
+ assert(nrecv == 0);
+ "#,
+ ));
+ assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
+ assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
+ assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ js_check(isolate.execute("check3.js", "assert(nrecv == 0)"));
+ // We are idle, so the next poll should be the last.
+ assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ });
+ }
+
+ #[test]
+ fn test_poll_async_delayed_ops() {
+ run_in_task(|| {
+ let (mut isolate, dispatch_count) = setup(Mode::AsyncDelayed);
+
+ js_check(isolate.execute(
+ "setup2.js",
+ r#"
+ let nrecv = 0;
+ Deno.core.setAsyncHandler((opId, buf) => {
+ nrecv++;
+ });
+ "#,
+ ));
+ assert_eq!(dispatch_count.load(Ordering::Relaxed), 0);
+ js_check(isolate.execute(
+ "check1.js",
+ r#"
+ assert(nrecv == 0);
+ let control = new Uint8Array([42]);
+ Deno.core.send(1, control);
+ assert(nrecv == 0);
+ "#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
assert_eq!(Async::Ready(()), isolate.poll().unwrap());
@@ -914,10 +1005,10 @@ pub mod tests {
js_check(isolate.execute(
"check2.js",
r#"
- assert(nrecv == 1);
- Deno.core.send(1, control);
- assert(nrecv == 1);
- "#,
+ assert(nrecv == 1);
+ Deno.core.send(1, control);
+ assert(nrecv == 1);
+ "#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
assert_eq!(Async::Ready(()), isolate.poll().unwrap());
@@ -1235,20 +1326,20 @@ pub mod tests {
js_check(isolate.execute(
"overflow_req_async.js",
r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler((opId, buf) => {
- assert(opId == 1);
- assert(buf.byteLength === 4);
- assert(buf[0] === 43);
- asyncRecv++;
- });
- // Large message that will overflow the shared space.
- let control = new Uint8Array(100 * 1024 * 1024);
- let response = Deno.core.dispatch(1, control);
- // Async messages always have null response.
- assert(response == null);
- assert(asyncRecv == 0);
- "#,
+ let asyncRecv = 0;
+ Deno.core.setAsyncHandler((opId, buf) => {
+ assert(opId == 1);
+ assert(buf.byteLength === 4);
+ assert(buf[0] === 43);
+ asyncRecv++;
+ });
+ // Large message that will overflow the shared space.
+ let control = new Uint8Array(100 * 1024 * 1024);
+ let response = Deno.core.dispatch(1, control);
+ // Async messages always have null response.
+ assert(response == null);
+ assert(asyncRecv == 0);
+ "#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
assert_eq!(Async::Ready(()), js_check(isolate.poll()));
@@ -1265,19 +1356,19 @@ pub mod tests {
js_check(isolate.execute(
"overflow_res_async.js",
r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler((opId, buf) => {
- assert(opId == 1);
- assert(buf.byteLength === 100 * 1024 * 1024);
- assert(buf[0] === 4);
- asyncRecv++;
- });
- // Large message that will overflow the shared space.
- let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(1, control);
- assert(response == null);
- assert(asyncRecv == 0);
- "#,
+ let asyncRecv = 0;
+ Deno.core.setAsyncHandler((opId, buf) => {
+ assert(opId == 1);
+ assert(buf.byteLength === 100 * 1024 * 1024);
+ assert(buf[0] === 4);
+ asyncRecv++;
+ });
+ // Large message that will overflow the shared space.
+ let control = new Uint8Array([42]);
+ let response = Deno.core.dispatch(1, control);
+ assert(response == null);
+ assert(asyncRecv == 0);
+ "#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
poll_until_ready(&mut isolate, 3).unwrap();
@@ -1294,22 +1385,22 @@ pub mod tests {
js_check(isolate.execute(
"overflow_res_multiple_dispatch_async.js",
r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler((opId, buf) => {
- assert(opId === 1);
- assert(buf.byteLength === 100 * 1024 * 1024);
- assert(buf[0] === 4);
- asyncRecv++;
- });
- // Large message that will overflow the shared space.
- let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(1, control);
- assert(response == null);
- assert(asyncRecv == 0);
- // Dispatch another message to verify that pending ops
- // are done even if shared space overflows
- Deno.core.dispatch(1, control);
- "#,
+ let asyncRecv = 0;
+ Deno.core.setAsyncHandler((opId, buf) => {
+ assert(opId === 1);
+ assert(buf.byteLength === 100 * 1024 * 1024);
+ assert(buf[0] === 4);
+ asyncRecv++;
+ });
+ // Large message that will overflow the shared space.
+ let control = new Uint8Array([42]);
+ let response = Deno.core.dispatch(1, control);
+ assert(response == null);
+ assert(asyncRecv == 0);
+ // Dispatch another message to verify that pending ops
+ // are done even if shared space overflows
+ Deno.core.dispatch(1, control);
+ "#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
poll_until_ready(&mut isolate, 3).unwrap();
diff --git a/core/libdeno/api.cc b/core/libdeno/api.cc
index 18dc1d43e..061638cb5 100644
--- a/core/libdeno/api.cc
+++ b/core/libdeno/api.cc
@@ -165,7 +165,7 @@ void deno_respond(Deno* d_, void* user_data, deno_op_id op_id, deno_buf buf) {
if (d->current_args_ != nullptr) {
// Synchronous response.
// Note op_id is not passed back in the case of synchronous response.
- if (buf.data_ptr != nullptr) {
+ if (buf.data_ptr != nullptr && buf.data_len > 0) {
auto ab = deno::ImportBuf(d, buf);
d->current_args_->GetReturnValue().Set(ab);
}