summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2020-08-20 09:45:59 -0400
committerGitHub <noreply@github.com>2020-08-20 09:45:59 -0400
commit0095611af98d3039e30ff44444ab83f65bcec554 (patch)
tree2c87ec9c7fc39f5a0af83c179b7f14493a8b6b95
parentbe1e7ab5320c0a110998818c3916c79b39710613 (diff)
First pass at json ops in core (#7033)
Adds Deno.core.jsonOpSync and Deno.core.jsonOpAsync
-rw-r--r--core/core.js57
-rw-r--r--core/core_isolate.rs63
-rw-r--r--core/examples/http_bench.js13
-rw-r--r--core/examples/http_bench.rs37
-rw-r--r--core/lib.rs1
5 files changed, 151 insertions, 20 deletions
diff --git a/core/core.js b/core/core.js
index 5f9d6f981..099472614 100644
--- a/core/core.js
+++ b/core/core.js
@@ -201,7 +201,64 @@ SharedQueue Binary Layout
return errorClass;
}
+ // Returns Uint8Array
+ function encodeJson(args) {
+ const s = JSON.stringify(args);
+ return core.encode(s);
+ }
+
+ function decodeJson(ui8) {
+ const s = Deno.core.decode(ui8);
+ return JSON.parse(s);
+ }
+
+ let nextPromiseId = 1;
+ const promiseTable = {};
+
+ function jsonOpAsync(opName, args) {
+ setAsyncHandler(opsCache[opName], jsonOpAsyncHandler);
+
+ args.promiseId = nextPromiseId++;
+ const argsBuf = encodeJson(args);
+ dispatch(opName, argsBuf);
+ let resolve, reject;
+ const promise = new Promise((resolve_, reject_) => {
+ resolve = resolve_;
+ reject = reject_;
+ });
+ promise.resolve = resolve;
+ promise.reject = reject;
+ promiseTable[args.promiseId] = promise;
+ return promise;
+ }
+
+ function jsonOpSync(opName, args) {
+ const argsBuf = encodeJson(args);
+ const res = dispatch(opName, argsBuf);
+ const r = decodeJson(res);
+ if (r["ok"]) {
+ return r["ok"];
+ } else {
+ throw r["err"];
+ }
+ }
+
+ function jsonOpAsyncHandler(buf) {
+ // Json Op.
+ const msg = decodeJson(buf);
+ const { ok, err, promiseId } = msg;
+ const promise = promiseTable[promiseId];
+ delete promiseTable[promiseId];
+ if (ok) {
+ promise.resolve(ok);
+ } else {
+ promise.reject(err);
+ }
+ }
+
Object.assign(window.Deno.core, {
+ jsonOpAsync,
+ jsonOpSync,
setAsyncHandler,
dispatch: send,
dispatchByName: dispatch,
diff --git a/core/core_isolate.rs b/core/core_isolate.rs
index 18539395f..160e37f1d 100644
--- a/core/core_isolate.rs
+++ b/core/core_isolate.rs
@@ -20,6 +20,8 @@ use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::AtomicWaker;
use futures::Future;
+use serde_json::json;
+use serde_json::Value;
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
@@ -429,6 +431,50 @@ impl CoreIsolate {
state.op_registry.register(name, op)
}
+ pub fn register_op_json_sync<F>(&mut self, name: &str, op: F) -> OpId
+ where
+ F: 'static
+ + Fn(
+ &mut CoreIsolateState,
+ serde_json::Value,
+ &mut [ZeroCopyBuf],
+ ) -> Result<serde_json::Value, ErrBox>,
+ {
+ let core_op =
+ move |state: &mut CoreIsolateState, bufs: &mut [ZeroCopyBuf]| -> Op {
+ let value = serde_json::from_slice(&bufs[0]).unwrap();
+ let result = op(state, value, &mut bufs[1..]);
+ let buf = serialize_result(None, result);
+ Op::Sync(buf)
+ };
+
+ let state_rc = Self::state(self);
+ let mut state = state_rc.borrow_mut();
+ state.op_registry.register(name, core_op)
+ }
+
+ pub fn register_op_json_async<F, Fut>(&mut self, name: &str, op: F) -> OpId
+ where
+ Fut: 'static + Future<Output = Result<serde_json::Value, ErrBox>>,
+ F: 'static
+ + Fn(&mut CoreIsolateState, serde_json::Value, &mut [ZeroCopyBuf]) -> Fut,
+ {
+ let core_op = move |state: &mut CoreIsolateState,
+ bufs: &mut [ZeroCopyBuf]|
+ -> Op {
+ let value: serde_json::Value = serde_json::from_slice(&bufs[0]).unwrap();
+ let promise_id = value.get("promiseId").unwrap().as_u64().unwrap();
+ let fut = op(state, value, &mut bufs[1..]);
+ let fut2 =
+ fut.map(move |result| serialize_result(Some(promise_id), result));
+ Op::Async(Box::pin(fut2))
+ };
+
+ let state_rc = Self::state(self);
+ let mut state = state_rc.borrow_mut();
+ state.op_registry.register(name, core_op)
+ }
+
/// Registers a callback on the isolate when the memory limits are approached.
/// Use this to prevent V8 from crashing the process when reaching the limit.
///
@@ -484,6 +530,23 @@ where
callback(current_heap_limit, initial_heap_limit)
}
+fn serialize_result(
+ promise_id: Option<u64>,
+ result: Result<Value, ErrBox>,
+) -> Buf {
+ let value = match result {
+ Ok(v) => json!({ "ok": v, "promiseId": promise_id }),
+ Err(err) => json!({
+ "promiseId": promise_id ,
+ "err": {
+ "message": err.to_string(),
+ "kind": "Other", // TODO(ry) Figure out how to propagate errors.
+ }
+ }),
+ };
+ serde_json::to_vec(&value).unwrap().into_boxed_slice()
+}
+
impl Future for CoreIsolate {
type Output = Result<(), ErrBox>;
diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js
index eba9bb677..ac97e0d88 100644
--- a/core/examples/http_bench.js
+++ b/core/examples/http_bench.js
@@ -80,12 +80,14 @@ function handleAsyncMsgFromRust(buf) {
/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
- return sendSync(ops["listen"], -1);
+ const { rid } = Deno.core.jsonOpSync("listen", {});
+ return rid;
}
/** Accepts a connection, returns rid. */
-function accept(rid) {
- return sendAsync(ops["accept"], rid);
+async function accept(serverRid) {
+ const { rid } = await Deno.core.jsonOpAsync("accept", { rid: serverRid });
+ return rid;
}
/**
@@ -124,9 +126,8 @@ let ops;
async function main() {
ops = Deno.core.ops();
- for (const opName in ops) {
- Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust);
- }
+ Deno.core.setAsyncHandler(ops["read"], handleAsyncMsgFromRust);
+ Deno.core.setAsyncHandler(ops["write"], handleAsyncMsgFromRust);
Deno.core.print("http_bench.js start\n");
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index 3a11a3507..0dbb6f8e6 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -1,14 +1,17 @@
#[macro_use]
extern crate log;
+use deno_core::serde_json;
use deno_core::CoreIsolate;
use deno_core::CoreIsolateState;
+use deno_core::ErrBox;
use deno_core::Op;
use deno_core::ResourceTable;
use deno_core::Script;
use deno_core::StartupData;
use deno_core::ZeroCopyBuf;
use futures::future::poll_fn;
+use futures::future::Future;
use futures::future::FutureExt;
use futures::future::TryFuture;
use futures::future::TryFutureExt;
@@ -152,8 +155,8 @@ pub fn isolate_new() -> CoreIsolate {
isolate.register_op(name, core_handler);
}
- register_sync_op(&mut isolate, "listen", op_listen);
- register_async_op(&mut isolate, "accept", op_accept);
+ isolate.register_op_json_sync("listen", op_listen);
+ isolate.register_op_json_async("accept", op_accept);
register_async_op(&mut isolate, "read", op_read);
register_async_op(&mut isolate, "write", op_write);
register_sync_op(&mut isolate, "close", op_close);
@@ -175,34 +178,40 @@ fn op_close(
}
fn op_listen(
- resource_table: Rc<RefCell<ResourceTable>>,
- _rid: u32,
+ state: &mut CoreIsolateState,
+ _args: serde_json::Value,
_buf: &mut [ZeroCopyBuf],
-) -> Result<u32, Error> {
+) -> Result<serde_json::Value, ErrBox> {
debug!("listen");
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let std_listener = std::net::TcpListener::bind(&addr)?;
let listener = TcpListener::from_std(std_listener)?;
- let resource_table = &mut resource_table.borrow_mut();
+ let resource_table = &mut state.resource_table.borrow_mut();
let rid = resource_table.add("tcpListener", Box::new(listener));
- Ok(rid)
+ Ok(serde_json::json!({ "rid": rid }))
}
fn op_accept(
- resource_table: Rc<RefCell<ResourceTable>>,
- rid: u32,
+ state: &mut CoreIsolateState,
+ args: serde_json::Value,
_buf: &mut [ZeroCopyBuf],
-) -> impl TryFuture<Ok = u32, Error = Error> {
+) -> impl Future<Output = Result<serde_json::Value, ErrBox>> {
+ let rid = args.get("rid").unwrap().as_u64().unwrap() as u32;
debug!("accept rid={}", rid);
+ let resource_table = state.resource_table.clone();
poll_fn(move |cx| {
let resource_table = &mut resource_table.borrow_mut();
let listener = resource_table
.get_mut::<TcpListener>(rid)
.ok_or_else(bad_resource)?;
- listener.poll_accept(cx).map_ok(|(stream, _addr)| {
- resource_table.add("tcpStream", Box::new(stream))
- })
+ listener
+ .poll_accept(cx)
+ .map_err(ErrBox::from)
+ .map_ok(|(stream, _addr)| {
+ let rid = resource_table.add("tcpStream", Box::new(stream));
+ serde_json::json!({ "rid": rid })
+ })
})
}
@@ -265,7 +274,7 @@ fn main() {
.enable_all()
.build()
.unwrap();
- runtime.block_on(isolate).expect("unexpected isolate error");
+ deno_core::js_check(runtime.block_on(isolate));
}
#[test]
diff --git a/core/lib.rs b/core/lib.rs
index cd4a4eeee..940e0c026 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -53,6 +53,7 @@ pub use crate::ops::OpId;
pub use crate::resources::ResourceTable;
pub use crate::zero_copy_buf::BufVec;
pub use crate::zero_copy_buf::ZeroCopyBuf;
+pub use serde_json;
pub fn v8_version() -> &'static str {
v8::V8::get_version()