summaryrefslogtreecommitdiff
path: root/core/examples/http_bench.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/examples/http_bench.rs')
-rw-r--r--core/examples/http_bench.rs232
1 files changed, 145 insertions, 87 deletions
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index a7b26f4b1..c08c5b6e1 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -5,6 +5,7 @@
extern crate deno;
extern crate futures;
extern crate libc;
+extern crate num_cpus;
extern crate tokio;
#[macro_use]
@@ -13,25 +14,23 @@ extern crate log;
extern crate lazy_static;
use deno::*;
-use futures::compat::AsyncRead01CompatExt;
-use futures::compat::AsyncWrite01CompatExt;
+use futures::future::Future;
use futures::future::FutureExt;
-use futures::future::TryFutureExt;
-use futures::io::{AsyncRead, AsyncWrite};
-use futures::stream::StreamExt;
+use futures::task::{Context, Poll};
use std::env;
-use std::future::Future;
use std::io::Error;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Mutex;
use std::sync::MutexGuard;
-use std::task::Poll;
-use tokio::net::tcp::Incoming;
+use tokio::io::AsyncRead;
+use tokio::io::AsyncWrite;
static LOGGER: Logger = Logger;
+
struct Logger;
+
impl log::Log for Logger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
metadata.level() <= log::max_level()
@@ -118,25 +117,18 @@ fn http_op(
let record = Record::from(control);
let is_sync = record.promise_id == 0;
let op = handler(record.clone(), zero_copy_buf);
-
let mut record_a = record.clone();
- let mut record_b = record.clone();
-
- let fut = Box::new(
- op.and_then(move |result| {
- record_a.result = result;
- futures::future::ok(record_a)
- })
- .or_else(|err| {
- eprintln!("unexpected err {}", err);
- record_b.result = -1;
- futures::future::ok(record_b)
- })
- .then(|result: Result<Record, ()>| {
- let record = result.unwrap();
- futures::future::ok(record.into())
- }),
- );
+
+ let fut = async move {
+ match op.await {
+ Ok(result) => record_a.result = result,
+ Err(err) => {
+ eprintln!("unexpected err {}", err);
+ record_a.result = -1;
+ }
+ };
+ Ok(record_a.into())
+ };
if is_sync {
Op::Sync(futures::executor::block_on(fut).unwrap())
@@ -172,27 +164,35 @@ fn main() {
isolate.register_op("write", http_op(op_write));
isolate.register_op("close", http_op(op_close));
- let main_future = isolate
- .then(|r| {
- js_check(r);
- futures::future::ok(())
- })
- .boxed();
+ let multi_thread = args.iter().any(|a| a == "--multi-thread");
- if args.iter().any(|a| a == "--multi-thread") {
+ println!(
+ "num cpus; logical: {}; physical: {}",
+ num_cpus::get(),
+ num_cpus::get_physical()
+ );
+ let mut builder = tokio::runtime::Builder::new();
+ let builder = if multi_thread {
println!("multi-thread");
- tokio::run(main_future.compat());
+ builder.threaded_scheduler()
} else {
println!("single-thread");
- tokio::runtime::current_thread::run(main_future.compat());
- }
+ builder.basic_scheduler()
+ };
+
+ let mut runtime = builder
+ .enable_io()
+ .build()
+ .expect("Unable to create tokio runtime");
+ let result = runtime.block_on(isolate.boxed());
+ js_check(result);
}
pub fn bad_resource() -> Error {
Error::new(ErrorKind::NotFound, "bad resource id")
}
-struct TcpListener(Incoming);
+struct TcpListener(tokio::net::TcpListener);
impl Resource for TcpListener {}
@@ -209,33 +209,42 @@ fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> {
RESOURCE_TABLE.lock().unwrap()
}
+struct Accept {
+ rid: ResourceId,
+}
+
+impl Future for Accept {
+ type Output = Result<(tokio::net::TcpStream, SocketAddr), std::io::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+
+ let mut table = lock_resource_table();
+ match table.get_mut::<TcpListener>(inner.rid) {
+ None => Poll::Ready(Err(bad_resource())),
+ Some(listener) => {
+ let listener = &mut listener.0;
+ listener.poll_accept(cx)
+ }
+ }
+ }
+}
+
fn op_accept(
record: Record,
_zero_copy_buf: Option<PinnedBuf>,
) -> Pin<Box<HttpOp>> {
let rid = record.arg as u32;
debug!("accept {}", rid);
- let fut = futures::future::poll_fn(move |cx| {
- let mut table = lock_resource_table();
- let listener =
- table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?;
- let mut listener = futures::compat::Compat01As03::new(&mut listener.0);
- match listener.poll_next_unpin(cx) {
- Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)),
- Poll::Ready(Some(Ok(stream))) => {
- let addr = stream.peer_addr().unwrap();
- Poll::Ready(Ok((stream, addr)))
- }
- Poll::Pending => Poll::Pending,
- _ => unreachable!(),
- }
- })
- .and_then(move |(stream, addr)| {
+
+ let fut = async move {
+ let (stream, addr) = Accept { rid }.await?;
debug!("accept success {}", addr);
let mut table = lock_resource_table();
let rid = table.add("tcpStream", Box::new(TcpStream(stream)));
- futures::future::ok(rid as i32)
- });
+ Ok(rid as i32)
+ };
+
fut.boxed()
}
@@ -244,12 +253,15 @@ fn op_listen(
_zero_copy_buf: Option<PinnedBuf>,
) -> Pin<Box<HttpOp>> {
debug!("listen");
- let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
- let listener = tokio::net::TcpListener::bind(&addr).unwrap();
- let mut table = lock_resource_table();
- let rid =
- table.add("tcpListener", Box::new(TcpListener(listener.incoming())));
- futures::future::ok(rid as i32).boxed()
+ let fut = async {
+ let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
+ let listener = tokio::net::TcpListener::bind(&addr).await?;
+ let mut table = lock_resource_table();
+ let rid = table.add("tcpListener", Box::new(TcpListener(listener)));
+ Ok(rid as i32)
+ };
+
+ fut.boxed()
}
fn op_close(
@@ -257,36 +269,82 @@ fn op_close(
_zero_copy_buf: Option<PinnedBuf>,
) -> Pin<Box<HttpOp>> {
debug!("close");
- let rid = record.arg as u32;
- let mut table = lock_resource_table();
- let fut = match table.close(rid) {
- Some(_) => futures::future::ok(0),
- None => futures::future::err(bad_resource()),
+ let fut = async move {
+ let rid = record.arg as u32;
+ let mut table = lock_resource_table();
+ match table.close(rid) {
+ Some(_) => Ok(0),
+ None => Err(bad_resource()),
+ }
};
fut.boxed()
}
+struct Read {
+ rid: ResourceId,
+ buf: PinnedBuf,
+}
+
+impl Future for Read {
+ type Output = Result<usize, std::io::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut table = lock_resource_table();
+
+ match table.get_mut::<TcpStream>(inner.rid) {
+ None => Poll::Ready(Err(bad_resource())),
+ Some(stream) => {
+ let pinned_stream = Pin::new(&mut stream.0);
+ pinned_stream.poll_read(cx, &mut inner.buf)
+ }
+ }
+ }
+}
+
fn op_read(
record: Record,
zero_copy_buf: Option<PinnedBuf>,
) -> Pin<Box<HttpOp>> {
let rid = record.arg as u32;
debug!("read rid={}", rid);
- let mut zero_copy_buf = zero_copy_buf.unwrap();
- let fut = futures::future::poll_fn(move |cx| {
- let mut table = lock_resource_table();
- let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?;
- let mut f: Box<dyn AsyncRead + Unpin> =
- Box::new(AsyncRead01CompatExt::compat(&stream.0));
- AsyncRead::poll_read(Pin::new(&mut f), cx, &mut zero_copy_buf)
- })
- .and_then(move |nread| {
+ let zero_copy_buf = zero_copy_buf.unwrap();
+
+ let fut = async move {
+ let nread = Read {
+ rid,
+ buf: zero_copy_buf,
+ }
+ .await?;
debug!("read success {}", nread);
- futures::future::ok(nread as i32)
- });
+ Ok(nread as i32)
+ };
+
fut.boxed()
}
+struct Write {
+ rid: ResourceId,
+ buf: PinnedBuf,
+}
+
+impl Future for Write {
+ type Output = Result<usize, std::io::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+ let inner = self.get_mut();
+ let mut table = lock_resource_table();
+
+ match table.get_mut::<TcpStream>(inner.rid) {
+ None => Poll::Ready(Err(bad_resource())),
+ Some(stream) => {
+ let pinned_stream = Pin::new(&mut stream.0);
+ pinned_stream.poll_write(cx, &inner.buf)
+ }
+ }
+ }
+}
+
fn op_write(
record: Record,
zero_copy_buf: Option<PinnedBuf>,
@@ -294,17 +352,17 @@ fn op_write(
let rid = record.arg as u32;
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
- let fut = futures::future::poll_fn(move |cx| {
- let mut table = lock_resource_table();
- let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?;
- let mut f: Box<dyn AsyncWrite + Unpin> =
- Box::new(AsyncWrite01CompatExt::compat(&stream.0));
- AsyncWrite::poll_write(Pin::new(&mut f), cx, &zero_copy_buf)
- })
- .and_then(move |nwritten| {
+
+ let fut = async move {
+ let nwritten = Write {
+ rid,
+ buf: zero_copy_buf,
+ }
+ .await?;
debug!("write success {}", nwritten);
- futures::future::ok(nwritten as i32)
- });
+ Ok(nwritten as i32)
+ };
+
fut.boxed()
}