summaryrefslogtreecommitdiff
path: root/core/examples/http_bench.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/examples/http_bench.rs')
-rw-r--r--core/examples/http_bench.rs144
1 files changed, 60 insertions, 84 deletions
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index c019d8a11..a6cc6d548 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -14,12 +14,10 @@ extern crate lazy_static;
use deno::*;
use futures::future::lazy;
-use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering;
use std::sync::Mutex;
+use std::sync::MutexGuard;
use tokio::prelude::*;
static LOGGER: Logger = Logger;
@@ -184,44 +182,38 @@ fn main() {
}
}
-enum Repr {
- TcpListener(tokio::net::TcpListener),
- TcpStream(tokio::net::TcpStream),
-}
+struct TcpListener(tokio::net::TcpListener);
+
+impl Resource for TcpListener {}
+
+struct TcpStream(tokio::net::TcpStream);
+
+impl Resource for TcpStream {}
-type ResourceTable = HashMap<i32, Repr>;
lazy_static! {
- static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new(HashMap::new());
- static ref NEXT_RID: AtomicUsize = AtomicUsize::new(3);
+ static ref RESOURCE_TABLE: Mutex<ResourceTable> =
+ Mutex::new(ResourceTable::default());
}
-fn new_rid() -> i32 {
- let rid = NEXT_RID.fetch_add(1, Ordering::SeqCst);
- rid as i32
+fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> {
+ RESOURCE_TABLE.lock().unwrap()
}
fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
- let listener_rid = record.arg;
- debug!("accept {}", listener_rid);
- Box::new(
- futures::future::poll_fn(move || {
- let mut table = RESOURCE_TABLE.lock().unwrap();
- let maybe_repr = table.get_mut(&listener_rid);
- match maybe_repr {
- Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(),
- _ => panic!("bad rid {}", listener_rid),
- }
- })
- .and_then(move |(stream, addr)| {
- debug!("accept success {}", addr);
- let rid = new_rid();
-
- let mut guard = RESOURCE_TABLE.lock().unwrap();
- guard.insert(rid, Repr::TcpStream(stream));
-
- Ok(rid as i32)
- }),
- )
+ let rid = record.arg as u32;
+ debug!("accept {}", rid);
+ let fut = futures::future::poll_fn(move || {
+ let mut table = lock_resource_table();
+ let listener = table.get_mut::<TcpListener>(rid)?;
+ listener.0.poll_accept()
+ })
+ .and_then(move |(stream, addr)| {
+ debug!("accept success {}", addr);
+ let mut table = lock_resource_table();
+ let rid = table.add(Box::new(TcpStream(stream)));
+ Ok(rid as i32)
+ });
+ Box::new(fut)
}
fn op_listen(
@@ -229,70 +221,54 @@ fn op_listen(
_zero_copy_buf: Option<PinnedBuf>,
) -> Box<HttpOp> {
debug!("listen");
- Box::new(lazy(move || {
- let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
- let listener = tokio::net::TcpListener::bind(&addr).unwrap();
- let rid = new_rid();
-
- let mut guard = RESOURCE_TABLE.lock().unwrap();
- guard.insert(rid, Repr::TcpListener(listener));
- futures::future::ok(rid)
- }))
+ let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
+ let listener = tokio::net::TcpListener::bind(&addr).unwrap();
+ let mut table = lock_resource_table();
+ let rid = table.add(Box::new(TcpListener(listener)));
+ Box::new(futures::future::ok(rid as i32))
}
fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
debug!("close");
- let rid = record.arg;
- Box::new(lazy(move || {
- let mut table = RESOURCE_TABLE.lock().unwrap();
- let r = table.remove(&rid);
- let result = if r.is_some() { 0 } else { -1 };
- futures::future::ok(result)
- }))
+ let rid = record.arg as u32;
+ let mut table = lock_resource_table();
+ let fut = match table.close(rid) {
+ Ok(_) => futures::future::ok(0),
+ Err(e) => futures::future::err(e),
+ };
+ Box::new(fut)
}
fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
- let rid = record.arg;
+ let rid = record.arg as u32;
debug!("read rid={}", rid);
let mut zero_copy_buf = zero_copy_buf.unwrap();
- Box::new(
- futures::future::poll_fn(move || {
- let mut table = RESOURCE_TABLE.lock().unwrap();
- let maybe_repr = table.get_mut(&rid);
- match maybe_repr {
- Some(Repr::TcpStream(ref mut stream)) => {
- stream.poll_read(&mut zero_copy_buf)
- }
- _ => panic!("bad rid"),
- }
- })
- .and_then(move |nread| {
- debug!("read success {}", nread);
- Ok(nread as i32)
- }),
- )
+ let fut = futures::future::poll_fn(move || {
+ let mut table = lock_resource_table();
+ let stream = table.get_mut::<TcpStream>(rid)?;
+ stream.0.poll_read(&mut zero_copy_buf)
+ })
+ .and_then(move |nread| {
+ debug!("read success {}", nread);
+ Ok(nread as i32)
+ });
+ Box::new(fut)
}
fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> {
- let rid = record.arg;
+ let rid = record.arg as u32;
debug!("write rid={}", rid);
let zero_copy_buf = zero_copy_buf.unwrap();
- Box::new(
- futures::future::poll_fn(move || {
- let mut table = RESOURCE_TABLE.lock().unwrap();
- let maybe_repr = table.get_mut(&rid);
- match maybe_repr {
- Some(Repr::TcpStream(ref mut stream)) => {
- stream.poll_write(&zero_copy_buf)
- }
- _ => panic!("bad rid"),
- }
- })
- .and_then(move |nwritten| {
- debug!("write success {}", nwritten);
- Ok(nwritten as i32)
- }),
- )
+ let fut = futures::future::poll_fn(move || {
+ let mut table = lock_resource_table();
+ let stream = table.get_mut::<TcpStream>(rid)?;
+ stream.0.poll_write(&zero_copy_buf)
+ })
+ .and_then(move |nwritten| {
+ debug!("write success {}", nwritten);
+ Ok(nwritten as i32)
+ });
+ Box::new(fut)
}
fn js_check(r: Result<(), ErrBox>) {