diff options
Diffstat (limited to 'core/examples/http_bench.rs')
-rw-r--r-- | core/examples/http_bench.rs | 69 |
1 files changed, 42 insertions, 27 deletions
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 159f23fb5..27fefc8bb 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -97,15 +97,38 @@ impl Isolate { state: Default::default(), }; - isolate.register_op("listen", op_listen); + isolate.register_sync_op("listen", op_listen); isolate.register_op("accept", op_accept); isolate.register_op("read", op_read); isolate.register_op("write", op_write); - isolate.register_op("close", op_close); + isolate.register_sync_op("close", op_close); isolate } + fn register_sync_op<F>(&mut self, name: &'static str, handler: F) + where + F: 'static + Fn(State, u32, Option<ZeroCopyBuf>) -> Result<u32, Error>, + { + let state = self.state.clone(); + let core_handler = + move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op { + let state = state.clone(); + let record = Record::from(control_buf); + let is_sync = record.promise_id == 0; + assert!(is_sync); + + let result: i32 = match handler(state, record.rid, zero_copy_buf) { + Ok(r) => r as i32, + Err(_) => -1, + }; + let buf = RecordBuf::from(Record { result, ..record })[..].into(); + Op::Sync(buf) + }; + + self.core_isolate.register_op(name, core_handler); + } + fn register_op<F>( &mut self, name: &'static str, @@ -117,10 +140,11 @@ impl Isolate { { let state = self.state.clone(); let core_handler = - move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> CoreOp { + move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op { let state = state.clone(); let record = Record::from(control_buf); let is_sync = record.promise_id == 0; + assert!(!is_sync); let fut = async move { let op = handler(state, record.rid, zero_copy_buf); @@ -128,14 +152,10 @@ impl Isolate { .map_ok(|r| r.try_into().expect("op result does not fit in i32")) .unwrap_or_else(|_| -1) .await; - Ok(RecordBuf::from(Record { result, ..record })[..].into()) + RecordBuf::from(Record { result, ..record })[..].into() }; - if is_sync { - Op::Sync(futures::executor::block_on(fut).unwrap()) - } else { - Op::Async(fut.boxed_local()) - } + Op::Async(fut.boxed_local()) }; self.core_isolate.register_op(name, core_handler); @@ -154,32 +174,27 @@ fn op_close( state: State, rid: u32, _buf: Option<ZeroCopyBuf>, -) -> impl TryFuture<Ok = u32, Error = Error> { +) -> Result<u32, Error> { debug!("close rid={}", rid); - - async move { - let resource_table = &mut state.borrow_mut().resource_table; - resource_table - .close(rid) - .map(|_| 0) - .ok_or_else(bad_resource) - } + let resource_table = &mut state.borrow_mut().resource_table; + resource_table + .close(rid) + .map(|_| 0) + .ok_or_else(bad_resource) } fn op_listen( state: State, _rid: u32, _buf: Option<ZeroCopyBuf>, -) -> impl TryFuture<Ok = u32, Error = Error> { +) -> Result<u32, Error> { debug!("listen"); - - async move { - let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); - let listener = tokio::net::TcpListener::bind(&addr).await?; - let resource_table = &mut state.borrow_mut().resource_table; - let rid = resource_table.add("tcpListener", Box::new(listener)); - Ok(rid) - } + let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); + let std_listener = std::net::TcpListener::bind(&addr)?; + let listener = TcpListener::from_std(std_listener)?; + let resource_table = &mut state.borrow_mut().resource_table; + let rid = resource_table.add("tcpListener", Box::new(listener)); + Ok(rid) } fn op_accept( |