diff options
-rw-r--r-- | cli/js/dispatch_json.ts | 12 | ||||
-rw-r--r-- | cli/js/dispatch_minimal.ts | 16 | ||||
-rw-r--r-- | cli/lib.rs | 32 | ||||
-rw-r--r-- | cli/tokio_util.rs | 29 | ||||
-rw-r--r-- | cli/worker.rs | 8 | ||||
-rw-r--r-- | core/examples/http_bench.js | 25 | ||||
-rw-r--r-- | core/isolate.rs | 209 | ||||
-rw-r--r-- | core/libdeno/api.cc | 2 |
8 files changed, 240 insertions, 93 deletions
diff --git a/cli/js/dispatch_json.ts b/cli/js/dispatch_json.ts index 572ec855a..890568409 100644 --- a/cli/js/dispatch_json.ts +++ b/cli/js/dispatch_json.ts @@ -75,11 +75,17 @@ export async function sendAsync( const promiseId = nextPromiseId(); args = Object.assign(args, { promiseId }); const promise = util.createResolvable<Ok>(); - promiseTable.set(promiseId, promise); const argsUi8 = encode(args); - const resUi8 = core.dispatch(opId, argsUi8, zeroCopy); - util.assert(resUi8 == null); + const buf = core.dispatch(opId, argsUi8, zeroCopy); + if (buf) { + // Sync result. + const res = decode(buf); + promise.resolve(res); + } else { + // Async result. + promiseTable.set(promiseId, promise); + } const res = await promise; return unwrapResponse(res); diff --git a/cli/js/dispatch_minimal.ts b/cli/js/dispatch_minimal.ts index 98636f85b..74a5e211c 100644 --- a/cli/js/dispatch_minimal.ts +++ b/cli/js/dispatch_minimal.ts @@ -61,8 +61,20 @@ export function sendAsyncMinimal( scratch32[1] = arg; scratch32[2] = 0; // result const promise = util.createResolvable<number>(); - promiseTableMin.set(promiseId, promise); - core.dispatch(opId, scratchBytes, zeroCopy); + const buf = core.dispatch(opId, scratchBytes, zeroCopy); + if (buf) { + const buf32 = new Int32Array( + buf.buffer, + buf.byteOffset, + buf.byteLength / 4 + ); + const record = recordFromBufMinimal(opId, buf32); + // Sync result. + promise.resolve(record.result); + } else { + // Async result. + promiseTableMin.set(promiseId, promise); + } return promise; } diff --git a/cli/lib.rs b/cli/lib.rs index 8d0904ddb..3c093cda4 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -298,25 +298,31 @@ fn eval_command(flags: DenoFlags, argv: Vec<String>) { } fn bundle_command(flags: DenoFlags, argv: Vec<String>) { - let (mut _worker, state) = create_worker_and_state(flags, argv); + let (worker, state) = create_worker_and_state(flags, argv); let main_module = state.main_module().unwrap(); assert!(state.argv.len() >= 3); let out_file = state.argv[2].clone(); debug!(">>>>> bundle_async START"); - let bundle_future = state - .ts_compiler - .bundle_async(state.clone(), main_module.to_string(), out_file) - .map_err(|err| { - debug!("diagnostics returned, exiting!"); - eprintln!(""); - print_err_and_exit(err); + // NOTE: we need to poll `worker` otherwise TS compiler worker won't run properly + let main_future = lazy(move || { + worker.then(move |result| { + js_check(result); + state + .ts_compiler + .bundle_async(state.clone(), main_module.to_string(), out_file) + .map_err(|err| { + debug!("diagnostics returned, exiting!"); + eprintln!(""); + print_err_and_exit(err); + }) + .and_then(move |_| { + debug!(">>>>> bundle_async END"); + Ok(()) + }) }) - .and_then(move |_| { - debug!(">>>>> bundle_async END"); - Ok(()) - }); - tokio_util::run(bundle_future); + }); + tokio_util::run(main_future); } fn run_repl(flags: DenoFlags, argv: Vec<String>) { diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index 678bb8e66..4ee73eef9 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -7,6 +7,7 @@ use futures::Poll; use std::io; use std::mem; use std::net::SocketAddr; +use std::ops::FnOnce; use tokio; use tokio::net::TcpStream; use tokio::runtime; @@ -78,6 +79,7 @@ where #[derive(Debug)] enum AcceptState { + Eager(Resource), Pending(Resource), Empty, } @@ -85,7 +87,7 @@ enum AcceptState { /// Simply accepts a connection. pub fn accept(r: Resource) -> Accept { Accept { - state: AcceptState::Pending(r), + state: AcceptState::Eager(r), } } @@ -107,6 +109,16 @@ impl Future for Accept { // in TcpListener resource. // In this way, when the listener is closed, the task can be // notified to error out (instead of stuck forever). + AcceptState::Eager(ref mut r) => match r.poll_accept() { + Ok(futures::prelude::Async::Ready(t)) => t, + Ok(futures::prelude::Async::NotReady) => { + self.state = AcceptState::Pending(r.to_owned()); + return Ok(futures::prelude::Async::NotReady); + } + Err(e) => { + return Err(e); + } + }, AcceptState::Pending(ref mut r) => match r.poll_accept() { Ok(futures::prelude::Async::Ready(t)) => { r.untrack_task(); @@ -126,8 +138,8 @@ impl Future for Accept { }; match mem::replace(&mut self.state, AcceptState::Empty) { - AcceptState::Pending(_) => Ok((stream, addr).into()), AcceptState::Empty => panic!("invalid internal state"), + _ => Ok((stream, addr).into()), } } } @@ -166,3 +178,16 @@ where { f.map_err(|err| panic!("Future got unexpected error: {:?}", err)) } + +#[cfg(test)] +pub fn run_in_task<F>(f: F) +where + F: FnOnce() + Send + 'static, +{ + let fut = futures::future::lazy(move || { + f(); + futures::future::ok(()) + }); + + run(fut) +} diff --git a/cli/worker.rs b/cli/worker.rs index 6ea17d915..990dd613a 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -263,7 +263,7 @@ mod tests { #[test] fn test_worker_messages() { - tokio_util::init(|| { + tokio_util::run_in_task(|| { let mut worker = create_test_worker(); let source = r#" onmessage = function(e) { @@ -314,7 +314,7 @@ mod tests { #[test] fn removed_from_resource_table_on_close() { - tokio_util::init(|| { + tokio_util::run_in_task(|| { let mut worker = create_test_worker(); worker .execute("onmessage = () => { delete window.onmessage; }") @@ -349,7 +349,7 @@ mod tests { #[test] fn execute_mod_resolve_error() { - tokio_util::init(|| { + tokio_util::run_in_task(|| { // "foo" is not a valid module specifier so this should return an error. let mut worker = create_test_worker(); let module_specifier = @@ -361,7 +361,7 @@ mod tests { #[test] fn execute_mod_002_hello() { - tokio_util::init(|| { + tokio_util::run_in_task(|| { // This assumes cwd is project root (an assumption made throughout the // tests). let mut worker = create_test_worker(); diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index a7142b09d..f553d4800 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -43,11 +43,25 @@ function send(promiseId, opId, arg, zeroCopy = null) { function sendAsync(opId, arg, zeroCopy = null) { const promiseId = nextPromiseId++; const p = createResolvable(); - promiseMap.set(promiseId, p); - send(promiseId, opId, arg, zeroCopy); + const buf = send(promiseId, opId, arg, zeroCopy); + if (buf) { + const record = recordFromBuf(buf); + // Sync result. + p.resolve(record.result); + } else { + // Async result. + promiseMap.set(promiseId, p); + } return p; } +/** Returns i32 number */ +function sendSync(opId, arg) { + const buf = send(0, opId, arg); + const record = recordFromBuf(buf); + return record.result; +} + function recordFromBuf(buf) { assert(buf.byteLength === 3 * 4); const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); @@ -58,13 +72,6 @@ function recordFromBuf(buf) { }; } -/** Returns i32 number */ -function sendSync(opId, arg) { - const buf = send(0, opId, arg); - const record = recordFromBuf(buf); - return record.result; -} - function handleAsyncMsgFromRust(opId, buf) { const record = recordFromBuf(buf); const { promiseId, result } = record; diff --git a/core/isolate.rs b/core/isolate.rs index 2f544a20a..ac5c8402a 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -315,6 +315,21 @@ impl Isolate { PinnedBuf::new(zero_copy_buf), ); + let op = match op { + Op::Async(mut fut) => { + // Tries to greedily poll async ops once. Often they are immediately ready, in + // which case they can be turned into a sync op before we return to V8. This + // can save a boundary crossing. + #[allow(clippy::match_wild_err_arm)] + match fut.poll() { + Err(_) => panic!("unexpected op error"), + Ok(Ready(buf)) => Op::Sync(buf), + Ok(NotReady) => Op::Async(fut), + } + } + Op::Sync(buf) => Op::Sync(buf), + }; + debug_assert_eq!(isolate.shared.size(), 0); match op { Op::Sync(buf) => { @@ -748,8 +763,34 @@ pub mod tests { ) } + struct DelayedFuture { + counter: u32, + buf: Box<[u8]>, + } + + impl DelayedFuture { + pub fn new(buf: Box<[u8]>) -> Self { + DelayedFuture { counter: 0, buf } + } + } + + impl Future for DelayedFuture { + type Item = Box<[u8]>; + type Error = (); + + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + if self.counter > 0 { + return Ok(Async::Ready(self.buf.clone())); + } + + self.counter += 1; + Ok(Async::NotReady) + } + } + pub enum Mode { AsyncImmediate, + AsyncDelayed, OverflowReqSync, OverflowResSync, OverflowReqAsync, @@ -772,6 +813,12 @@ pub mod tests { let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); Op::Async(Box::new(futures::future::ok(buf))) } + Mode::AsyncDelayed => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); + Op::Async(Box::new(DelayedFuture::new(buf))) + } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); @@ -789,7 +836,7 @@ pub mod tests { Mode::OverflowReqAsync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(Box::new(futures::future::ok(buf))) + Op::Async(Box::new(DelayedFuture::new(buf))) } Mode::OverflowResAsync => { assert_eq!(control.len(), 1); @@ -798,7 +845,7 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; let buf = vec.into_boxed_slice(); - Op::Async(Box::new(futures::future::ok(buf))) + Op::Async(Box::new(DelayedFuture::new(buf))) } } }; @@ -892,21 +939,65 @@ pub mod tests { js_check(isolate.execute( "setup2.js", r#" - let nrecv = 0; - Deno.core.setAsyncHandler((opId, buf) => { - nrecv++; - }); - "#, + let nrecv = 0; + Deno.core.setAsyncHandler((opId, buf) => { + nrecv++; + }); + "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); js_check(isolate.execute( "check1.js", r#" - assert(nrecv == 0); - let control = new Uint8Array([42]); - Deno.core.send(1, control); - assert(nrecv == 0); - "#, + assert(nrecv == 0); + let control = new Uint8Array([42]); + const res1 = Deno.core.send(1, control); + assert(res1); + assert(nrecv == 0); + "#, + )); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); + js_check(isolate.execute( + "check2.js", + r#" + assert(nrecv == 0); + Deno.core.send(1, control); + assert(nrecv == 0); + "#, + )); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); + assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + js_check(isolate.execute("check3.js", "assert(nrecv == 0)")); + // We are idle, so the next poll should be the last. + assert_eq!(Async::Ready(()), isolate.poll().unwrap()); + }); + } + + #[test] + fn test_poll_async_delayed_ops() { + run_in_task(|| { + let (mut isolate, dispatch_count) = setup(Mode::AsyncDelayed); + + js_check(isolate.execute( + "setup2.js", + r#" + let nrecv = 0; + Deno.core.setAsyncHandler((opId, buf) => { + nrecv++; + }); + "#, + )); + assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); + js_check(isolate.execute( + "check1.js", + r#" + assert(nrecv == 0); + let control = new Uint8Array([42]); + Deno.core.send(1, control); + assert(nrecv == 0); + "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); assert_eq!(Async::Ready(()), isolate.poll().unwrap()); @@ -914,10 +1005,10 @@ pub mod tests { js_check(isolate.execute( "check2.js", r#" - assert(nrecv == 1); - Deno.core.send(1, control); - assert(nrecv == 1); - "#, + assert(nrecv == 1); + Deno.core.send(1, control); + assert(nrecv == 1); + "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); assert_eq!(Async::Ready(()), isolate.poll().unwrap()); @@ -1235,20 +1326,20 @@ pub mod tests { js_check(isolate.execute( "overflow_req_async.js", r#" - let asyncRecv = 0; - Deno.core.setAsyncHandler((opId, buf) => { - assert(opId == 1); - assert(buf.byteLength === 4); - assert(buf[0] === 43); - asyncRecv++; - }); - // Large message that will overflow the shared space. - let control = new Uint8Array(100 * 1024 * 1024); - let response = Deno.core.dispatch(1, control); - // Async messages always have null response. - assert(response == null); - assert(asyncRecv == 0); - "#, + let asyncRecv = 0; + Deno.core.setAsyncHandler((opId, buf) => { + assert(opId == 1); + assert(buf.byteLength === 4); + assert(buf[0] === 43); + asyncRecv++; + }); + // Large message that will overflow the shared space. + let control = new Uint8Array(100 * 1024 * 1024); + let response = Deno.core.dispatch(1, control); + // Async messages always have null response. + assert(response == null); + assert(asyncRecv == 0); + "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); assert_eq!(Async::Ready(()), js_check(isolate.poll())); @@ -1265,19 +1356,19 @@ pub mod tests { js_check(isolate.execute( "overflow_res_async.js", r#" - let asyncRecv = 0; - Deno.core.setAsyncHandler((opId, buf) => { - assert(opId == 1); - assert(buf.byteLength === 100 * 1024 * 1024); - assert(buf[0] === 4); - asyncRecv++; - }); - // Large message that will overflow the shared space. - let control = new Uint8Array([42]); - let response = Deno.core.dispatch(1, control); - assert(response == null); - assert(asyncRecv == 0); - "#, + let asyncRecv = 0; + Deno.core.setAsyncHandler((opId, buf) => { + assert(opId == 1); + assert(buf.byteLength === 100 * 1024 * 1024); + assert(buf[0] === 4); + asyncRecv++; + }); + // Large message that will overflow the shared space. + let control = new Uint8Array([42]); + let response = Deno.core.dispatch(1, control); + assert(response == null); + assert(asyncRecv == 0); + "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); poll_until_ready(&mut isolate, 3).unwrap(); @@ -1294,22 +1385,22 @@ pub mod tests { js_check(isolate.execute( "overflow_res_multiple_dispatch_async.js", r#" - let asyncRecv = 0; - Deno.core.setAsyncHandler((opId, buf) => { - assert(opId === 1); - assert(buf.byteLength === 100 * 1024 * 1024); - assert(buf[0] === 4); - asyncRecv++; - }); - // Large message that will overflow the shared space. - let control = new Uint8Array([42]); - let response = Deno.core.dispatch(1, control); - assert(response == null); - assert(asyncRecv == 0); - // Dispatch another message to verify that pending ops - // are done even if shared space overflows - Deno.core.dispatch(1, control); - "#, + let asyncRecv = 0; + Deno.core.setAsyncHandler((opId, buf) => { + assert(opId === 1); + assert(buf.byteLength === 100 * 1024 * 1024); + assert(buf[0] === 4); + asyncRecv++; + }); + // Large message that will overflow the shared space. + let control = new Uint8Array([42]); + let response = Deno.core.dispatch(1, control); + assert(response == null); + assert(asyncRecv == 0); + // Dispatch another message to verify that pending ops + // are done even if shared space overflows + Deno.core.dispatch(1, control); + "#, )); assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); poll_until_ready(&mut isolate, 3).unwrap(); diff --git a/core/libdeno/api.cc b/core/libdeno/api.cc index 18dc1d43e..061638cb5 100644 --- a/core/libdeno/api.cc +++ b/core/libdeno/api.cc @@ -165,7 +165,7 @@ void deno_respond(Deno* d_, void* user_data, deno_op_id op_id, deno_buf buf) { if (d->current_args_ != nullptr) { // Synchronous response. // Note op_id is not passed back in the case of synchronous response. - if (buf.data_ptr != nullptr) { + if (buf.data_ptr != nullptr && buf.data_len > 0) { auto ab = deno::ImportBuf(d, buf); d->current_args_->GetReturnValue().Set(ab); } |