diff options
Diffstat (limited to 'core/http_bench.rs')
-rw-r--r-- | core/http_bench.rs | 254 |
1 files changed, 171 insertions, 83 deletions
diff --git a/core/http_bench.rs b/core/http_bench.rs index 3da30433a..8e5a0e10c 100644 --- a/core/http_bench.rs +++ b/core/http_bench.rs @@ -12,18 +12,11 @@ extern crate log; #[macro_use] extern crate lazy_static; -use deno_core::deno_buf; -use deno_core::AsyncResult; -use deno_core::Isolate; -use deno_core::JSError; -use deno_core::Op; -use deno_core::RECORD_OFFSET_ARG; -use deno_core::RECORD_OFFSET_OP; -use deno_core::RECORD_OFFSET_PROMISE_ID; -use deno_core::RECORD_OFFSET_RESULT; +use deno_core::*; use futures::future::lazy; use std::collections::HashMap; use std::env; +use std::mem; use std::net::SocketAddr; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -36,11 +29,148 @@ const OP_READ: i32 = 3; const OP_WRITE: i32 = 4; const OP_CLOSE: i32 = 5; +const INDEX_START: usize = 0; +const INDEX_END: usize = 1; + +const NUM_RECORDS: usize = 128; +const RECORD_SIZE: usize = 4; + +#[derive(Clone, Debug)] +pub struct Record { + pub promise_id: i32, + pub op_id: i32, + pub arg: i32, + pub result: i32, +} + +pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send; + +struct HttpBench { + shared32: Vec<i32>, +} + +impl HttpBench { + fn new() -> Self { + let mut shared32 = Vec::<i32>::new(); + let n = 2 + 4 * NUM_RECORDS; + shared32.resize(n, 0); + shared32[INDEX_START] = 0; + shared32[INDEX_END] = 0; + Self { shared32 } + } +} + +fn idx(i: usize, off: usize) -> usize { + 2 + i * RECORD_SIZE + off +} + +impl Behavior<Record> for HttpBench { + fn startup_snapshot(&mut self) -> Option<deno_buf> { + None + } + + fn startup_shared(&mut self) -> Option<deno_buf> { + let ptr = self.shared32.as_ptr() as *const u8; + let len = mem::size_of::<i32>() * self.shared32.len(); + Some(unsafe { deno_buf::from_raw_parts(ptr, len) }) + } + + fn resolve(&mut self, _specifier: &str, _referrer: deno_mod) -> deno_mod { + // HttpBench doesn't do ES modules. + unimplemented!() + } + + fn recv( + &mut self, + record: Record, + zero_copy_buf: deno_buf, + ) -> (bool, Box<Op<Record>>) { + let is_sync = record.promise_id == 0; + let http_bench_op = match record.op_id { + OP_LISTEN => { + assert!(is_sync); + op_listen() + } + OP_CLOSE => { + assert!(is_sync); + let rid = record.arg; + op_close(rid) + } + OP_ACCEPT => { + assert!(!is_sync); + let listener_rid = record.arg; + op_accept(listener_rid) + } + OP_READ => { + assert!(!is_sync); + let rid = record.arg; + op_read(rid, zero_copy_buf) + } + OP_WRITE => { + assert!(!is_sync); + let rid = record.arg; + op_write(rid, zero_copy_buf) + } + _ => panic!("bad op {}", record.op_id), + }; + let mut record_a = record.clone(); + let mut record_b = record.clone(); + + let op = Box::new( + http_bench_op + .and_then(move |result| { + record_a.result = result; + Ok(record_a) + }).or_else(|err| -> Result<Record, ()> { + eprintln!("unexpected err {}", err); + record_b.result = -1; + Ok(record_b) + }), + ); + (is_sync, op) + } + + fn records_reset(&mut self) { + self.shared32[INDEX_START] = 0; + self.shared32[INDEX_END] = 0; + } + + fn records_push(&mut self, record: Record) -> bool { + debug!("push {:?}", record); + let i = self.shared32[INDEX_END] as usize; + if i >= NUM_RECORDS { + return false; + } + self.shared32[idx(i, 0)] = record.promise_id; + self.shared32[idx(i, 1)] = record.op_id; + self.shared32[idx(i, 2)] = record.arg; + self.shared32[idx(i, 3)] = record.result; + self.shared32[INDEX_END] += 1; + true + } + + fn records_shift(&mut self) -> Option<Record> { + let i = self.shared32[INDEX_START] as usize; + if i == self.shared32[INDEX_END] as usize { + return None; + } + let record = Record { + promise_id: self.shared32[idx(i, 0)], + op_id: self.shared32[idx(i, 1)], + arg: self.shared32[idx(i, 2)], + result: self.shared32[idx(i, 3)], + }; + self.shared32[INDEX_START] += 1; + Some(record) + } +} + fn main() { let js_source = include_str!("http_bench.js"); - let isolate = deno_core::Isolate::new(recv_cb); let main_future = lazy(move || { + let isolate = deno_core::Isolate::new(HttpBench::new()); + // TODO currently isolate.execute() must be run inside tokio, hence the // lazy(). It would be nice to not have that contraint. Probably requires // using v8::MicrotasksPolicy::kExplicit @@ -77,69 +207,7 @@ fn new_rid() -> i32 { rid as i32 } -fn recv_cb(isolate: &mut Isolate, zero_copy_buf: deno_buf) { - isolate.test_send_counter += 1; // TODO ideally store this in isolate.state? - - let promise_id = isolate.shared.get_record(0, RECORD_OFFSET_PROMISE_ID); - let op_id = isolate.shared.get_record(0, RECORD_OFFSET_OP); - let arg = isolate.shared.get_record(0, RECORD_OFFSET_ARG); - - // dbg!(promise_id); - // dbg!(op_id); - // dbg!(arg); - - let is_sync = promise_id == 0; - - if is_sync { - // sync ops - match op_id { - OP_CLOSE => { - debug!("close"); - assert!(is_sync); - let mut table = RESOURCE_TABLE.lock().unwrap(); - let r = table.remove(&arg); - isolate.shared.set_record( - 0, - RECORD_OFFSET_RESULT, - if r.is_some() { 0 } else { -1 }, - ); - } - OP_LISTEN => { - debug!("listen"); - assert!(is_sync); - - let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); - let listener = tokio::net::TcpListener::bind(&addr).unwrap(); - let rid = new_rid(); - isolate.shared.set_record(0, RECORD_OFFSET_RESULT, rid); - let mut guard = RESOURCE_TABLE.lock().unwrap(); - guard.insert(rid, Repr::TcpListener(listener)); - } - _ => panic!("bad op"), - } - } else { - // async ops - let zero_copy_id = zero_copy_buf.zero_copy_id; - let op = match op_id { - OP_ACCEPT => { - let listener_rid = arg; - op_accept(listener_rid) - } - OP_READ => { - let rid = arg; - op_read(rid, zero_copy_buf) - } - OP_WRITE => { - let rid = arg; - op_write(rid, zero_copy_buf) - } - _ => panic!("bad op"), - }; - isolate.add_op(promise_id, op, zero_copy_id); - } -} - -fn op_accept(listener_rid: i32) -> Box<Op> { +fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> { debug!("accept {}", listener_rid); Box::new( futures::future::poll_fn(move || { @@ -147,7 +215,7 @@ fn op_accept(listener_rid: i32) -> Box<Op> { let maybe_repr = table.get_mut(&listener_rid); match maybe_repr { Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(), - _ => panic!("bad rid"), + _ => panic!("bad rid {}", listener_rid), } }).and_then(move |(stream, addr)| { debug!("accept success {}", addr); @@ -156,12 +224,36 @@ fn op_accept(listener_rid: i32) -> Box<Op> { let mut guard = RESOURCE_TABLE.lock().unwrap(); guard.insert(rid, Repr::TcpStream(stream)); - Ok(AsyncResult { result: rid }) + Ok(rid as i32) }), ) } -fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<Op> { +fn op_listen() -> Box<HttpBenchOp> { + debug!("listen"); + + Box::new(lazy(move || { + let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); + let listener = tokio::net::TcpListener::bind(&addr).unwrap(); + let rid = new_rid(); + + let mut guard = RESOURCE_TABLE.lock().unwrap(); + guard.insert(rid, Repr::TcpListener(listener)); + futures::future::ok(rid) + })) +} + +fn op_close(rid: i32) -> Box<HttpBenchOp> { + debug!("close"); + Box::new(lazy(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let r = table.remove(&rid); + let result = if r.is_some() { 0 } else { -1 }; + futures::future::ok(result) + })) +} + +fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<HttpBenchOp> { debug!("read rid={}", rid); Box::new( futures::future::poll_fn(move || { @@ -175,14 +267,12 @@ fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<Op> { } }).and_then(move |nread| { debug!("read success {}", nread); - Ok(AsyncResult { - result: nread as i32, - }) + Ok(nread as i32) }), ) } -fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<Op> { +fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<HttpBenchOp> { debug!("write rid={}", rid); Box::new( futures::future::poll_fn(move || { @@ -196,9 +286,7 @@ fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<Op> { } }).and_then(move |nwritten| { debug!("write success {}", nwritten); - Ok(AsyncResult { - result: nwritten as i32, - }) + Ok(nwritten as i32) }), ) } |