diff options
Diffstat (limited to 'core/examples/http_bench.rs')
-rw-r--r-- | core/examples/http_bench.rs | 232 |
1 files changed, 145 insertions, 87 deletions
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index a7b26f4b1..c08c5b6e1 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -5,6 +5,7 @@ extern crate deno; extern crate futures; extern crate libc; +extern crate num_cpus; extern crate tokio; #[macro_use] @@ -13,25 +14,23 @@ extern crate log; extern crate lazy_static; use deno::*; -use futures::compat::AsyncRead01CompatExt; -use futures::compat::AsyncWrite01CompatExt; +use futures::future::Future; use futures::future::FutureExt; -use futures::future::TryFutureExt; -use futures::io::{AsyncRead, AsyncWrite}; -use futures::stream::StreamExt; +use futures::task::{Context, Poll}; use std::env; -use std::future::Future; use std::io::Error; use std::io::ErrorKind; use std::net::SocketAddr; use std::pin::Pin; use std::sync::Mutex; use std::sync::MutexGuard; -use std::task::Poll; -use tokio::net::tcp::Incoming; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; static LOGGER: Logger = Logger; + struct Logger; + impl log::Log for Logger { fn enabled(&self, metadata: &log::Metadata) -> bool { metadata.level() <= log::max_level() @@ -118,25 +117,18 @@ fn http_op( let record = Record::from(control); let is_sync = record.promise_id == 0; let op = handler(record.clone(), zero_copy_buf); - let mut record_a = record.clone(); - let mut record_b = record.clone(); - - let fut = Box::new( - op.and_then(move |result| { - record_a.result = result; - futures::future::ok(record_a) - }) - .or_else(|err| { - eprintln!("unexpected err {}", err); - record_b.result = -1; - futures::future::ok(record_b) - }) - .then(|result: Result<Record, ()>| { - let record = result.unwrap(); - futures::future::ok(record.into()) - }), - ); + + let fut = async move { + match op.await { + Ok(result) => record_a.result = result, + Err(err) => { + eprintln!("unexpected err {}", err); + record_a.result = -1; + } + }; + Ok(record_a.into()) + }; if is_sync { Op::Sync(futures::executor::block_on(fut).unwrap()) @@ -172,27 +164,35 @@ fn main() { isolate.register_op("write", http_op(op_write)); isolate.register_op("close", http_op(op_close)); - let main_future = isolate - .then(|r| { - js_check(r); - futures::future::ok(()) - }) - .boxed(); + let multi_thread = args.iter().any(|a| a == "--multi-thread"); - if args.iter().any(|a| a == "--multi-thread") { + println!( + "num cpus; logical: {}; physical: {}", + num_cpus::get(), + num_cpus::get_physical() + ); + let mut builder = tokio::runtime::Builder::new(); + let builder = if multi_thread { println!("multi-thread"); - tokio::run(main_future.compat()); + builder.threaded_scheduler() } else { println!("single-thread"); - tokio::runtime::current_thread::run(main_future.compat()); - } + builder.basic_scheduler() + }; + + let mut runtime = builder + .enable_io() + .build() + .expect("Unable to create tokio runtime"); + let result = runtime.block_on(isolate.boxed()); + js_check(result); } pub fn bad_resource() -> Error { Error::new(ErrorKind::NotFound, "bad resource id") } -struct TcpListener(Incoming); +struct TcpListener(tokio::net::TcpListener); impl Resource for TcpListener {} @@ -209,33 +209,42 @@ fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { RESOURCE_TABLE.lock().unwrap() } +struct Accept { + rid: ResourceId, +} + +impl Future for Accept { + type Output = Result<(tokio::net::TcpStream, SocketAddr), std::io::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + + let mut table = lock_resource_table(); + match table.get_mut::<TcpListener>(inner.rid) { + None => Poll::Ready(Err(bad_resource())), + Some(listener) => { + let listener = &mut listener.0; + listener.poll_accept(cx) + } + } + } +} + fn op_accept( record: Record, _zero_copy_buf: Option<PinnedBuf>, ) -> Pin<Box<HttpOp>> { let rid = record.arg as u32; debug!("accept {}", rid); - let fut = futures::future::poll_fn(move |cx| { - let mut table = lock_resource_table(); - let listener = - table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?; - let mut listener = futures::compat::Compat01As03::new(&mut listener.0); - match listener.poll_next_unpin(cx) { - Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)), - Poll::Ready(Some(Ok(stream))) => { - let addr = stream.peer_addr().unwrap(); - Poll::Ready(Ok((stream, addr))) - } - Poll::Pending => Poll::Pending, - _ => unreachable!(), - } - }) - .and_then(move |(stream, addr)| { + + let fut = async move { + let (stream, addr) = Accept { rid }.await?; debug!("accept success {}", addr); let mut table = lock_resource_table(); let rid = table.add("tcpStream", Box::new(TcpStream(stream))); - futures::future::ok(rid as i32) - }); + Ok(rid as i32) + }; + fut.boxed() } @@ -244,12 +253,15 @@ fn op_listen( _zero_copy_buf: Option<PinnedBuf>, ) -> Pin<Box<HttpOp>> { debug!("listen"); - let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); - let listener = tokio::net::TcpListener::bind(&addr).unwrap(); - let mut table = lock_resource_table(); - let rid = - table.add("tcpListener", Box::new(TcpListener(listener.incoming()))); - futures::future::ok(rid as i32).boxed() + let fut = async { + let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); + let listener = tokio::net::TcpListener::bind(&addr).await?; + let mut table = lock_resource_table(); + let rid = table.add("tcpListener", Box::new(TcpListener(listener))); + Ok(rid as i32) + }; + + fut.boxed() } fn op_close( @@ -257,36 +269,82 @@ fn op_close( _zero_copy_buf: Option<PinnedBuf>, ) -> Pin<Box<HttpOp>> { debug!("close"); - let rid = record.arg as u32; - let mut table = lock_resource_table(); - let fut = match table.close(rid) { - Some(_) => futures::future::ok(0), - None => futures::future::err(bad_resource()), + let fut = async move { + let rid = record.arg as u32; + let mut table = lock_resource_table(); + match table.close(rid) { + Some(_) => Ok(0), + None => Err(bad_resource()), + } }; fut.boxed() } +struct Read { + rid: ResourceId, + buf: PinnedBuf, +} + +impl Future for Read { + type Output = Result<usize, std::io::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut table = lock_resource_table(); + + match table.get_mut::<TcpStream>(inner.rid) { + None => Poll::Ready(Err(bad_resource())), + Some(stream) => { + let pinned_stream = Pin::new(&mut stream.0); + pinned_stream.poll_read(cx, &mut inner.buf) + } + } + } +} + fn op_read( record: Record, zero_copy_buf: Option<PinnedBuf>, ) -> Pin<Box<HttpOp>> { let rid = record.arg as u32; debug!("read rid={}", rid); - let mut zero_copy_buf = zero_copy_buf.unwrap(); - let fut = futures::future::poll_fn(move |cx| { - let mut table = lock_resource_table(); - let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?; - let mut f: Box<dyn AsyncRead + Unpin> = - Box::new(AsyncRead01CompatExt::compat(&stream.0)); - AsyncRead::poll_read(Pin::new(&mut f), cx, &mut zero_copy_buf) - }) - .and_then(move |nread| { + let zero_copy_buf = zero_copy_buf.unwrap(); + + let fut = async move { + let nread = Read { + rid, + buf: zero_copy_buf, + } + .await?; debug!("read success {}", nread); - futures::future::ok(nread as i32) - }); + Ok(nread as i32) + }; + fut.boxed() } +struct Write { + rid: ResourceId, + buf: PinnedBuf, +} + +impl Future for Write { + type Output = Result<usize, std::io::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + let inner = self.get_mut(); + let mut table = lock_resource_table(); + + match table.get_mut::<TcpStream>(inner.rid) { + None => Poll::Ready(Err(bad_resource())), + Some(stream) => { + let pinned_stream = Pin::new(&mut stream.0); + pinned_stream.poll_write(cx, &inner.buf) + } + } + } +} + fn op_write( record: Record, zero_copy_buf: Option<PinnedBuf>, @@ -294,17 +352,17 @@ fn op_write( let rid = record.arg as u32; debug!("write rid={}", rid); let zero_copy_buf = zero_copy_buf.unwrap(); - let fut = futures::future::poll_fn(move |cx| { - let mut table = lock_resource_table(); - let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?; - let mut f: Box<dyn AsyncWrite + Unpin> = - Box::new(AsyncWrite01CompatExt::compat(&stream.0)); - AsyncWrite::poll_write(Pin::new(&mut f), cx, &zero_copy_buf) - }) - .and_then(move |nwritten| { + + let fut = async move { + let nwritten = Write { + rid, + buf: zero_copy_buf, + } + .await?; debug!("write success {}", nwritten); - futures::future::ok(nwritten as i32) - }); + Ok(nwritten as i32) + }; + fut.boxed() } |