diff options
Diffstat (limited to 'core/examples')
-rw-r--r-- | core/examples/http_bench_bin_ops.js | 92 | ||||
-rw-r--r-- | core/examples/http_bench_bin_ops.rs | 150 |
2 files changed, 28 insertions, 214 deletions
diff --git a/core/examples/http_bench_bin_ops.js b/core/examples/http_bench_bin_ops.js index f20366494..18f98419f 100644 --- a/core/examples/http_bench_bin_ops.js +++ b/core/examples/http_bench_bin_ops.js @@ -8,85 +8,15 @@ const responseBuf = new Uint8Array( .split("") .map((c) => c.charCodeAt(0)), ); -const promiseMap = new Map(); -let nextPromiseId = 1; - -function assert(cond) { - if (!cond) { - throw Error("assert"); - } -} - -function createResolvable() { - let resolve; - let reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - promise.resolve = resolve; - promise.reject = reject; - return promise; -} - -const scratch32 = new Int32Array(3); -const scratchBytes = new Uint8Array( - scratch32.buffer, - scratch32.byteOffset, - scratch32.byteLength, -); -assert(scratchBytes.byteLength === 3 * 4); - -function send(promiseId, opId, rid, ...zeroCopy) { - scratch32[0] = promiseId; - scratch32[1] = rid; - scratch32[2] = -1; - return Deno.core.dispatch(opId, scratchBytes, ...zeroCopy); -} - -/** Returns Promise<number> */ -function sendAsync(opId, rid, ...zeroCopy) { - const promiseId = nextPromiseId++; - const p = createResolvable(); - const buf = send(promiseId, opId, rid, ...zeroCopy); - if (buf) { - const record = recordFromBuf(buf); - // Sync result. - p.resolve(record.result); - } else { - // Async result. - promiseMap.set(promiseId, p); - } - return p; -} - -/** Returns i32 number */ -function sendSync(opId, rid) { - const buf = send(0, opId, rid); - const record = recordFromBuf(buf); - return record[2]; -} - -function recordFromBuf(buf) { - assert(buf.byteLength === 3 * 4); - return new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); -} - -function handleAsyncMsgFromRust(buf) { - const record = recordFromBuf(buf); - const p = promiseMap.get(record[0]); - promiseMap.delete(record[0]); - p.resolve(record[2]); -} /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(ops["listen"], -1); + return Deno.core.binOpSync("listen"); } /** Accepts a connection, returns rid. */ function accept(rid) { - return sendAsync(ops["accept"], rid); + return Deno.core.binOpAsync("accept", rid); } /** @@ -94,16 +24,16 @@ function accept(rid) { * Returns bytes read. */ function read(rid, data) { - return sendAsync(ops["read"], rid, data); + return Deno.core.binOpAsync("read", rid, data); } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ function write(rid, data) { - return sendAsync(ops["write"], rid, data); + return Deno.core.binOpAsync("write", rid, data); } function close(rid) { - return sendSync(ops["close"], rid); + Deno.core.binOpSync("close", rid); } async function serve(rid) { @@ -121,16 +51,14 @@ async function serve(rid) { close(rid); } -let ops; - async function main() { - ops = Deno.core.ops(); - for (const opName in ops) { - Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust); - } + Deno.core.ops(); + Deno.core.registerErrorClass("Error", Error); const listenerRid = listen(); - Deno.core.print(`http_bench_bin_ops listening on http://127.0.0.1:4544/\n`); + Deno.core.print( + `http_bench_bin_ops listening on http://127.0.0.1:4544/\n`, + ); for (;;) { const rid = await accept(listenerRid); diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index bc4ca4dce..1f649b235 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -3,30 +3,23 @@ #[macro_use] extern crate log; +use deno_core::error::bad_resource_id; +use deno_core::error::AnyError; use deno_core::AsyncRefCell; use deno_core::BufVec; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::JsRuntime; -use deno_core::Op; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; -use futures::future::FutureExt; -use futures::future::TryFuture; -use futures::future::TryFutureExt; use std::cell::RefCell; use std::convert::TryFrom; -use std::convert::TryInto; use std::env; -use std::fmt::Debug; use std::io::Error; -use std::io::ErrorKind; -use std::mem::size_of; use std::net::SocketAddr; -use std::ptr; use std::rc::Rc; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -120,52 +113,21 @@ impl From<tokio::net::TcpStream> for TcpStream { } } -#[derive(Copy, Clone, Debug, PartialEq)] -struct Record { - promise_id: u32, - rid: ResourceId, - result: i32, -} - -type RecordBuf = [u8; size_of::<Record>()]; - -impl From<&[u8]> for Record { - fn from(buf: &[u8]) -> Self { - assert_eq!(buf.len(), size_of::<RecordBuf>()); - unsafe { *(buf as *const _ as *const RecordBuf) }.into() - } -} - -impl From<RecordBuf> for Record { - fn from(buf: RecordBuf) -> Self { - unsafe { - #[allow(clippy::cast_ptr_alignment)] - ptr::read_unaligned(&buf as *const _ as *const Self) - } - } -} - -impl From<Record> for RecordBuf { - fn from(record: Record) -> Self { - unsafe { ptr::read(&record as *const _ as *const Self) } - } -} - fn create_js_runtime() -> JsRuntime { - let mut js_runtime = JsRuntime::new(Default::default()); - register_op_bin_sync(&mut js_runtime, "listen", op_listen); - register_op_bin_sync(&mut js_runtime, "close", op_close); - register_op_bin_async(&mut js_runtime, "accept", op_accept); - register_op_bin_async(&mut js_runtime, "read", op_read); - register_op_bin_async(&mut js_runtime, "write", op_write); - js_runtime + let mut runtime = JsRuntime::new(Default::default()); + runtime.register_op("listen", deno_core::bin_op_sync(op_listen)); + runtime.register_op("close", deno_core::bin_op_sync(op_close)); + runtime.register_op("accept", deno_core::bin_op_async(op_accept)); + runtime.register_op("read", deno_core::bin_op_async(op_read)); + runtime.register_op("write", deno_core::bin_op_async(op_write)); + runtime } fn op_listen( state: &mut OpState, _rid: ResourceId, _bufs: &mut [ZeroCopyBuf], -) -> Result<u32, Error> { +) -> Result<u32, AnyError> { debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; @@ -179,7 +141,7 @@ fn op_close( state: &mut OpState, rid: ResourceId, _bufs: &mut [ZeroCopyBuf], -) -> Result<u32, Error> { +) -> Result<u32, AnyError> { debug!("close rid={}", rid); state .resource_table @@ -192,7 +154,7 @@ async fn op_accept( state: Rc<RefCell<OpState>>, rid: ResourceId, _bufs: BufVec, -) -> Result<u32, Error> { +) -> Result<u32, AnyError> { debug!("accept rid={}", rid); let listener = state @@ -209,7 +171,7 @@ async fn op_read( state: Rc<RefCell<OpState>>, rid: ResourceId, mut bufs: BufVec, -) -> Result<usize, Error> { +) -> Result<u32, AnyError> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); debug!("read rid={}", rid); @@ -218,14 +180,15 @@ async fn op_read( .resource_table .get::<TcpStream>(rid) .ok_or_else(bad_resource_id)?; - stream.read(&mut bufs[0]).await + let nread = stream.read(&mut bufs[0]).await?; + Ok(nread as u32) } async fn op_write( state: Rc<RefCell<OpState>>, rid: ResourceId, bufs: BufVec, -) -> Result<usize, Error> { +) -> Result<u32, AnyError> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); debug!("write rid={}", rid); @@ -234,70 +197,8 @@ async fn op_write( .resource_table .get::<TcpStream>(rid) .ok_or_else(bad_resource_id)?; - stream.write(&bufs[0]).await -} - -fn register_op_bin_sync<F>( - js_runtime: &mut JsRuntime, - name: &'static str, - op_fn: F, -) where - F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<u32, Error> + 'static, -{ - let base_op_fn = move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op { - let record = Record::from(bufs[0].as_ref()); - let is_sync = record.promise_id == 0; - assert!(is_sync); - - let zero_copy_bufs = &mut bufs[1..]; - let result: i32 = - match op_fn(&mut state.borrow_mut(), record.rid, zero_copy_bufs) { - Ok(r) => r as i32, - Err(_) => -1, - }; - let buf = RecordBuf::from(Record { result, ..record })[..].into(); - Op::Sync(buf) - }; - - js_runtime.register_op(name, base_op_fn); -} - -fn register_op_bin_async<F, R>( - js_runtime: &mut JsRuntime, - name: &'static str, - op_fn: F, -) where - F: Fn(Rc<RefCell<OpState>>, u32, BufVec) -> R + Copy + 'static, - R: TryFuture, - R::Ok: TryInto<i32>, - <R::Ok as TryInto<i32>>::Error: Debug, -{ - let base_op_fn = move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op { - let mut bufs_iter = bufs.into_iter(); - let record_buf = bufs_iter.next().unwrap(); - let zero_copy_bufs = bufs_iter.collect::<BufVec>(); - - let record = Record::from(record_buf.as_ref()); - let is_sync = record.promise_id == 0; - assert!(!is_sync); - - let fut = async move { - let op = op_fn(state, record.rid, zero_copy_bufs); - let result = op - .map_ok(|r| r.try_into().expect("op result does not fit in i32")) - .unwrap_or_else(|_| -1) - .await; - RecordBuf::from(Record { result, ..record })[..].into() - }; - - Op::Async(fut.boxed_local()) - }; - - js_runtime.register_op(name, base_op_fn); -} - -fn bad_resource_id() -> Error { - Error::new(ErrorKind::NotFound, "bad resource id") + let nwritten = stream.write(&bufs[0]).await?; + Ok(nwritten as u32) } fn main() { @@ -329,18 +230,3 @@ fn main() { }; runtime.block_on(future).unwrap(); } - -#[test] -fn test_record_from() { - let expected = Record { - promise_id: 1, - rid: 3, - result: 4, - }; - let buf = RecordBuf::from(expected); - if cfg!(target_endian = "little") { - assert_eq!(buf, [1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]); - } - let actual = Record::from(buf); - assert_eq!(actual, expected); -} |