diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/BUILD.gn | 50 | ||||
-rw-r--r-- | core/http_bench.js | 136 | ||||
-rw-r--r-- | core/http_bench.rs | 132 | ||||
-rw-r--r-- | core/isolate.rs | 286 | ||||
-rw-r--r-- | core/lib.rs | 3 | ||||
-rw-r--r-- | core/shared.rs | 49 | ||||
-rw-r--r-- | core/shared_queue.js | 127 | ||||
-rw-r--r-- | core/shared_queue.rs | 225 | ||||
-rw-r--r-- | core/shared_queue_test.js | 65 | ||||
-rw-r--r-- | core/test_util.rs | 51 |
10 files changed, 776 insertions, 348 deletions
diff --git a/core/BUILD.gn b/core/BUILD.gn index be06a7f4b..c14820637 100644 --- a/core/BUILD.gn +++ b/core/BUILD.gn @@ -1,5 +1,19 @@ import("//build_extra/rust/rust.gni") +group("default") { + testonly = true + deps = [ + ":deno_core_http_bench", + ":deno_core_http_bench_test", + ":deno_core_test", + ] +} + +deno_core_deps = [ + "../libdeno:libdeno_static_lib", + "../libdeno:v8", +] + # deno_core does not depend on flatbuffers nor tokio. main_extern = [ "$rust_build:futures", @@ -10,28 +24,36 @@ main_extern = [ rust_crate("deno_core") { source_root = "lib.rs" + deps = deno_core_deps extern = main_extern - deps = [ - "../libdeno:libdeno_static_lib", - ] } rust_test("deno_core_test") { source_root = "lib.rs" + deps = deno_core_deps extern = main_extern - deps = [ - "../libdeno:libdeno_static_lib", - ] +} + +http_bench_extern = [ + "$rust_build:futures", + "$rust_build:lazy_static", + "$rust_build:libc", + "$rust_build:log", + "$rust_build:tokio", + ":deno_core", +] +if (is_win) { + http_bench_extern += [ "$rust_build:winapi" ] } rust_executable("deno_core_http_bench") { source_root = "http_bench.rs" - extern = [ - "$rust_build:futures", - "$rust_build:lazy_static", - "$rust_build:libc", - "$rust_build:log", - "$rust_build:tokio", - ":deno_core" - ] + deps = deno_core_deps + extern = http_bench_extern +} + +rust_test("deno_core_http_bench_test") { + source_root = "http_bench.rs" + deps = deno_core_deps + extern = http_bench_extern } diff --git a/core/http_bench.js b/core/http_bench.js index aae15a72c..e5598c06f 100644 --- a/core/http_bench.js +++ b/core/http_bench.js @@ -6,64 +6,21 @@ const OP_ACCEPT = 2; const OP_READ = 3; const OP_WRITE = 4; const OP_CLOSE = 5; -const INDEX_START = 0; -const INDEX_END = 1; -const NUM_RECORDS = 128; -const RECORD_SIZE = 4; - -const shared32 = new Int32Array(libdeno.shared); - -function idx(i, off) { - return 2 + i * RECORD_SIZE + off; -} - -function recordsPush(promiseId, opId, arg, result) { - let i = shared32[INDEX_END]; - if (i >= NUM_RECORDS) { - return false; - } - shared32[idx(i, 0)] = promiseId; - shared32[idx(i, 1)] = opId; - shared32[idx(i, 2)] = arg; - shared32[idx(i, 3)] = result; - shared32[INDEX_END]++; - return true; -} - -function recordsShift() { - if (shared32[INDEX_START] == shared32[INDEX_END]) { - return null; - } - const i = shared32[INDEX_START]; - const record = { - promiseId: shared32[idx(i, 0)], - opId: shared32[idx(i, 1)], - arg: shared32[idx(i, 2)], - result: shared32[idx(i, 3)] - }; - shared32[INDEX_START]++; - return record; -} - -function recordsReset() { - shared32[INDEX_START] = 0; - shared32[INDEX_END] = 0; -} - -function recordsSize() { - return shared32[INDEX_END] - shared32[INDEX_START]; -} - const requestBuf = new Uint8Array(64 * 1024); const responseBuf = new Uint8Array( "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" .split("") .map(c => c.charCodeAt(0)) ); - const promiseMap = new Map(); let nextPromiseId = 1; +function assert(cond) { + if (!cond) { + throw Error("assert"); + } +} + function createResolvable() { let methods; const promise = new Promise((resolve, reject) => { @@ -72,36 +29,73 @@ function createResolvable() { return Object.assign(promise, methods); } +const scratch32 = new Int32Array(4); +const scratchBytes = new Uint8Array( + scratch32.buffer, + scratch32.byteOffset, + scratch32.byteLength +); +assert(scratchBytes.byteLength === 4 * 4); + +// Toggle what method we send with. false = legacy. +// AFAICT This has no effect on performance. +const sendWithShared = true; + +function send(promiseId, opId, arg, zeroCopy = null) { + scratch32[0] = promiseId; + scratch32[1] = opId; + scratch32[2] = arg; + scratch32[3] = -1; + if (sendWithShared) { + Deno._sharedQueue.push(scratchBytes); + libdeno.send(null, zeroCopy); + } else { + libdeno.send(scratchBytes, zeroCopy); + } +} + /** Returns Promise<number> */ -function sendAsync(opId, arg, zeroCopyData) { +function sendAsync(opId, arg, zeroCopy = null) { const promiseId = nextPromiseId++; const p = createResolvable(); - recordsReset(); - recordsPush(promiseId, opId, arg, -1); promiseMap.set(promiseId, p); - libdeno.send(null, zeroCopyData); + send(promiseId, opId, arg, zeroCopy); return p; } -/** Returns u32 number */ -function sendSync(opId, arg) { - recordsReset(); - recordsPush(0, opId, arg, -1); - libdeno.send(); - if (recordsSize() != 1) { - throw Error("Expected sharedSimple to have size 1"); - } - let { result } = recordsShift(); - return result; +function recordFromBuf(buf) { + assert(buf.byteLength === 16); + const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); + return { + promiseId: buf32[0], + opId: buf32[1], + arg: buf32[2], + result: buf32[3] + }; } -function handleAsyncMsgFromRust() { - while (recordsSize() > 0) { - const { promiseId, result } = recordsShift(); - const p = promiseMap.get(promiseId); - promiseMap.delete(promiseId); - p.resolve(result); +function recv() { + const buf = Deno._sharedQueue.shift(); + if (!buf) { + return null; } + return recordFromBuf(buf); +} + +/** Returns i32 number */ +function sendSync(opId, arg) { + send(0, opId, arg); + const record = recv(); + assert(recv() == null); + return record.result; +} + +function handleAsyncMsgFromRust(buf) { + const record = recordFromBuf(buf); + const { promiseId, result } = record; + const p = promiseMap.get(promiseId); + promiseMap.delete(promiseId); + p.resolve(result); } /** Listens on 0.0.0.0:4500, returns rid. */ @@ -147,12 +141,12 @@ async function serve(rid) { } async function main() { - libdeno.recv(handleAsyncMsgFromRust); + Deno._setAsyncHandler(handleAsyncMsgFromRust); libdeno.print("http_bench.js start\n"); const listenerRid = listen(); - libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}`); + libdeno.print(`listening http://127.0.0.1:4544/ rid = ${listenerRid}\n`); while (true) { const rid = await accept(listenerRid); // libdeno.print(`accepted ${rid}`); diff --git a/core/http_bench.rs b/core/http_bench.rs index b3699dc15..ed3792d6a 100644 --- a/core/http_bench.rs +++ b/core/http_bench.rs @@ -16,7 +16,6 @@ use deno_core::*; use futures::future::lazy; use std::collections::HashMap; use std::env; -use std::mem; use std::net::SocketAddr; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -29,13 +28,7 @@ const OP_READ: i32 = 3; const OP_WRITE: i32 = 4; const OP_CLOSE: i32 = 5; -const INDEX_START: usize = 0; -const INDEX_END: usize = 1; - -const NUM_RECORDS: usize = 128; -const RECORD_SIZE: usize = 4; - -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Record { pub promise_id: i32, pub op_id: i32, @@ -43,48 +36,84 @@ pub struct Record { pub result: i32, } -pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send; +impl Into<Buf> for Record { + fn into(self) -> Buf { + let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result] + .into_boxed_slice(); + let ptr = Box::into_raw(buf32) as *mut [u8; 16]; + unsafe { Box::from_raw(ptr) } + } +} -struct HttpBench { - shared32: Vec<i32>, +impl From<&[u8]> for Record { + fn from(s: &[u8]) -> Record { + let ptr = s.as_ptr() as *const i32; + let ints = unsafe { std::slice::from_raw_parts(ptr, 4) }; + Record { + promise_id: ints[0], + op_id: ints[1], + arg: ints[2], + result: ints[3], + } + } } -impl HttpBench { - fn new() -> Self { - let mut shared32 = Vec::<i32>::new(); - let n = 2 + 4 * NUM_RECORDS; - shared32.resize(n, 0); - shared32[INDEX_START] = 0; - shared32[INDEX_END] = 0; - Self { shared32 } +impl From<Buf> for Record { + fn from(buf: Buf) -> Record { + assert_eq!(buf.len(), 4 * 4); + //let byte_len = buf.len(); + let ptr = Box::into_raw(buf) as *mut [i32; 4]; + let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) }; + assert_eq!(ints.len(), 4); + Record { + promise_id: ints[0], + op_id: ints[1], + arg: ints[2], + result: ints[3], + } } } -fn idx(i: usize, off: usize) -> usize { - 2 + i * RECORD_SIZE + off +#[test] +fn test_record_from() { + let r = Record { + promise_id: 1, + op_id: 2, + arg: 3, + result: 4, + }; + let expected = r.clone(); + let buf: Buf = r.into(); + #[cfg(target_endian = "little")] + assert_eq!( + buf, + vec![1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice() + ); + let actual = Record::from(buf); + assert_eq!(actual, expected); + // TODO test From<&[u8]> for Record } -impl Behavior<Record> for HttpBench { +pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send; + +struct HttpBench(); + +impl Behavior for HttpBench { fn startup_snapshot(&mut self) -> Option<deno_buf> { None } - fn startup_shared(&mut self) -> Option<deno_buf> { - let ptr = self.shared32.as_ptr() as *const u8; - let len = mem::size_of::<i32>() * self.shared32.len(); - Some(unsafe { deno_buf::from_raw_parts(ptr, len) }) - } - fn resolve(&mut self, _specifier: &str, _referrer: deno_mod) -> deno_mod { // HttpBench doesn't do ES modules. unimplemented!() } - fn recv( + fn dispatch( &mut self, - record: Record, + control: &[u8], zero_copy_buf: deno_buf, - ) -> (bool, Box<Op<Record>>) { + ) -> (bool, Box<Op>) { + let record = Record::from(control); let is_sync = record.promise_id == 0; let http_bench_op = match record.op_id { OP_LISTEN => { @@ -125,51 +154,22 @@ impl Behavior<Record> for HttpBench { eprintln!("unexpected err {}", err); record_b.result = -1; Ok(record_b) + }).then(|result| -> Result<Buf, ()> { + let record = result.unwrap(); + Ok(record.into()) }), ); (is_sync, op) } - - fn records_reset(&mut self) { - self.shared32[INDEX_START] = 0; - self.shared32[INDEX_END] = 0; - } - - fn records_push(&mut self, record: Record) -> bool { - debug!("push {:?}", record); - let i = self.shared32[INDEX_END] as usize; - if i >= NUM_RECORDS { - return false; - } - self.shared32[idx(i, 0)] = record.promise_id; - self.shared32[idx(i, 1)] = record.op_id; - self.shared32[idx(i, 2)] = record.arg; - self.shared32[idx(i, 3)] = record.result; - self.shared32[INDEX_END] += 1; - true - } - - fn records_shift(&mut self) -> Option<Record> { - let i = self.shared32[INDEX_START] as usize; - if i == self.shared32[INDEX_END] as usize { - return None; - } - let record = Record { - promise_id: self.shared32[idx(i, 0)], - op_id: self.shared32[idx(i, 1)], - arg: self.shared32[idx(i, 2)], - result: self.shared32[idx(i, 3)], - }; - self.shared32[INDEX_START] += 1; - Some(record) - } } fn main() { let js_source = include_str!("http_bench.js"); let main_future = lazy(move || { - let isolate = deno_core::Isolate::new(HttpBench::new()); + let isolate = deno_core::Isolate::new(HttpBench()); + + isolate.shared_init(); // TODO currently isolate.execute() must be run inside tokio, hence the // lazy(). It would be nice to not have that contraint. Probably requires diff --git a/core/isolate.rs b/core/isolate.rs index f7aa431aa..33b52da75 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -3,6 +3,8 @@ use crate::js_errors::JSError; use crate::libdeno; use crate::libdeno::deno_buf; use crate::libdeno::deno_mod; +use crate::shared_queue::SharedQueue; +use crate::shared_queue::RECOMMENDED_SIZE; use futures::Async; use futures::Future; use futures::Poll; @@ -11,19 +13,20 @@ use std::ffi::CStr; use std::ffi::CString; use std::sync::{Once, ONCE_INIT}; -pub type Op<R> = dyn Future<Item = R, Error = ()> + Send; +pub type Buf = Box<[u8]>; +pub type Op = dyn Future<Item = Buf, Error = ()> + Send; -struct PendingOp<R> { - op: Box<Op<R>>, +struct PendingOp { + op: Box<Op>, polled_recently: bool, zero_copy_id: usize, // non-zero if associated zero-copy buffer. } -impl<R> Future for PendingOp<R> { - type Item = R; +impl Future for PendingOp { + type Item = Buf; type Error = (); - fn poll(&mut self) -> Poll<R, ()> { + fn poll(&mut self) -> Poll<Buf, ()> { // Do not call poll on ops we've already polled this turn. if self.polled_recently { Ok(Async::NotReady) @@ -32,7 +35,7 @@ impl<R> Future for PendingOp<R> { let op = &mut self.op; op.poll().map_err(|()| { // Ops should not error. If an op experiences an error it needs to - // encode that error into the record R, so it can be returned to JS. + // encode that error into a buf, so it can be returned to JS. panic!("ops should not error") }) } @@ -40,35 +43,21 @@ impl<R> Future for PendingOp<R> { } /// Defines the behavior of an Isolate. -pub trait Behavior<R> { +pub trait Behavior { /// Called exactly once when an Isolate is created to retrieve the startup /// snapshot. fn startup_snapshot(&mut self) -> Option<deno_buf>; - /// Called exactly once when an Isolate is created to provide the - /// backing memory for the libdeno.shared SharedArrayBuffer. - fn startup_shared(&mut self) -> Option<deno_buf>; - /// Called during mod_instantiate() to resolve imports. fn resolve(&mut self, specifier: &str, referrer: deno_mod) -> deno_mod; /// Called whenever libdeno.send() is called in JavaScript. zero_copy_buf /// corresponds to the second argument of libdeno.send(). - fn recv(&mut self, record: R, zero_copy_buf: deno_buf) -> (bool, Box<Op<R>>); - - // TODO(ry) Remove records_reset(). - // TODO(ry) Abstract records_* and startup_shared() methods into standalone - // trait called Shared. It should, however, wait until integration with - // existing Deno codebase is complete. - - /// Clears the shared buffer. - fn records_reset(&mut self); - - /// Returns false if not enough room. - fn records_push(&mut self, record: R) -> bool; - - /// Returns none if empty. - fn records_shift(&mut self) -> Option<R>; + fn dispatch( + &mut self, + control: &[u8], + zero_copy_buf: deno_buf, + ) -> (bool, Box<Op>); } /// A single execution context of JavaScript. Corresponds roughly to the "Web @@ -77,18 +66,19 @@ pub trait Behavior<R> { /// pending ops have completed. /// /// Ops are created in JavaScript by calling libdeno.send(), and in Rust by -/// implementing Behavior::recv. An Op corresponds exactly to a Promise in +/// implementing Behavior::dispatch. An Op corresponds exactly to a Promise in /// JavaScript. -pub struct Isolate<R, B: Behavior<R>> { +pub struct Isolate<B: Behavior> { libdeno_isolate: *const libdeno::isolate, behavior: B, - pending_ops: Vec<PendingOp<R>>, + shared: SharedQueue, + pending_ops: Vec<PendingOp>, polled_recently: bool, } -unsafe impl<R, B: Behavior<R>> Send for Isolate<R, B> {} +unsafe impl<B: Behavior> Send for Isolate<B> {} -impl<R, B: Behavior<R>> Drop for Isolate<R, B> { +impl<B: Behavior> Drop for Isolate<B> { fn drop(&mut self) { unsafe { libdeno::deno_delete(self.libdeno_isolate) } } @@ -96,22 +86,21 @@ impl<R, B: Behavior<R>> Drop for Isolate<R, B> { static DENO_INIT: Once = ONCE_INIT; -impl<R, B: Behavior<R>> Isolate<R, B> { +impl<B: Behavior> Isolate<B> { pub fn new(mut behavior: B) -> Self { DENO_INIT.call_once(|| { unsafe { libdeno::deno_init() }; }); + let shared = SharedQueue::new(RECOMMENDED_SIZE); + let config = libdeno::deno_config { will_snapshot: 0, load_snapshot: match behavior.startup_snapshot() { Some(s) => s, None => libdeno::deno_buf::empty(), }, - shared: match behavior.startup_shared() { - Some(s) => s, - None => libdeno::deno_buf::empty(), - }, + shared: shared.as_deno_buf(), recv_cb: Self::pre_dispatch, }; let libdeno_isolate = unsafe { libdeno::deno_new(config) }; @@ -119,28 +108,50 @@ impl<R, B: Behavior<R>> Isolate<R, B> { Self { libdeno_isolate, behavior, + shared, pending_ops: Vec::new(), polled_recently: false, } } + /// Executes a bit of built-in JavaScript to provide Deno._sharedQueue. + pub fn shared_init(&self) { + js_check(self.execute("shared_queue.js", include_str!("shared_queue.js"))); + } + extern "C" fn pre_dispatch( user_data: *mut c_void, - control_buf: deno_buf, + control_argv0: deno_buf, zero_copy_buf: deno_buf, ) { - let isolate = unsafe { Isolate::<R, B>::from_raw_ptr(user_data) }; - assert_eq!(control_buf.len(), 0); + let isolate = unsafe { Isolate::<B>::from_raw_ptr(user_data) }; let zero_copy_id = zero_copy_buf.zero_copy_id; - let req_record = isolate.behavior.records_shift().unwrap(); + let control_shared = isolate.shared.shift(); - isolate.behavior.records_reset(); + let (is_sync, op) = if control_argv0.len() > 0 { + // The user called libdeno.send(control) + isolate + .behavior + .dispatch(control_argv0.as_ref(), zero_copy_buf) + } else if let Some(c) = control_shared { + // The user called Deno._sharedQueue.push(control) + isolate.behavior.dispatch(&c, zero_copy_buf) + } else { + // The sharedQueue is empty. The shouldn't happen usually, but it's also + // not technically a failure. + #[cfg(test)] + unreachable!(); + #[cfg(not(test))] + return; + }; + + // At this point the SharedQueue should be empty. + assert_eq!(isolate.shared.size(), 0); - let (is_sync, op) = isolate.behavior.recv(req_record, zero_copy_buf); if is_sync { let res_record = op.wait().unwrap(); - let push_success = isolate.behavior.records_push(res_record); + let push_success = isolate.shared.push(res_record); assert!(push_success); // TODO check that if JSError thrown during respond(), that it will be // picked up. @@ -295,7 +306,7 @@ impl<R, B: Behavior<R>> Isolate<R, B> { specifier_ptr: *const libc::c_char, referrer: deno_mod, ) -> deno_mod { - let isolate = unsafe { Isolate::<R, B>::from_raw_ptr(user_data) }; + let isolate = unsafe { Isolate::<B>::from_raw_ptr(user_data) }; let specifier_c: &CStr = unsafe { CStr::from_ptr(specifier_ptr) }; let specifier: &str = specifier_c.to_str().unwrap(); isolate.behavior.resolve(specifier, referrer) @@ -319,7 +330,7 @@ impl Drop for LockerScope { } } -impl<R, B: Behavior<R>> Future for Isolate<R, B> { +impl<B: Behavior> Future for Isolate<B> { type Item = (); type Error = JSError; @@ -336,22 +347,18 @@ impl<R, B: Behavior<R>> Future for Isolate<R, B> { while !self.polled_recently { let mut completed_count = 0; - - debug!("poll loop"); - self.polled_recently = true; - - self.behavior.records_reset(); + assert_eq!(self.shared.size(), 0); let mut i = 0; - while i != self.pending_ops.len() { + while i < self.pending_ops.len() { let pending = &mut self.pending_ops[i]; match pending.poll() { - Err(()) => panic!("unexpectd error"), + Err(()) => panic!("unexpected error"), Ok(Async::NotReady) => { i += 1; } - Ok(Async::Ready(record)) => { + Ok(Async::Ready(buf)) => { let completed = self.pending_ops.remove(i); completed_count += 1; @@ -359,15 +366,16 @@ impl<R, B: Behavior<R>> Future for Isolate<R, B> { self.zero_copy_release(completed.zero_copy_id); } - self.behavior.records_push(record); + self.shared.push(buf); } } } if completed_count > 0 { - debug!("respond"); self.respond()?; - debug!("after respond"); + // The other side should have shifted off all the messages. + assert_eq!(self.shared.size(), 0); + self.shared.reset(); } } @@ -385,99 +393,33 @@ impl<R, B: Behavior<R>> Future for Isolate<R, B> { } } +pub fn js_check(r: Result<(), JSError>) { + if let Err(e) = r { + panic!(e.to_string()); + } +} + #[cfg(test)] mod tests { use super::*; - use std::collections::HashMap; - - fn js_check(r: Result<(), JSError>) { - if let Err(e) = r { - panic!(e.to_string()); - } - } - - struct TestBehavior { - recv_count: usize, - resolve_count: usize, - push_count: usize, - shift_count: usize, - reset_count: usize, - mod_map: HashMap<String, deno_mod>, - } - - impl TestBehavior { - fn new() -> Self { - Self { - recv_count: 0, - resolve_count: 0, - push_count: 0, - shift_count: 0, - reset_count: 0, - mod_map: HashMap::new(), - } - } - - fn register(&mut self, name: &str, id: deno_mod) { - self.mod_map.insert(name.to_string(), id); - } - } - - impl Behavior<()> for TestBehavior { - fn startup_snapshot(&mut self) -> Option<deno_buf> { - None - } - - fn startup_shared(&mut self) -> Option<deno_buf> { - None - } - - fn recv( - &mut self, - _record: (), - _zero_copy_buf: deno_buf, - ) -> (bool, Box<Op<()>>) { - self.recv_count += 1; - (false, Box::new(futures::future::ok(()))) - } - - fn resolve(&mut self, specifier: &str, _referrer: deno_mod) -> deno_mod { - self.resolve_count += 1; - match self.mod_map.get(specifier) { - Some(id) => *id, - None => 0, - } - } - - fn records_reset(&mut self) { - self.reset_count += 1; - } - - fn records_push(&mut self, _record: ()) -> bool { - self.push_count += 1; - true - } - - fn records_shift(&mut self) -> Option<()> { - self.shift_count += 1; - Some(()) - } - } + use crate::test_util::*; #[test] - fn test_recv() { + fn test_dispatch() { let behavior = TestBehavior::new(); let isolate = Isolate::new(behavior); js_check(isolate.execute( "filename.js", r#" - libdeno.send(); + let control = new Uint8Array([42]); + libdeno.send(control); async function main() { - libdeno.send(); + libdeno.send(control); } main(); "#, )); - assert_eq!(isolate.behavior.recv_count, 2); + assert_eq!(isolate.behavior.dispatch_count, 2); } #[test] @@ -491,10 +433,11 @@ mod tests { r#" import { b } from 'b.js' if (b() != 'b') throw Error(); - libdeno.send(); + let control = new Uint8Array([42]); + libdeno.send(control); "#, ).unwrap(); - assert_eq!(isolate.behavior.recv_count, 0); + assert_eq!(isolate.behavior.dispatch_count, 0); assert_eq!(isolate.behavior.resolve_count, 0); let imports = isolate.mod_get_imports(mod_a); @@ -507,16 +450,16 @@ mod tests { assert_eq!(imports.len(), 0); js_check(isolate.mod_instantiate(mod_b)); - assert_eq!(isolate.behavior.recv_count, 0); + assert_eq!(isolate.behavior.dispatch_count, 0); assert_eq!(isolate.behavior.resolve_count, 0); isolate.behavior.register("b.js", mod_b); js_check(isolate.mod_instantiate(mod_a)); - assert_eq!(isolate.behavior.recv_count, 0); + assert_eq!(isolate.behavior.dispatch_count, 0); assert_eq!(isolate.behavior.resolve_count, 1); js_check(isolate.mod_evaluate(mod_a)); - assert_eq!(isolate.behavior.recv_count, 1); + assert_eq!(isolate.behavior.dispatch_count, 1); assert_eq!(isolate.behavior.resolve_count, 1); } @@ -525,11 +468,13 @@ mod tests { let behavior = TestBehavior::new(); let mut isolate = Isolate::new(behavior); + isolate.shared_init(); + js_check(isolate.execute( "setup.js", r#" let nrecv = 0; - libdeno.recv(() => { + Deno._setAsyncHandler((buf) => { nrecv++; }); function assertEq(actual, expected) { @@ -539,32 +484,77 @@ mod tests { } "#, )); - assert_eq!(isolate.behavior.recv_count, 0); + assert_eq!(isolate.behavior.dispatch_count, 0); js_check(isolate.execute( "check1.js", r#" assertEq(nrecv, 0); - libdeno.send(); + let control = new Uint8Array([42]); + libdeno.send(control); assertEq(nrecv, 0); "#, )); - assert_eq!(isolate.behavior.recv_count, 1); + assert_eq!(isolate.behavior.dispatch_count, 1); assert_eq!(Ok(Async::Ready(())), isolate.poll()); - assert_eq!(isolate.behavior.recv_count, 1); + assert_eq!(isolate.behavior.dispatch_count, 1); js_check(isolate.execute( "check2.js", r#" assertEq(nrecv, 1); - libdeno.send(); + libdeno.send(control); assertEq(nrecv, 1); "#, )); - assert_eq!(isolate.behavior.recv_count, 2); + assert_eq!(isolate.behavior.dispatch_count, 2); assert_eq!(Ok(Async::Ready(())), isolate.poll()); js_check(isolate.execute("check3.js", "assertEq(nrecv, 2)")); - assert_eq!(isolate.behavior.recv_count, 2); + assert_eq!(isolate.behavior.dispatch_count, 2); // We are idle, so the next poll should be the last. assert_eq!(Ok(Async::Ready(())), isolate.poll()); } + #[test] + fn test_shared() { + let behavior = TestBehavior::new(); + let mut isolate = Isolate::new(behavior); + + isolate.shared_init(); + + js_check(isolate.execute( + "setup.js", + r#" + let nrecv = 0; + Deno._setAsyncHandler((buf) => { + assert(buf.byteLength === 1); + assert(buf[0] === 43); + nrecv++; + }); + function assert(cond) { + if (!cond) { + throw Error("assert"); + } + } + "#, + )); + assert_eq!(isolate.behavior.dispatch_count, 0); + + js_check(isolate.execute( + "send1.js", + r#" + let control = new Uint8Array([42]); + Deno._sharedQueue.push(control); + libdeno.send(); + assert(nrecv === 0); + + Deno._sharedQueue.push(control); + libdeno.send(); + assert(nrecv === 0); + "#, + )); + assert_eq!(isolate.behavior.dispatch_count, 2); + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + + js_check(isolate.execute("send1.js", "assert(nrecv === 2);")); + } + } diff --git a/core/lib.rs b/core/lib.rs index 66233ddfc..d044f60b5 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -8,6 +8,9 @@ mod flags; mod isolate; mod js_errors; mod libdeno; +mod shared_queue; +#[cfg(test)] +mod test_util; pub use crate::flags::v8_set_flags; pub use crate::isolate::*; diff --git a/core/shared.rs b/core/shared.rs deleted file mode 100644 index 40d83db73..000000000 --- a/core/shared.rs +++ /dev/null @@ -1,49 +0,0 @@ -use crate::libdeno::deno_buf; -use std::mem; - -// TODO this is where we abstract flatbuffers at. -// TODO make these constants private to this file. -const INDEX_NUM_RECORDS: usize = 0; -const INDEX_RECORDS: usize = 1; -pub const RECORD_OFFSET_PROMISE_ID: usize = 0; -pub const RECORD_OFFSET_OP: usize = 1; -pub const RECORD_OFFSET_ARG: usize = 2; -pub const RECORD_OFFSET_RESULT: usize = 3; -const RECORD_SIZE: usize = 4; -const NUM_RECORDS: usize = 100; - -/// Represents the shared buffer between JS and Rust. -/// Used for FFI. -pub struct Shared(Vec<i32>); - -impl Shared { - pub fn new() -> Shared { - let mut vec = Vec::<i32>::new(); - vec.resize(INDEX_RECORDS + RECORD_SIZE * NUM_RECORDS, 0); - Shared(vec) - } - - pub fn set_record(&mut self, i: usize, off: usize, value: i32) { - assert!(i < NUM_RECORDS); - self.0[INDEX_RECORDS + RECORD_SIZE * i + off] = value; - } - - pub fn get_record(&self, i: usize, off: usize) -> i32 { - assert!(i < NUM_RECORDS); - return self.0[INDEX_RECORDS + RECORD_SIZE * i + off]; - } - - pub fn set_num_records(&mut self, num_records: i32) { - self.0[INDEX_NUM_RECORDS] = num_records; - } - - pub fn get_num_records(&self) -> i32 { - return self.0[INDEX_NUM_RECORDS]; - } - - pub fn as_deno_buf(&mut self) -> deno_buf { - let ptr = self.0.as_mut_ptr() as *mut u8; - let len = mem::size_of::<i32>() * self.0.len(); - unsafe { deno_buf::from_raw_parts(ptr, len) } - } -} diff --git a/core/shared_queue.js b/core/shared_queue.js new file mode 100644 index 000000000..36f5dc91b --- /dev/null +++ b/core/shared_queue.js @@ -0,0 +1,127 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +(window => { + const MAX_RECORDS = 100; + const INDEX_NUM_RECORDS = 0; + const INDEX_NUM_SHIFTED_OFF = 1; + const INDEX_HEAD = 2; + const INDEX_OFFSETS = 3; + const INDEX_RECORDS = 3 + MAX_RECORDS; + const HEAD_INIT = 4 * INDEX_RECORDS; + + let sharedBytes = null; + let shared32 = null; + + if (!window["Deno"]) { + window["Deno"] = {}; + } + + function assert(cond) { + if (!cond) { + throw Error("assert"); + } + } + + function reset() { + shared32.fill(0, 0, INDEX_RECORDS); + shared32[INDEX_HEAD] = HEAD_INIT; + } + + function head() { + return shared32[INDEX_HEAD]; + } + + function numRecords() { + return shared32[INDEX_NUM_RECORDS]; + } + + function setEnd(index, end) { + shared32[INDEX_OFFSETS + index] = end; + } + + function getEnd(index) { + if (index < numRecords()) { + return shared32[INDEX_OFFSETS + index]; + } else { + return null; + } + } + + function getOffset(index) { + if (index < numRecords()) { + if (index == 0) { + return HEAD_INIT; + } else { + return shared32[INDEX_OFFSETS + index - 1]; + } + } else { + return null; + } + } + + function push(buf) { + let off = head(); + let end = off + buf.byteLength; + let index = numRecords(); + if (end > shared32.byteLength) { + console.log("shared_queue.ts push fail"); + return false; + } + setEnd(index, end); + assert(end - off == buf.byteLength); + sharedBytes.set(buf, off); + shared32[INDEX_NUM_RECORDS] += 1; + shared32[INDEX_HEAD] = end; + return true; + } + + /// Returns null if empty. + function shift() { + let i = shared32[INDEX_NUM_SHIFTED_OFF]; + if (i >= numRecords()) { + return null; + } + let off = getOffset(i); + let end = getEnd(i); + shared32[INDEX_NUM_SHIFTED_OFF] += 1; + return sharedBytes.subarray(off, end); + } + + function size() { + return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF]; + } + + let asyncHandler = null; + function setAsyncHandler(cb) { + assert(asyncHandler == null); + asyncHandler = cb; + } + + function handleAsyncMsgFromRust() { + let buf; + while ((buf = shift()) != null) { + asyncHandler(buf); + } + } + + function init(shared) { + assert(shared.byteLength > 0); + assert(sharedBytes == null); + assert(shared32 == null); + sharedBytes = new Uint8Array(shared); + shared32 = new Int32Array(shared); + // Callers should not call libdeno.recv, use setAsyncHandler. + libdeno.recv(handleAsyncMsgFromRust); + } + + window.Deno._setAsyncHandler = setAsyncHandler; + window.Deno._sharedQueue = { + head, + numRecords, + size, + push, + reset, + shift + }; + + init(libdeno.shared); +})(this); diff --git a/core/shared_queue.rs b/core/shared_queue.rs new file mode 100644 index 000000000..8e3199504 --- /dev/null +++ b/core/shared_queue.rs @@ -0,0 +1,225 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. +use crate::isolate::Buf; +use crate::libdeno::deno_buf; + +const MAX_RECORDS: usize = 100; +/// Total number of records added. +const INDEX_NUM_RECORDS: usize = 0; +/// Number of records that have been shifted off. +const INDEX_NUM_SHIFTED_OFF: usize = 1; +/// The head is the number of initialized bytes in SharedQueue. +/// It grows monotonically. +const INDEX_HEAD: usize = 2; +const INDEX_OFFSETS: usize = 3; +const INDEX_RECORDS: usize = 3 + MAX_RECORDS; +/// Byte offset of where the records begin. Also where the head starts. +const HEAD_INIT: usize = 4 * INDEX_RECORDS; +/// A rough guess at how big we should make the shared buffer in bytes. +pub const RECOMMENDED_SIZE: usize = 128 * MAX_RECORDS; + +pub struct SharedQueue { + bytes: Vec<u8>, +} + +impl SharedQueue { + pub fn new(len: usize) -> Self { + let mut bytes = Vec::new(); + bytes.resize(HEAD_INIT + len, 0); + let mut q = Self { bytes }; + q.reset(); + q + } + + pub fn as_deno_buf(&self) -> deno_buf { + let ptr = self.bytes.as_ptr(); + let len = self.bytes.len(); + unsafe { deno_buf::from_raw_parts(ptr, len) } + } + + /// Clears the shared buffer. + pub fn reset(&mut self) { + let s: &mut [u32] = self.as_u32_slice_mut(); + for i in 0..INDEX_RECORDS { + s[i] = 0; + } + s[INDEX_HEAD] = HEAD_INIT as u32; + } + + fn as_u32_slice<'a>(&'a self) -> &'a [u32] { + let p = self.bytes.as_ptr() as *const u32; + unsafe { std::slice::from_raw_parts(p, self.bytes.len() / 4) } + } + + fn as_u32_slice_mut<'a>(&'a mut self) -> &'a mut [u32] { + let p = self.bytes.as_mut_ptr() as *mut u32; + unsafe { std::slice::from_raw_parts_mut(p, self.bytes.len() / 4) } + } + + pub fn size(&self) -> usize { + let s = self.as_u32_slice(); + (s[INDEX_NUM_RECORDS] - s[INDEX_NUM_SHIFTED_OFF]) as usize + } + + fn num_records(&self) -> usize { + let s = self.as_u32_slice(); + s[INDEX_NUM_RECORDS] as usize + } + + fn head(&self) -> usize { + let s = self.as_u32_slice(); + s[INDEX_HEAD] as usize + } + + fn set_end(&mut self, index: usize, end: usize) { + let s = self.as_u32_slice_mut(); + s[INDEX_OFFSETS + index] = end as u32; + } + + fn get_end(&self, index: usize) -> Option<usize> { + if index < self.num_records() { + let s = self.as_u32_slice(); + Some(s[INDEX_OFFSETS + index] as usize) + } else { + None + } + } + + fn get_offset(&self, index: usize) -> Option<usize> { + if index < self.num_records() { + Some(if index == 0 { + HEAD_INIT + } else { + let s = self.as_u32_slice(); + s[INDEX_OFFSETS + index - 1] as usize + }) + } else { + None + } + } + + /// Returns none if empty. + pub fn shift<'a>(&'a mut self) -> Option<&'a [u8]> { + let u32_slice = self.as_u32_slice(); + let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize; + if i >= self.num_records() { + assert_eq!(self.size(), 0); + return None; + } + let off = self.get_offset(i).unwrap(); + let end = self.get_end(i).unwrap(); + + let u32_slice = self.as_u32_slice_mut(); + u32_slice[INDEX_NUM_SHIFTED_OFF] += 1; + + Some(&self.bytes[off..end]) + } + + pub fn push(&mut self, record: Buf) -> bool { + let off = self.head(); + let end = off + record.len(); + let index = self.num_records(); + if end > self.bytes.len() { + eprintln!("WARNING the sharedQueue overflowed"); + return false; + } + self.set_end(index, end); + assert_eq!(end - off, record.len()); + self.bytes[off..end].copy_from_slice(&record); + let u32_slice = self.as_u32_slice_mut(); + u32_slice[INDEX_NUM_RECORDS] += 1; + u32_slice[INDEX_HEAD] = end as u32; + true + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::isolate::js_check; + use crate::isolate::Isolate; + use crate::test_util::*; + use futures::Async; + use futures::Future; + + #[test] + fn basic() { + let mut q = SharedQueue::new(RECOMMENDED_SIZE); + + let h = q.head(); + assert!(h > 0); + + let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice(); + let len = r.len() + h; + assert!(q.push(r)); + assert_eq!(q.head(), len); + + let r = vec![6, 7].into_boxed_slice(); + assert!(q.push(r)); + + let r = vec![8, 9, 10, 11].into_boxed_slice(); + assert!(q.push(r)); + assert_eq!(q.num_records(), 3); + assert_eq!(q.size(), 3); + + let r = q.shift().unwrap(); + assert_eq!(r.as_ref(), vec![1, 2, 3, 4, 5].as_slice()); + assert_eq!(q.num_records(), 3); + assert_eq!(q.size(), 2); + + let r = q.shift().unwrap(); + assert_eq!(r.as_ref(), vec![6, 7].as_slice()); + assert_eq!(q.num_records(), 3); + assert_eq!(q.size(), 1); + + let r = q.shift().unwrap(); + assert_eq!(r.as_ref(), vec![8, 9, 10, 11].as_slice()); + assert_eq!(q.num_records(), 3); + assert_eq!(q.size(), 0); + + assert!(q.shift().is_none()); + assert!(q.shift().is_none()); + + assert_eq!(q.num_records(), 3); + assert_eq!(q.size(), 0); + + q.reset(); + assert_eq!(q.num_records(), 0); + assert_eq!(q.size(), 0); + } + + fn alloc_buf(byte_length: usize) -> Buf { + let mut v = Vec::new(); + v.resize(byte_length, 0); + v.into_boxed_slice() + } + + #[test] + fn overflow() { + let mut q = SharedQueue::new(RECOMMENDED_SIZE); + assert!(q.push(alloc_buf(RECOMMENDED_SIZE - 1))); + assert_eq!(q.size(), 1); + assert!(!q.push(alloc_buf(2))); + assert_eq!(q.size(), 1); + assert!(q.push(alloc_buf(1))); + assert_eq!(q.size(), 2); + + assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1); + assert_eq!(q.size(), 1); + assert_eq!(q.shift().unwrap().len(), 1); + assert_eq!(q.size(), 0); + + assert!(!q.push(alloc_buf(1))); + } + + #[test] + fn test_js() { + let behavior = TestBehavior::new(); + let mut isolate = Isolate::new(behavior); + isolate.shared_init(); + js_check( + isolate + .execute("shared_queue_test.js", include_str!("shared_queue_test.js")), + ); + assert_eq!(Ok(Async::Ready(())), isolate.poll()); + } +} diff --git a/core/shared_queue_test.js b/core/shared_queue_test.js new file mode 100644 index 000000000..33000fd45 --- /dev/null +++ b/core/shared_queue_test.js @@ -0,0 +1,65 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. + +function assert(cond) { + if (!cond) { + throw Error("assert"); + } +} + +function main() { + const q = Deno._sharedQueue; + + let h = q.head(); + assert(h > 0); + + let r = new Uint8Array([1, 2, 3, 4, 5]); + let len = r.byteLength + h; + assert(q.push(r)); + assert(q.head() == len); + + r = new Uint8Array([6, 7]); + assert(q.push(r)); + + r = new Uint8Array([8, 9, 10, 11]); + assert(q.push(r)); + assert(q.numRecords() == 3); + assert(q.size() == 3); + + r = q.shift(); + assert(r.byteLength == 5); + assert(r[0] == 1); + assert(r[1] == 2); + assert(r[2] == 3); + assert(r[3] == 4); + assert(r[4] == 5); + assert(q.numRecords() == 3); + assert(q.size() == 2); + + r = q.shift(); + assert(r.byteLength == 2); + assert(r[0] == 6); + assert(r[1] == 7); + assert(q.numRecords() == 3); + assert(q.size() == 1); + + r = q.shift(); + assert(r.byteLength == 4); + assert(r[0] == 8); + assert(r[1] == 9); + assert(r[2] == 10); + assert(r[3] == 11); + assert(q.numRecords() == 3); + assert(q.size() == 0); + + assert(q.shift() == null); + assert(q.shift() == null); + assert(q.numRecords() == 3); + assert(q.size() == 0); + + q.reset(); + assert(q.numRecords() == 0); + assert(q.size() == 0); + libdeno.print("shared_queue_test.js ok\n"); +} + +main(); diff --git a/core/test_util.rs b/core/test_util.rs new file mode 100644 index 000000000..80025c2be --- /dev/null +++ b/core/test_util.rs @@ -0,0 +1,51 @@ +use crate::isolate::Behavior; +use crate::isolate::Op; +use crate::libdeno::deno_buf; +use crate::libdeno::deno_mod; +use std::collections::HashMap; + +pub struct TestBehavior { + pub dispatch_count: usize, + pub resolve_count: usize, + pub mod_map: HashMap<String, deno_mod>, +} + +impl TestBehavior { + pub fn new() -> Self { + Self { + dispatch_count: 0, + resolve_count: 0, + mod_map: HashMap::new(), + } + } + + pub fn register(&mut self, name: &str, id: deno_mod) { + self.mod_map.insert(name.to_string(), id); + } +} + +impl Behavior for TestBehavior { + fn startup_snapshot(&mut self) -> Option<deno_buf> { + None + } + + fn dispatch( + &mut self, + control: &[u8], + _zero_copy_buf: deno_buf, + ) -> (bool, Box<Op>) { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + self.dispatch_count += 1; + let buf = vec![43u8].into_boxed_slice(); + (false, Box::new(futures::future::ok(buf))) + } + + fn resolve(&mut self, specifier: &str, _referrer: deno_mod) -> deno_mod { + self.resolve_count += 1; + match self.mod_map.get(specifier) { + Some(id) => *id, + None => 0, + } + } +} |