diff options
author | Bert Belder <bertbelder@gmail.com> | 2019-04-28 21:31:10 +0200 |
---|---|---|
committer | Bert Belder <bertbelder@gmail.com> | 2019-05-01 21:11:09 +0200 |
commit | 41c7e96f1a81ea416ebb3ba45f2815e0202d6b75 (patch) | |
tree | 5fcb15b8664d7579cf4db76d16754c8efa7c4667 /core/isolate.rs | |
parent | abdb98a2516a9d6ec313805dffbc2107d38f8ed4 (diff) |
Refactor zero-copy buffers for performance and to prevent memory leaks
* In order to prevent ArrayBuffers from getting garbage collected by V8,
we used to store a v8::Persistent<ArrayBuffer> in a map. This patch
introduces a custom ArrayBuffer allocator which doesn't use Persistent
handles, but instead stores a pointer to the actual ArrayBuffer data
alongside with a reference count. Since creating Persistent handles
has quite a bit of overhead, this change significantly increases
performance. Various HTTP server benchmarks report about 5-10% more
requests per second than before.
* Previously the Persistent handle that prevented garbage collection had
to be released manually, and this wasn't always done, which was
causing memory leaks. This has been resolved by introducing a new
`PinnedBuf` type in both Rust and C++ that automatically re-enables
garbage collection when it goes out of scope.
* Zero-copy buffers are now correctly wrapped in an Option if there is a
possibility that they're not present. This clears up a correctness
issue where we were creating zero-length slices from a null pointer,
which is against the rules.
Diffstat (limited to 'core/isolate.rs')
-rw-r--r-- | core/isolate.rs | 140 |
1 files changed, 51 insertions, 89 deletions
diff --git a/core/isolate.rs b/core/isolate.rs index 5488fab75..2cafb29b6 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -8,6 +8,8 @@ use crate::js_errors::JSError; use crate::libdeno; use crate::libdeno::deno_buf; use crate::libdeno::deno_mod; +use crate::libdeno::deno_pinned_buf; +use crate::libdeno::PinnedBuf; use crate::libdeno::Snapshot1; use crate::libdeno::Snapshot2; use crate::shared_queue::SharedQueue; @@ -26,33 +28,6 @@ use std::sync::{Arc, Mutex, Once, ONCE_INIT}; pub type Buf = Box<[u8]>; pub type Op = dyn Future<Item = Buf, Error = ()> + Send; -struct PendingOp { - op: Box<Op>, - zero_copy_id: usize, // non-zero if associated zero-copy buffer. -} - -struct OpResult { - buf: Buf, - zero_copy_id: usize, -} - -impl Future for PendingOp { - type Item = OpResult; - type Error = (); - - fn poll(&mut self) -> Poll<OpResult, ()> { - // Ops should not error. If an op experiences an error it needs to - // encode that error into a buf, so it can be returned to JS. - Ok(match self.op.poll().expect("ops should not error") { - NotReady => NotReady, - Ready(buf) => Ready(OpResult { - buf, - zero_copy_id: self.zero_copy_id, - }), - }) - } -} - /// Stores a script used to initalize a Isolate pub struct Script<'a> { pub source: &'a str, @@ -71,7 +46,8 @@ pub enum StartupData<'a> { #[derive(Default)] pub struct Config { - dispatch: Option<Arc<Fn(&[u8], deno_buf) -> (bool, Box<Op>) + Send + Sync>>, + dispatch: + Option<Arc<Fn(&[u8], Option<PinnedBuf>) -> (bool, Box<Op>) + Send + Sync>>, pub will_snapshot: bool, } @@ -81,7 +57,7 @@ impl Config { /// corresponds to the second argument of Deno.core.dispatch(). pub fn dispatch<F>(&mut self, f: F) where - F: Fn(&[u8], deno_buf) -> (bool, Box<Op>) + Send + Sync + 'static, + F: Fn(&[u8], Option<PinnedBuf>) -> (bool, Box<Op>) + Send + Sync + 'static, { self.dispatch = Some(Arc::new(f)); } @@ -101,7 +77,7 @@ pub struct Isolate { config: Config, needs_init: bool, shared: SharedQueue, - pending_ops: FuturesUnordered<PendingOp>, + pending_ops: FuturesUnordered<Box<Op>>, have_unpolled_ops: bool, } @@ -194,24 +170,22 @@ impl Isolate { extern "C" fn pre_dispatch( user_data: *mut c_void, control_argv0: deno_buf, - zero_copy_buf: deno_buf, + zero_copy_buf: deno_pinned_buf, ) { let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; - let zero_copy_id = zero_copy_buf.zero_copy_id; - let control_shared = isolate.shared.shift(); let (is_sync, op) = if control_argv0.len() > 0 { // The user called Deno.core.send(control) if let Some(ref f) = isolate.config.dispatch { - f(control_argv0.as_ref(), zero_copy_buf) + f(control_argv0.as_ref(), PinnedBuf::new(zero_copy_buf)) } else { panic!("isolate.config.dispatch not set") } } else if let Some(c) = control_shared { // The user called Deno.sharedQueue.push(control) if let Some(ref f) = isolate.config.dispatch { - f(&c, zero_copy_buf) + f(&c, PinnedBuf::new(zero_copy_buf)) } else { panic!("isolate.config.dispatch not set") } @@ -235,17 +209,11 @@ impl Isolate { // picked up. let _ = isolate.respond(Some(&res_record)); } else { - isolate.pending_ops.push(PendingOp { op, zero_copy_id }); + isolate.pending_ops.push(op); isolate.have_unpolled_ops = true; } } - fn zero_copy_release(&self, zero_copy_id: usize) { - unsafe { - libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_id) - } - } - #[inline] unsafe fn from_raw_ptr<'a>(ptr: *const c_void) -> &'a mut Self { let ptr = ptr as *mut _; @@ -469,17 +437,13 @@ impl Future for Isolate { Err(_) => panic!("unexpected op error"), Ok(Ready(None)) => break, Ok(NotReady) => break, - Ok(Ready(Some(r))) => { - if r.zero_copy_id > 0 { - self.zero_copy_release(r.zero_copy_id); - } - - let successful_push = self.shared.push(&r.buf); + Ok(Ready(Some(buf))) => { + let successful_push = self.shared.push(&buf); if !successful_push { // If we couldn't push the response to the shared queue, because // there wasn't enough size, we will return the buffer via the // legacy route, using the argument of deno_respond. - overflow_response = Some(r.buf); + overflow_response = Some(buf); break; } } @@ -591,47 +555,45 @@ pub mod tests { let dispatch_count_ = dispatch_count.clone(); let mut config = Config::default(); - config.dispatch( - move |control: &[u8], _zero_copy_buf: deno_buf| -> (bool, Box<Op>) { - dispatch_count_.fetch_add(1, Ordering::Relaxed); - match mode { - Mode::AsyncImmediate => { - assert_eq!(control.len(), 1); - assert_eq!(control[0], 42); - let buf = vec![43u8].into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) - } - Mode::OverflowReqSync => { - assert_eq!(control.len(), 100 * 1024 * 1024); - let buf = vec![43u8].into_boxed_slice(); - (true, Box::new(futures::future::ok(buf))) - } - Mode::OverflowResSync => { - assert_eq!(control.len(), 1); - assert_eq!(control[0], 42); - let mut vec = Vec::<u8>::new(); - vec.resize(100 * 1024 * 1024, 0); - vec[0] = 99; - let buf = vec.into_boxed_slice(); - (true, Box::new(futures::future::ok(buf))) - } - Mode::OverflowReqAsync => { - assert_eq!(control.len(), 100 * 1024 * 1024); - let buf = vec![43u8].into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) - } - Mode::OverflowResAsync => { - assert_eq!(control.len(), 1); - assert_eq!(control[0], 42); - let mut vec = Vec::<u8>::new(); - vec.resize(100 * 1024 * 1024, 0); - vec[0] = 4; - let buf = vec.into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) - } + config.dispatch(move |control, _| -> (bool, Box<Op>) { + dispatch_count_.fetch_add(1, Ordering::Relaxed); + match mode { + Mode::AsyncImmediate => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let buf = vec![43u8].into_boxed_slice(); + (false, Box::new(futures::future::ok(buf))) } - }, - ); + Mode::OverflowReqSync => { + assert_eq!(control.len(), 100 * 1024 * 1024); + let buf = vec![43u8].into_boxed_slice(); + (true, Box::new(futures::future::ok(buf))) + } + Mode::OverflowResSync => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let mut vec = Vec::<u8>::new(); + vec.resize(100 * 1024 * 1024, 0); + vec[0] = 99; + let buf = vec.into_boxed_slice(); + (true, Box::new(futures::future::ok(buf))) + } + Mode::OverflowReqAsync => { + assert_eq!(control.len(), 100 * 1024 * 1024); + let buf = vec![43u8].into_boxed_slice(); + (false, Box::new(futures::future::ok(buf))) + } + Mode::OverflowResAsync => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let mut vec = Vec::<u8>::new(); + vec.resize(100 * 1024 * 1024, 0); + vec[0] = 4; + let buf = vec.into_boxed_slice(); + (false, Box::new(futures::future::ok(buf))) + } + } + }); let mut isolate = Isolate::new(StartupData::None, config); js_check(isolate.execute( |