summaryrefslogtreecommitdiff
path: root/core/http_bench.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/http_bench.rs')
-rw-r--r--core/http_bench.rs254
1 files changed, 171 insertions, 83 deletions
diff --git a/core/http_bench.rs b/core/http_bench.rs
index 3da30433a..8e5a0e10c 100644
--- a/core/http_bench.rs
+++ b/core/http_bench.rs
@@ -12,18 +12,11 @@ extern crate log;
#[macro_use]
extern crate lazy_static;
-use deno_core::deno_buf;
-use deno_core::AsyncResult;
-use deno_core::Isolate;
-use deno_core::JSError;
-use deno_core::Op;
-use deno_core::RECORD_OFFSET_ARG;
-use deno_core::RECORD_OFFSET_OP;
-use deno_core::RECORD_OFFSET_PROMISE_ID;
-use deno_core::RECORD_OFFSET_RESULT;
+use deno_core::*;
use futures::future::lazy;
use std::collections::HashMap;
use std::env;
+use std::mem;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@@ -36,11 +29,148 @@ const OP_READ: i32 = 3;
const OP_WRITE: i32 = 4;
const OP_CLOSE: i32 = 5;
+const INDEX_START: usize = 0;
+const INDEX_END: usize = 1;
+
+const NUM_RECORDS: usize = 128;
+const RECORD_SIZE: usize = 4;
+
+#[derive(Clone, Debug)]
+pub struct Record {
+ pub promise_id: i32,
+ pub op_id: i32,
+ pub arg: i32,
+ pub result: i32,
+}
+
+pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
+
+struct HttpBench {
+ shared32: Vec<i32>,
+}
+
+impl HttpBench {
+ fn new() -> Self {
+ let mut shared32 = Vec::<i32>::new();
+ let n = 2 + 4 * NUM_RECORDS;
+ shared32.resize(n, 0);
+ shared32[INDEX_START] = 0;
+ shared32[INDEX_END] = 0;
+ Self { shared32 }
+ }
+}
+
+fn idx(i: usize, off: usize) -> usize {
+ 2 + i * RECORD_SIZE + off
+}
+
+impl Behavior<Record> for HttpBench {
+ fn startup_snapshot(&mut self) -> Option<deno_buf> {
+ None
+ }
+
+ fn startup_shared(&mut self) -> Option<deno_buf> {
+ let ptr = self.shared32.as_ptr() as *const u8;
+ let len = mem::size_of::<i32>() * self.shared32.len();
+ Some(unsafe { deno_buf::from_raw_parts(ptr, len) })
+ }
+
+ fn resolve(&mut self, _specifier: &str, _referrer: deno_mod) -> deno_mod {
+ // HttpBench doesn't do ES modules.
+ unimplemented!()
+ }
+
+ fn recv(
+ &mut self,
+ record: Record,
+ zero_copy_buf: deno_buf,
+ ) -> (bool, Box<Op<Record>>) {
+ let is_sync = record.promise_id == 0;
+ let http_bench_op = match record.op_id {
+ OP_LISTEN => {
+ assert!(is_sync);
+ op_listen()
+ }
+ OP_CLOSE => {
+ assert!(is_sync);
+ let rid = record.arg;
+ op_close(rid)
+ }
+ OP_ACCEPT => {
+ assert!(!is_sync);
+ let listener_rid = record.arg;
+ op_accept(listener_rid)
+ }
+ OP_READ => {
+ assert!(!is_sync);
+ let rid = record.arg;
+ op_read(rid, zero_copy_buf)
+ }
+ OP_WRITE => {
+ assert!(!is_sync);
+ let rid = record.arg;
+ op_write(rid, zero_copy_buf)
+ }
+ _ => panic!("bad op {}", record.op_id),
+ };
+ let mut record_a = record.clone();
+ let mut record_b = record.clone();
+
+ let op = Box::new(
+ http_bench_op
+ .and_then(move |result| {
+ record_a.result = result;
+ Ok(record_a)
+ }).or_else(|err| -> Result<Record, ()> {
+ eprintln!("unexpected err {}", err);
+ record_b.result = -1;
+ Ok(record_b)
+ }),
+ );
+ (is_sync, op)
+ }
+
+ fn records_reset(&mut self) {
+ self.shared32[INDEX_START] = 0;
+ self.shared32[INDEX_END] = 0;
+ }
+
+ fn records_push(&mut self, record: Record) -> bool {
+ debug!("push {:?}", record);
+ let i = self.shared32[INDEX_END] as usize;
+ if i >= NUM_RECORDS {
+ return false;
+ }
+ self.shared32[idx(i, 0)] = record.promise_id;
+ self.shared32[idx(i, 1)] = record.op_id;
+ self.shared32[idx(i, 2)] = record.arg;
+ self.shared32[idx(i, 3)] = record.result;
+ self.shared32[INDEX_END] += 1;
+ true
+ }
+
+ fn records_shift(&mut self) -> Option<Record> {
+ let i = self.shared32[INDEX_START] as usize;
+ if i == self.shared32[INDEX_END] as usize {
+ return None;
+ }
+ let record = Record {
+ promise_id: self.shared32[idx(i, 0)],
+ op_id: self.shared32[idx(i, 1)],
+ arg: self.shared32[idx(i, 2)],
+ result: self.shared32[idx(i, 3)],
+ };
+ self.shared32[INDEX_START] += 1;
+ Some(record)
+ }
+}
+
fn main() {
let js_source = include_str!("http_bench.js");
- let isolate = deno_core::Isolate::new(recv_cb);
let main_future = lazy(move || {
+ let isolate = deno_core::Isolate::new(HttpBench::new());
+
// TODO currently isolate.execute() must be run inside tokio, hence the
// lazy(). It would be nice to not have that contraint. Probably requires
// using v8::MicrotasksPolicy::kExplicit
@@ -77,69 +207,7 @@ fn new_rid() -> i32 {
rid as i32
}
-fn recv_cb(isolate: &mut Isolate, zero_copy_buf: deno_buf) {
- isolate.test_send_counter += 1; // TODO ideally store this in isolate.state?
-
- let promise_id = isolate.shared.get_record(0, RECORD_OFFSET_PROMISE_ID);
- let op_id = isolate.shared.get_record(0, RECORD_OFFSET_OP);
- let arg = isolate.shared.get_record(0, RECORD_OFFSET_ARG);
-
- // dbg!(promise_id);
- // dbg!(op_id);
- // dbg!(arg);
-
- let is_sync = promise_id == 0;
-
- if is_sync {
- // sync ops
- match op_id {
- OP_CLOSE => {
- debug!("close");
- assert!(is_sync);
- let mut table = RESOURCE_TABLE.lock().unwrap();
- let r = table.remove(&arg);
- isolate.shared.set_record(
- 0,
- RECORD_OFFSET_RESULT,
- if r.is_some() { 0 } else { -1 },
- );
- }
- OP_LISTEN => {
- debug!("listen");
- assert!(is_sync);
-
- let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
- let listener = tokio::net::TcpListener::bind(&addr).unwrap();
- let rid = new_rid();
- isolate.shared.set_record(0, RECORD_OFFSET_RESULT, rid);
- let mut guard = RESOURCE_TABLE.lock().unwrap();
- guard.insert(rid, Repr::TcpListener(listener));
- }
- _ => panic!("bad op"),
- }
- } else {
- // async ops
- let zero_copy_id = zero_copy_buf.zero_copy_id;
- let op = match op_id {
- OP_ACCEPT => {
- let listener_rid = arg;
- op_accept(listener_rid)
- }
- OP_READ => {
- let rid = arg;
- op_read(rid, zero_copy_buf)
- }
- OP_WRITE => {
- let rid = arg;
- op_write(rid, zero_copy_buf)
- }
- _ => panic!("bad op"),
- };
- isolate.add_op(promise_id, op, zero_copy_id);
- }
-}
-
-fn op_accept(listener_rid: i32) -> Box<Op> {
+fn op_accept(listener_rid: i32) -> Box<HttpBenchOp> {
debug!("accept {}", listener_rid);
Box::new(
futures::future::poll_fn(move || {
@@ -147,7 +215,7 @@ fn op_accept(listener_rid: i32) -> Box<Op> {
let maybe_repr = table.get_mut(&listener_rid);
match maybe_repr {
Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(),
- _ => panic!("bad rid"),
+ _ => panic!("bad rid {}", listener_rid),
}
}).and_then(move |(stream, addr)| {
debug!("accept success {}", addr);
@@ -156,12 +224,36 @@ fn op_accept(listener_rid: i32) -> Box<Op> {
let mut guard = RESOURCE_TABLE.lock().unwrap();
guard.insert(rid, Repr::TcpStream(stream));
- Ok(AsyncResult { result: rid })
+ Ok(rid as i32)
}),
)
}
-fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<Op> {
+fn op_listen() -> Box<HttpBenchOp> {
+ debug!("listen");
+
+ Box::new(lazy(move || {
+ let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
+ let listener = tokio::net::TcpListener::bind(&addr).unwrap();
+ let rid = new_rid();
+
+ let mut guard = RESOURCE_TABLE.lock().unwrap();
+ guard.insert(rid, Repr::TcpListener(listener));
+ futures::future::ok(rid)
+ }))
+}
+
+fn op_close(rid: i32) -> Box<HttpBenchOp> {
+ debug!("close");
+ Box::new(lazy(move || {
+ let mut table = RESOURCE_TABLE.lock().unwrap();
+ let r = table.remove(&rid);
+ let result = if r.is_some() { 0 } else { -1 };
+ futures::future::ok(result)
+ }))
+}
+
+fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<HttpBenchOp> {
debug!("read rid={}", rid);
Box::new(
futures::future::poll_fn(move || {
@@ -175,14 +267,12 @@ fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<Op> {
}
}).and_then(move |nread| {
debug!("read success {}", nread);
- Ok(AsyncResult {
- result: nread as i32,
- })
+ Ok(nread as i32)
}),
)
}
-fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<Op> {
+fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<HttpBenchOp> {
debug!("write rid={}", rid);
Box::new(
futures::future::poll_fn(move || {
@@ -196,9 +286,7 @@ fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<Op> {
}
}).and_then(move |nwritten| {
debug!("write success {}", nwritten);
- Ok(AsyncResult {
- result: nwritten as i32,
- })
+ Ok(nwritten as i32)
}),
)
}