diff options
Diffstat (limited to 'core/examples/http_bench.rs')
-rw-r--r-- | core/examples/http_bench.rs | 140 |
1 files changed, 83 insertions, 57 deletions
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 8635d4f23..6a9213cbe 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -13,14 +13,20 @@ extern crate log; extern crate lazy_static; use deno::*; -use futures::future::lazy; +use futures::future::FutureExt; +use futures::future::TryFutureExt; use std::env; +use std::future::Future; use std::io::Error; use std::io::ErrorKind; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Mutex; use std::sync::MutexGuard; -use tokio::prelude::*; +use std::task::Poll; +use tokio::prelude::Async; +use tokio::prelude::AsyncRead; +use tokio::prelude::AsyncWrite; static LOGGER: Logger = Logger; struct Logger; @@ -98,10 +104,10 @@ fn test_record_from() { // TODO test From<&[u8]> for Record } -pub type HttpOp = dyn Future<Item = i32, Error = std::io::Error> + Send; +pub type HttpOp = dyn Future<Output = Result<i32, std::io::Error>> + Send; pub type HttpOpHandler = - fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp>; + fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Pin<Box<HttpOp>>; fn http_op( handler: HttpOpHandler, @@ -117,53 +123,28 @@ fn http_op( let fut = Box::new( op.and_then(move |result| { record_a.result = result; - Ok(record_a) + futures::future::ok(record_a) }) - .or_else(|err| -> Result<Record, ()> { + .or_else(|err| { eprintln!("unexpected err {}", err); record_b.result = -1; - Ok(record_b) + futures::future::ok(record_b) }) - .then(|result| -> Result<Buf, ()> { + .then(|result: Result<Record, ()>| { let record = result.unwrap(); - Ok(record.into()) + futures::future::ok(record.into()) }), ); if is_sync { - Op::Sync(fut.wait().unwrap()) + Op::Sync(futures::executor::block_on(fut).unwrap()) } else { - Op::Async(fut) + Op::Async(fut.boxed()) } } } fn main() { - let main_future = lazy(move || { - // TODO currently isolate.execute() must be run inside tokio, hence the - // lazy(). It would be nice to not have that contraint. Probably requires - // using v8::MicrotasksPolicy::kExplicit - - let js_source = include_str!("http_bench.js"); - - let startup_data = StartupData::Script(Script { - source: js_source, - filename: "http_bench.js", - }); - - let mut isolate = deno::Isolate::new(startup_data, false); - isolate.register_op("listen", http_op(op_listen)); - isolate.register_op("accept", http_op(op_accept)); - isolate.register_op("read", http_op(op_read)); - isolate.register_op("write", http_op(op_write)); - isolate.register_op("close", http_op(op_close)); - - isolate.then(|r| { - js_check(r); - Ok(()) - }) - }); - let args: Vec<String> = env::args().collect(); // NOTE: `--help` arg will display V8 help and exit let args = deno::v8_set_flags(args); @@ -175,12 +156,33 @@ fn main() { log::LevelFilter::Warn }); + let js_source = include_str!("http_bench.js"); + + let startup_data = StartupData::Script(Script { + source: js_source, + filename: "http_bench.js", + }); + + let mut isolate = deno::Isolate::new(startup_data, false); + isolate.register_op("listen", http_op(op_listen)); + isolate.register_op("accept", http_op(op_accept)); + isolate.register_op("read", http_op(op_read)); + isolate.register_op("write", http_op(op_write)); + isolate.register_op("close", http_op(op_close)); + + let main_future = isolate + .then(|r| { + js_check(r); + futures::future::ok(()) + }) + .boxed(); + if args.iter().any(|a| a == "--multi-thread") { println!("multi-thread"); - tokio::run(main_future); + tokio::run(main_future.compat()); } else { println!("single-thread"); - tokio::runtime::current_thread::run(main_future); + tokio::runtime::current_thread::run(main_future.compat()); } } @@ -205,37 +207,47 @@ fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { RESOURCE_TABLE.lock().unwrap() } -fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { +fn op_accept( + record: Record, + _zero_copy_buf: Option<PinnedBuf>, +) -> Pin<Box<HttpOp>> { let rid = record.arg as u32; debug!("accept {}", rid); - let fut = futures::future::poll_fn(move || { + let fut = futures::future::poll_fn(move |_cx| { let mut table = lock_resource_table(); let listener = table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?; - listener.0.poll_accept() + match listener.0.poll_accept() { + Err(e) => Poll::Ready(Err(e)), + Ok(Async::Ready(v)) => Poll::Ready(Ok(v)), + Ok(Async::NotReady) => Poll::Pending, + } }) .and_then(move |(stream, addr)| { debug!("accept success {}", addr); let mut table = lock_resource_table(); let rid = table.add("tcpStream", Box::new(TcpStream(stream))); - Ok(rid as i32) + futures::future::ok(rid as i32) }); - Box::new(fut) + fut.boxed() } fn op_listen( _record: Record, _zero_copy_buf: Option<PinnedBuf>, -) -> Box<HttpOp> { +) -> Pin<Box<HttpOp>> { debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let listener = tokio::net::TcpListener::bind(&addr).unwrap(); let mut table = lock_resource_table(); let rid = table.add("tcpListener", Box::new(TcpListener(listener))); - Box::new(futures::future::ok(rid as i32)) + futures::future::ok(rid as i32).boxed() } -fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { +fn op_close( + record: Record, + _zero_copy_buf: Option<PinnedBuf>, +) -> Pin<Box<HttpOp>> { debug!("close"); let rid = record.arg as u32; let mut table = lock_resource_table(); @@ -243,39 +255,53 @@ fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { Some(_) => futures::future::ok(0), None => futures::future::err(bad_resource()), }; - Box::new(fut) + fut.boxed() } -fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { +fn op_read( + record: Record, + zero_copy_buf: Option<PinnedBuf>, +) -> Pin<Box<HttpOp>> { let rid = record.arg as u32; debug!("read rid={}", rid); let mut zero_copy_buf = zero_copy_buf.unwrap(); - let fut = futures::future::poll_fn(move || { + let fut = futures::future::poll_fn(move |_cx| { let mut table = lock_resource_table(); let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?; - stream.0.poll_read(&mut zero_copy_buf) + match stream.0.poll_read(&mut zero_copy_buf) { + Err(e) => Poll::Ready(Err(e)), + Ok(Async::Ready(v)) => Poll::Ready(Ok(v)), + Ok(Async::NotReady) => Poll::Pending, + } }) .and_then(move |nread| { debug!("read success {}", nread); - Ok(nread as i32) + futures::future::ok(nread as i32) }); - Box::new(fut) + fut.boxed() } -fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { +fn op_write( + record: Record, + zero_copy_buf: Option<PinnedBuf>, +) -> Pin<Box<HttpOp>> { let rid = record.arg as u32; debug!("write rid={}", rid); let zero_copy_buf = zero_copy_buf.unwrap(); - let fut = futures::future::poll_fn(move || { + let fut = futures::future::poll_fn(move |_cx| { let mut table = lock_resource_table(); let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?; - stream.0.poll_write(&zero_copy_buf) + match stream.0.poll_write(&zero_copy_buf) { + Err(e) => Poll::Ready(Err(e)), + Ok(Async::Ready(v)) => Poll::Ready(Ok(v)), + Ok(Async::NotReady) => Poll::Pending, + } }) .and_then(move |nwritten| { debug!("write success {}", nwritten); - Ok(nwritten as i32) + futures::future::ok(nwritten as i32) }); - Box::new(fut) + fut.boxed() } fn js_check(r: Result<(), ErrBox>) { |