diff options
Diffstat (limited to 'core/examples/http_bench.rs')
-rw-r--r-- | core/examples/http_bench.rs | 144 |
1 files changed, 60 insertions, 84 deletions
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index c019d8a11..a6cc6d548 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -14,12 +14,10 @@ extern crate lazy_static; use deno::*; use futures::future::lazy; -use std::collections::HashMap; use std::env; use std::net::SocketAddr; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; use std::sync::Mutex; +use std::sync::MutexGuard; use tokio::prelude::*; static LOGGER: Logger = Logger; @@ -184,44 +182,38 @@ fn main() { } } -enum Repr { - TcpListener(tokio::net::TcpListener), - TcpStream(tokio::net::TcpStream), -} +struct TcpListener(tokio::net::TcpListener); + +impl Resource for TcpListener {} + +struct TcpStream(tokio::net::TcpStream); + +impl Resource for TcpStream {} -type ResourceTable = HashMap<i32, Repr>; lazy_static! { - static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new(HashMap::new()); - static ref NEXT_RID: AtomicUsize = AtomicUsize::new(3); + static ref RESOURCE_TABLE: Mutex<ResourceTable> = + Mutex::new(ResourceTable::default()); } -fn new_rid() -> i32 { - let rid = NEXT_RID.fetch_add(1, Ordering::SeqCst); - rid as i32 +fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { + RESOURCE_TABLE.lock().unwrap() } fn op_accept(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { - let listener_rid = record.arg; - debug!("accept {}", listener_rid); - Box::new( - futures::future::poll_fn(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&listener_rid); - match maybe_repr { - Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(), - _ => panic!("bad rid {}", listener_rid), - } - }) - .and_then(move |(stream, addr)| { - debug!("accept success {}", addr); - let rid = new_rid(); - - let mut guard = RESOURCE_TABLE.lock().unwrap(); - guard.insert(rid, Repr::TcpStream(stream)); - - Ok(rid as i32) - }), - ) + let rid = record.arg as u32; + debug!("accept {}", rid); + let fut = futures::future::poll_fn(move || { + let mut table = lock_resource_table(); + let listener = table.get_mut::<TcpListener>(rid)?; + listener.0.poll_accept() + }) + .and_then(move |(stream, addr)| { + debug!("accept success {}", addr); + let mut table = lock_resource_table(); + let rid = table.add(Box::new(TcpStream(stream))); + Ok(rid as i32) + }); + Box::new(fut) } fn op_listen( @@ -229,70 +221,54 @@ fn op_listen( _zero_copy_buf: Option<PinnedBuf>, ) -> Box<HttpOp> { debug!("listen"); - Box::new(lazy(move || { - let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); - let listener = tokio::net::TcpListener::bind(&addr).unwrap(); - let rid = new_rid(); - - let mut guard = RESOURCE_TABLE.lock().unwrap(); - guard.insert(rid, Repr::TcpListener(listener)); - futures::future::ok(rid) - })) + let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); + let listener = tokio::net::TcpListener::bind(&addr).unwrap(); + let mut table = lock_resource_table(); + let rid = table.add(Box::new(TcpListener(listener))); + Box::new(futures::future::ok(rid as i32)) } fn op_close(record: Record, _zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { debug!("close"); - let rid = record.arg; - Box::new(lazy(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let r = table.remove(&rid); - let result = if r.is_some() { 0 } else { -1 }; - futures::future::ok(result) - })) + let rid = record.arg as u32; + let mut table = lock_resource_table(); + let fut = match table.close(rid) { + Ok(_) => futures::future::ok(0), + Err(e) => futures::future::err(e), + }; + Box::new(fut) } fn op_read(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { - let rid = record.arg; + let rid = record.arg as u32; debug!("read rid={}", rid); let mut zero_copy_buf = zero_copy_buf.unwrap(); - Box::new( - futures::future::poll_fn(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&rid); - match maybe_repr { - Some(Repr::TcpStream(ref mut stream)) => { - stream.poll_read(&mut zero_copy_buf) - } - _ => panic!("bad rid"), - } - }) - .and_then(move |nread| { - debug!("read success {}", nread); - Ok(nread as i32) - }), - ) + let fut = futures::future::poll_fn(move || { + let mut table = lock_resource_table(); + let stream = table.get_mut::<TcpStream>(rid)?; + stream.0.poll_read(&mut zero_copy_buf) + }) + .and_then(move |nread| { + debug!("read success {}", nread); + Ok(nread as i32) + }); + Box::new(fut) } fn op_write(record: Record, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpOp> { - let rid = record.arg; + let rid = record.arg as u32; debug!("write rid={}", rid); let zero_copy_buf = zero_copy_buf.unwrap(); - Box::new( - futures::future::poll_fn(move || { - let mut table = RESOURCE_TABLE.lock().unwrap(); - let maybe_repr = table.get_mut(&rid); - match maybe_repr { - Some(Repr::TcpStream(ref mut stream)) => { - stream.poll_write(&zero_copy_buf) - } - _ => panic!("bad rid"), - } - }) - .and_then(move |nwritten| { - debug!("write success {}", nwritten); - Ok(nwritten as i32) - }), - ) + let fut = futures::future::poll_fn(move || { + let mut table = lock_resource_table(); + let stream = table.get_mut::<TcpStream>(rid)?; + stream.0.poll_write(&zero_copy_buf) + }) + .and_then(move |nwritten| { + debug!("write success {}", nwritten); + Ok(nwritten as i32) + }); + Box::new(fut) } fn js_check(r: Result<(), ErrBox>) { |