summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/core.js266
-rw-r--r--core/examples/http_bench_bin_ops.js92
-rw-r--r--core/examples/http_bench_bin_ops.rs150
-rw-r--r--core/lib.rs9
-rw-r--r--core/ops.rs122
-rw-r--r--core/ops_bin.rs377
-rw-r--r--core/ops_json.rs134
7 files changed, 743 insertions, 407 deletions
diff --git a/core/core.js b/core/core.js
index f44bf253e..2de8e1fff 100644
--- a/core/core.js
+++ b/core/core.js
@@ -30,13 +30,22 @@ SharedQueue Binary Layout
const core = window.Deno.core;
const { recv, send } = core;
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////////// Dispatch /////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const dispatch = send;
+ const dispatchByName = (opName, control, ...zeroCopy) =>
+ dispatch(opsCache[opName], control, ...zeroCopy);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ //////////////////////////////////// Shared array buffer ///////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
let sharedBytes;
let shared32;
- let asyncHandlers;
-
let opsCache = {};
- const errorMap = {};
function init() {
const shared = core.shared;
@@ -45,6 +54,7 @@ SharedQueue Binary Layout
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);
+
asyncHandlers = [];
// Callers should not call core.recv, use setAsyncHandler.
recv(handleAsyncMsgFromRust);
@@ -150,15 +160,43 @@ SharedQueue Binary Layout
return [opId, buf];
}
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////// Error handling //////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const errorMap = {};
+
+ function registerErrorClass(errorName, className, args) {
+ if (typeof errorMap[errorName] !== "undefined") {
+ throw new TypeError(`Error class for "${errorName}" already registered`);
+ }
+ errorMap[errorName] = [className, args ?? []];
+ }
+
+ function handleError(className, message) {
+ if (typeof errorMap[className] === "undefined") {
+ return new Error(
+ `Unregistered error class: "${className}"\n` +
+ ` ${message}\n` +
+ ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
+ );
+ }
+
+ const [ErrorClass, args] = errorMap[className];
+ return new ErrorClass(message, ...args);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////// Async handling //////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ let asyncHandlers = [];
+
function setAsyncHandler(opId, cb) {
assert(opId != null);
asyncHandlers[opId] = cb;
}
- function setAsyncHandlerByName(opName, cb) {
- setAsyncHandler(opsCache[opName], cb);
- }
-
function handleAsyncMsgFromRust() {
while (true) {
const opIdBuf = shift();
@@ -166,108 +204,196 @@ SharedQueue Binary Layout
break;
}
assert(asyncHandlers[opIdBuf[0]] != null);
- asyncHandlers[opIdBuf[0]](opIdBuf[1]);
+ asyncHandlers[opIdBuf[0]](opIdBuf[1], true);
}
for (let i = 0; i < arguments.length; i += 2) {
- asyncHandlers[arguments[i]](arguments[i + 1]);
+ asyncHandlers[arguments[i]](arguments[i + 1], false);
}
}
- function dispatch(opName, control, ...zeroCopy) {
- return send(opsCache[opName], control, ...zeroCopy);
- }
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////// General sync & async ops handling ////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
- function registerErrorClass(errorName, className, args) {
- if (typeof errorMap[errorName] !== "undefined") {
- throw new TypeError(`Error class for "${errorName}" already registered`);
+ let nextRequestId = 1;
+ const promiseTable = {};
+
+ function asyncHandle(u8Array, isCopyNeeded, opResultParser) {
+ const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded);
+ if (error !== null) {
+ promiseTable[requestId][1](error);
+ } else {
+ promiseTable[requestId][0](result);
}
- errorMap[errorName] = [className, args ?? []];
+ delete promiseTable[requestId];
}
- function getErrorClassAndArgs(errorName) {
- return errorMap[errorName] ?? [undefined, []];
+ function opAsync(opName, opRequestBuilder, opResultParser) {
+ const opId = opsCache[opName];
+ // Make sure requests of this type are handled by the asyncHandler
+ // The asyncHandler's role is to call the "promiseTable[requestId]" function
+ if (typeof asyncHandlers[opId] === "undefined") {
+ asyncHandlers[opId] = (buffer, isCopyNeeded) =>
+ asyncHandle(buffer, isCopyNeeded, opResultParser);
+ }
+
+ const requestId = nextRequestId++;
+
+ // Create and store promise
+ const promise = new Promise((resolve, reject) => {
+ promiseTable[requestId] = [resolve, reject];
+ });
+
+ // Synchronously dispatch async request
+ core.dispatch(opId, ...opRequestBuilder(requestId));
+
+ // Wait for async response
+ return promise;
}
- // Returns Uint8Array
- function encodeJson(args) {
- const s = JSON.stringify(args);
- return core.encode(s);
+ function opSync(opName, opRequestBuilder, opResultParser) {
+ const opId = opsCache[opName];
+ const u8Array = core.dispatch(opId, ...opRequestBuilder());
+
+ const [_, result, error] = opResultParser(u8Array, false);
+ if (error !== null) throw error;
+ return result;
}
- function decodeJson(ui8) {
- const s = core.decode(ui8);
- return JSON.parse(s);
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////// Bin ops handling /////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const binRequestHeaderByteLength = 8 + 4;
+ const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength);
+ const scratchView = new DataView(scratchBuffer);
+
+ function binOpBuildRequest(requestId, argument, zeroCopy) {
+ scratchView.setBigUint64(0, BigInt(requestId), true);
+ scratchView.setUint32(8, argument, true);
+ return [scratchView, ...zeroCopy];
}
- let nextPromiseId = 1;
- const promiseTable = {};
+ function binOpParseResult(u8Array, isCopyNeeded) {
+ // Decode header value from u8Array
+ const headerByteLength = 8 + 2 * 4;
+ assert(u8Array.byteLength >= headerByteLength);
+ assert(u8Array.byteLength % 4 == 0);
+ const view = new DataView(
+ u8Array.buffer,
+ u8Array.byteOffset + u8Array.byteLength - headerByteLength,
+ headerByteLength,
+ );
+
+ const requestId = Number(view.getBigUint64(0, true));
+ const status = view.getUint32(8, true);
+ const result = view.getUint32(12, true);
+
+ // Error handling
+ if (status !== 0) {
+ const className = core.decode(u8Array.subarray(0, result));
+ const message = core.decode(u8Array.subarray(result, -headerByteLength))
+ .trim();
+
+ return [requestId, null, handleError(className, message)];
+ }
- function processResponse(res) {
- if ("ok" in res) {
- return res.ok;
+ if (u8Array.byteLength === headerByteLength) {
+ return [requestId, result, null];
}
- const [ErrorClass, args] = getErrorClassAndArgs(res.err.className);
- if (!ErrorClass) {
- throw new Error(
- `Unregistered error class: "${res.err.className}"\n ${res.err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
- );
+
+ // Rest of response buffer is passed as reference or as a copy
+ let respBuffer = null;
+ if (isCopyNeeded) {
+ // Copy part of the response array (if sent through shared array buf)
+ respBuffer = u8Array.slice(0, result);
+ } else {
+ // Create view on existing array (if sent through overflow)
+ respBuffer = u8Array.subarray(0, result);
}
- throw new ErrorClass(res.err.message, ...args);
+
+ return [requestId, respBuffer, null];
}
- async function jsonOpAsync(opName, args = null, ...zeroCopy) {
- setAsyncHandler(opsCache[opName], jsonOpAsyncHandler);
-
- const promiseId = nextPromiseId++;
- const reqBuf = core.encode("\0".repeat(8) + JSON.stringify(args));
- new DataView(reqBuf.buffer).setBigUint64(0, BigInt(promiseId));
- dispatch(opName, reqBuf, ...zeroCopy);
- let resolve, reject;
- const promise = new Promise((resolve_, reject_) => {
- resolve = resolve_;
- reject = reject_;
- });
- promise.resolve = resolve;
- promise.reject = reject;
- promiseTable[promiseId] = promise;
- return processResponse(await promise);
+ function binOpAsync(opName, argument = 0, ...zeroCopy) {
+ return opAsync(
+ opName,
+ (requestId) => binOpBuildRequest(requestId, argument, zeroCopy),
+ binOpParseResult,
+ );
+ }
+
+ function binOpSync(opName, argument = 0, ...zeroCopy) {
+ return opSync(
+ opName,
+ () => binOpBuildRequest(0, argument, zeroCopy),
+ binOpParseResult,
+ );
}
- function jsonOpSync(opName, args = null, ...zeroCopy) {
- const argsBuf = encodeJson(args);
- const res = dispatch(opName, argsBuf, ...zeroCopy);
- return processResponse(decodeJson(res));
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////// Json ops handling ////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const jsonRequestHeaderLength = 8;
+
+ function jsonOpBuildRequest(requestId, argument, zeroCopy) {
+ const u8Array = core.encode(
+ "\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument),
+ );
+ new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true);
+ return [u8Array, ...zeroCopy];
}
- function jsonOpAsyncHandler(buf) {
- // Json Op.
- const res = decodeJson(buf);
- const promise = promiseTable[res.promiseId];
- delete promiseTable[res.promiseId];
- promise.resolve(res);
+ function jsonOpParseResult(u8Array, _) {
+ const data = JSON.parse(core.decode(u8Array));
+
+ if ("err" in data) {
+ return [
+ data.requestId,
+ null,
+ handleError(data.err.className, data.err.message),
+ ];
+ }
+
+ return [data.requestId, data.ok, null];
+ }
+
+ function jsonOpAsync(opName, argument = null, ...zeroCopy) {
+ return opAsync(
+ opName,
+ (requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy),
+ jsonOpParseResult,
+ );
+ }
+
+ function jsonOpSync(opName, argument = null, ...zeroCopy) {
+ return opSync(
+ opName,
+ () => [core.encode(JSON.stringify(argument)), ...zeroCopy],
+ jsonOpParseResult,
+ );
}
function resources() {
return jsonOpSync("op_resources");
}
-
function close(rid) {
- jsonOpSync("op_close", { rid });
+ return jsonOpSync("op_close", { rid });
}
Object.assign(window.Deno.core, {
jsonOpAsync,
jsonOpSync,
- setAsyncHandler,
- setAsyncHandlerByName,
- dispatch: send,
- dispatchByName: dispatch,
+ binOpAsync,
+ binOpSync,
+ dispatch,
+ dispatchByName,
ops,
close,
resources,
registerErrorClass,
- getErrorClassAndArgs,
sharedQueueInit: init,
// sharedQueue is private but exposed for testing.
sharedQueue: {
@@ -279,5 +405,7 @@ SharedQueue Binary Layout
reset,
shift,
},
+ // setAsyncHandler is private but exposed for testing.
+ setAsyncHandler,
});
})(this);
diff --git a/core/examples/http_bench_bin_ops.js b/core/examples/http_bench_bin_ops.js
index f20366494..18f98419f 100644
--- a/core/examples/http_bench_bin_ops.js
+++ b/core/examples/http_bench_bin_ops.js
@@ -8,85 +8,15 @@ const responseBuf = new Uint8Array(
.split("")
.map((c) => c.charCodeAt(0)),
);
-const promiseMap = new Map();
-let nextPromiseId = 1;
-
-function assert(cond) {
- if (!cond) {
- throw Error("assert");
- }
-}
-
-function createResolvable() {
- let resolve;
- let reject;
- const promise = new Promise((res, rej) => {
- resolve = res;
- reject = rej;
- });
- promise.resolve = resolve;
- promise.reject = reject;
- return promise;
-}
-
-const scratch32 = new Int32Array(3);
-const scratchBytes = new Uint8Array(
- scratch32.buffer,
- scratch32.byteOffset,
- scratch32.byteLength,
-);
-assert(scratchBytes.byteLength === 3 * 4);
-
-function send(promiseId, opId, rid, ...zeroCopy) {
- scratch32[0] = promiseId;
- scratch32[1] = rid;
- scratch32[2] = -1;
- return Deno.core.dispatch(opId, scratchBytes, ...zeroCopy);
-}
-
-/** Returns Promise<number> */
-function sendAsync(opId, rid, ...zeroCopy) {
- const promiseId = nextPromiseId++;
- const p = createResolvable();
- const buf = send(promiseId, opId, rid, ...zeroCopy);
- if (buf) {
- const record = recordFromBuf(buf);
- // Sync result.
- p.resolve(record.result);
- } else {
- // Async result.
- promiseMap.set(promiseId, p);
- }
- return p;
-}
-
-/** Returns i32 number */
-function sendSync(opId, rid) {
- const buf = send(0, opId, rid);
- const record = recordFromBuf(buf);
- return record[2];
-}
-
-function recordFromBuf(buf) {
- assert(buf.byteLength === 3 * 4);
- return new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
-}
-
-function handleAsyncMsgFromRust(buf) {
- const record = recordFromBuf(buf);
- const p = promiseMap.get(record[0]);
- promiseMap.delete(record[0]);
- p.resolve(record[2]);
-}
/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
- return sendSync(ops["listen"], -1);
+ return Deno.core.binOpSync("listen");
}
/** Accepts a connection, returns rid. */
function accept(rid) {
- return sendAsync(ops["accept"], rid);
+ return Deno.core.binOpAsync("accept", rid);
}
/**
@@ -94,16 +24,16 @@ function accept(rid) {
* Returns bytes read.
*/
function read(rid, data) {
- return sendAsync(ops["read"], rid, data);
+ return Deno.core.binOpAsync("read", rid, data);
}
/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
function write(rid, data) {
- return sendAsync(ops["write"], rid, data);
+ return Deno.core.binOpAsync("write", rid, data);
}
function close(rid) {
- return sendSync(ops["close"], rid);
+ Deno.core.binOpSync("close", rid);
}
async function serve(rid) {
@@ -121,16 +51,14 @@ async function serve(rid) {
close(rid);
}
-let ops;
-
async function main() {
- ops = Deno.core.ops();
- for (const opName in ops) {
- Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust);
- }
+ Deno.core.ops();
+ Deno.core.registerErrorClass("Error", Error);
const listenerRid = listen();
- Deno.core.print(`http_bench_bin_ops listening on http://127.0.0.1:4544/\n`);
+ Deno.core.print(
+ `http_bench_bin_ops listening on http://127.0.0.1:4544/\n`,
+ );
for (;;) {
const rid = await accept(listenerRid);
diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs
index bc4ca4dce..1f649b235 100644
--- a/core/examples/http_bench_bin_ops.rs
+++ b/core/examples/http_bench_bin_ops.rs
@@ -3,30 +3,23 @@
#[macro_use]
extern crate log;
+use deno_core::error::bad_resource_id;
+use deno_core::error::AnyError;
use deno_core::AsyncRefCell;
use deno_core::BufVec;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::JsRuntime;
-use deno_core::Op;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
-use futures::future::FutureExt;
-use futures::future::TryFuture;
-use futures::future::TryFutureExt;
use std::cell::RefCell;
use std::convert::TryFrom;
-use std::convert::TryInto;
use std::env;
-use std::fmt::Debug;
use std::io::Error;
-use std::io::ErrorKind;
-use std::mem::size_of;
use std::net::SocketAddr;
-use std::ptr;
use std::rc::Rc;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@@ -120,52 +113,21 @@ impl From<tokio::net::TcpStream> for TcpStream {
}
}
-#[derive(Copy, Clone, Debug, PartialEq)]
-struct Record {
- promise_id: u32,
- rid: ResourceId,
- result: i32,
-}
-
-type RecordBuf = [u8; size_of::<Record>()];
-
-impl From<&[u8]> for Record {
- fn from(buf: &[u8]) -> Self {
- assert_eq!(buf.len(), size_of::<RecordBuf>());
- unsafe { *(buf as *const _ as *const RecordBuf) }.into()
- }
-}
-
-impl From<RecordBuf> for Record {
- fn from(buf: RecordBuf) -> Self {
- unsafe {
- #[allow(clippy::cast_ptr_alignment)]
- ptr::read_unaligned(&buf as *const _ as *const Self)
- }
- }
-}
-
-impl From<Record> for RecordBuf {
- fn from(record: Record) -> Self {
- unsafe { ptr::read(&record as *const _ as *const Self) }
- }
-}
-
fn create_js_runtime() -> JsRuntime {
- let mut js_runtime = JsRuntime::new(Default::default());
- register_op_bin_sync(&mut js_runtime, "listen", op_listen);
- register_op_bin_sync(&mut js_runtime, "close", op_close);
- register_op_bin_async(&mut js_runtime, "accept", op_accept);
- register_op_bin_async(&mut js_runtime, "read", op_read);
- register_op_bin_async(&mut js_runtime, "write", op_write);
- js_runtime
+ let mut runtime = JsRuntime::new(Default::default());
+ runtime.register_op("listen", deno_core::bin_op_sync(op_listen));
+ runtime.register_op("close", deno_core::bin_op_sync(op_close));
+ runtime.register_op("accept", deno_core::bin_op_async(op_accept));
+ runtime.register_op("read", deno_core::bin_op_async(op_read));
+ runtime.register_op("write", deno_core::bin_op_async(op_write));
+ runtime
}
fn op_listen(
state: &mut OpState,
_rid: ResourceId,
_bufs: &mut [ZeroCopyBuf],
-) -> Result<u32, Error> {
+) -> Result<u32, AnyError> {
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let std_listener = std::net::TcpListener::bind(&addr)?;
@@ -179,7 +141,7 @@ fn op_close(
state: &mut OpState,
rid: ResourceId,
_bufs: &mut [ZeroCopyBuf],
-) -> Result<u32, Error> {
+) -> Result<u32, AnyError> {
debug!("close rid={}", rid);
state
.resource_table
@@ -192,7 +154,7 @@ async fn op_accept(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_bufs: BufVec,
-) -> Result<u32, Error> {
+) -> Result<u32, AnyError> {
debug!("accept rid={}", rid);
let listener = state
@@ -209,7 +171,7 @@ async fn op_read(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
mut bufs: BufVec,
-) -> Result<usize, Error> {
+) -> Result<u32, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
debug!("read rid={}", rid);
@@ -218,14 +180,15 @@ async fn op_read(
.resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
- stream.read(&mut bufs[0]).await
+ let nread = stream.read(&mut bufs[0]).await?;
+ Ok(nread as u32)
}
async fn op_write(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
bufs: BufVec,
-) -> Result<usize, Error> {
+) -> Result<u32, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
debug!("write rid={}", rid);
@@ -234,70 +197,8 @@ async fn op_write(
.resource_table
.get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
- stream.write(&bufs[0]).await
-}
-
-fn register_op_bin_sync<F>(
- js_runtime: &mut JsRuntime,
- name: &'static str,
- op_fn: F,
-) where
- F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<u32, Error> + 'static,
-{
- let base_op_fn = move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op {
- let record = Record::from(bufs[0].as_ref());
- let is_sync = record.promise_id == 0;
- assert!(is_sync);
-
- let zero_copy_bufs = &mut bufs[1..];
- let result: i32 =
- match op_fn(&mut state.borrow_mut(), record.rid, zero_copy_bufs) {
- Ok(r) => r as i32,
- Err(_) => -1,
- };
- let buf = RecordBuf::from(Record { result, ..record })[..].into();
- Op::Sync(buf)
- };
-
- js_runtime.register_op(name, base_op_fn);
-}
-
-fn register_op_bin_async<F, R>(
- js_runtime: &mut JsRuntime,
- name: &'static str,
- op_fn: F,
-) where
- F: Fn(Rc<RefCell<OpState>>, u32, BufVec) -> R + Copy + 'static,
- R: TryFuture,
- R::Ok: TryInto<i32>,
- <R::Ok as TryInto<i32>>::Error: Debug,
-{
- let base_op_fn = move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
- let mut bufs_iter = bufs.into_iter();
- let record_buf = bufs_iter.next().unwrap();
- let zero_copy_bufs = bufs_iter.collect::<BufVec>();
-
- let record = Record::from(record_buf.as_ref());
- let is_sync = record.promise_id == 0;
- assert!(!is_sync);
-
- let fut = async move {
- let op = op_fn(state, record.rid, zero_copy_bufs);
- let result = op
- .map_ok(|r| r.try_into().expect("op result does not fit in i32"))
- .unwrap_or_else(|_| -1)
- .await;
- RecordBuf::from(Record { result, ..record })[..].into()
- };
-
- Op::Async(fut.boxed_local())
- };
-
- js_runtime.register_op(name, base_op_fn);
-}
-
-fn bad_resource_id() -> Error {
- Error::new(ErrorKind::NotFound, "bad resource id")
+ let nwritten = stream.write(&bufs[0]).await?;
+ Ok(nwritten as u32)
}
fn main() {
@@ -329,18 +230,3 @@ fn main() {
};
runtime.block_on(future).unwrap();
}
-
-#[test]
-fn test_record_from() {
- let expected = Record {
- promise_id: 1,
- rid: 3,
- result: 4,
- };
- let buf = RecordBuf::from(expected);
- if cfg!(target_endian = "little") {
- assert_eq!(buf, [1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0]);
- }
- let actual = Record::from(buf);
- assert_eq!(actual, expected);
-}
diff --git a/core/lib.rs b/core/lib.rs
index deea9d281..c65ed7aac 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -15,6 +15,8 @@ mod module_specifier;
mod modules;
mod normalize_path;
mod ops;
+mod ops_bin;
+mod ops_json;
pub mod plugin_api;
mod resources;
mod runtime;
@@ -58,8 +60,6 @@ pub use crate::modules::ModuleSourceFuture;
pub use crate::modules::NoopModuleLoader;
pub use crate::modules::RecursiveModuleLoad;
pub use crate::normalize_path::normalize_path;
-pub use crate::ops::json_op_async;
-pub use crate::ops::json_op_sync;
pub use crate::ops::op_close;
pub use crate::ops::op_resources;
pub use crate::ops::Op;
@@ -68,6 +68,11 @@ pub use crate::ops::OpFn;
pub use crate::ops::OpId;
pub use crate::ops::OpState;
pub use crate::ops::OpTable;
+pub use crate::ops_bin::bin_op_async;
+pub use crate::ops_bin::bin_op_sync;
+pub use crate::ops_bin::ValueOrVector;
+pub use crate::ops_json::json_op_async;
+pub use crate::ops_json::json_op_sync;
pub use crate::resources::Resource;
pub use crate::resources::ResourceId;
pub use crate::resources::ResourceTable;
diff --git a/core/ops.rs b/core/ops.rs
index eceab7feb..212a713ad 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -10,13 +10,10 @@ use crate::BufVec;
use crate::ZeroCopyBuf;
use futures::Future;
use indexmap::IndexMap;
-use serde::de::DeserializeOwned;
-use serde::Serialize;
use serde_json::json;
use serde_json::Value;
use std::cell::RefCell;
use std::collections::HashMap;
-use std::convert::TryInto;
use std::iter::once;
use std::ops::Deref;
use std::ops::DerefMut;
@@ -117,125 +114,6 @@ impl Default for OpTable {
}
}
-/// Creates an op that passes data synchronously using JSON.
-///
-/// The provided function `op_fn` has the following parameters:
-/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op.
-/// * `V`: the deserializable value that is passed to the Rust function.
-/// * `&mut [ZeroCopyBuf]`: raw bytes passed along, usually not needed if the JSON value is used.
-///
-/// `op_fn` returns a serializable value, which is directly returned to JavaScript.
-///
-/// When registering an op like this...
-/// ```ignore
-/// let mut runtime = JsRuntime::new(...);
-/// runtime.register_op("hello", deno_core::json_op_sync(Self::hello_op));
-/// ```
-///
-/// ...it can be invoked from JS using the provided name, for example:
-/// ```js
-/// Deno.core.ops();
-/// let result = Deno.core.jsonOpSync("function_name", args);
-/// ```
-///
-/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
-/// A more complete example is available in the examples directory.
-pub fn json_op_sync<F, V, R>(op_fn: F) -> Box<OpFn>
-where
- F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
- V: DeserializeOwned,
- R: Serialize,
-{
- Box::new(move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op {
- let result = serde_json::from_slice(&bufs[0])
- .map_err(AnyError::from)
- .and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs[1..]));
- let buf =
- json_serialize_op_result(None, result, state.borrow().get_error_class_fn);
- Op::Sync(buf)
- })
-}
-
-/// Creates an op that passes data asynchronously using JSON.
-///
-/// The provided function `op_fn` has the following parameters:
-/// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op.
-/// * `V`: the deserializable value that is passed to the Rust function.
-/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used.
-///
-/// `op_fn` returns a future, whose output is a serializable value. This value will be asynchronously
-/// returned to JavaScript.
-///
-/// When registering an op like this...
-/// ```ignore
-/// let mut runtime = JsRuntime::new(...);
-/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op));
-/// ```
-///
-/// ...it can be invoked from JS using the provided name, for example:
-/// ```js
-/// Deno.core.ops();
-/// let future = Deno.core.jsonOpAsync("function_name", args);
-/// ```
-///
-/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
-/// A more complete example is available in the examples directory.
-pub fn json_op_async<F, V, R, RV>(op_fn: F) -> Box<OpFn>
-where
- F: Fn(Rc<RefCell<OpState>>, V, BufVec) -> R + 'static,
- V: DeserializeOwned,
- R: Future<Output = Result<RV, AnyError>> + 'static,
- RV: Serialize,
-{
- let try_dispatch_op =
- move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> {
- let promise_id = bufs[0]
- .get(0..8)
- .map(|b| u64::from_be_bytes(b.try_into().unwrap()))
- .ok_or_else(|| type_error("missing or invalid `promiseId`"))?;
- let args = serde_json::from_slice(&bufs[0][8..])?;
- let bufs = bufs[1..].into();
- use crate::futures::FutureExt;
- let fut = op_fn(state.clone(), args, bufs).map(move |result| {
- json_serialize_op_result(
- Some(promise_id),
- result,
- state.borrow().get_error_class_fn,
- )
- });
- Ok(Op::Async(Box::pin(fut)))
- };
-
- Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
- match try_dispatch_op(state.clone(), bufs) {
- Ok(op) => op,
- Err(err) => Op::Sync(json_serialize_op_result(
- None,
- Err::<(), AnyError>(err),
- state.borrow().get_error_class_fn,
- )),
- }
- })
-}
-
-fn json_serialize_op_result<R: Serialize>(
- promise_id: Option<u64>,
- result: Result<R, AnyError>,
- get_error_class_fn: crate::runtime::GetErrorClassFn,
-) -> Box<[u8]> {
- let value = match result {
- Ok(v) => serde_json::json!({ "ok": v, "promiseId": promise_id }),
- Err(err) => serde_json::json!({
- "promiseId": promise_id ,
- "err": {
- "className": (get_error_class_fn)(&err),
- "message": err.to_string(),
- }
- }),
- };
- serde_json::to_vec(&value).unwrap().into_boxed_slice()
-}
-
/// Return map of resources with id as key
/// and string representation as value.
///
diff --git a/core/ops_bin.rs b/core/ops_bin.rs
new file mode 100644
index 000000000..053150bfd
--- /dev/null
+++ b/core/ops_bin.rs
@@ -0,0 +1,377 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::error::AnyError;
+use crate::futures::future::FutureExt;
+use crate::BufVec;
+use crate::Op;
+use crate::OpFn;
+use crate::OpState;
+use crate::ZeroCopyBuf;
+use std::boxed::Box;
+use std::cell::RefCell;
+use std::convert::TryInto;
+use std::future::Future;
+use std::rc::Rc;
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+pub struct RequestHeader {
+ pub request_id: u64,
+ pub argument: u32,
+}
+
+impl RequestHeader {
+ pub fn from_raw(bytes: &[u8]) -> Option<Self> {
+ if bytes.len() < 3 * 4 {
+ return None;
+ }
+
+ Some(Self {
+ request_id: u64::from_le_bytes(bytes[0..8].try_into().unwrap()),
+ argument: u32::from_le_bytes(bytes[8..12].try_into().unwrap()),
+ })
+ }
+}
+
+#[derive(Copy, Clone, Debug, PartialEq)]
+pub struct ResponseHeader {
+ pub request_id: u64,
+ pub status: u32,
+ pub result: u32,
+}
+
+impl Into<[u8; 16]> for ResponseHeader {
+ fn into(self) -> [u8; 16] {
+ let mut resp_header = [0u8; 16];
+ resp_header[0..8].copy_from_slice(&self.request_id.to_le_bytes());
+ resp_header[8..12].copy_from_slice(&self.status.to_le_bytes());
+ resp_header[12..16].copy_from_slice(&self.result.to_le_bytes());
+ resp_header
+ }
+}
+
+pub trait ValueOrVector {
+ fn value(&self) -> u32;
+ fn vector(self) -> Option<Vec<u8>>;
+}
+
+impl ValueOrVector for Vec<u8> {
+ fn value(&self) -> u32 {
+ self.len() as u32
+ }
+ fn vector(self) -> Option<Vec<u8>> {
+ Some(self)
+ }
+}
+
+impl ValueOrVector for u32 {
+ fn value(&self) -> u32 {
+ *self
+ }
+ fn vector(self) -> Option<Vec<u8>> {
+ None
+ }
+}
+
+fn gen_padding_32bit(len: usize) -> &'static [u8] {
+ &[b' ', b' ', b' '][0..(4 - (len & 3)) & 3]
+}
+
+/// Creates an op that passes data synchronously using raw ui8 buffer.
+///
+/// The provided function `op_fn` has the following parameters:
+/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op.
+/// * `argument`: the i32 value that is passed to the Rust function.
+/// * `&mut [ZeroCopyBuf]`: raw bytes passed along.
+///
+/// `op_fn` returns an array buffer value, which is directly returned to JavaScript.
+///
+/// When registering an op like this...
+/// ```ignore
+/// let mut runtime = JsRuntime::new(...);
+/// runtime.register_op("hello", deno_core::bin_op_sync(Self::hello_op));
+/// ```
+///
+/// ...it can be invoked from JS using the provided name, for example:
+/// ```js
+/// Deno.core.ops();
+/// let result = Deno.core.binOpSync("function_name", args);
+/// ```
+///
+/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
+/// A more complete example is available in the examples directory.
+pub fn bin_op_sync<F, R>(op_fn: F) -> Box<OpFn>
+where
+ F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
+ R: ValueOrVector,
+{
+ Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
+ let mut bufs_iter = bufs.into_iter();
+ let record_buf = bufs_iter.next().expect("Expected record at position 0");
+ let mut zero_copy = bufs_iter.collect::<BufVec>();
+
+ let req_header = match RequestHeader::from_raw(&record_buf) {
+ Some(r) => r,
+ None => {
+ let error_class = b"TypeError";
+ let error_message = b"Unparsable control buffer";
+ let len = error_class.len() + error_message.len();
+ let padding = gen_padding_32bit(len);
+ let resp_header = ResponseHeader {
+ request_id: 0,
+ status: 1,
+ result: error_class.len() as u32,
+ };
+ return Op::Sync(
+ error_class
+ .iter()
+ .chain(error_message.iter())
+ .chain(padding)
+ .chain(&Into::<[u8; 16]>::into(resp_header))
+ .cloned()
+ .collect(),
+ );
+ }
+ };
+
+ match op_fn(&mut state.borrow_mut(), req_header.argument, &mut zero_copy) {
+ Ok(possibly_vector) => {
+ let resp_header = ResponseHeader {
+ request_id: req_header.request_id,
+ status: 0,
+ result: possibly_vector.value(),
+ };
+ let resp_encoded_header = Into::<[u8; 16]>::into(resp_header);
+
+ let resp_vector = match possibly_vector.vector() {
+ Some(mut vector) => {
+ let padding = gen_padding_32bit(vector.len());
+ vector.extend(padding);
+ vector.extend(&resp_encoded_header);
+ vector
+ }
+ None => resp_encoded_header.to_vec(),
+ };
+ Op::Sync(resp_vector.into_boxed_slice())
+ }
+ Err(error) => {
+ let error_class =
+ (state.borrow().get_error_class_fn)(&error).as_bytes();
+ let error_message = error.to_string().as_bytes().to_owned();
+ let len = error_class.len() + error_message.len();
+ let padding = gen_padding_32bit(len);
+ let resp_header = ResponseHeader {
+ request_id: req_header.request_id,
+ status: 1,
+ result: error_class.len() as u32,
+ };
+ return Op::Sync(
+ error_class
+ .iter()
+ .chain(error_message.iter())
+ .chain(padding)
+ .chain(&Into::<[u8; 16]>::into(resp_header))
+ .cloned()
+ .collect(),
+ );
+ }
+ }
+ })
+}
+
+/// Creates an op that passes data asynchronously using raw ui8 buffer.
+///
+/// The provided function `op_fn` has the following parameters:
+/// * `Rc<RefCell<OpState>>`: the op state, can be used to read/write resources in the runtime from an op.
+/// * `argument`: the i32 value that is passed to the Rust function.
+/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used.
+///
+/// `op_fn` returns a future, whose output is a JSON value. This value will be asynchronously
+/// returned to JavaScript.
+///
+/// When registering an op like this...
+/// ```ignore
+/// let mut runtime = JsRuntime::new(...);
+/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op));
+/// ```
+///
+/// ...it can be invoked from JS using the provided name, for example:
+/// ```js
+/// Deno.core.ops();
+/// let future = Deno.core.jsonOpAsync("function_name", args);
+/// ```
+///
+/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
+/// A more complete example is available in the examples directory.
+pub fn bin_op_async<F, R, RV>(op_fn: F) -> Box<OpFn>
+where
+ F: Fn(Rc<RefCell<OpState>>, u32, BufVec) -> R + 'static,
+ R: Future<Output = Result<RV, AnyError>> + 'static,
+ RV: ValueOrVector,
+{
+ Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
+ let mut bufs_iter = bufs.into_iter();
+ let record_buf = bufs_iter.next().expect("Expected record at position 0");
+ let zero_copy = bufs_iter.collect::<BufVec>();
+
+ let req_header = match RequestHeader::from_raw(&record_buf) {
+ Some(r) => r,
+ None => {
+ let error_class = b"TypeError";
+ let error_message = b"Unparsable control buffer";
+ let len = error_class.len() + error_message.len();
+ let padding = gen_padding_32bit(len);
+ let resp_header = ResponseHeader {
+ request_id: 0,
+ status: 1,
+ result: error_class.len() as u32,
+ };
+ return Op::Sync(
+ error_class
+ .iter()
+ .chain(error_message.iter())
+ .chain(padding)
+ .chain(&Into::<[u8; 16]>::into(resp_header))
+ .cloned()
+ .collect(),
+ );
+ }
+ };
+
+ let fut =
+ op_fn(state.clone(), req_header.argument, zero_copy).map(move |result| {
+ match result {
+ Ok(possibly_vector) => {
+ let resp_header = ResponseHeader {
+ request_id: req_header.request_id,
+ status: 0,
+ result: possibly_vector.value(),
+ };
+ let resp_encoded_header = Into::<[u8; 16]>::into(resp_header);
+
+ let resp_vector = match possibly_vector.vector() {
+ Some(mut vector) => {
+ let padding = gen_padding_32bit(vector.len());
+ vector.extend(padding);
+ vector.extend(&resp_encoded_header);
+ vector
+ }
+ None => resp_encoded_header.to_vec(),
+ };
+ resp_vector.into_boxed_slice()
+ }
+ Err(error) => {
+ let error_class =
+ (state.borrow().get_error_class_fn)(&error).as_bytes();
+ let error_message = error.to_string().as_bytes().to_owned();
+ let len = error_class.len() + error_message.len();
+ let padding = gen_padding_32bit(len);
+ let resp_header = ResponseHeader {
+ request_id: req_header.request_id,
+ status: 1,
+ result: error_class.len() as u32,
+ };
+
+ error_class
+ .iter()
+ .chain(error_message.iter())
+ .chain(padding)
+ .chain(&Into::<[u8; 16]>::into(resp_header))
+ .cloned()
+ .collect()
+ }
+ }
+ });
+ let temp = Box::pin(fut);
+ Op::Async(temp)
+ })
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn padding() {
+ assert_eq!(gen_padding_32bit(0), &[] as &[u8]);
+ assert_eq!(gen_padding_32bit(1), &[b' ', b' ', b' ']);
+ assert_eq!(gen_padding_32bit(2), &[b' ', b' ']);
+ assert_eq!(gen_padding_32bit(3), &[b' ']);
+ assert_eq!(gen_padding_32bit(4), &[] as &[u8]);
+ assert_eq!(gen_padding_32bit(5), &[b' ', b' ', b' ']);
+ }
+
+ #[test]
+ fn response_header_to_bytes() {
+ // Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´
+ let resp_header = ResponseHeader {
+ request_id: 0x0102030405060708u64,
+ status: 0x090A0B0Cu32,
+ result: 0x0D0E0F10u32,
+ };
+
+ // All numbers are always little-endian encoded, as the js side also wants this to be fixed
+ assert_eq!(
+ &Into::<[u8; 16]>::into(resp_header),
+ &[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 16, 15, 14, 13]
+ );
+ }
+
+ #[test]
+ fn response_header_to_bytes_max_value() {
+ // Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´
+ let resp_header = ResponseHeader {
+ request_id: (1u64 << 53u64) - 1u64,
+ status: 0xFFFFFFFFu32,
+ result: 0xFFFFFFFFu32,
+ };
+
+ // All numbers are always little-endian encoded, as the js side also wants this to be fixed
+ assert_eq!(
+ &Into::<[u8; 16]>::into(resp_header),
+ &[
+ 255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255, 255, 255, 255,
+ 255
+ ]
+ );
+ }
+
+ #[test]
+ fn request_header_from_bytes() {
+ let req_header =
+ RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9])
+ .unwrap();
+
+ assert_eq!(req_header.request_id, 0x0102030405060708u64);
+ assert_eq!(req_header.argument, 0x090A0B0Cu32);
+ }
+
+ #[test]
+ fn request_header_from_bytes_max_value() {
+ let req_header = RequestHeader::from_raw(&[
+ 255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255,
+ ])
+ .unwrap();
+
+ assert_eq!(req_header.request_id, (1u64 << 53u64) - 1u64);
+ assert_eq!(req_header.argument, 0xFFFFFFFFu32);
+ }
+
+ #[test]
+ fn request_header_from_bytes_too_short() {
+ let req_header =
+ RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10]);
+
+ assert_eq!(req_header, None);
+ }
+
+ #[test]
+ fn request_header_from_bytes_long() {
+ let req_header = RequestHeader::from_raw(&[
+ 8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 13, 14, 15, 16, 17, 18, 19, 20, 21,
+ ])
+ .unwrap();
+
+ assert_eq!(req_header.request_id, 0x0102030405060708u64);
+ assert_eq!(req_header.argument, 0x090A0B0Cu32);
+ }
+}
diff --git a/core/ops_json.rs b/core/ops_json.rs
new file mode 100644
index 000000000..0ef91ed33
--- /dev/null
+++ b/core/ops_json.rs
@@ -0,0 +1,134 @@
+// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+
+use crate::error::type_error;
+use crate::error::AnyError;
+use crate::BufVec;
+use crate::Op;
+use crate::OpFn;
+use crate::OpState;
+use crate::ZeroCopyBuf;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
+use std::cell::RefCell;
+use std::convert::TryInto;
+use std::future::Future;
+use std::rc::Rc;
+
+fn json_serialize_op_result<R: Serialize>(
+ request_id: Option<u64>,
+ result: Result<R, AnyError>,
+ get_error_class_fn: crate::runtime::GetErrorClassFn,
+) -> Box<[u8]> {
+ let value = match result {
+ Ok(v) => serde_json::json!({ "ok": v, "requestId": request_id }),
+ Err(err) => serde_json::json!({
+ "requestId": request_id,
+ "err": {
+ "className": (get_error_class_fn)(&err),
+ "message": err.to_string(),
+ }
+ }),
+ };
+ serde_json::to_vec(&value).unwrap().into_boxed_slice()
+}
+
+/// Creates an op that passes data synchronously using JSON.
+///
+/// The provided function `op_fn` has the following parameters:
+/// * `&mut OpState`: the op state, can be used to read/write resources in the runtime from an op.
+/// * `V`: the deserializable value that is passed to the Rust function.
+/// * `&mut [ZeroCopyBuf]`: raw bytes passed along, usually not needed if the JSON value is used.
+///
+/// `op_fn` returns a serializable value, which is directly returned to JavaScript.
+///
+/// When registering an op like this...
+/// ```ignore
+/// let mut runtime = JsRuntime::new(...);
+/// runtime.register_op("hello", deno_core::json_op_sync(Self::hello_op));
+/// ```
+///
+/// ...it can be invoked from JS using the provided name, for example:
+/// ```js
+/// Deno.core.ops();
+/// let result = Deno.core.jsonOpSync("function_name", args);
+/// ```
+///
+/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
+/// A more complete example is available in the examples directory.
+pub fn json_op_sync<F, V, R>(op_fn: F) -> Box<OpFn>
+where
+ F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
+ V: DeserializeOwned,
+ R: Serialize,
+{
+ Box::new(move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op {
+ let result = serde_json::from_slice(&bufs[0])
+ .map_err(AnyError::from)
+ .and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs[1..]));
+ let buf =
+ json_serialize_op_result(None, result, state.borrow().get_error_class_fn);
+ Op::Sync(buf)
+ })
+}
+
+/// Creates an op that passes data asynchronously using JSON.
+///
+/// The provided function `op_fn` has the following parameters:
+/// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op.
+/// * `V`: the deserializable value that is passed to the Rust function.
+/// * `BufVec`: raw bytes passed along, usually not needed if the JSON value is used.
+///
+/// `op_fn` returns a future, whose output is a serializable value. This value will be asynchronously
+/// returned to JavaScript.
+///
+/// When registering an op like this...
+/// ```ignore
+/// let mut runtime = JsRuntime::new(...);
+/// runtime.register_op("hello", deno_core::json_op_async(Self::hello_op));
+/// ```
+///
+/// ...it can be invoked from JS using the provided name, for example:
+/// ```js
+/// Deno.core.ops();
+/// let future = Deno.core.jsonOpAsync("function_name", args);
+/// ```
+///
+/// The `Deno.core.ops()` statement is needed once before any op calls, for initialization.
+/// A more complete example is available in the examples directory.
+pub fn json_op_async<F, V, R, RV>(op_fn: F) -> Box<OpFn>
+where
+ F: Fn(Rc<RefCell<OpState>>, V, BufVec) -> R + 'static,
+ V: DeserializeOwned,
+ R: Future<Output = Result<RV, AnyError>> + 'static,
+ RV: Serialize,
+{
+ let try_dispatch_op =
+ move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> {
+ let request_id = bufs[0]
+ .get(0..8)
+ .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
+ .ok_or_else(|| type_error("missing or invalid `requestId`"))?;
+ let args = serde_json::from_slice(&bufs[0][8..])?;
+ let bufs = bufs[1..].into();
+ use crate::futures::FutureExt;
+ let fut = op_fn(state.clone(), args, bufs).map(move |result| {
+ json_serialize_op_result(
+ Some(request_id),
+ result,
+ state.borrow().get_error_class_fn,
+ )
+ });
+ Ok(Op::Async(Box::pin(fut)))
+ };
+
+ Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
+ match try_dispatch_op(state.clone(), bufs) {
+ Ok(op) => op,
+ Err(err) => Op::Sync(json_serialize_op_result(
+ None,
+ Err::<(), AnyError>(err),
+ state.borrow().get_error_class_fn,
+ )),
+ }
+ })
+}