diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/core.js | 266 | ||||
-rw-r--r-- | core/examples/http_bench_bin_ops.js | 92 | ||||
-rw-r--r-- | core/examples/http_bench_bin_ops.rs | 150 | ||||
-rw-r--r-- | core/lib.rs | 9 | ||||
-rw-r--r-- | core/ops.rs | 122 | ||||
-rw-r--r-- | core/ops_bin.rs | 377 | ||||
-rw-r--r-- | core/ops_json.rs | 134 |
7 files changed, 743 insertions, 407 deletions
diff --git a/core/core.js b/core/core.js index f44bf253e..2de8e1fff 100644 --- a/core/core.js +++ b/core/core.js @@ -30,13 +30,22 @@ SharedQueue Binary Layout const core = window.Deno.core; const { recv, send } = core; + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////////////////// Dispatch ///////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const dispatch = send; + const dispatchByName = (opName, control, ...zeroCopy) => + dispatch(opsCache[opName], control, ...zeroCopy); + + //////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////// Shared array buffer /////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + let sharedBytes; let shared32; - let asyncHandlers; - let opsCache = {}; - const errorMap = {}; function init() { const shared = core.shared; @@ -45,6 +54,7 @@ SharedQueue Binary Layout assert(shared32 == null); sharedBytes = new Uint8Array(shared); shared32 = new Int32Array(shared); + asyncHandlers = []; // Callers should not call core.recv, use setAsyncHandler. recv(handleAsyncMsgFromRust); @@ -150,15 +160,43 @@ SharedQueue Binary Layout return [opId, buf]; } + //////////////////////////////////////////////////////////////////////////////////////////// + ////////////////////////////////////// Error handling ////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const errorMap = {}; + + function registerErrorClass(errorName, className, args) { + if (typeof errorMap[errorName] !== "undefined") { + throw new TypeError(`Error class for "${errorName}" already registered`); + } + errorMap[errorName] = [className, args ?? []]; + } + + function handleError(className, message) { + if (typeof errorMap[className] === "undefined") { + return new Error( + `Unregistered error class: "${className}"\n` + + ` ${message}\n` + + ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`, + ); + } + + const [ErrorClass, args] = errorMap[className]; + return new ErrorClass(message, ...args); + } + + //////////////////////////////////////////////////////////////////////////////////////////// + ////////////////////////////////////// Async handling ////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + let asyncHandlers = []; + function setAsyncHandler(opId, cb) { assert(opId != null); asyncHandlers[opId] = cb; } - function setAsyncHandlerByName(opName, cb) { - setAsyncHandler(opsCache[opName], cb); - } - function handleAsyncMsgFromRust() { while (true) { const opIdBuf = shift(); @@ -166,108 +204,196 @@ SharedQueue Binary Layout break; } assert(asyncHandlers[opIdBuf[0]] != null); - asyncHandlers[opIdBuf[0]](opIdBuf[1]); + asyncHandlers[opIdBuf[0]](opIdBuf[1], true); } for (let i = 0; i < arguments.length; i += 2) { - asyncHandlers[arguments[i]](arguments[i + 1]); + asyncHandlers[arguments[i]](arguments[i + 1], false); } } - function dispatch(opName, control, ...zeroCopy) { - return send(opsCache[opName], control, ...zeroCopy); - } + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////// General sync & async ops handling //////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// - function registerErrorClass(errorName, className, args) { - if (typeof errorMap[errorName] !== "undefined") { - throw new TypeError(`Error class for "${errorName}" already registered`); + let nextRequestId = 1; + const promiseTable = {}; + + function asyncHandle(u8Array, isCopyNeeded, opResultParser) { + const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded); + if (error !== null) { + promiseTable[requestId][1](error); + } else { + promiseTable[requestId][0](result); } - errorMap[errorName] = [className, args ?? []]; + delete promiseTable[requestId]; } - function getErrorClassAndArgs(errorName) { - return errorMap[errorName] ?? [undefined, []]; + function opAsync(opName, opRequestBuilder, opResultParser) { + const opId = opsCache[opName]; + // Make sure requests of this type are handled by the asyncHandler + // The asyncHandler's role is to call the "promiseTable[requestId]" function + if (typeof asyncHandlers[opId] === "undefined") { + asyncHandlers[opId] = (buffer, isCopyNeeded) => + asyncHandle(buffer, isCopyNeeded, opResultParser); + } + + const requestId = nextRequestId++; + + // Create and store promise + const promise = new Promise((resolve, reject) => { + promiseTable[requestId] = [resolve, reject]; + }); + + // Synchronously dispatch async request + core.dispatch(opId, ...opRequestBuilder(requestId)); + + // Wait for async response + return promise; } - // Returns Uint8Array - function encodeJson(args) { - const s = JSON.stringify(args); - return core.encode(s); + function opSync(opName, opRequestBuilder, opResultParser) { + const opId = opsCache[opName]; + const u8Array = core.dispatch(opId, ...opRequestBuilder()); + + const [_, result, error] = opResultParser(u8Array, false); + if (error !== null) throw error; + return result; } - function decodeJson(ui8) { - const s = core.decode(ui8); - return JSON.parse(s); + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////////////// Bin ops handling ///////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const binRequestHeaderByteLength = 8 + 4; + const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength); + const scratchView = new DataView(scratchBuffer); + + function binOpBuildRequest(requestId, argument, zeroCopy) { + scratchView.setBigUint64(0, BigInt(requestId), true); + scratchView.setUint32(8, argument, true); + return [scratchView, ...zeroCopy]; } - let nextPromiseId = 1; - const promiseTable = {}; + function binOpParseResult(u8Array, isCopyNeeded) { + // Decode header value from u8Array + const headerByteLength = 8 + 2 * 4; + assert(u8Array.byteLength >= headerByteLength); + assert(u8Array.byteLength % 4 == 0); + const view = new DataView( + u8Array.buffer, + u8Array.byteOffset + u8Array.byteLength - headerByteLength, + headerByteLength, + ); + + const requestId = Number(view.getBigUint64(0, true)); + const status = view.getUint32(8, true); + const result = view.getUint32(12, true); + + // Error handling + if (status !== 0) { + const className = core.decode(u8Array.subarray(0, result)); + const message = core.decode(u8Array.subarray(result, -headerByteLength)) + .trim(); + + return [requestId, null, handleError(className, message)]; + } - function processResponse(res) { - if ("ok" in res) { - return res.ok; + if (u8Array.byteLength === headerByteLength) { + return [requestId, result, null]; } - const [ErrorClass, args] = getErrorClassAndArgs(res.err.className); - if (!ErrorClass) { - throw new Error( - `Unregistered error class: "${res.err.className}"\n ${res.err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`, - ); + + // Rest of response buffer is passed as reference or as a copy + let respBuffer = null; + if (isCopyNeeded) { + // Copy part of the response array (if sent through shared array buf) + respBuffer = u8Array.slice(0, result); + } else { + // Create view on existing array (if sent through overflow) + respBuffer = u8Array.subarray(0, result); } - throw new ErrorClass(res.err.message, ...args); + + return [requestId, respBuffer, null]; } - async function jsonOpAsync(opName, args = null, ...zeroCopy) { - setAsyncHandler(opsCache[opName], jsonOpAsyncHandler); - - const promiseId = nextPromiseId++; - const reqBuf = core.encode("\0".repeat(8) + JSON.stringify(args)); - new DataView(reqBuf.buffer).setBigUint64(0, BigInt(promiseId)); - dispatch(opName, reqBuf, ...zeroCopy); - let resolve, reject; - const promise = new Promise((resolve_, reject_) => { - resolve = resolve_; - reject = reject_; - }); - promise.resolve = resolve; - promise.reject = reject; - promiseTable[promiseId] = promise; - return processResponse(await promise); + function binOpAsync(opName, argument = 0, ...zeroCopy) { + return opAsync( + opName, + (requestId) => binOpBuildRequest(requestId, argument, zeroCopy), + binOpParseResult, + ); + } + + function binOpSync(opName, argument = 0, ...zeroCopy) { + return opSync( + opName, + () => binOpBuildRequest(0, argument, zeroCopy), + binOpParseResult, + ); } - function jsonOpSync(opName, args = null, ...zeroCopy) { - const argsBuf = encodeJson(args); - const res = dispatch(opName, argsBuf, ...zeroCopy); - return processResponse(decodeJson(res)); + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////////////// Json ops handling //////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const jsonRequestHeaderLength = 8; + + function jsonOpBuildRequest(requestId, argument, zeroCopy) { + const u8Array = core.encode( + "\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument), + ); + new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true); + return [u8Array, ...zeroCopy]; } - function jsonOpAsyncHandler(buf) { - // Json Op. - const res = decodeJson(buf); - const promise = promiseTable[res.promiseId]; - delete promiseTable[res.promiseId]; - promise.resolve(res); + function jsonOpParseResult(u8Array, _) { + const data = JSON.parse(core.decode(u8Array)); + + if ("err" in data) { + return [ + data.requestId, + null, + handleError(data.err.className, data.err.message), + ]; + } + + return [data.requestId, data.ok, null]; + } + + function jsonOpAsync(opName, argument = null, ...zeroCopy) { + return opAsync( + opName, + (requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy), + jsonOpParseResult, + ); + } + + function jsonOpSync(opName, argument = null, ...zeroCopy) { + return opSync( + opName, + () => [core.encode(JSON.stringify(argument)), ...zeroCopy], + jsonOpParseResult, + ); } function resources() { return jsonOpSync("op_resources"); } - function close(rid) { - jsonOpSync("op_close", { rid }); + return jsonOpSync("op_close", { rid }); } Object.assign(window.Deno.core, { jsonOpAsync, jsonOpSync, - setAsyncHandler, - setAsyncHandlerByName, - dispatch: send, - dispatchByName: dispatch, + binOpAsync, + binOpSync, + dispatch, + dispatchByName, ops, close, resources, registerErrorClass, - getErrorClassAndArgs, sharedQueueInit: init, // sharedQueue is private but exposed for testing. sharedQueue: { @@ -279,5 +405,7 @@ SharedQueue Binary Layout reset, shift, }, + // setAsyncHandler is private but exposed for testing. + setAsyncHandler, }); })(this); diff --git a/core/examples/http_bench_bin_ops.js b/core/examples/http_bench_bin_ops.js index f20366494..18f98419f 100644 --- a/core/examples/http_bench_bin_ops.js +++ b/core/examples/http_bench_bin_ops.js @@ -8,85 +8,15 @@ const responseBuf = new Uint8Array( .split("") .map((c) => c.charCodeAt(0)), ); -const promiseMap = new Map(); -let nextPromiseId = 1; - -function assert(cond) { - if (!cond) { - throw Error("assert"); - } -} - -function createResolvable() { - let resolve; - let reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - promise.resolve = resolve; - promise.reject = reject; - return promise; -} - -const scratch32 = new Int32Array(3); -const scratchBytes = new Uint8Array( - scratch32.buffer, - scratch32.byteOffset, - scratch32.byteLength, -); -assert(scratchBytes.byteLength === 3 * 4); - -function send(promiseId, opId, rid, ...zeroCopy) { - scratch32[0] = promiseId; - scratch32[1] = rid; - scratch32[2] = -1; - return Deno.core.dispatch(opId, scratchBytes, ...zeroCopy); -} - -/** Returns Promise<number> */ -function sendAsync(opId, rid, ...zeroCopy) { - const promiseId = nextPromiseId++; - const p = createResolvable(); - const buf = send(promiseId, opId, rid, ...zeroCopy); - if (buf) { - const record = recordFromBuf(buf); - // Sync result. - p.resolve(record.result); - } else { - // Async result. - promiseMap.set(promiseId, p); - } - return p; -} - -/** Returns i32 number */ -function sendSync(opId, rid) { - const buf = send(0, opId, rid); - const record = recordFromBuf(buf); - return record[2]; -} - -function recordFromBuf(buf) { - assert(buf.byteLength === 3 * 4); - return new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); -} - -function handleAsyncMsgFromRust(buf) { - const record = recordFromBuf(buf); - const p = promiseMap.get(record[0]); - promiseMap.delete(record[0]); - p.resolve(record[2]); -} /** Listens on 0.0.0.0:4500, returns rid. */ function listen() { - return sendSync(ops["listen"], -1); + return Deno.core.binOpSync("listen"); } /** Accepts a connection, returns rid. */ function accept(rid) { - return sendAsync(ops["accept"], rid); + return Deno.core.binOpAsync("accept", rid); } /** @@ -94,16 +24,16 @@ function accept(rid) { * Returns bytes read. */ function read(rid, data) { - return sendAsync(ops["read"], rid, data); + return Deno.core.binOpAsync("read", rid, data); } /** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ function write(rid, data) { - return sendAsync(ops["write"], rid, data); + return Deno.core.binOpAsync("write", rid, data); } function close(rid) { - return sendSync(ops["close"], rid); + Deno.core.binOpSync("close", rid); } async function serve(rid) { @@ -121,16 +51,14 @@ async function serve(rid) { close(rid); } -let ops; - async function main() { - ops = Deno.core.ops(); - for (const opName in ops) { - Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust); - } + Deno.core.ops(); + Deno.core.registerErrorClass("Error", Error); const listenerRid = listen(); - Deno.core.print(`http_bench_bin_ops listening on http://127.0.0.1:4544/\n`); + Deno.core.print( + `http_bench_bin_ops listening on http://127.0.0.1:4544/\n`, + ); for (;;) { const rid = await accept(listenerRid); diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index bc4ca4dce..1f649b235 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -3,30 +3,23 @@ #[macro_use] extern crate log; +use deno_core::error::bad_resource_id; +use deno_core::error::AnyError; use deno_core::AsyncRefCell; use deno_core::BufVec; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::JsRuntime; -use deno_core::Op; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; -use futures::future::FutureExt; -use futures::future::TryFuture; -use futures::future::TryFutureExt; use std::cell::RefCell; use std::convert::TryFrom; -use std::convert::TryInto; use std::env; -use std::fmt::Debug; use std::io::Error; -use std::io::ErrorKind; -use std::mem::size_of; use std::net::SocketAddr; -use std::ptr; use std::rc::Rc; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -120,52 +113,21 @@ impl From<tokio::net::TcpStream> for TcpStream { } } -#[derive(Copy, Clone, Debug, PartialEq)] -struct Record { - promise_id: u32, - rid: ResourceId, - result: i32, -} - -type RecordBuf = [u8; size_of::<Record>()]; - -impl From<&[u8]> for Record { - fn from(buf: &[u8]) -> Self { - assert_eq!(buf.len(), size_of::<RecordBuf>()); - unsafe { *(buf as *const _ as *const RecordBuf) }.into() - } -} - -impl From<RecordBuf> for Record { - fn from(buf: RecordBuf) -> Self { - unsafe { - #[allow(clippy::cast_ptr_alignment)] - ptr::read_unaligned(&buf as *const _ as *const Self) - } - } -} - -impl From<Record> for RecordBuf { - fn from(record: Record) -> Self { - unsafe { ptr::read(&record as *const _ as *const Self) } - } -} - fn create_js_runtime() -> JsRuntime { - let mut js_runtime = JsRuntime::new(Default::default()); - register_op_bin_sync(&mut js_runtime, "listen", op_listen); - register_op_bin_sync(&mut js_runtime, "close", op_close); - register_op_bin_async(&mut js_runtime, "accept", op_accept); - register_op_bin_async(&mut js_runtime, "read", op_read); - register_op_bin_async(&mut js_runtime, "write", op_write); - js_runtime + let mut runtime = JsRuntime::new(Default::default()); + runtime.register_op("listen", deno_core::bin_op_sync(op_listen)); + runtime.register_op("close", deno_core::bin_op_sync(op_close)); + runtime.register_op("accept", deno_core::bin_op_async(op_accept)); + runtime.register_op("read", deno_core::bin_op_async(op_read)); + runtime.register_op("write", deno_core::bin_op_async(op_write)); + runtime } fn op_listen( state: &mut OpState, _rid: ResourceId, _bufs: &mut [ZeroCopyBuf], -) -> Result<u32, Error> { +) -> Result<u32, AnyError> { debug!("listen"); let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; @@ -179,7 +141,7 @@ fn op_close( state: &mut OpState, rid: ResourceId, _bufs: &mut [ZeroCopyBuf], -) -> Result<u32, Error> { +) -> Result<u32, AnyError> { debug!("close rid={}", rid); state .resource_table @@ -192,7 +154,7 @@ async fn op_accept( state: Rc<RefCell<OpState>>, rid: ResourceId, _bufs: BufVec, -) -> Result<u32, Error> { +) -> Result<u32, AnyError> { debug!("accept rid={}", rid); let listener = state @@ -209,7 +171,7 @@ async fn op_read( state: Rc<RefCell<OpState>>, rid: ResourceId, mut bufs: BufVec, -) -> Result<usize, Error> { +) -> Result<u32, AnyError> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); debug!("read rid={}", rid); @@ -218,14 +180,15 @@ async fn op_read( .resource_table .get::<TcpStream>(rid) .ok_or_else(bad_resource_id)?; - stream.read(&mut bufs[0]).await + let nread = stream.read(&mut bufs[0]).await?; + Ok(nread as u32) } async fn op_write( state: Rc<RefCell<OpState>>, rid: ResourceId, bufs: BufVec, -) -> Result<usize, Error> { +) -> Result<u32, AnyError> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); debug!("write rid={}", rid); @@ -234,70 +197,8 @@ async fn op_write( .resource_table .get::<TcpStream>(rid) .ok_or_else(bad_resource_id)?; - stream.write(&bufs[0]).await -} - -fn register_op_bin_sync<F>( - js_runtime: &mut JsRuntime, - name: &'static str, - op_fn: F, -) where - F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<u32, Error> + 'static, -{ - let base_op_fn = move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op { - let record = Record::from(bufs[0].as_ref()); - let is_sync = record.promise_id == 0; - assert!(is_sync); - - let zero_copy_bufs = &mut bufs[1..]; - let result: i32 = - match op_fn(&mut state.borrow_mut(), record.rid, zero_copy_bufs) { - Ok(r) => r as i32, - Err(_) => -1, - }; - let buf = RecordBuf::from(Record { result, ..record })[..].into(); - Op::Sync(buf) - }; - - js_runtime.register_op(name, base_op_fn); -} - -fn register_op_bin_async<F, R>( - js_runtime: &mut JsRuntime, - name: &'static str, - op_fn: F, -) where - F: Fn(Rc<RefCell<OpState>>, u32, BufVec) -> R + Copy + 'static, - R: TryFuture, - R::Ok: TryInto<i32>, - <R::Ok as TryInto<i32>>::Error: Debug, -{ - let base_op_fn = move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op { - let mut bufs_iter = bufs.into_iter(); - let record_buf = bufs_iter.next().unwrap(); - let zero_copy_bufs = bufs_iter.collect::<BufVec>(); - - let record = Record::from(record_buf.as_ref()); - let is_sync = record.promise_id == 0; - assert!(!is_sync); - - let fut = async move { - let op = op_fn(state, record.rid, zero_copy_bufs); - let result = op - .map_ok(|r| r.try_into().expect("op result does not fit in i32")) - .unwrap_or_else(|_| -1) - .await; - RecordBuf::from(Record { result, ..record })[..].into() - }; - - Op::Async(fut.boxed_local()) - }; - - js_runtime.register_op(name, base_op_fn); -} - -fn bad_resource_id() -> Error { - Error::new(ErrorKind::NotFound, "bad resource id") + let nwritten = stream.write(&bufs[0]).await?; + Ok(nwritten as u32) } fn main() { @@ -329,18 +230,3 @@ fn main() { }; runtime.block_on(future).unwrap(); } - -#[test] -fn test_record_from() { - let expected = Record { - promise_id: 1, - rid: 3, - result: 4, - }; - let buf = RecordBuf::from(expected); - if cfg!(target_endian = "little") { - assert_eq!(buf, [1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]); - } - let actual = Record::from(buf); - assert_eq!(actual, expected); -} diff --git a/core/lib.rs b/core/lib.rs index deea9d281..c65ed7aac 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -15,6 +15,8 @@ mod module_specifier; mod modules; mod normalize_path; mod ops; +mod ops_bin; +mod ops_json; pub mod plugin_api; mod resources; mod runtime; @@ -58,8 +60,6 @@ pub use crate::modules::ModuleSourceFuture; pub use crate::modules::NoopModuleLoader; pub use crate::modules::RecursiveModuleLoad; pub use crate::normalize_path::normalize_path; -pub use crate::ops::json_op_async; -pub use crate::ops::json_op_sync; pub use crate::ops::op_close; pub use crate::ops::op_resources; pub use crate::ops::Op; @@ -68,6 +68,11 @@ pub use crate::ops::OpFn; pub use crate::ops::OpId; pub use crate::ops::OpState; pub use crate::ops::OpTable; +pub use crate::ops_bin::bin_op_async; +pub use crate::ops_bin::bin_op_sync; +pub use crate::ops_bin::ValueOrVector; +pub use crate::ops_json::json_op_async; +pub use crate::ops_json::json_op_sync; pub use crate::resources::Resource; pub use crate::resources::ResourceId; pub use crate::resources::ResourceTable; diff --git a/core/ops.rs b/core/ops.rs index eceab7feb..212a713ad 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -10,13 +10,10 @@ use crate::BufVec; use crate::ZeroCopyBuf; use futures::Future; use indexmap::IndexMap; -use serde::de::DeserializeOwned; -use serde::Serialize; use serde_json::json; use serde_json::Value; use std::cell::RefCell; use std::collections::HashMap; -use std::convert::TryInto; use std::iter::once; use std::ops::Deref; use std::ops::DerefMut; @@ -117,125 +114,6 @@ impl Default for OpTable { } } -/// Creates an op that passes data synchronously using JSON. -/// -/// The provided function `op_fn` has the following parameters: -/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op. -/// * `V`: the deserializable value that is passed to the Rust function. -/// * `&mut [ZeroCopyBuf]`: raw bytes passed along, usually not needed if the JSON value is used. -/// -/// `op_fn` returns a serializable value, which is directly returned to JavaScript. -/// -/// When registering an op like this... -/// ```ignore -/// let mut runtime = JsRuntime::new(...); -/// runtime.register_op("hello", deno_core::json_op_sync(Self::hello_op)); -/// ``` -/// -/// ...it can be invoked from JS using the provided name, for example: -/// ```js -/// Deno.core.ops(); -/// let result = Deno.core.jsonOpSync("function_name", args); -/// ``` -/// -/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. -/// A more complete example is available in the examples directory. -pub fn json_op_sync<F, V, R>(op_fn: F) -> Box<OpFn> -where - F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static, - V: DeserializeOwned, - R: Serialize, -{ - Box::new(move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op { - let result = serde_json::from_slice(&bufs[0]) - .map_err(AnyError::from) - .and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs[1..])); - let buf = - json_serialize_op_result(None, result, state.borrow().get_error_class_fn); - Op::Sync(buf) - }) -} - -/// Creates an op that passes data asynchronously using JSON. -/// -/// The provided function `op_fn` has the following parameters: -/// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op. -/// * `V`: the deserializable value that is passed to the Rust function. -/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used. -/// -/// `op_fn` returns a future, whose output is a serializable value. This value will be asynchronously -/// returned to JavaScript. -/// -/// When registering an op like this... -/// ```ignore -/// let mut runtime = JsRuntime::new(...); -/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op)); -/// ``` -/// -/// ...it can be invoked from JS using the provided name, for example: -/// ```js -/// Deno.core.ops(); -/// let future = Deno.core.jsonOpAsync("function_name", args); -/// ``` -/// -/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. -/// A more complete example is available in the examples directory. -pub fn json_op_async<F, V, R, RV>(op_fn: F) -> Box<OpFn> -where - F: Fn(Rc<RefCell<OpState>>, V, BufVec) -> R + 'static, - V: DeserializeOwned, - R: Future<Output = Result<RV, AnyError>> + 'static, - RV: Serialize, -{ - let try_dispatch_op = - move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> { - let promise_id = bufs[0] - .get(0..8) - .map(|b| u64::from_be_bytes(b.try_into().unwrap())) - .ok_or_else(|| type_error("missing or invalid `promiseId`"))?; - let args = serde_json::from_slice(&bufs[0][8..])?; - let bufs = bufs[1..].into(); - use crate::futures::FutureExt; - let fut = op_fn(state.clone(), args, bufs).map(move |result| { - json_serialize_op_result( - Some(promise_id), - result, - state.borrow().get_error_class_fn, - ) - }); - Ok(Op::Async(Box::pin(fut))) - }; - - Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op { - match try_dispatch_op(state.clone(), bufs) { - Ok(op) => op, - Err(err) => Op::Sync(json_serialize_op_result( - None, - Err::<(), AnyError>(err), - state.borrow().get_error_class_fn, - )), - } - }) -} - -fn json_serialize_op_result<R: Serialize>( - promise_id: Option<u64>, - result: Result<R, AnyError>, - get_error_class_fn: crate::runtime::GetErrorClassFn, -) -> Box<[u8]> { - let value = match result { - Ok(v) => serde_json::json!({ "ok": v, "promiseId": promise_id }), - Err(err) => serde_json::json!({ - "promiseId": promise_id , - "err": { - "className": (get_error_class_fn)(&err), - "message": err.to_string(), - } - }), - }; - serde_json::to_vec(&value).unwrap().into_boxed_slice() -} - /// Return map of resources with id as key /// and string representation as value. /// diff --git a/core/ops_bin.rs b/core/ops_bin.rs new file mode 100644 index 000000000..053150bfd --- /dev/null +++ b/core/ops_bin.rs @@ -0,0 +1,377 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::error::AnyError; +use crate::futures::future::FutureExt; +use crate::BufVec; +use crate::Op; +use crate::OpFn; +use crate::OpState; +use crate::ZeroCopyBuf; +use std::boxed::Box; +use std::cell::RefCell; +use std::convert::TryInto; +use std::future::Future; +use std::rc::Rc; + +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct RequestHeader { + pub request_id: u64, + pub argument: u32, +} + +impl RequestHeader { + pub fn from_raw(bytes: &[u8]) -> Option<Self> { + if bytes.len() < 3 * 4 { + return None; + } + + Some(Self { + request_id: u64::from_le_bytes(bytes[0..8].try_into().unwrap()), + argument: u32::from_le_bytes(bytes[8..12].try_into().unwrap()), + }) + } +} + +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct ResponseHeader { + pub request_id: u64, + pub status: u32, + pub result: u32, +} + +impl Into<[u8; 16]> for ResponseHeader { + fn into(self) -> [u8; 16] { + let mut resp_header = [0u8; 16]; + resp_header[0..8].copy_from_slice(&self.request_id.to_le_bytes()); + resp_header[8..12].copy_from_slice(&self.status.to_le_bytes()); + resp_header[12..16].copy_from_slice(&self.result.to_le_bytes()); + resp_header + } +} + +pub trait ValueOrVector { + fn value(&self) -> u32; + fn vector(self) -> Option<Vec<u8>>; +} + +impl ValueOrVector for Vec<u8> { + fn value(&self) -> u32 { + self.len() as u32 + } + fn vector(self) -> Option<Vec<u8>> { + Some(self) + } +} + +impl ValueOrVector for u32 { + fn value(&self) -> u32 { + *self + } + fn vector(self) -> Option<Vec<u8>> { + None + } +} + +fn gen_padding_32bit(len: usize) -> &'static [u8] { + &[b' ', b' ', b' '][0..(4 - (len & 3)) & 3] +} + +/// Creates an op that passes data synchronously using raw ui8 buffer. +/// +/// The provided function `op_fn` has the following parameters: +/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op. +/// * `argument`: the i32 value that is passed to the Rust function. +/// * `&mut [ZeroCopyBuf]`: raw bytes passed along. +/// +/// `op_fn` returns an array buffer value, which is directly returned to JavaScript. +/// +/// When registering an op like this... +/// ```ignore +/// let mut runtime = JsRuntime::new(...); +/// runtime.register_op("hello", deno_core::bin_op_sync(Self::hello_op)); +/// ``` +/// +/// ...it can be invoked from JS using the provided name, for example: +/// ```js +/// Deno.core.ops(); +/// let result = Deno.core.binOpSync("function_name", args); +/// ``` +/// +/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. +/// A more complete example is available in the examples directory. +pub fn bin_op_sync<F, R>(op_fn: F) -> Box<OpFn> +where + F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static, + R: ValueOrVector, +{ + Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op { + let mut bufs_iter = bufs.into_iter(); + let record_buf = bufs_iter.next().expect("Expected record at position 0"); + let mut zero_copy = bufs_iter.collect::<BufVec>(); + + let req_header = match RequestHeader::from_raw(&record_buf) { + Some(r) => r, + None => { + let error_class = b"TypeError"; + let error_message = b"Unparsable control buffer"; + let len = error_class.len() + error_message.len(); + let padding = gen_padding_32bit(len); + let resp_header = ResponseHeader { + request_id: 0, + status: 1, + result: error_class.len() as u32, + }; + return Op::Sync( + error_class + .iter() + .chain(error_message.iter()) + .chain(padding) + .chain(&Into::<[u8; 16]>::into(resp_header)) + .cloned() + .collect(), + ); + } + }; + + match op_fn(&mut state.borrow_mut(), req_header.argument, &mut zero_copy) { + Ok(possibly_vector) => { + let resp_header = ResponseHeader { + request_id: req_header.request_id, + status: 0, + result: possibly_vector.value(), + }; + let resp_encoded_header = Into::<[u8; 16]>::into(resp_header); + + let resp_vector = match possibly_vector.vector() { + Some(mut vector) => { + let padding = gen_padding_32bit(vector.len()); + vector.extend(padding); + vector.extend(&resp_encoded_header); + vector + } + None => resp_encoded_header.to_vec(), + }; + Op::Sync(resp_vector.into_boxed_slice()) + } + Err(error) => { + let error_class = + (state.borrow().get_error_class_fn)(&error).as_bytes(); + let error_message = error.to_string().as_bytes().to_owned(); + let len = error_class.len() + error_message.len(); + let padding = gen_padding_32bit(len); + let resp_header = ResponseHeader { + request_id: req_header.request_id, + status: 1, + result: error_class.len() as u32, + }; + return Op::Sync( + error_class + .iter() + .chain(error_message.iter()) + .chain(padding) + .chain(&Into::<[u8; 16]>::into(resp_header)) + .cloned() + .collect(), + ); + } + } + }) +} + +/// Creates an op that passes data asynchronously using raw ui8 buffer. +/// +/// The provided function `op_fn` has the following parameters: +/// * `Rc<RefCell<OpState>>`: the op state, can be used to read/write resources in the runtime from an op. +/// * `argument`: the i32 value that is passed to the Rust function. +/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used. +/// +/// `op_fn` returns a future, whose output is a JSON value. This value will be asynchronously +/// returned to JavaScript. +/// +/// When registering an op like this... +/// ```ignore +/// let mut runtime = JsRuntime::new(...); +/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op)); +/// ``` +/// +/// ...it can be invoked from JS using the provided name, for example: +/// ```js +/// Deno.core.ops(); +/// let future = Deno.core.jsonOpAsync("function_name", args); +/// ``` +/// +/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. +/// A more complete example is available in the examples directory. +pub fn bin_op_async<F, R, RV>(op_fn: F) -> Box<OpFn> +where + F: Fn(Rc<RefCell<OpState>>, u32, BufVec) -> R + 'static, + R: Future<Output = Result<RV, AnyError>> + 'static, + RV: ValueOrVector, +{ + Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op { + let mut bufs_iter = bufs.into_iter(); + let record_buf = bufs_iter.next().expect("Expected record at position 0"); + let zero_copy = bufs_iter.collect::<BufVec>(); + + let req_header = match RequestHeader::from_raw(&record_buf) { + Some(r) => r, + None => { + let error_class = b"TypeError"; + let error_message = b"Unparsable control buffer"; + let len = error_class.len() + error_message.len(); + let padding = gen_padding_32bit(len); + let resp_header = ResponseHeader { + request_id: 0, + status: 1, + result: error_class.len() as u32, + }; + return Op::Sync( + error_class + .iter() + .chain(error_message.iter()) + .chain(padding) + .chain(&Into::<[u8; 16]>::into(resp_header)) + .cloned() + .collect(), + ); + } + }; + + let fut = + op_fn(state.clone(), req_header.argument, zero_copy).map(move |result| { + match result { + Ok(possibly_vector) => { + let resp_header = ResponseHeader { + request_id: req_header.request_id, + status: 0, + result: possibly_vector.value(), + }; + let resp_encoded_header = Into::<[u8; 16]>::into(resp_header); + + let resp_vector = match possibly_vector.vector() { + Some(mut vector) => { + let padding = gen_padding_32bit(vector.len()); + vector.extend(padding); + vector.extend(&resp_encoded_header); + vector + } + None => resp_encoded_header.to_vec(), + }; + resp_vector.into_boxed_slice() + } + Err(error) => { + let error_class = + (state.borrow().get_error_class_fn)(&error).as_bytes(); + let error_message = error.to_string().as_bytes().to_owned(); + let len = error_class.len() + error_message.len(); + let padding = gen_padding_32bit(len); + let resp_header = ResponseHeader { + request_id: req_header.request_id, + status: 1, + result: error_class.len() as u32, + }; + + error_class + .iter() + .chain(error_message.iter()) + .chain(padding) + .chain(&Into::<[u8; 16]>::into(resp_header)) + .cloned() + .collect() + } + } + }); + let temp = Box::pin(fut); + Op::Async(temp) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn padding() { + assert_eq!(gen_padding_32bit(0), &[] as &[u8]); + assert_eq!(gen_padding_32bit(1), &[b' ', b' ', b' ']); + assert_eq!(gen_padding_32bit(2), &[b' ', b' ']); + assert_eq!(gen_padding_32bit(3), &[b' ']); + assert_eq!(gen_padding_32bit(4), &[] as &[u8]); + assert_eq!(gen_padding_32bit(5), &[b' ', b' ', b' ']); + } + + #[test] + fn response_header_to_bytes() { + // Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´ + let resp_header = ResponseHeader { + request_id: 0x0102030405060708u64, + status: 0x090A0B0Cu32, + result: 0x0D0E0F10u32, + }; + + // All numbers are always little-endian encoded, as the js side also wants this to be fixed + assert_eq!( + &Into::<[u8; 16]>::into(resp_header), + &[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 16, 15, 14, 13] + ); + } + + #[test] + fn response_header_to_bytes_max_value() { + // Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´ + let resp_header = ResponseHeader { + request_id: (1u64 << 53u64) - 1u64, + status: 0xFFFFFFFFu32, + result: 0xFFFFFFFFu32, + }; + + // All numbers are always little-endian encoded, as the js side also wants this to be fixed + assert_eq!( + &Into::<[u8; 16]>::into(resp_header), + &[ + 255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255, 255, 255, 255, + 255 + ] + ); + } + + #[test] + fn request_header_from_bytes() { + let req_header = + RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9]) + .unwrap(); + + assert_eq!(req_header.request_id, 0x0102030405060708u64); + assert_eq!(req_header.argument, 0x090A0B0Cu32); + } + + #[test] + fn request_header_from_bytes_max_value() { + let req_header = RequestHeader::from_raw(&[ + 255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255, + ]) + .unwrap(); + + assert_eq!(req_header.request_id, (1u64 << 53u64) - 1u64); + assert_eq!(req_header.argument, 0xFFFFFFFFu32); + } + + #[test] + fn request_header_from_bytes_too_short() { + let req_header = + RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10]); + + assert_eq!(req_header, None); + } + + #[test] + fn request_header_from_bytes_long() { + let req_header = RequestHeader::from_raw(&[ + 8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 13, 14, 15, 16, 17, 18, 19, 20, 21, + ]) + .unwrap(); + + assert_eq!(req_header.request_id, 0x0102030405060708u64); + assert_eq!(req_header.argument, 0x090A0B0Cu32); + } +} diff --git a/core/ops_json.rs b/core/ops_json.rs new file mode 100644 index 000000000..0ef91ed33 --- /dev/null +++ b/core/ops_json.rs @@ -0,0 +1,134 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use crate::error::type_error; +use crate::error::AnyError; +use crate::BufVec; +use crate::Op; +use crate::OpFn; +use crate::OpState; +use crate::ZeroCopyBuf; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::cell::RefCell; +use std::convert::TryInto; +use std::future::Future; +use std::rc::Rc; + +fn json_serialize_op_result<R: Serialize>( + request_id: Option<u64>, + result: Result<R, AnyError>, + get_error_class_fn: crate::runtime::GetErrorClassFn, +) -> Box<[u8]> { + let value = match result { + Ok(v) => serde_json::json!({ "ok": v, "requestId": request_id }), + Err(err) => serde_json::json!({ + "requestId": request_id, + "err": { + "className": (get_error_class_fn)(&err), + "message": err.to_string(), + } + }), + }; + serde_json::to_vec(&value).unwrap().into_boxed_slice() +} + +/// Creates an op that passes data synchronously using JSON. +/// +/// The provided function `op_fn` has the following parameters: +/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op. +/// * `V`: the deserializable value that is passed to the Rust function. +/// * `&mut [ZeroCopyBuf]`: raw bytes passed along, usually not needed if the JSON value is used. +/// +/// `op_fn` returns a serializable value, which is directly returned to JavaScript. +/// +/// When registering an op like this... +/// ```ignore +/// let mut runtime = JsRuntime::new(...); +/// runtime.register_op("hello", deno_core::json_op_sync(Self::hello_op)); +/// ``` +/// +/// ...it can be invoked from JS using the provided name, for example: +/// ```js +/// Deno.core.ops(); +/// let result = Deno.core.jsonOpSync("function_name", args); +/// ``` +/// +/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. +/// A more complete example is available in the examples directory. +pub fn json_op_sync<F, V, R>(op_fn: F) -> Box<OpFn> +where + F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static, + V: DeserializeOwned, + R: Serialize, +{ + Box::new(move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op { + let result = serde_json::from_slice(&bufs[0]) + .map_err(AnyError::from) + .and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs[1..])); + let buf = + json_serialize_op_result(None, result, state.borrow().get_error_class_fn); + Op::Sync(buf) + }) +} + +/// Creates an op that passes data asynchronously using JSON. +/// +/// The provided function `op_fn` has the following parameters: +/// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op. +/// * `V`: the deserializable value that is passed to the Rust function. +/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used. +/// +/// `op_fn` returns a future, whose output is a serializable value. This value will be asynchronously +/// returned to JavaScript. +/// +/// When registering an op like this... +/// ```ignore +/// let mut runtime = JsRuntime::new(...); +/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op)); +/// ``` +/// +/// ...it can be invoked from JS using the provided name, for example: +/// ```js +/// Deno.core.ops(); +/// let future = Deno.core.jsonOpAsync("function_name", args); +/// ``` +/// +/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization. +/// A more complete example is available in the examples directory. +pub fn json_op_async<F, V, R, RV>(op_fn: F) -> Box<OpFn> +where + F: Fn(Rc<RefCell<OpState>>, V, BufVec) -> R + 'static, + V: DeserializeOwned, + R: Future<Output = Result<RV, AnyError>> + 'static, + RV: Serialize, +{ + let try_dispatch_op = + move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> { + let request_id = bufs[0] + .get(0..8) + .map(|b| u64::from_le_bytes(b.try_into().unwrap())) + .ok_or_else(|| type_error("missing or invalid `requestId`"))?; + let args = serde_json::from_slice(&bufs[0][8..])?; + let bufs = bufs[1..].into(); + use crate::futures::FutureExt; + let fut = op_fn(state.clone(), args, bufs).map(move |result| { + json_serialize_op_result( + Some(request_id), + result, + state.borrow().get_error_class_fn, + ) + }); + Ok(Op::Async(Box::pin(fut))) + }; + + Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op { + match try_dispatch_op(state.clone(), bufs) { + Ok(op) => op, + Err(err) => Op::Sync(json_serialize_op_result( + None, + Err::<(), AnyError>(err), + state.borrow().get_error_class_fn, + )), + } + }) +} |