summaryrefslogtreecommitdiff
path: root/core/examples/http_bench.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/examples/http_bench.rs')
-rw-r--r--core/examples/http_bench.rs140
1 files changed, 83 insertions, 57 deletions
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index 8635d4f23..6a9213cbe 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -13,14 +13,20 @@ extern crate log;
extern crate lazy_static;
use deno::*;
-use futures::future::lazy;
+use futures::future::FutureExt;
+use futures::future::TryFutureExt;
use std::env;
+use std::future::Future;
use std::io::Error;
use std::io::ErrorKind;
use std::net::SocketAddr;
+use std::pin::Pin;
use std::sync::Mutex;
use std::sync::MutexGuard;
-use tokio::prelude::*;
+use std::task::Poll;
+use tokio::prelude::Async;
+use tokio::prelude::AsyncRead;
+use tokio::prelude::AsyncWrite;
static LOGGER: Logger = Logger;
struct Logger;
@@ -98,10 +104,10 @@ fn test_record_from() {
// TODO test From<&[u8]> for Record
}
-pub type HttpOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
+pub type HttpOp = dyn Future<Output = Result<i32, std::io::Error>> + Send;
pub type HttpOpHandler =
- fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp>;
+ fn(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Pin<Box<HttpOp>>;
fn http_op(
handler: HttpOpHandler,
@@ -117,53 +123,28 @@ fn http_op(
let fut = Box::new(
op.and_then(move |result| {
record_a.result = result;
- Ok(record_a)
+ futures::future::ok(record_a)
})
- .or_else(|err| -> Result<Record, ()> {
+ .or_else(|err| {
eprintln!("unexpected err {}", err);
record_b.result = -1;
- Ok(record_b)
+ futures::future::ok(record_b)
})
- .then(|result| -> Result<Buf, ()> {
+ .then(|result: Result<Record, ()>| {
let record = result.unwrap();
- Ok(record.into())
+ futures::future::ok(record.into())
}),
);
if is_sync {
- Op::Sync(fut.wait().unwrap())
+ Op::Sync(futures::executor::block_on(fut).unwrap())
} else {
- Op::Async(fut)
+ Op::Async(fut.boxed())
}
}
}
fn main() {
- let main_future = lazy(move || {
- // TODO currently isolate.execute() must be run inside tokio, hence the
- // lazy(). It would be nice to not have that contraint. Probably requires
- // using v8::MicrotasksPolicy::kExplicit
-
- let js_source = include_str!("http_bench.js");
-
- let startup_data = StartupData::Script(Script {
- source: js_source,
- filename: "http_bench.js",
- });
-
- let mut isolate = deno::Isolate::new(startup_data, false);
- isolate.register_op("listen", http_op(op_listen));
- isolate.register_op("accept", http_op(op_accept));
- isolate.register_op("read", http_op(op_read));
- isolate.register_op("write", http_op(op_write));
- isolate.register_op("close", http_op(op_close));
-
- isolate.then(|r| {
- js_check(r);
- Ok(())
- })
- });
-
let args: Vec<String> = env::args().collect();
// NOTE: `--help` arg will display V8 help and exit
let args = deno::v8_set_flags(args);
@@ -175,12 +156,33 @@ fn main() {
log::LevelFilter::Warn
});
+ let js_source = include_str!("http_bench.js");
+
+ let startup_data = StartupData::Script(Script {
+ source: js_source,
+ filename: "http_bench.js",
+ });
+
+ let mut isolate = deno::Isolate::new(startup_data, false);
+ isolate.register_op("listen", http_op(op_listen));
+ isolate.register_op("accept", http_op(op_accept));
+ isolate.register_op("read", http_op(op_read));
+ isolate.register_op("write", http_op(op_write));
+ isolate.register_op("close", http_op(op_close));
+
+ let main_future = isolate
+ .then(|r| {
+ js_check(r);
+ futures::future::ok(())
+ })
+ .boxed();
+
if args.iter().any(|a| a == "--multi-thread") {
println!("multi-thread");
- tokio::run(main_future);
+ tokio::run(main_future.compat());
} else {
println!("single-thread");
- tokio::runtime::current_thread::run(main_future);
+ tokio::runtime::current_thread::run(main_future.compat());
}
}
@@ -205,37 +207,47 @@ fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> {
RESOURCE_TABLE.lock().unwrap()
}
-fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+fn op_accept(
+ record: Record,
+ _zero_copy_buf: Option<PinnedBuf>,
+) -> Pin<Box<HttpOp>> {
let rid = record.arg as u32;
debug!("accept {}", rid);
- let fut = futures::future::poll_fn(move || {
+ let fut = futures::future::poll_fn(move |_cx| {
let mut table = lock_resource_table();
let listener =
table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?;
- listener.0.poll_accept()
+ match listener.0.poll_accept() {
+ Err(e) => Poll::Ready(Err(e)),
+ Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
+ Ok(Async::NotReady) => Poll::Pending,
+ }
})
.and_then(move |(stream, addr)| {
debug!("accept success {}", addr);
let mut table = lock_resource_table();
let rid = table.add("tcpStream", Box::new(TcpStream(stream)));
- Ok(rid as i32)
+ futures::future::ok(rid as i32)
});
- Box::new(fut)
+ fut.boxed()
}
fn op_listen(
_record: Record,
_zero_copy_buf: Option<PinnedBuf>,
-) -> Box<HttpOp> {
+) -> Pin<Box<HttpOp>> {
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
let mut table = lock_resource_table();
let rid = table.add("tcpListener", Box::new(TcpListener(listener)));
- Box::new(futures::future::ok(rid as i32))
+ futures::future::ok(rid as i32).boxed()
}
-fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+fn op_close(
+ record: Record,
+ _zero_copy_buf: Option<PinnedBuf>,
+) -> Pin<Box<HttpOp>> {
debug!("close");
let rid = record.arg as u32;
let mut table = lock_resource_table();
@@ -243,39 +255,53 @@ fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
Some(_) => futures::future::ok(0),
None => futures::future::err(bad_resource()),
};
- Box::new(fut)
+ fut.boxed()
}
-fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+fn op_read(
+ record: Record,
+ zero_copy_buf: Option<PinnedBuf>,
+) -> Pin<Box<HttpOp>> {
let rid = record.arg as u32;
debug!("read rid={}", rid);
let mut zero_copy_buf = zero_copy_buf.unwrap();
- let fut = futures::future::poll_fn(move || {
+ let fut = futures::future::poll_fn(move |_cx| {
let mut table = lock_resource_table();
let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?;
- stream.0.poll_read(&mut zero_copy_buf)
+ match stream.0.poll_read(&mut zero_copy_buf) {
+ Err(e) => Poll::Ready(Err(e)),
+ Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
+ Ok(Async::NotReady) => Poll::Pending,
+ }
})
.and_then(move |nread| {
debug!("read success {}", nread);
- Ok(nread as i32)
+ futures::future::ok(nread as i32)
});
- Box::new(fut)
+ fut.boxed()
}
-fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
+fn op_write(
+ record: Record,
+ zero_copy_buf: Option<PinnedBuf>,
+) -> Pin<Box<HttpOp>> {
let rid = record.arg as u32;
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
- let fut = futures::future::poll_fn(move || {
+ let fut = futures::future::poll_fn(move |_cx| {
let mut table = lock_resource_table();
let stream = table.get_mut::<TcpStream>(rid).ok_or_else(bad_resource)?;
- stream.0.poll_write(&zero_copy_buf)
+ match stream.0.poll_write(&zero_copy_buf) {
+ Err(e) => Poll::Ready(Err(e)),
+ Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
+ Ok(Async::NotReady) => Poll::Pending,
+ }
})
.and_then(move |nwritten| {
debug!("write success {}", nwritten);
- Ok(nwritten as i32)
+ futures::future::ok(nwritten as i32)
});
- Box::new(fut)
+ fut.boxed()
}
fn js_check(r: Result<(), ErrBox>) {