diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/BUILD.gn | 37 | ||||
-rw-r--r-- | core/http_bench.js | 150 | ||||
-rw-r--r-- | core/http_bench.rs | 210 | ||||
-rw-r--r-- | core/js_errors.rs | 416 | ||||
-rw-r--r-- | core/lib.rs | 364 | ||||
l--------- | core/libdeno.rs | 1 | ||||
-rw-r--r-- | core/shared.rs | 49 |
7 files changed, 1227 insertions, 0 deletions
diff --git a/core/BUILD.gn b/core/BUILD.gn new file mode 100644 index 000000000..be06a7f4b --- /dev/null +++ b/core/BUILD.gn @@ -0,0 +1,37 @@ +import("//build_extra/rust/rust.gni") + +# deno_core does not depend on flatbuffers nor tokio. +main_extern = [ + "$rust_build:futures", + "$rust_build:libc", + "$rust_build:serde_json", + "$rust_build:log", +] + +rust_crate("deno_core") { + source_root = "lib.rs" + extern = main_extern + deps = [ + "../libdeno:libdeno_static_lib", + ] +} + +rust_test("deno_core_test") { + source_root = "lib.rs" + extern = main_extern + deps = [ + "../libdeno:libdeno_static_lib", + ] +} + +rust_executable("deno_core_http_bench") { + source_root = "http_bench.rs" + extern = [ + "$rust_build:futures", + "$rust_build:lazy_static", + "$rust_build:libc", + "$rust_build:log", + "$rust_build:tokio", + ":deno_core" + ] +} diff --git a/core/http_bench.js b/core/http_bench.js new file mode 100644 index 000000000..b9615e689 --- /dev/null +++ b/core/http_bench.js @@ -0,0 +1,150 @@ +// This is not a real HTTP server. We read blindly one time into 'requestBuf', +// then write this fixed 'responseBuf'. The point of this benchmark is to +// exercise the event loop in a simple yet semi-realistic way. +const shared32 = new Int32Array(libdeno.shared); + +const INDEX_NUM_RECORDS = 0; +const INDEX_RECORDS = 1; +const RECORD_OFFSET_PROMISE_ID = 0; +const RECORD_OFFSET_OP = 1; +const RECORD_OFFSET_ARG = 2; +const RECORD_OFFSET_RESULT = 3; +const RECORD_SIZE = 4; +const OP_LISTEN = 1; +const OP_ACCEPT = 2; +const OP_READ = 3; +const OP_WRITE = 4; +const OP_CLOSE = 5; + +const NUM_RECORDS = (shared32.length - INDEX_RECORDS) / RECORD_SIZE; +if (NUM_RECORDS != 100) { + throw Error("expected 100 entries"); +} + +const requestBuf = new Uint8Array(64 * 1024); +const responseBuf = new Uint8Array( + "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" + .split("") + .map(c => c.charCodeAt(0)) +); + +const promiseMap = new Map(); +let nextPromiseId = 1; + +function createResolvable() { + let methods; + const promise = new Promise((resolve, reject) => { + methods = { resolve, reject }; + }); + return Object.assign(promise, methods); +} + +/** Returns Promise<number> */ +function sendAsync(op, arg, zeroCopyData) { + const id = nextPromiseId++; + const p = createResolvable(); + shared32[INDEX_NUM_RECORDS] = 1; + setRecord(0, RECORD_OFFSET_PROMISE_ID, id); + setRecord(0, RECORD_OFFSET_OP, op); + setRecord(0, RECORD_OFFSET_ARG, arg); + setRecord(0, RECORD_OFFSET_RESULT, -1); + promiseMap.set(id, p); + libdeno.send(null, zeroCopyData); + return p; +} + +/** Returns u32 number */ +function sendSync(op, arg) { + shared32[INDEX_NUM_RECORDS] = 1; + setRecord(0, RECORD_OFFSET_PROMISE_ID, 0); + setRecord(0, RECORD_OFFSET_OP, op); + setRecord(0, RECORD_OFFSET_ARG, arg); + setRecord(0, RECORD_OFFSET_RESULT, -1); + libdeno.send(); + return getRecord(0, RECORD_OFFSET_RESULT); +} + +function setRecord(i, off, value) { + if (i >= NUM_RECORDS) { + throw Error("out of range"); + } + shared32[INDEX_RECORDS + RECORD_SIZE * i + off] = value; +} + +function getRecord(i, off) { + if (i >= NUM_RECORDS) { + throw Error("out of range"); + } + return shared32[INDEX_RECORDS + RECORD_SIZE * i + off]; +} + +function handleAsyncMsgFromRust() { + for (let i = 0; i < shared32[INDEX_NUM_RECORDS]; i++) { + let id = getRecord(i, RECORD_OFFSET_PROMISE_ID); + const p = promiseMap.get(id); + promiseMap.delete(id); + p.resolve(getRecord(i, RECORD_OFFSET_RESULT)); + } +} + +/** Listens on 0.0.0.0:4500, returns rid. */ +function listen() { + return sendSync(OP_LISTEN, -1); +} + +/** Accepts a connection, returns rid. */ +async function accept(rid) { + return await sendAsync(OP_ACCEPT, rid); +} + +/** + * Reads a packet from the rid, presumably an http request. data is ignored. + * Returns bytes read. + */ +async function read(rid, data) { + return await sendAsync(OP_READ, rid, data); +} + +/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */ +async function write(rid, data) { + return await sendAsync(OP_WRITE, rid, data); +} + +function close(rid) { + return sendSync(OP_CLOSE, rid); +} + +async function serve(rid) { + while (true) { + const nread = await read(rid, requestBuf); + if (nread <= 0) { + break; + } + + const nwritten = await write(rid, responseBuf); + if (nwritten < 0) { + break; + } + } + close(rid); +} + +async function main() { + libdeno.recv(handleAsyncMsgFromRust); + + libdeno.print("http_bench.js start"); + + const listener_rid = listen(); + libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listener_rid}`); + while (true) { + const rid = await accept(listener_rid); + // libdeno.print(`accepted ${rid}`); + if (rid < 0) { + libdeno.print(`accept error ${rid}`); + return; + } + serve(rid); + } +} + +main(); diff --git a/core/http_bench.rs b/core/http_bench.rs new file mode 100644 index 000000000..3da30433a --- /dev/null +++ b/core/http_bench.rs @@ -0,0 +1,210 @@ +/// To run this benchmark: +/// +/// > DENO_BUILD_MODE=release ./tools/build.py && \ +/// ./target/release/deno_core_http_bench --multi-thread +extern crate deno_core; +extern crate futures; +extern crate libc; +extern crate tokio; + +#[macro_use] +extern crate log; +#[macro_use] +extern crate lazy_static; + +use deno_core::deno_buf; +use deno_core::AsyncResult; +use deno_core::Isolate; +use deno_core::JSError; +use deno_core::Op; +use deno_core::RECORD_OFFSET_ARG; +use deno_core::RECORD_OFFSET_OP; +use deno_core::RECORD_OFFSET_PROMISE_ID; +use deno_core::RECORD_OFFSET_RESULT; +use futures::future::lazy; +use std::collections::HashMap; +use std::env; +use std::net::SocketAddr; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Mutex; +use tokio::prelude::*; + +const OP_LISTEN: i32 = 1; +const OP_ACCEPT: i32 = 2; +const OP_READ: i32 = 3; +const OP_WRITE: i32 = 4; +const OP_CLOSE: i32 = 5; + +fn main() { + let js_source = include_str!("http_bench.js"); + let isolate = deno_core::Isolate::new(recv_cb); + + let main_future = lazy(move || { + // TODO currently isolate.execute() must be run inside tokio, hence the + // lazy(). It would be nice to not have that contraint. Probably requires + // using v8::MicrotasksPolicy::kExplicit + js_check(isolate.execute("http_bench.js", js_source)); + isolate.then(|r| { + js_check(r); + Ok(()) + }) + }); + + let args: Vec<String> = env::args().collect(); + if args.len() > 1 && args[1] == "--multi-thread" { + println!("multi-thread"); + tokio::run(main_future); + } else { + println!("single-thread"); + tokio::runtime::current_thread::run(main_future); + } +} + +enum Repr { + TcpListener(tokio::net::TcpListener), + TcpStream(tokio::net::TcpStream), +} + +type ResourceTable = HashMap<i32, Repr>; +lazy_static! { + static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new(HashMap::new()); + static ref NEXT_RID: AtomicUsize = AtomicUsize::new(3); +} + +fn new_rid() -> i32 { + let rid = NEXT_RID.fetch_add(1, Ordering::SeqCst); + rid as i32 +} + +fn recv_cb(isolate: &mut Isolate, zero_copy_buf: deno_buf) { + isolate.test_send_counter += 1; // TODO ideally store this in isolate.state? + + let promise_id = isolate.shared.get_record(0, RECORD_OFFSET_PROMISE_ID); + let op_id = isolate.shared.get_record(0, RECORD_OFFSET_OP); + let arg = isolate.shared.get_record(0, RECORD_OFFSET_ARG); + + // dbg!(promise_id); + // dbg!(op_id); + // dbg!(arg); + + let is_sync = promise_id == 0; + + if is_sync { + // sync ops + match op_id { + OP_CLOSE => { + debug!("close"); + assert!(is_sync); + let mut table = RESOURCE_TABLE.lock().unwrap(); + let r = table.remove(&arg); + isolate.shared.set_record( + 0, + RECORD_OFFSET_RESULT, + if r.is_some() { 0 } else { -1 }, + ); + } + OP_LISTEN => { + debug!("listen"); + assert!(is_sync); + + let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap(); + let listener = tokio::net::TcpListener::bind(&addr).unwrap(); + let rid = new_rid(); + isolate.shared.set_record(0, RECORD_OFFSET_RESULT, rid); + let mut guard = RESOURCE_TABLE.lock().unwrap(); + guard.insert(rid, Repr::TcpListener(listener)); + } + _ => panic!("bad op"), + } + } else { + // async ops + let zero_copy_id = zero_copy_buf.zero_copy_id; + let op = match op_id { + OP_ACCEPT => { + let listener_rid = arg; + op_accept(listener_rid) + } + OP_READ => { + let rid = arg; + op_read(rid, zero_copy_buf) + } + OP_WRITE => { + let rid = arg; + op_write(rid, zero_copy_buf) + } + _ => panic!("bad op"), + }; + isolate.add_op(promise_id, op, zero_copy_id); + } +} + +fn op_accept(listener_rid: i32) -> Box<Op> { + debug!("accept {}", listener_rid); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&listener_rid); + match maybe_repr { + Some(Repr::TcpListener(ref mut listener)) => listener.poll_accept(), + _ => panic!("bad rid"), + } + }).and_then(move |(stream, addr)| { + debug!("accept success {}", addr); + let rid = new_rid(); + + let mut guard = RESOURCE_TABLE.lock().unwrap(); + guard.insert(rid, Repr::TcpStream(stream)); + + Ok(AsyncResult { result: rid }) + }), + ) +} + +fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<Op> { + debug!("read rid={}", rid); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&rid); + match maybe_repr { + Some(Repr::TcpStream(ref mut stream)) => { + stream.poll_read(&mut zero_copy_buf) + } + _ => panic!("bad rid"), + } + }).and_then(move |nread| { + debug!("read success {}", nread); + Ok(AsyncResult { + result: nread as i32, + }) + }), + ) +} + +fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<Op> { + debug!("write rid={}", rid); + Box::new( + futures::future::poll_fn(move || { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&rid); + match maybe_repr { + Some(Repr::TcpStream(ref mut stream)) => { + stream.poll_write(&zero_copy_buf) + } + _ => panic!("bad rid"), + } + }).and_then(move |nwritten| { + debug!("write success {}", nwritten); + Ok(AsyncResult { + result: nwritten as i32, + }) + }), + ) +} + +fn js_check(r: Result<(), JSError>) { + if let Err(e) = r { + panic!(e.to_string()); + } +} diff --git a/core/js_errors.rs b/core/js_errors.rs new file mode 100644 index 000000000..c07af136f --- /dev/null +++ b/core/js_errors.rs @@ -0,0 +1,416 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. + +// Note that source_map_mappings requires 0-indexed line and column numbers but +// V8 Exceptions are 1-indexed. + +// TODO: This currently only applies to uncaught exceptions. It would be nice to +// also have source maps for situations like this: +// const err = new Error("Boo!"); +// console.log(err.stack); +// It would require calling into Rust from Error.prototype.prepareStackTrace. + +use serde_json; +use std::fmt; +use std::str; + +#[derive(Debug, PartialEq)] +pub struct StackFrame { + pub line: i64, // zero indexed + pub column: i64, // zero indexed + pub script_name: String, + pub function_name: String, + pub is_eval: bool, + pub is_constructor: bool, + pub is_wasm: bool, +} + +#[derive(Debug, PartialEq)] +pub struct JSError { + pub message: String, + + pub source_line: Option<String>, + pub script_resource_name: Option<String>, + pub line_number: Option<i64>, + pub start_position: Option<i64>, + pub end_position: Option<i64>, + pub error_level: Option<i64>, + pub start_column: Option<i64>, + pub end_column: Option<i64>, + + pub frames: Vec<StackFrame>, +} + +impl fmt::Display for StackFrame { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // Note when we print to string, we change from 0-indexed to 1-indexed. + let function_name = self.function_name.clone(); + let script_line_column = + format_script_line_column(&self.script_name, self.line, self.column); + + if !self.function_name.is_empty() { + write!(f, " at {} ({})", function_name, script_line_column) + } else if self.is_eval { + write!(f, " at eval ({})", script_line_column) + } else { + write!(f, " at {}", script_line_column) + } + } +} + +fn format_script_line_column( + script_name: &str, + line: i64, + column: i64, +) -> String { + // TODO match this style with how typescript displays errors. + let line = (1 + line).to_string(); + let column = (1 + column).to_string(); + let script_name = script_name.to_string(); + format!("{}:{}:{}", script_name, line, column) +} + +impl fmt::Display for JSError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + if self.script_resource_name.is_some() { + let script_resource_name = self.script_resource_name.as_ref().unwrap(); + // Avoid showing internal code from gen/bundle/main.js + if script_resource_name != "gen/bundle/main.js" + && script_resource_name != "gen/bundle/compiler.js" + { + if self.line_number.is_some() && self.start_column.is_some() { + assert!(self.line_number.is_some()); + assert!(self.start_column.is_some()); + let script_line_column = format_script_line_column( + script_resource_name, + self.line_number.unwrap() - 1, + self.start_column.unwrap() - 1, + ); + write!(f, "{}", script_line_column)?; + } + if self.source_line.is_some() { + write!(f, "\n{}\n", self.source_line.as_ref().unwrap())?; + let mut s = String::new(); + for i in 0..self.end_column.unwrap() { + if i >= self.start_column.unwrap() { + s.push('^'); + } else { + s.push(' '); + } + } + writeln!(f, "{}", s)?; + } + } + } + + write!(f, "{}", self.message.clone())?; + + for frame in &self.frames { + write!(f, "\n{}", &frame.to_string())?; + } + Ok(()) + } +} + +impl StackFrame { + // TODO Maybe use serde_derive? + fn from_json_value(v: &serde_json::Value) -> Option<Self> { + if !v.is_object() { + return None; + } + let obj = v.as_object().unwrap(); + + let line_v = &obj["line"]; + if !line_v.is_u64() { + return None; + } + let line = line_v.as_u64().unwrap() as i64; + + let column_v = &obj["column"]; + if !column_v.is_u64() { + return None; + } + let column = column_v.as_u64().unwrap() as i64; + + let script_name_v = &obj["scriptName"]; + if !script_name_v.is_string() { + return None; + } + let script_name = String::from(script_name_v.as_str().unwrap()); + + // Optional fields. See EncodeExceptionAsJSON() in libdeno. + // Sometimes V8 doesn't provide all the frame information. + + let mut function_name = String::from(""); // default + if obj.contains_key("functionName") { + let function_name_v = &obj["functionName"]; + if function_name_v.is_string() { + function_name = String::from(function_name_v.as_str().unwrap()); + } + } + + let mut is_eval = false; // default + if obj.contains_key("isEval") { + let is_eval_v = &obj["isEval"]; + if is_eval_v.is_boolean() { + is_eval = is_eval_v.as_bool().unwrap(); + } + } + + let mut is_constructor = false; // default + if obj.contains_key("isConstructor") { + let is_constructor_v = &obj["isConstructor"]; + if is_constructor_v.is_boolean() { + is_constructor = is_constructor_v.as_bool().unwrap(); + } + } + + let mut is_wasm = false; // default + if obj.contains_key("isWasm") { + let is_wasm_v = &obj["isWasm"]; + if is_wasm_v.is_boolean() { + is_wasm = is_wasm_v.as_bool().unwrap(); + } + } + + Some(StackFrame { + line: line - 1, + column: column - 1, + script_name, + function_name, + is_eval, + is_constructor, + is_wasm, + }) + } +} + +impl JSError { + /// Creates a new JSError by parsing the raw exception JSON string from V8. + pub fn from_v8_exception(json_str: &str) -> Option<Self> { + let v = serde_json::from_str::<serde_json::Value>(json_str); + if v.is_err() { + return None; + } + let v = v.unwrap(); + + if !v.is_object() { + return None; + } + let obj = v.as_object().unwrap(); + + let message_v = &obj["message"]; + if !message_v.is_string() { + return None; + } + let message = String::from(message_v.as_str().unwrap()); + + let source_line = obj + .get("sourceLine") + .and_then(|v| v.as_str().map(String::from)); + let script_resource_name = obj + .get("scriptResourceName") + .and_then(|v| v.as_str().map(String::from)); + let line_number = obj.get("lineNumber").and_then(|v| v.as_i64()); + let start_position = obj.get("startPosition").and_then(|v| v.as_i64()); + let end_position = obj.get("endPosition").and_then(|v| v.as_i64()); + let error_level = obj.get("errorLevel").and_then(|v| v.as_i64()); + let start_column = obj.get("startColumn").and_then(|v| v.as_i64()); + let end_column = obj.get("endColumn").and_then(|v| v.as_i64()); + + let frames_v = &obj["frames"]; + if !frames_v.is_array() { + return None; + } + let frame_values = frames_v.as_array().unwrap(); + + let mut frames = Vec::<StackFrame>::new(); + for frame_v in frame_values { + match StackFrame::from_json_value(frame_v) { + None => return None, + Some(frame) => frames.push(frame), + } + } + + Some(JSError { + message, + source_line, + script_resource_name, + line_number, + start_position, + end_position, + error_level, + start_column, + end_column, + frames, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn error1() -> JSError { + JSError { + message: "Error: foo bar".to_string(), + source_line: None, + script_resource_name: None, + line_number: None, + start_position: None, + end_position: None, + error_level: None, + start_column: None, + end_column: None, + frames: vec![ + StackFrame { + line: 4, + column: 16, + script_name: "foo_bar.ts".to_string(), + function_name: "foo".to_string(), + is_eval: false, + is_constructor: false, + is_wasm: false, + }, + StackFrame { + line: 5, + column: 20, + script_name: "bar_baz.ts".to_string(), + function_name: "qat".to_string(), + is_eval: false, + is_constructor: false, + is_wasm: false, + }, + StackFrame { + line: 1, + column: 1, + script_name: "deno_main.js".to_string(), + function_name: "".to_string(), + is_eval: false, + is_constructor: false, + is_wasm: false, + }, + ], + } + } + + #[test] + fn stack_frame_from_json_value_1() { + let v = serde_json::from_str::<serde_json::Value>( + r#"{ + "line":2, + "column":11, + "functionName":"foo", + "scriptName":"/Users/rld/src/deno/tests/error_001.ts", + "isEval":true, + "isConstructor":false, + "isWasm":false + }"#, + ).unwrap(); + let r = StackFrame::from_json_value(&v); + assert_eq!( + r, + Some(StackFrame { + line: 1, + column: 10, + script_name: "/Users/rld/src/deno/tests/error_001.ts".to_string(), + function_name: "foo".to_string(), + is_eval: true, + is_constructor: false, + is_wasm: false, + }) + ); + } + + #[test] + fn stack_frame_from_json_value_2() { + let v = serde_json::from_str::<serde_json::Value>( + r#"{ + "scriptName": "/Users/rld/src/deno/tests/error_001.ts", + "line": 2, + "column": 11 + }"#, + ).unwrap(); + let r = StackFrame::from_json_value(&v); + assert!(r.is_some()); + let f = r.unwrap(); + assert_eq!(f.line, 1); + assert_eq!(f.column, 10); + assert_eq!(f.script_name, "/Users/rld/src/deno/tests/error_001.ts"); + } + + #[test] + fn js_error_from_v8_exception() { + let r = JSError::from_v8_exception( + r#"{ + "message":"Uncaught Error: bad", + "frames":[ + { + "line":2, + "column":11, + "functionName":"foo", + "scriptName":"/Users/rld/src/deno/tests/error_001.ts", + "isEval":true, + "isConstructor":false, + "isWasm":false + }, { + "line":5, + "column":5, + "functionName":"bar", + "scriptName":"/Users/rld/src/deno/tests/error_001.ts", + "isEval":true, + "isConstructor":false, + "isWasm":false + } + ]}"#, + ); + assert!(r.is_some()); + let e = r.unwrap(); + assert_eq!(e.message, "Uncaught Error: bad"); + assert_eq!(e.frames.len(), 2); + assert_eq!( + e.frames[0], + StackFrame { + line: 1, + column: 10, + script_name: "/Users/rld/src/deno/tests/error_001.ts".to_string(), + function_name: "foo".to_string(), + is_eval: true, + is_constructor: false, + is_wasm: false, + } + ) + } + + #[test] + fn js_error_from_v8_exception2() { + let r = JSError::from_v8_exception( + "{\"message\":\"Error: boo\",\"sourceLine\":\"throw Error('boo');\",\"scriptResourceName\":\"a.js\",\"lineNumber\":3,\"startPosition\":8,\"endPosition\":9,\"errorLevel\":8,\"startColumn\":6,\"endColumn\":7,\"isSharedCrossOrigin\":false,\"isOpaque\":false,\"frames\":[{\"line\":3,\"column\":7,\"functionName\":\"\",\"scriptName\":\"a.js\",\"isEval\":false,\"isConstructor\":false,\"isWasm\":false}]}" + ); + assert!(r.is_some()); + let e = r.unwrap(); + assert_eq!(e.message, "Error: boo"); + assert_eq!(e.source_line, Some("throw Error('boo');".to_string())); + assert_eq!(e.script_resource_name, Some("a.js".to_string())); + assert_eq!(e.line_number, Some(3)); + assert_eq!(e.start_position, Some(8)); + assert_eq!(e.end_position, Some(9)); + assert_eq!(e.error_level, Some(8)); + assert_eq!(e.start_column, Some(6)); + assert_eq!(e.end_column, Some(7)); + assert_eq!(e.frames.len(), 1); + } + + #[test] + fn stack_frame_to_string() { + let e = error1(); + assert_eq!(" at foo (foo_bar.ts:5:17)", &e.frames[0].to_string()); + assert_eq!(" at qat (bar_baz.ts:6:21)", &e.frames[1].to_string()); + } + + #[test] + fn js_error_to_string() { + let e = error1(); + let expected = "Error: foo bar\n at foo (foo_bar.ts:5:17)\n at qat (bar_baz.ts:6:21)\n at deno_main.js:2:2"; + assert_eq!(expected, &e.to_string()); + } +} diff --git a/core/lib.rs b/core/lib.rs new file mode 100644 index 000000000..d13339ee5 --- /dev/null +++ b/core/lib.rs @@ -0,0 +1,364 @@ +#[macro_use] +extern crate log; +extern crate futures; +extern crate libc; + +mod js_errors; +mod libdeno; +mod shared; + +pub use crate::js_errors::JSError; +pub use crate::libdeno::deno_buf; +pub use crate::shared::*; +use futures::Async; +use futures::Future; +use futures::Poll; +use libc::c_void; +use std::collections::HashMap; +use std::ffi::CStr; +use std::ffi::CString; +use std::sync::{Once, ONCE_INIT}; + +pub struct Isolate { + libdeno_isolate: *const libdeno::isolate, + pending_ops: HashMap<i32, PendingOp>, // promise_id -> op + polled_recently: bool, + recv_cb: RecvCallback, + + pub shared: Shared, + pub test_send_counter: u32, // TODO only used for testing- REMOVE. +} + +pub type RecvCallback = fn(isolate: &mut Isolate, zero_copy_buf: deno_buf); + +pub const NUM_RECORDS: usize = 100; + +// TODO rename to AsyncResult +pub struct AsyncResult { + pub result: i32, +} + +pub type Op = dyn Future<Item = AsyncResult, Error = std::io::Error> + Send; + +struct PendingOp { + op: Box<Op>, + polled_recently: bool, + zero_copy_id: usize, // non-zero if associated zero-copy buffer. +} + +static DENO_INIT: Once = ONCE_INIT; + +unsafe impl Send for Isolate {} + +impl Isolate { + pub fn new(recv_cb: RecvCallback) -> Self { + DENO_INIT.call_once(|| { + unsafe { libdeno::deno_init() }; + }); + + // Allocate unmanaged memory for the shared buffer by creating a Vec<u8>, + // grabbing the raw pointer, and then leaking the Vec so it is never freed. + let mut shared = Shared::new(); + let shared_deno_buf = shared.as_deno_buf(); + + let config = libdeno::deno_config { + will_snapshot: 0, + load_snapshot: deno_buf::empty(), // TODO + shared: shared_deno_buf, + recv_cb: pre_dispatch, + }; + let libdeno_isolate = unsafe { libdeno::deno_new(config) }; + + Self { + pending_ops: HashMap::new(), + polled_recently: false, + libdeno_isolate, + test_send_counter: 0, + recv_cb, + shared, + } + } + + fn zero_copy_release(&self, zero_copy_id: usize) { + unsafe { + libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_id) + } + } + + pub fn add_op( + self: &mut Self, + promise_id: i32, + op: Box<Op>, + zero_copy_id: usize, + ) { + debug!("add_op {}", zero_copy_id); + self.pending_ops.insert( + promise_id, + PendingOp { + op, + polled_recently: false, + zero_copy_id, + }, + ); + self.polled_recently = false; + } + + #[inline] + pub unsafe fn from_raw_ptr<'a>(ptr: *const c_void) -> &'a mut Self { + let ptr = ptr as *mut _; + &mut *ptr + } + + #[inline] + pub fn as_raw_ptr(&self) -> *const c_void { + self as *const _ as *const c_void + } + + pub fn execute( + &self, + js_filename: &str, + js_source: &str, + ) -> Result<(), JSError> { + let filename = CString::new(js_filename).unwrap(); + let source = CString::new(js_source).unwrap(); + unsafe { + libdeno::deno_execute( + self.libdeno_isolate, + self.as_raw_ptr(), + filename.as_ptr(), + source.as_ptr(), + ) + }; + if let Some(err) = self.last_exception() { + return Err(err); + } + Ok(()) + } + + pub fn last_exception(&self) -> Option<JSError> { + let ptr = unsafe { libdeno::deno_last_exception(self.libdeno_isolate) }; + if ptr.is_null() { + None + } else { + let cstr = unsafe { CStr::from_ptr(ptr) }; + let v8_exception = cstr.to_str().unwrap(); + debug!("v8_exception\n{}\n", v8_exception); + let js_error = JSError::from_v8_exception(v8_exception).unwrap(); + Some(js_error) + } + } + + fn check_promise_errors(&self) { + unsafe { + libdeno::deno_check_promise_errors(self.libdeno_isolate); + } + } + + fn respond(&mut self) -> Result<(), JSError> { + let buf = deno_buf::empty(); + unsafe { + libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf) + } + if let Some(err) = self.last_exception() { + Err(err) + } else { + Ok(()) + } + } +} + +struct LockerScope { + libdeno_isolate: *const libdeno::isolate, +} + +impl LockerScope { + fn new(isolate: &Isolate) -> LockerScope { + let libdeno_isolate = isolate.libdeno_isolate; + unsafe { libdeno::deno_lock(libdeno_isolate) } + LockerScope { libdeno_isolate } + } +} + +impl Drop for LockerScope { + fn drop(&mut self) { + unsafe { libdeno::deno_unlock(self.libdeno_isolate) } + } +} + +impl Future for Isolate { + type Item = (); + type Error = JSError; + + fn poll(&mut self) -> Poll<(), JSError> { + // Lock the current thread for V8. + let _locker = LockerScope::new(self); + + // Clear + self.polled_recently = false; + for (_, pending) in self.pending_ops.iter_mut() { + pending.polled_recently = false; + } + + while !self.polled_recently { + let mut complete = HashMap::<i32, AsyncResult>::new(); + + self.polled_recently = true; + for (promise_id, pending) in self.pending_ops.iter_mut() { + // Do not call poll on futures we've already polled this turn. + if pending.polled_recently { + continue; + } + pending.polled_recently = true; + + let promise_id = *promise_id; + let op = &mut pending.op; + match op.poll() { + Err(op_err) => { + eprintln!("op err {:?}", op_err); + complete.insert(promise_id, AsyncResult { result: -1 }); + debug!("pending op {} complete err", promise_id); + } + Ok(Async::Ready(async_result)) => { + complete.insert(promise_id, async_result); + debug!("pending op {} complete ready", promise_id); + } + Ok(Async::NotReady) => { + debug!("pending op {} not ready", promise_id); + continue; + } + } + } + + self.shared.set_num_records(complete.len() as i32); + if complete.len() > 0 { + // self.zero_copy_release() and self.respond() need Locker. + let mut i = 0; + for (promise_id, async_result) in complete.iter_mut() { + let pending = self.pending_ops.remove(promise_id).unwrap(); + + if pending.zero_copy_id > 0 { + self.zero_copy_release(pending.zero_copy_id); + } + + self + .shared + .set_record(i, RECORD_OFFSET_PROMISE_ID, *promise_id); + self + .shared + .set_record(i, RECORD_OFFSET_RESULT, async_result.result); + i += 1; + } + self.respond()?; + } + } + + self.check_promise_errors(); + if let Some(err) = self.last_exception() { + return Err(err); + } + + // We're idle if pending_ops is empty. + if self.pending_ops.is_empty() { + Ok(futures::Async::Ready(())) + } else { + Ok(futures::Async::NotReady) + } + } +} + +extern "C" fn pre_dispatch( + user_data: *mut c_void, + control_buf: deno_buf, + zero_copy_buf: deno_buf, +) { + let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; + assert_eq!(control_buf.len(), 0); + (isolate.recv_cb)(isolate, zero_copy_buf); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn inc_counter(isolate: &mut Isolate, zero_copy_buf: deno_buf) { + assert_eq!(zero_copy_buf.len(), 0); + isolate.test_send_counter += 1; // TODO ideally store this in isolate.state? + } + + fn js_check(r: Result<(), JSError>) { + if let Err(e) = r { + panic!(e.to_string()); + } + } + + #[test] + fn test_execute() { + let isolate = Isolate::new(inc_counter); + js_check(isolate.execute( + "filename.js", + r#" + libdeno.send(); + async function main() { + libdeno.send(); + } + main(); + "#, + )); + // We expect that main is executed even tho we didn't poll. + assert_eq!(isolate.test_send_counter, 2); + } + + fn async_immediate(isolate: &mut Isolate, zero_copy_buf: deno_buf) { + assert_eq!(zero_copy_buf.len(), 0); + isolate.test_send_counter += 1; // TODO ideally store this in isolate.state? + + let promise_id = 0; + let op = Box::new(futures::future::ok(AsyncResult { result: 0 })); + isolate.add_op(promise_id, op, zero_copy_buf.zero_copy_id); + } + + #[test] + fn test_poll_async_immediate_ops() { + let mut isolate = Isolate::new(async_immediate); + js_check(isolate.execute( + "setup.js", + r#" + let nrecv = 0; + libdeno.recv(() => { + nrecv++; + }); + function assertEq(actual, expected) { + if (expected != actual) { + throw Error(`actual ${actual} expected ${expected} `); + } + } + "#, + )); + assert_eq!(isolate.test_send_counter, 0); + js_check(isolate.execute( + "check1.js", + r#" + assertEq(nrecv, 0); + libdeno.send(); + assertEq(nrecv, 0); + "#, + )); + assert_eq!(isolate.test_send_counter, 1); + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + assert_eq!(isolate.test_send_counter, 1); + js_check(isolate.execute( + "check2.js", + r#" + assertEq(nrecv, 1); + libdeno.send(); + assertEq(nrecv, 1); + "#, + )); + assert_eq!(isolate.test_send_counter, 2); + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + js_check(isolate.execute("check3.js", "assertEq(nrecv, 2)")); + assert_eq!(isolate.test_send_counter, 2); + // We are idle, so the next poll should be the last. + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + } +} diff --git a/core/libdeno.rs b/core/libdeno.rs new file mode 120000 index 000000000..32688906e --- /dev/null +++ b/core/libdeno.rs @@ -0,0 +1 @@ +../src/libdeno.rs
\ No newline at end of file diff --git a/core/shared.rs b/core/shared.rs new file mode 100644 index 000000000..40d83db73 --- /dev/null +++ b/core/shared.rs @@ -0,0 +1,49 @@ +use crate::libdeno::deno_buf; +use std::mem; + +// TODO this is where we abstract flatbuffers at. +// TODO make these constants private to this file. +const INDEX_NUM_RECORDS: usize = 0; +const INDEX_RECORDS: usize = 1; +pub const RECORD_OFFSET_PROMISE_ID: usize = 0; +pub const RECORD_OFFSET_OP: usize = 1; +pub const RECORD_OFFSET_ARG: usize = 2; +pub const RECORD_OFFSET_RESULT: usize = 3; +const RECORD_SIZE: usize = 4; +const NUM_RECORDS: usize = 100; + +/// Represents the shared buffer between JS and Rust. +/// Used for FFI. +pub struct Shared(Vec<i32>); + +impl Shared { + pub fn new() -> Shared { + let mut vec = Vec::<i32>::new(); + vec.resize(INDEX_RECORDS + RECORD_SIZE * NUM_RECORDS, 0); + Shared(vec) + } + + pub fn set_record(&mut self, i: usize, off: usize, value: i32) { + assert!(i < NUM_RECORDS); + self.0[INDEX_RECORDS + RECORD_SIZE * i + off] = value; + } + + pub fn get_record(&self, i: usize, off: usize) -> i32 { + assert!(i < NUM_RECORDS); + return self.0[INDEX_RECORDS + RECORD_SIZE * i + off]; + } + + pub fn set_num_records(&mut self, num_records: i32) { + self.0[INDEX_NUM_RECORDS] = num_records; + } + + pub fn get_num_records(&self) -> i32 { + return self.0[INDEX_NUM_RECORDS]; + } + + pub fn as_deno_buf(&mut self) -> deno_buf { + let ptr = self.0.as_mut_ptr() as *mut u8; + let len = mem::size_of::<i32>() * self.0.len(); + unsafe { deno_buf::from_raw_parts(ptr, len) } + } +} |