diff options
-rw-r--r-- | core/bindings.rs | 3 | ||||
-rw-r--r-- | core/core.js | 16 | ||||
-rw-r--r-- | core/examples/http_bench.js | 94 | ||||
-rw-r--r-- | core/examples/http_bench.rs | 193 |
4 files changed, 59 insertions, 247 deletions
diff --git a/core/bindings.rs b/core/bindings.rs index 1c76cf56f..f0181a227 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -381,7 +381,8 @@ fn send( Err(err) => { let msg = format!("invalid op id: {}", err); let msg = v8::String::new(scope, &msg).unwrap(); - scope.throw_exception(msg.into()); + let exc = v8::Exception::type_error(scope, msg); + scope.throw_exception(exc); return; } }; diff --git a/core/core.js b/core/core.js index 099472614..82a812030 100644 --- a/core/core.js +++ b/core/core.js @@ -61,7 +61,7 @@ SharedQueue Binary Layout function ops() { // op id 0 is a special value to retrieve the map of registered ops. - const opsMapBytes = send(0, new Uint8Array([])); + const opsMapBytes = send(0); const opsMapJson = String.fromCharCode.apply(null, opsMapBytes); opsCache = JSON.parse(opsMapJson); return { ...opsCache }; @@ -215,12 +215,12 @@ SharedQueue Binary Layout let nextPromiseId = 1; const promiseTable = {}; - function jsonOpAsync(opName, args) { + function jsonOpAsync(opName, args, ...zeroCopy) { setAsyncHandler(opsCache[opName], jsonOpAsyncHandler); args.promiseId = nextPromiseId++; const argsBuf = encodeJson(args); - dispatch(opName, argsBuf); + dispatch(opName, argsBuf, ...zeroCopy); let resolve, reject; const promise = new Promise((resolve_, reject_) => { resolve = resolve_; @@ -232,14 +232,14 @@ SharedQueue Binary Layout return promise; } - function jsonOpSync(opName, args) { + function jsonOpSync(opName, args, ...zeroCopy) { const argsBuf = encodeJson(args); - const res = dispatch(opName, argsBuf); + const res = dispatch(opName, argsBuf, ...zeroCopy); const r = decodeJson(res); - if (r["ok"]) { - return r["ok"]; + if ("ok" in r) { + return r.ok; } else { - throw r["err"]; + throw r.err; } } diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index ac97e0d88..de6b5a9a4 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -7,76 +7,6 @@ const responseBuf = new Uint8Array( .split("") .map((c) => c.charCodeAt(0)), ); -const promiseMap = new Map(); -let nextPromiseId = 1; - -function assert(cond) { - if (!cond) { - throw Error("assert"); - } -} - -function createResolvable() { - let resolve; - let reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - promise.resolve = resolve; - promise.reject = reject; - return promise; -} - -const scratch32 = new Int32Array(3); -const scratchBytes = new Uint8Array( - scratch32.buffer, - scratch32.byteOffset, - scratch32.byteLength, -); -assert(scratchBytes.byteLength === 3 * 4); - -function send(promiseId, opId, rid, ...zeroCopy) { - scratch32[0] = promiseId; - scratch32[1] = rid; - scratch32[2] = -1; - return Deno.core.dispatch(opId, scratchBytes, ...zeroCopy); -} - -/** Returns Promise<number> */ -function sendAsync(opId, rid, ...zeroCopy) { - const promiseId = nextPromiseId++; - const p = createResolvable(); - const buf = send(promiseId, opId, rid, ...zeroCopy); - if (buf) { - const record = recordFromBuf(buf); - // Sync result. - p.resolve(record.result); - } else { - // Async result. - promiseMap.set(promiseId, p); - } - return p; -} - -/** Returns i32 number */ -function sendSync(opId, rid) { - const buf = send(0, opId, rid); - const record = recordFromBuf(buf); - return record[2]; -} - -function recordFromBuf(buf) { - assert(buf.byteLength === 3 * 4); - return new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); -} - -function handleAsyncMsgFromRust(buf) { - const record = recordFromBuf(buf); - const p = promiseMap.get(record[0]); - promiseMap.delete(record[0]); - p.resolve(record[2]); -} /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { @@ -94,17 +24,19 @@ async function accept(serverRid) { * Reads a packet from the rid, presumably an http request. data is ignored. * Returns bytes read. */ -function read(rid, data) { - return sendAsync(ops["read"], rid, data); +async function read(rid, data) { + const { nread } = await Deno.core.jsonOpAsync("read", { rid }, data); + return nread; } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ -function write(rid, data) { - return sendAsync(ops["write"], rid, data); +async function write(rid, data) { + const { nwritten } = await Deno.core.jsonOpAsync("write", { rid }, data); + return nwritten; } function close(rid) { - return sendSync(ops["close"], rid); + Deno.core.jsonOpSync("close", { rid }); } async function serve(rid) { @@ -122,20 +54,14 @@ async function serve(rid) { close(rid); } -let ops; - async function main() { - ops = Deno.core.ops(); - Deno.core.setAsyncHandler(ops["read"], handleAsyncMsgFromRust); - Deno.core.setAsyncHandler(ops["write"], handleAsyncMsgFromRust); - - Deno.core.print("http_bench.js start\n"); + Deno.core.ops(); const listenerRid = listen(); Deno.core.print(`listening http://127.0.0.1:4544/ rid=${listenerRid}\n`); - while (true) { + + for (;;) { const rid = await accept(listenerRid); - // Deno.core.print(`accepted ${rid}`); if (rid < 0) { Deno.core.print(`accept error ${rid}`); return; diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 0dbb6f8e6..bf9d8d137 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -5,27 +5,17 @@ use deno_core::serde_json; use deno_core::CoreIsolate; use deno_core::CoreIsolateState; use deno_core::ErrBox; -use deno_core::Op; -use deno_core::ResourceTable; use deno_core::Script; use deno_core::StartupData; use deno_core::ZeroCopyBuf; use futures::future::poll_fn; use futures::future::Future; -use futures::future::FutureExt; -use futures::future::TryFuture; -use futures::future::TryFutureExt; -use std::cell::RefCell; -use std::convert::TryInto; use std::env; -use std::fmt::Debug; use std::io::Error; use std::io::ErrorKind; -use std::mem::size_of; use std::net::SocketAddr; use std::pin::Pin; -use std::ptr; -use std::rc::Rc; +use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::net::TcpListener; @@ -47,37 +37,6 @@ impl log::Log for Logger { fn flush(&self) {} } -#[derive(Copy, Clone, Debug, PartialEq)] -struct Record { - pub promise_id: u32, - pub rid: u32, - pub result: i32, -} - -type RecordBuf = [u8; size_of::<Record>()]; - -impl From<&[u8]> for Record { - fn from(buf: &[u8]) -> Self { - assert_eq!(buf.len(), size_of::<RecordBuf>()); - unsafe { *(buf as *const _ as *const RecordBuf) }.into() - } -} - -impl From<RecordBuf> for Record { - fn from(buf: RecordBuf) -> Self { - unsafe { - #[allow(clippy::cast_ptr_alignment)] - ptr::read_unaligned(&buf as *const _ as *const Self) - } - } -} - -impl From<Record> for RecordBuf { - fn from(record: Record) -> Self { - unsafe { ptr::read(&record as *const _ as *const Self) } - } -} - pub fn isolate_new() -> CoreIsolate { let startup_data = StartupData::Script(Script { source: include_str!("http_bench.js"), @@ -86,94 +45,27 @@ pub fn isolate_new() -> CoreIsolate { let mut isolate = CoreIsolate::new(startup_data, false); - fn register_sync_op<F>( - isolate: &mut CoreIsolate, - name: &'static str, - handler: F, - ) where - F: 'static - + Fn( - Rc<RefCell<ResourceTable>>, - u32, - &mut [ZeroCopyBuf], - ) -> Result<u32, Error>, - { - let core_handler = move |state: &mut CoreIsolateState, - zero_copy_bufs: &mut [ZeroCopyBuf]| - -> Op { - assert!(!zero_copy_bufs.is_empty()); - let record = Record::from(zero_copy_bufs[0].as_ref()); - let is_sync = record.promise_id == 0; - assert!(is_sync); - - let resource_table = state.resource_table.clone(); - let result: i32 = - match handler(resource_table, record.rid, &mut zero_copy_bufs[1..]) { - Ok(r) => r as i32, - Err(_) => -1, - }; - let buf = RecordBuf::from(Record { result, ..record })[..].into(); - Op::Sync(buf) - }; - - isolate.register_op(name, core_handler); - } - - fn register_async_op<F>( - isolate: &mut CoreIsolate, - name: &'static str, - handler: impl Fn(Rc<RefCell<ResourceTable>>, u32, &mut [ZeroCopyBuf]) -> F - + Copy - + 'static, - ) where - F: TryFuture, - F::Ok: TryInto<i32>, - <F::Ok as TryInto<i32>>::Error: Debug, - { - let core_handler = move |state: &mut CoreIsolateState, - zero_copy_bufs: &mut [ZeroCopyBuf]| - -> Op { - assert!(!zero_copy_bufs.is_empty()); - let record = Record::from(zero_copy_bufs[0].as_ref()); - let is_sync = record.promise_id == 0; - assert!(!is_sync); - - let mut zero_copy = zero_copy_bufs[1..].to_vec(); - let resource_table = state.resource_table.clone(); - let fut = async move { - let op = handler(resource_table, record.rid, &mut zero_copy); - let result = op - .map_ok(|r| r.try_into().expect("op result does not fit in i32")) - .unwrap_or_else(|_| -1) - .await; - RecordBuf::from(Record { result, ..record })[..].into() - }; - - Op::Async(fut.boxed_local()) - }; - - isolate.register_op(name, core_handler); - } - isolate.register_op_json_sync("listen", op_listen); isolate.register_op_json_async("accept", op_accept); - register_async_op(&mut isolate, "read", op_read); - register_async_op(&mut isolate, "write", op_write); - register_sync_op(&mut isolate, "close", op_close); + isolate.register_op_json_async("read", op_read); + isolate.register_op_json_async("write", op_write); + isolate.register_op_json_sync("close", op_close); isolate } fn op_close( - resource_table: Rc<RefCell<ResourceTable>>, - rid: u32, + state: &mut CoreIsolateState, + args: serde_json::Value, _buf: &mut [ZeroCopyBuf], -) -> Result<u32, Error> { +) -> Result<serde_json::Value, ErrBox> { + let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; debug!("close rid={}", rid); - let resource_table = &mut resource_table.borrow_mut(); + + let resource_table = &mut state.resource_table.borrow_mut(); resource_table .close(rid) - .map(|_| 0) + .map(|_| serde_json::json!(())) .ok_or_else(bad_resource) } @@ -205,55 +97,63 @@ fn op_accept( let listener = resource_table .get_mut::<TcpListener>(rid) .ok_or_else(bad_resource)?; - listener - .poll_accept(cx) - .map_err(ErrBox::from) - .map_ok(|(stream, _addr)| { - let rid = resource_table.add("tcpStream", Box::new(stream)); - serde_json::json!({ "rid": rid }) - }) + listener.poll_accept(cx)?.map(|(stream, _addr)| { + let rid = resource_table.add("tcpStream", Box::new(stream)); + Ok(serde_json::json!({ "rid": rid })) + }) }) } fn op_read( - resource_table: Rc<RefCell<ResourceTable>>, - rid: u32, + state: &mut CoreIsolateState, + args: serde_json::Value, bufs: &mut [ZeroCopyBuf], -) -> impl TryFuture<Ok = usize, Error = Error> { +) -> impl Future<Output = Result<serde_json::Value, ErrBox>> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let mut buf = bufs[0].clone(); + let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; debug!("read rid={}", rid); - poll_fn(move |cx| { + let mut buf = bufs[0].clone(); + let resource_table = state.resource_table.clone(); + + poll_fn(move |cx| -> Poll<Result<serde_json::Value, ErrBox>> { let resource_table = &mut resource_table.borrow_mut(); let stream = resource_table .get_mut::<TcpStream>(rid) .ok_or_else(bad_resource)?; - Pin::new(stream).poll_read(cx, &mut buf) + Pin::new(stream) + .poll_read(cx, &mut buf)? + .map(|nread| Ok(serde_json::json!({ "nread": nread }))) }) } fn op_write( - resource_table: Rc<RefCell<ResourceTable>>, - rid: u32, + state: &mut CoreIsolateState, + args: serde_json::Value, bufs: &mut [ZeroCopyBuf], -) -> impl TryFuture<Ok = usize, Error = Error> { +) -> impl Future<Output = Result<serde_json::Value, ErrBox>> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let buf = bufs[0].clone(); + + let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; debug!("write rid={}", rid); + let buf = bufs[0].clone(); + let resource_table = state.resource_table.clone(); + poll_fn(move |cx| { let resource_table = &mut resource_table.borrow_mut(); let stream = resource_table .get_mut::<TcpStream>(rid) .ok_or_else(bad_resource)?; - Pin::new(stream).poll_write(cx, &buf) + Pin::new(stream) + .poll_write(cx, &buf)? + .map(|nwritten| Ok(serde_json::json!({ "nwritten": nwritten }))) }) } -fn bad_resource() -> Error { - Error::new(ErrorKind::NotFound, "bad resource id") +fn bad_resource() -> ErrBox { + Error::new(ErrorKind::NotFound, "bad resource id").into() } fn main() { @@ -276,18 +176,3 @@ fn main() { .unwrap(); deno_core::js_check(runtime.block_on(isolate)); } - -#[test] -fn test_record_from() { - let expected = Record { - promise_id: 1, - rid: 3, - result: 4, - }; - let buf = RecordBuf::from(expected); - if cfg!(target_endian = "little") { - assert_eq!(buf, [1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]); - } - let actual = Record::from(buf); - assert_eq!(actual, expected); -} |