diff options
author | Bert Belder <bertbelder@gmail.com> | 2020-02-06 15:34:40 -0800 |
---|---|---|
committer | Bert Belder <bertbelder@gmail.com> | 2020-02-07 18:17:03 -0800 |
commit | f650c3edb3081ac0433b338c0c99418171656dd1 (patch) | |
tree | c97529873ec2f4c494a8efb1638886527a31c4f9 /core/examples | |
parent | 25467aa7c7852c3695091a547ffac4857478c037 (diff) |
Refactor deno_core_http_bench and make it single-threaded (#3903)
Diffstat (limited to 'core/examples')
-rw-r--r-- | core/examples/http_bench.js | 14 | ||||
-rw-r--r-- | core/examples/http_bench.rs | 472 |
2 files changed, 201 insertions, 285 deletions
diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 9473b6880..abe81e41e 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -36,18 +36,18 @@ const scratchBytes = new Uint8Array( ); assert(scratchBytes.byteLength === 3 * 4); -function send(promiseId, opId, arg, zeroCopy = null) { +function send(promiseId, opId, rid, zeroCopy = null) { scratch32[0] = promiseId; - scratch32[1] = arg; + scratch32[1] = rid; scratch32[2] = -1; return Deno.core.dispatch(opId, scratchBytes, zeroCopy); } /** Returns Promise<number> */ -function sendAsync(opId, arg, zeroCopy = null) { +function sendAsync(opId, rid, zeroCopy = null) { const promiseId = nextPromiseId++; const p = createResolvable(); - const buf = send(promiseId, opId, arg, zeroCopy); + const buf = send(promiseId, opId, rid, zeroCopy); if (buf) { const record = recordFromBuf(buf); // Sync result. @@ -60,8 +60,8 @@ function sendAsync(opId, arg, zeroCopy = null) { } /** Returns i32 number */ -function sendSync(opId, arg) { - const buf = send(0, opId, arg); +function sendSync(opId, rid) { + const buf = send(0, opId, rid); const record = recordFromBuf(buf); return record[2]; } @@ -131,7 +131,7 @@ async function main() { Deno.core.print("http_bench.js start\n"); const listenerRid = listen(); - Deno.core.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`); + Deno.core.print(`listening http://127.0.0.1:4544/ rid=${listenerRid}\n`); while (true) { const rid = await accept(listenerRid); // Deno.core.print(`accepted ${rid}`); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index fa570acfb..9b1edd9bf 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -2,32 +2,33 @@ /// /// > DENO_BUILD_MODE=release ./tools/build.py && \ /// ./target/release/deno_core_http_bench --multi-thread -extern crate deno_core; -extern crate futures; -extern crate libc; -extern crate num_cpus; -extern crate tokio; #[macro_use] -extern crate log; +extern crate derive_deref; #[macro_use] -extern crate lazy_static; +extern crate log; +use deno_core::Isolate as CoreIsolate; use deno_core::*; -use futures::future::Future; -use futures::future::FutureExt; -use futures::task::{Context, Poll}; +use futures::future::poll_fn; +use futures::prelude::*; +use futures::task::Context; +use futures::task::Poll; +use std::cell::RefCell; +use std::convert::TryInto; use std::env; +use std::fmt::Debug; use std::io::Error; use std::io::ErrorKind; +use std::mem::size_of; use std::net::SocketAddr; use std::pin::Pin; -use std::sync::Mutex; -use std::sync::MutexGuard; +use std::ptr; +use std::rc::Rc; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; - -static LOGGER: Logger = Logger; +use tokio::net::TcpListener; +use tokio::net::TcpStream; struct Logger; @@ -35,330 +36,245 @@ impl log::Log for Logger { fn enabled(&self, metadata: &log::Metadata) -> bool { metadata.level() <= log::max_level() } + fn log(&self, record: &log::Record) { if self.enabled(record.metadata()) { println!("{} - {}", record.level(), record.args()); } } + fn flush(&self) {} } -#[derive(Clone, Debug, PartialEq)] -pub struct Record { - pub promise_id: i32, - pub arg: i32, +#[derive(Copy, Clone, Debug, PartialEq)] +struct Record { + pub promise_id: u32, + pub rid: u32, pub result: i32, } -impl Into<Buf> for Record { - fn into(self) -> Buf { - let buf32 = vec![self.promise_id, self.arg, self.result].into_boxed_slice(); - let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4]; - unsafe { Box::from_raw(ptr) } - } -} +type RecordBuf = [u8; size_of::<Record>()]; impl From<&[u8]> for Record { - fn from(s: &[u8]) -> Record { - #[allow(clippy::cast_ptr_alignment)] - let ptr = s.as_ptr() as *const i32; - let ints = unsafe { std::slice::from_raw_parts(ptr, 3) }; - Record { - promise_id: ints[0], - arg: ints[1], - result: ints[2], - } + fn from(buf: &[u8]) -> Self { + assert_eq!(buf.len(), size_of::<RecordBuf>()); + unsafe { *(buf as *const _ as *const RecordBuf) }.into() } } -impl From<Buf> for Record { - fn from(buf: Buf) -> Record { - assert_eq!(buf.len(), 3 * 4); - #[allow(clippy::cast_ptr_alignment)] - let ptr = Box::into_raw(buf) as *mut [i32; 3]; - let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) }; - assert_eq!(ints.len(), 3); - Record { - promise_id: ints[0], - arg: ints[1], - result: ints[2], +impl From<RecordBuf> for Record { + fn from(buf: RecordBuf) -> Self { + unsafe { + #[allow(clippy::cast_ptr_alignment)] + ptr::read_unaligned(&buf as *const _ as *const Self) } } } -#[test] -fn test_record_from() { - let r = Record { - promise_id: 1, - arg: 3, - result: 4, - }; - let expected = r.clone(); - let buf: Buf = r.into(); - #[cfg(target_endian = "little")] - assert_eq!( - buf, - vec![1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice() - ); - let actual = Record::from(buf); - assert_eq!(actual, expected); - // TODO test From<&[u8]> for Record -} - -pub type HttpOp = dyn Future<Output = Result<i32, std::io::Error>> + Send; - -pub type HttpOpHandler = - fn(record: Record, zero_copy_buf: Option<ZeroCopyBuf>) -> Pin<Box<HttpOp>>; - -fn http_op( - handler: HttpOpHandler, -) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp { - move |control: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> CoreOp { - let record = Record::from(control); - let is_sync = record.promise_id == 0; - let op = handler(record.clone(), zero_copy_buf); - let mut record_a = record; - - let fut = async move { - match op.await { - Ok(result) => record_a.result = result, - Err(err) => { - eprintln!("unexpected err {}", err); - record_a.result = -1; - } - }; - Ok(record_a.into()) - }; - - if is_sync { - Op::Sync(futures::executor::block_on(fut).unwrap()) - } else { - Op::Async(fut.boxed()) - } +impl From<Record> for RecordBuf { + fn from(record: Record) -> Self { + unsafe { ptr::read(&record as *const _ as *const Self) } } } -fn main() { - let args: Vec<String> = env::args().collect(); - // NOTE: `--help` arg will display V8 help and exit - let args = deno_core::v8_set_flags(args); - - log::set_logger(&LOGGER).unwrap(); - log::set_max_level(if args.iter().any(|a| a == "-D") { - log::LevelFilter::Debug - } else { - log::LevelFilter::Warn - }); - - let js_source = include_str!("http_bench.js"); - - let startup_data = StartupData::Script(Script { - source: js_source, - filename: "http_bench.js", - }); - - let isolate = deno_core::Isolate::new(startup_data, false); - isolate.register_op("listen", http_op(op_listen)); - isolate.register_op("accept", http_op(op_accept)); - isolate.register_op("read", http_op(op_read)); - isolate.register_op("write", http_op(op_write)); - isolate.register_op("close", http_op(op_close)); - - println!( - "num cpus; logical: {}; physical: {}", - num_cpus::get(), - num_cpus::get_physical() - ); - - let mut runtime = tokio::runtime::Builder::new() - .basic_scheduler() - .enable_all() - .build() - .unwrap(); - let result = runtime.block_on(isolate.boxed_local()); - js_check(result); +struct Isolate { + core_isolate: Box<CoreIsolate>, // Unclear why CoreIsolate::new() returns a box. + state: State, } -pub fn bad_resource() -> Error { - Error::new(ErrorKind::NotFound, "bad resource id") -} +#[derive(Clone, Default, Deref)] +struct State(Rc<RefCell<StateInner>>); -struct TcpListener(tokio::net::TcpListener); +#[derive(Default)] +struct StateInner { + resource_table: ResourceTable, +} -impl Resource for TcpListener {} +impl Isolate { + pub fn new() -> Self { + let startup_data = StartupData::Script(Script { + source: include_str!("http_bench.js"), + filename: "http_bench.js", + }); -struct TcpStream(tokio::net::TcpStream); + let mut isolate = Self { + core_isolate: CoreIsolate::new(startup_data, false), + state: Default::default(), + }; -impl Resource for TcpStream {} + isolate.register_op("listen", op_listen); + isolate.register_op("accept", op_accept); + isolate.register_op("read", op_read); + isolate.register_op("write", op_write); + isolate.register_op("close", op_close); -lazy_static! { - static ref RESOURCE_TABLE: Mutex<ResourceTable> = - Mutex::new(ResourceTable::default()); -} + isolate + } -fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> { - RESOURCE_TABLE.lock().unwrap() -} + fn register_op<F>( + &mut self, + name: &'static str, + handler: impl Fn(State, u32, Option<ZeroCopyBuf>) -> F + Copy + 'static, + ) where + F: TryFuture, + F::Ok: TryInto<i32>, + <F::Ok as TryInto<i32>>::Error: Debug, + { + let state = self.state.clone(); + let core_handler = + move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> CoreOp { + let state = state.clone(); + let record = Record::from(control_buf); + let is_sync = record.promise_id == 0; + + let fut = async move { + let op = handler(state, record.rid, zero_copy_buf); + let result = op + .map_ok(|r| r.try_into().expect("op result does not fit in i32")) + .unwrap_or_else(|_| -1) + .await; + Ok(RecordBuf::from(Record { result, ..record })[..].into()) + }; + + if is_sync { + Op::Sync(futures::executor::block_on(fut).unwrap()) + } else { + Op::Async(fut.boxed_local()) + } + }; -struct Accept { - rid: ResourceId, + self.core_isolate.register_op(name, core_handler); + } } -impl Future for Accept { - type Output = Result<(tokio::net::TcpStream, SocketAddr), std::io::Error>; +impl Future for Isolate { + type Output = <CoreIsolate as Future>::Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - - let mut table = lock_resource_table(); - match table.get_mut::<TcpListener>(inner.rid) { - None => Poll::Ready(Err(bad_resource())), - Some(listener) => { - let listener = &mut listener.0; - listener.poll_accept(cx) - } - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + self.core_isolate.poll_unpin(cx) } } -fn op_accept( - record: Record, - _zero_copy_buf: Option<ZeroCopyBuf>, -) -> Pin<Box<HttpOp>> { - let rid = record.arg as u32; - debug!("accept {}", rid); - - let fut = async move { - let (stream, addr) = Accept { rid }.await?; - debug!("accept success {}", addr); - let mut table = lock_resource_table(); - let rid = table.add("tcpStream", Box::new(TcpStream(stream))); - Ok(rid as i32) - }; - - fut.boxed() +fn op_close( + state: State, + rid: u32, + _buf: Option<ZeroCopyBuf>, +) -> impl TryFuture<Ok = u32, Error = Error> { + debug!("close rid={}", rid); + + async move { + let resource_table = &mut state.borrow_mut().resource_table; + resource_table + .close(rid) + .map(|_| 0) + .ok_or_else(bad_resource) + } } fn op_listen( - _record: Record, - _zero_copy_buf: Option<ZeroCopyBuf>, -) -> Pin<Box<HttpOp>> { + state: State, + _rid: u32, + _buf: Option<ZeroCopyBuf>, +) -> impl TryFuture<Ok = u32, Error = Error> { debug!("listen"); - let fut = async { + + async move { let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let listener = tokio::net::TcpListener::bind(&addr).await?; - let mut table = lock_resource_table(); - let rid = table.add("tcpListener", Box::new(TcpListener(listener))); - Ok(rid as i32) - }; - - fut.boxed() -} - -fn op_close( - record: Record, - _zero_copy_buf: Option<ZeroCopyBuf>, -) -> Pin<Box<HttpOp>> { - debug!("close"); - let fut = async move { - let rid = record.arg as u32; - let mut table = lock_resource_table(); - match table.close(rid) { - Some(_) => Ok(0), - None => Err(bad_resource()), - } - }; - fut.boxed() -} - -struct Read { - rid: ResourceId, - buf: ZeroCopyBuf, + let resource_table = &mut state.borrow_mut().resource_table; + let rid = resource_table.add("tcpListener", Box::new(listener)); + Ok(rid) + } } -impl Future for Read { - type Output = Result<usize, std::io::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let mut table = lock_resource_table(); - - match table.get_mut::<TcpStream>(inner.rid) { - None => Poll::Ready(Err(bad_resource())), - Some(stream) => { - let pinned_stream = Pin::new(&mut stream.0); - pinned_stream.poll_read(cx, &mut inner.buf) - } - } - } +fn op_accept( + state: State, + rid: u32, + _buf: Option<ZeroCopyBuf>, +) -> impl TryFuture<Ok = u32, Error = Error> { + debug!("accept rid={}", rid); + + poll_fn(move |cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let listener = resource_table + .get_mut::<TcpListener>(rid) + .ok_or_else(bad_resource)?; + listener.poll_accept(cx).map_ok(|(stream, _addr)| { + resource_table.add("tcpStream", Box::new(stream)) + }) + }) } fn op_read( - record: Record, - zero_copy_buf: Option<ZeroCopyBuf>, -) -> Pin<Box<HttpOp>> { - let rid = record.arg as u32; + state: State, + rid: u32, + buf: Option<ZeroCopyBuf>, +) -> impl TryFuture<Ok = usize, Error = Error> { + let mut buf = buf.unwrap(); debug!("read rid={}", rid); - let zero_copy_buf = zero_copy_buf.unwrap(); - - let fut = async move { - let nread = Read { - rid, - buf: zero_copy_buf, - } - .await?; - debug!("read success {}", nread); - Ok(nread as i32) - }; - - fut.boxed() -} -struct Write { - rid: ResourceId, - buf: ZeroCopyBuf, + poll_fn(move |cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let stream = resource_table + .get_mut::<TcpStream>(rid) + .ok_or_else(bad_resource)?; + Pin::new(stream).poll_read(cx, &mut buf) + }) } -impl Future for Write { - type Output = Result<usize, std::io::Error>; +fn op_write( + state: State, + rid: u32, + buf: Option<ZeroCopyBuf>, +) -> impl TryFuture<Ok = usize, Error = Error> { + let buf = buf.unwrap(); + debug!("write rid={}", rid); - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { - let inner = self.get_mut(); - let mut table = lock_resource_table(); + poll_fn(move |cx| { + let resource_table = &mut state.borrow_mut().resource_table; + let stream = resource_table + .get_mut::<TcpStream>(rid) + .ok_or_else(bad_resource)?; + Pin::new(stream).poll_write(cx, &buf) + }) +} - match table.get_mut::<TcpStream>(inner.rid) { - None => Poll::Ready(Err(bad_resource())), - Some(stream) => { - let pinned_stream = Pin::new(&mut stream.0); - pinned_stream.poll_write(cx, &inner.buf) - } - } - } +fn bad_resource() -> Error { + Error::new(ErrorKind::NotFound, "bad resource id") } -fn op_write( - record: Record, - zero_copy_buf: Option<ZeroCopyBuf>, -) -> Pin<Box<HttpOp>> { - let rid = record.arg as u32; - debug!("write rid={}", rid); - let zero_copy_buf = zero_copy_buf.unwrap(); +fn main() { + log::set_logger(&Logger).unwrap(); + log::set_max_level( + env::args() + .find(|a| a == "-D") + .map(|_| log::LevelFilter::Debug) + .unwrap_or(log::LevelFilter::Warn), + ); - let fut = async move { - let nwritten = Write { - rid, - buf: zero_copy_buf, - } - .await?; - debug!("write success {}", nwritten); - Ok(nwritten as i32) - }; + // NOTE: `--help` arg will display V8 help and exit + deno_core::v8_set_flags(env::args().collect()); - fut.boxed() + let isolate = Isolate::new(); + let mut runtime = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + runtime.block_on(isolate).expect("unexpected isolate error"); } -fn js_check(r: Result<(), ErrBox>) { - if let Err(e) = r { - panic!(e.to_string()); +#[test] +fn test_record_from() { + let expected = Record { + promise_id: 1, + rid: 3, + result: 4, + }; + let buf = RecordBuf::from(expected); + if cfg!(target_endian = "little") { + assert_eq!(buf, [1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]); } + let actual = Record::from(buf); + assert_eq!(actual, expected); } |