From c821e8f2f1fb8ad5e9eb00854277cafc8c80b2f5 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Sun, 6 Sep 2020 02:34:02 +0200 Subject: Move JSON ops to deno_core (#7336) --- core/examples/http_bench_bin_ops.rs | 223 +++++++++++++++++------------------ core/examples/http_bench_json_ops.rs | 163 ++++++++++++++----------- 2 files changed, 196 insertions(+), 190 deletions(-) (limited to 'core/examples') diff --git a/core/examples/http_bench_bin_ops.rs b/core/examples/http_bench_bin_ops.rs index 366779e8c..f93b8b079 100644 --- a/core/examples/http_bench_bin_ops.rs +++ b/core/examples/http_bench_bin_ops.rs @@ -1,10 +1,12 @@ #[macro_use] extern crate log; +use deno_core::js_check; +use deno_core::BasicState; +use deno_core::BufVec; use deno_core::CoreIsolate; -use deno_core::CoreIsolateState; use deno_core::Op; -use deno_core::ResourceTable; +use deno_core::OpRegistry; use deno_core::Script; use deno_core::StartupData; use deno_core::ZeroCopyBuf; @@ -12,7 +14,6 @@ use futures::future::poll_fn; use futures::future::FutureExt; use futures::future::TryFuture; use futures::future::TryFutureExt; -use std::cell::RefCell; use std::convert::TryInto; use std::env; use std::fmt::Debug; @@ -27,6 +28,7 @@ use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::net::TcpListener; use tokio::net::TcpStream; +use tokio::runtime; struct Logger; @@ -46,9 +48,9 @@ impl log::Log for Logger { #[derive(Copy, Clone, Debug, PartialEq)] struct Record { - pub promise_id: u32, - pub rid: u32, - pub result: i32, + promise_id: u32, + rid: u32, + result: i32, } type RecordBuf = [u8; size_of::()]; @@ -75,131 +77,64 @@ impl From for RecordBuf { } } -pub fn isolate_new() -> CoreIsolate { +fn create_isolate() -> CoreIsolate { + let state = BasicState::new(); + register_op_bin_sync(&state, "listen", op_listen); + register_op_bin_sync(&state, "close", op_close); + register_op_bin_async(&state, "accept", op_accept); + register_op_bin_async(&state, "read", op_read); + register_op_bin_async(&state, "write", op_write); + let startup_data = StartupData::Script(Script { source: include_str!("http_bench_bin_ops.js"), filename: "http_bench_bin_ops.js", }); - let mut isolate = CoreIsolate::new(startup_data, false); - - fn register_sync_op( - isolate: &mut CoreIsolate, - name: &'static str, - handler: F, - ) where - F: 'static - + Fn( - Rc>, - u32, - &mut [ZeroCopyBuf], - ) -> Result, - { - let core_handler = move |state: &mut CoreIsolateState, - zero_copy_bufs: &mut [ZeroCopyBuf]| - -> Op { - assert!(!zero_copy_bufs.is_empty()); - let record = Record::from(zero_copy_bufs[0].as_ref()); - let is_sync = record.promise_id == 0; - assert!(is_sync); - - let resource_table = state.resource_table.clone(); - let result: i32 = - match handler(resource_table, record.rid, &mut zero_copy_bufs[1..]) { - Ok(r) => r as i32, - Err(_) => -1, - }; - let buf = RecordBuf::from(Record { result, ..record })[..].into(); - Op::Sync(buf) - }; - - isolate.register_op(name, core_handler); - } - - fn register_async_op( - isolate: &mut CoreIsolate, - name: &'static str, - handler: impl Fn(Rc>, u32, &mut [ZeroCopyBuf]) -> F - + Copy - + 'static, - ) where - F: TryFuture, - F::Ok: TryInto, - >::Error: Debug, - { - let core_handler = move |state: &mut CoreIsolateState, - zero_copy_bufs: &mut [ZeroCopyBuf]| - -> Op { - assert!(!zero_copy_bufs.is_empty()); - let record = Record::from(zero_copy_bufs[0].as_ref()); - let is_sync = record.promise_id == 0; - assert!(!is_sync); - - let mut zero_copy = zero_copy_bufs[1..].to_vec(); - let resource_table = state.resource_table.clone(); - let fut = async move { - let op = handler(resource_table, record.rid, &mut zero_copy); - let result = op - .map_ok(|r| r.try_into().expect("op result does not fit in i32")) - .unwrap_or_else(|_| -1) - .await; - RecordBuf::from(Record { result, ..record })[..].into() - }; - - Op::Async(fut.boxed_local()) - }; - - isolate.register_op(name, core_handler); - } - - register_sync_op(&mut isolate, "listen", op_listen); - register_async_op(&mut isolate, "accept", op_accept); - register_async_op(&mut isolate, "read", op_read); - register_async_op(&mut isolate, "write", op_write); - register_sync_op(&mut isolate, "close", op_close); - - isolate -} - -fn op_close( - resource_table: Rc>, - rid: u32, - _buf: &mut [ZeroCopyBuf], -) -> Result { - debug!("close rid={}", rid); - let resource_table = &mut resource_table.borrow_mut(); - resource_table - .close(rid) - .map(|_| 0) - .ok_or_else(bad_resource) + CoreIsolate::new(state, startup_data, false) } fn op_listen( - resource_table: Rc>, + state: &BasicState, _rid: u32, - _buf: &mut [ZeroCopyBuf], + _bufs: &mut [ZeroCopyBuf], ) -> Result { debug!("listen"); let addr = "127.0.0.1:4544".parse::().unwrap(); let std_listener = std::net::TcpListener::bind(&addr)?; let listener = TcpListener::from_std(std_listener)?; - let resource_table = &mut resource_table.borrow_mut(); - let rid = resource_table.add("tcpListener", Box::new(listener)); + let rid = state + .resource_table + .borrow_mut() + .add("tcpListener", Box::new(listener)); Ok(rid) } +fn op_close( + state: &BasicState, + rid: u32, + _bufs: &mut [ZeroCopyBuf], +) -> Result { + debug!("close rid={}", rid); + state + .resource_table + .borrow_mut() + .close(rid) + .map(|_| 0) + .ok_or_else(bad_resource_id) +} + fn op_accept( - resource_table: Rc>, + state: Rc, rid: u32, - _buf: &mut [ZeroCopyBuf], + _bufs: BufVec, ) -> impl TryFuture { debug!("accept rid={}", rid); poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let listener = resource_table .get_mut::(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(bad_resource_id)?; listener.poll_accept(cx).map_ok(|(stream, _addr)| { resource_table.add("tcpStream", Box::new(stream)) }) @@ -207,9 +142,9 @@ fn op_accept( } fn op_read( - resource_table: Rc>, + state: Rc, rid: u32, - bufs: &mut [ZeroCopyBuf], + bufs: BufVec, ) -> impl TryFuture { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); let mut buf = bufs[0].clone(); @@ -217,33 +152,85 @@ fn op_read( debug!("read rid={}", rid); poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let stream = resource_table .get_mut::(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(bad_resource_id)?; Pin::new(stream).poll_read(cx, &mut buf) }) } fn op_write( - resource_table: Rc>, + state: Rc, rid: u32, - bufs: &mut [ZeroCopyBuf], + bufs: BufVec, ) -> impl TryFuture { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); let buf = bufs[0].clone(); debug!("write rid={}", rid); poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let stream = resource_table .get_mut::(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(bad_resource_id)?; Pin::new(stream).poll_write(cx, &buf) }) } -fn bad_resource() -> Error { +fn register_op_bin_sync(state: &BasicState, name: &'static str, op_fn: F) +where + F: Fn(&BasicState, u32, &mut [ZeroCopyBuf]) -> Result + 'static, +{ + let base_op_fn = move |state: Rc, mut bufs: BufVec| -> Op { + let record = Record::from(bufs[0].as_ref()); + let is_sync = record.promise_id == 0; + assert!(is_sync); + + let zero_copy_bufs = &mut bufs[1..]; + let result: i32 = match op_fn(&state, record.rid, zero_copy_bufs) { + Ok(r) => r as i32, + Err(_) => -1, + }; + let buf = RecordBuf::from(Record { result, ..record })[..].into(); + Op::Sync(buf) + }; + + state.register_op(name, base_op_fn); +} + +fn register_op_bin_async(state: &BasicState, name: &'static str, op_fn: F) +where + F: Fn(Rc, u32, BufVec) -> R + Copy + 'static, + R: TryFuture, + R::Ok: TryInto, + >::Error: Debug, +{ + let base_op_fn = move |state: Rc, bufs: BufVec| -> Op { + let mut bufs_iter = bufs.into_iter(); + let record_buf = bufs_iter.next().unwrap(); + let zero_copy_bufs = bufs_iter.collect::(); + + let record = Record::from(record_buf.as_ref()); + let is_sync = record.promise_id == 0; + assert!(!is_sync); + + let fut = async move { + let op = op_fn(state, record.rid, zero_copy_bufs); + let result = op + .map_ok(|r| r.try_into().expect("op result does not fit in i32")) + .unwrap_or_else(|_| -1) + .await; + RecordBuf::from(Record { result, ..record })[..].into() + }; + + Op::Async(fut.boxed_local()) + }; + + state.register_op(name, base_op_fn); +} + +fn bad_resource_id() -> Error { Error::new(ErrorKind::NotFound, "bad resource id") } @@ -259,13 +246,13 @@ fn main() { // NOTE: `--help` arg will display V8 help and exit deno_core::v8_set_flags(env::args().collect()); - let isolate = isolate_new(); - let mut runtime = tokio::runtime::Builder::new() + let isolate = create_isolate(); + let mut runtime = runtime::Builder::new() .basic_scheduler() .enable_all() .build() .unwrap(); - runtime.block_on(isolate).expect("unexpected isolate error"); + js_check(runtime.block_on(isolate)); } #[test] diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index f0fc5f94c..6e3063a0f 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -1,25 +1,29 @@ #[macro_use] extern crate log; -use deno_core::serde_json; +use deno_core::js_check; +use deno_core::BasicState; +use deno_core::BufVec; use deno_core::CoreIsolate; -use deno_core::CoreIsolateState; use deno_core::ErrBox; +use deno_core::OpRegistry; use deno_core::Script; use deno_core::StartupData; use deno_core::ZeroCopyBuf; use futures::future::poll_fn; use futures::future::Future; +use serde_json::Value; +use std::convert::TryInto; use std::env; -use std::io::Error; -use std::io::ErrorKind; use std::net::SocketAddr; use std::pin::Pin; +use std::rc::Rc; use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::net::TcpListener; use tokio::net::TcpStream; +use tokio::runtime; struct Logger; @@ -37,66 +41,79 @@ impl log::Log for Logger { fn flush(&self) {} } -pub fn isolate_new() -> CoreIsolate { +fn create_isolate() -> CoreIsolate { + let state = BasicState::new(); + state.register_op_json_sync("listen", op_listen); + state.register_op_json_sync("close", op_close); + state.register_op_json_async("accept", op_accept); + state.register_op_json_async("read", op_read); + state.register_op_json_async("write", op_write); + let startup_data = StartupData::Script(Script { source: include_str!("http_bench_json_ops.js"), filename: "http_bench_json_ops.js", }); - let mut isolate = CoreIsolate::new(startup_data, false); - - isolate.register_op_json_sync("listen", op_listen); - isolate.register_op_json_async("accept", op_accept); - isolate.register_op_json_async("read", op_read); - isolate.register_op_json_async("write", op_write); - isolate.register_op_json_sync("close", op_close); + CoreIsolate::new(state, startup_data, false) +} - isolate +fn op_listen( + state: &BasicState, + _args: Value, + _bufs: &mut [ZeroCopyBuf], +) -> Result { + debug!("listen"); + let addr = "127.0.0.1:4544".parse::().unwrap(); + let std_listener = std::net::TcpListener::bind(&addr)?; + let listener = TcpListener::from_std(std_listener)?; + let rid = state + .resource_table + .borrow_mut() + .add("tcpListener", Box::new(listener)); + Ok(serde_json::json!({ "rid": rid })) } fn op_close( - state: &mut CoreIsolateState, - args: serde_json::Value, + state: &BasicState, + args: Value, _buf: &mut [ZeroCopyBuf], -) -> Result { - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; +) -> Result { + let rid: u32 = args + .get("rid") + .unwrap() + .as_u64() + .unwrap() + .try_into() + .unwrap(); debug!("close rid={}", rid); - let resource_table = &mut state.resource_table.borrow_mut(); - resource_table + state + .resource_table + .borrow_mut() .close(rid) .map(|_| serde_json::json!(())) - .ok_or_else(bad_resource) -} - -fn op_listen( - state: &mut CoreIsolateState, - _args: serde_json::Value, - _buf: &mut [ZeroCopyBuf], -) -> Result { - debug!("listen"); - let addr = "127.0.0.1:4544".parse::().unwrap(); - let std_listener = std::net::TcpListener::bind(&addr)?; - let listener = TcpListener::from_std(std_listener)?; - let resource_table = &mut state.resource_table.borrow_mut(); - let rid = resource_table.add("tcpListener", Box::new(listener)); - Ok(serde_json::json!({ "rid": rid })) + .ok_or_else(ErrBox::bad_resource_id) } fn op_accept( - state: &mut CoreIsolateState, - args: serde_json::Value, - _buf: &mut [ZeroCopyBuf], -) -> impl Future> { - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; + state: Rc, + args: Value, + _bufs: BufVec, +) -> impl Future> { + let rid: u32 = args + .get("rid") + .unwrap() + .as_u64() + .unwrap() + .try_into() + .unwrap(); debug!("accept rid={}", rid); - let resource_table = state.resource_table.clone(); poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let listener = resource_table .get_mut::(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(ErrBox::bad_resource_id)?; listener.poll_accept(cx)?.map(|(stream, _addr)| { let rid = resource_table.add("tcpStream", Box::new(stream)); Ok(serde_json::json!({ "rid": rid })) @@ -105,57 +122,59 @@ fn op_accept( } fn op_read( - state: &mut CoreIsolateState, - args: serde_json::Value, - bufs: &mut [ZeroCopyBuf], -) -> impl Future> { + state: Rc, + args: Value, + mut bufs: BufVec, +) -> impl Future> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; + let rid: u32 = args + .get("rid") + .unwrap() + .as_u64() + .unwrap() + .try_into() + .unwrap(); debug!("read rid={}", rid); - let mut buf = bufs[0].clone(); - let resource_table = state.resource_table.clone(); - - poll_fn(move |cx| -> Poll> { - let resource_table = &mut resource_table.borrow_mut(); + poll_fn(move |cx| -> Poll> { + let resource_table = &mut state.resource_table.borrow_mut(); let stream = resource_table .get_mut::(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(ErrBox::bad_resource_id)?; Pin::new(stream) - .poll_read(cx, &mut buf)? + .poll_read(cx, &mut bufs[0])? .map(|nread| Ok(serde_json::json!({ "nread": nread }))) }) } fn op_write( - state: &mut CoreIsolateState, - args: serde_json::Value, - bufs: &mut [ZeroCopyBuf], -) -> impl Future> { + state: Rc, + args: Value, + bufs: BufVec, +) -> impl Future> { assert_eq!(bufs.len(), 1, "Invalid number of arguments"); - let rid = args.get("rid").unwrap().as_u64().unwrap() as u32; + let rid: u32 = args + .get("rid") + .unwrap() + .as_u64() + .unwrap() + .try_into() + .unwrap(); debug!("write rid={}", rid); - let buf = bufs[0].clone(); - let resource_table = state.resource_table.clone(); - poll_fn(move |cx| { - let resource_table = &mut resource_table.borrow_mut(); + let resource_table = &mut state.resource_table.borrow_mut(); let stream = resource_table .get_mut::(rid) - .ok_or_else(bad_resource)?; + .ok_or_else(ErrBox::bad_resource_id)?; Pin::new(stream) - .poll_write(cx, &buf)? + .poll_write(cx, &bufs[0])? .map(|nwritten| Ok(serde_json::json!({ "nwritten": nwritten }))) }) } -fn bad_resource() -> ErrBox { - Error::new(ErrorKind::NotFound, "bad resource id").into() -} - fn main() { log::set_logger(&Logger).unwrap(); log::set_max_level( @@ -168,11 +187,11 @@ fn main() { // NOTE: `--help` arg will display V8 help and exit deno_core::v8_set_flags(env::args().collect()); - let isolate = isolate_new(); - let mut runtime = tokio::runtime::Builder::new() + let isolate = create_isolate(); + let mut runtime = runtime::Builder::new() .basic_scheduler() .enable_all() .build() .unwrap(); - deno_core::js_check(runtime.block_on(isolate)); + js_check(runtime.block_on(isolate)); } -- cgit v1.2.3