summaryrefslogtreecommitdiff
path: root/core/examples/http_bench.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/examples/http_bench.rs')
-rw-r--r--core/examples/http_bench.rs69
1 files changed, 42 insertions, 27 deletions
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index 159f23fb5..27fefc8bb 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -97,15 +97,38 @@ impl Isolate {
state: Default::default(),
};
- isolate.register_op("listen", op_listen);
+ isolate.register_sync_op("listen", op_listen);
isolate.register_op("accept", op_accept);
isolate.register_op("read", op_read);
isolate.register_op("write", op_write);
- isolate.register_op("close", op_close);
+ isolate.register_sync_op("close", op_close);
isolate
}
+ fn register_sync_op<F>(&mut self, name: &'static str, handler: F)
+ where
+ F: 'static + Fn(State, u32, Option<ZeroCopyBuf>) -> Result<u32, Error>,
+ {
+ let state = self.state.clone();
+ let core_handler =
+ move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op {
+ let state = state.clone();
+ let record = Record::from(control_buf);
+ let is_sync = record.promise_id == 0;
+ assert!(is_sync);
+
+ let result: i32 = match handler(state, record.rid, zero_copy_buf) {
+ Ok(r) => r as i32,
+ Err(_) => -1,
+ };
+ let buf = RecordBuf::from(Record { result, ..record })[..].into();
+ Op::Sync(buf)
+ };
+
+ self.core_isolate.register_op(name, core_handler);
+ }
+
fn register_op<F>(
&mut self,
name: &'static str,
@@ -117,10 +140,11 @@ impl Isolate {
{
let state = self.state.clone();
let core_handler =
- move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> CoreOp {
+ move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op {
let state = state.clone();
let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
+ assert!(!is_sync);
let fut = async move {
let op = handler(state, record.rid, zero_copy_buf);
@@ -128,14 +152,10 @@ impl Isolate {
.map_ok(|r| r.try_into().expect("op result does not fit in i32"))
.unwrap_or_else(|_| -1)
.await;
- Ok(RecordBuf::from(Record { result, ..record })[..].into())
+ RecordBuf::from(Record { result, ..record })[..].into()
};
- if is_sync {
- Op::Sync(futures::executor::block_on(fut).unwrap())
- } else {
- Op::Async(fut.boxed_local())
- }
+ Op::Async(fut.boxed_local())
};
self.core_isolate.register_op(name, core_handler);
@@ -154,32 +174,27 @@ fn op_close(
state: State,
rid: u32,
_buf: Option<ZeroCopyBuf>,
-) -> impl TryFuture<Ok = u32, Error = Error> {
+) -> Result<u32, Error> {
debug!("close rid={}", rid);
-
- async move {
- let resource_table = &mut state.borrow_mut().resource_table;
- resource_table
- .close(rid)
- .map(|_| 0)
- .ok_or_else(bad_resource)
- }
+ let resource_table = &mut state.borrow_mut().resource_table;
+ resource_table
+ .close(rid)
+ .map(|_| 0)
+ .ok_or_else(bad_resource)
}
fn op_listen(
state: State,
_rid: u32,
_buf: Option<ZeroCopyBuf>,
-) -> impl TryFuture<Ok = u32, Error = Error> {
+) -> Result<u32, Error> {
debug!("listen");
-
- async move {
- let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
- let listener = tokio::net::TcpListener::bind(&addr).await?;
- let resource_table = &mut state.borrow_mut().resource_table;
- let rid = resource_table.add("tcpListener", Box::new(listener));
- Ok(rid)
- }
+ let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
+ let std_listener = std::net::TcpListener::bind(&addr)?;
+ let listener = TcpListener::from_std(std_listener)?;
+ let resource_table = &mut state.borrow_mut().resource_table;
+ let rid = resource_table.add("tcpListener", Box::new(listener));
+ Ok(rid)
}
fn op_accept(