diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-10-14 23:46:27 +0200 |
---|---|---|
committer | Ryan Dahl <ry@tinyclouds.org> | 2019-10-14 17:46:27 -0400 |
commit | 4221b90c3febbe03a4b47e47248263741a0fdd4a (patch) | |
tree | 0c16487c197863fb1223f37a9c589905517dfdcd /cli | |
parent | 605659535794ca0d8fe3ee4ea5857b418d7ce091 (diff) |
perf: eager poll async ops in Isolate (#3046)
Diffstat (limited to 'cli')
-rw-r--r-- | cli/js/dispatch_json.ts | 12 | ||||
-rw-r--r-- | cli/js/dispatch_minimal.ts | 16 | ||||
-rw-r--r-- | cli/lib.rs | 32 | ||||
-rw-r--r-- | cli/tokio_util.rs | 29 | ||||
-rw-r--r-- | cli/worker.rs | 8 |
5 files changed, 73 insertions, 24 deletions
diff --git a/cli/js/dispatch_json.ts b/cli/js/dispatch_json.ts index 572ec855a..890568409 100644 --- a/cli/js/dispatch_json.ts +++ b/cli/js/dispatch_json.ts @@ -75,11 +75,17 @@ export async function sendAsync( const promiseId = nextPromiseId(); args = Object.assign(args, { promiseId }); const promise = util.createResolvable<Ok>(); - promiseTable.set(promiseId, promise); const argsUi8 = encode(args); - const resUi8 = core.dispatch(opId, argsUi8, zeroCopy); - util.assert(resUi8 == null); + const buf = core.dispatch(opId, argsUi8, zeroCopy); + if (buf) { + // Sync result. + const res = decode(buf); + promise.resolve(res); + } else { + // Async result. + promiseTable.set(promiseId, promise); + } const res = await promise; return unwrapResponse(res); diff --git a/cli/js/dispatch_minimal.ts b/cli/js/dispatch_minimal.ts index 98636f85b..74a5e211c 100644 --- a/cli/js/dispatch_minimal.ts +++ b/cli/js/dispatch_minimal.ts @@ -61,8 +61,20 @@ export function sendAsyncMinimal( scratch32[1] = arg; scratch32[2] = 0; // result const promise = util.createResolvable<number>(); - promiseTableMin.set(promiseId, promise); - core.dispatch(opId, scratchBytes, zeroCopy); + const buf = core.dispatch(opId, scratchBytes, zeroCopy); + if (buf) { + const buf32 = new Int32Array( + buf.buffer, + buf.byteOffset, + buf.byteLength / 4 + ); + const record = recordFromBufMinimal(opId, buf32); + // Sync result. + promise.resolve(record.result); + } else { + // Async result. + promiseTableMin.set(promiseId, promise); + } return promise; } diff --git a/cli/lib.rs b/cli/lib.rs index 8d0904ddb..3c093cda4 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -298,25 +298,31 @@ fn eval_command(flags: DenoFlags, argv: Vec<String>) { } fn bundle_command(flags: DenoFlags, argv: Vec<String>) { - let (mut _worker, state) = create_worker_and_state(flags, argv); + let (worker, state) = create_worker_and_state(flags, argv); let main_module = state.main_module().unwrap(); assert!(state.argv.len() >= 3); let out_file = state.argv[2].clone(); debug!(">>>>> bundle_async START"); - let bundle_future = state - .ts_compiler - .bundle_async(state.clone(), main_module.to_string(), out_file) - .map_err(|err| { - debug!("diagnostics returned, exiting!"); - eprintln!(""); - print_err_and_exit(err); + // NOTE: we need to poll `worker` otherwise TS compiler worker won't run properly + let main_future = lazy(move || { + worker.then(move |result| { + js_check(result); + state + .ts_compiler + .bundle_async(state.clone(), main_module.to_string(), out_file) + .map_err(|err| { + debug!("diagnostics returned, exiting!"); + eprintln!(""); + print_err_and_exit(err); + }) + .and_then(move |_| { + debug!(">>>>> bundle_async END"); + Ok(()) + }) }) - .and_then(move |_| { - debug!(">>>>> bundle_async END"); - Ok(()) - }); - tokio_util::run(bundle_future); + }); + tokio_util::run(main_future); } fn run_repl(flags: DenoFlags, argv: Vec<String>) { diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index 678bb8e66..4ee73eef9 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -7,6 +7,7 @@ use futures::Poll; use std::io; use std::mem; use std::net::SocketAddr; +use std::ops::FnOnce; use tokio; use tokio::net::TcpStream; use tokio::runtime; @@ -78,6 +79,7 @@ where #[derive(Debug)] enum AcceptState { + Eager(Resource), Pending(Resource), Empty, } @@ -85,7 +87,7 @@ enum AcceptState { /// Simply accepts a connection. pub fn accept(r: Resource) -> Accept { Accept { - state: AcceptState::Pending(r), + state: AcceptState::Eager(r), } } @@ -107,6 +109,16 @@ impl Future for Accept { // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). + AcceptState::Eager(ref mut r) => match r.poll_accept() { + Ok(futures::prelude::Async::Ready(t)) => t, + Ok(futures::prelude::Async::NotReady) => { + self.state = AcceptState::Pending(r.to_owned()); + return Ok(futures::prelude::Async::NotReady); + } + Err(e) => { + return Err(e); + } + }, AcceptState::Pending(ref mut r) => match r.poll_accept() { Ok(futures::prelude::Async::Ready(t)) => { r.untrack_task(); @@ -126,8 +138,8 @@ impl Future for Accept { }; match mem::replace(&mut self.state, AcceptState::Empty) { - AcceptState::Pending(_) => Ok((stream, addr).into()), AcceptState::Empty => panic!("invalid internal state"), + _ => Ok((stream, addr).into()), } } } @@ -166,3 +178,16 @@ where { f.map_err(|err| panic!("Future got unexpected error: {:?}", err)) } + +#[cfg(test)] +pub fn run_in_task<F>(f: F) +where + F: FnOnce() + Send + 'static, +{ + let fut = futures::future::lazy(move || { + f(); + futures::future::ok(()) + }); + + run(fut) +} diff --git a/cli/worker.rs b/cli/worker.rs index 6ea17d915..990dd613a 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -263,7 +263,7 @@ mod tests { #[test] fn test_worker_messages() { - tokio_util::init(|| { + tokio_util::run_in_task(|| { let mut worker = create_test_worker(); let source = r#" onmessage = function(e) { @@ -314,7 +314,7 @@ mod tests { #[test] fn removed_from_resource_table_on_close() { - tokio_util::init(|| { + tokio_util::run_in_task(|| { let mut worker = create_test_worker(); worker .execute("onmessage = () => { delete window.onmessage; }") @@ -349,7 +349,7 @@ mod tests { #[test] fn execute_mod_resolve_error() { - tokio_util::init(|| { + tokio_util::run_in_task(|| { // "foo" is not a valid module specifier so this should return an error. let mut worker = create_test_worker(); let module_specifier = @@ -361,7 +361,7 @@ mod tests { #[test] fn execute_mod_002_hello() { - tokio_util::init(|| { + tokio_util::run_in_task(|| { // This assumes cwd is project root (an assumption made throughout the // tests). let mut worker = create_test_worker(); |