diff options
author | Bert Belder <bertbelder@gmail.com> | 2019-04-28 21:31:10 +0200 |
---|---|---|
committer | Bert Belder <bertbelder@gmail.com> | 2019-05-01 21:11:09 +0200 |
commit | 41c7e96f1a81ea416ebb3ba45f2815e0202d6b75 (patch) | |
tree | 5fcb15b8664d7579cf4db76d16754c8efa7c4667 | |
parent | abdb98a2516a9d6ec313805dffbc2107d38f8ed4 (diff) |
Refactor zero-copy buffers for performance and to prevent memory leaks
* In order to prevent ArrayBuffers from getting garbage collected by V8,
we used to store a v8::Persistent<ArrayBuffer> in a map. This patch
introduces a custom ArrayBuffer allocator which doesn't use Persistent
handles, but instead stores a pointer to the actual ArrayBuffer data
alongside with a reference count. Since creating Persistent handles
has quite a bit of overhead, this change significantly increases
performance. Various HTTP server benchmarks report about 5-10% more
requests per second than before.
* Previously the Persistent handle that prevented garbage collection had
to be released manually, and this wasn't always done, which was
causing memory leaks. This has been resolved by introducing a new
`PinnedBuf` type in both Rust and C++ that automatically re-enables
garbage collection when it goes out of scope.
* Zero-copy buffers are now correctly wrapped in an Option if there is a
possibility that they're not present. This clears up a correctness
issue where we were creating zero-length slices from a null pointer,
which is against the rules.
-rw-r--r-- | cli/ops.rs | 213 | ||||
-rw-r--r-- | cli/state.rs | 4 | ||||
-rw-r--r-- | core/examples/http_bench.rs | 11 | ||||
-rw-r--r-- | core/isolate.rs | 140 | ||||
-rw-r--r-- | core/lib.rs | 1 | ||||
-rw-r--r-- | core/libdeno.rs | 84 | ||||
-rw-r--r-- | core/libdeno/BUILD.gn | 1 | ||||
-rw-r--r-- | core/libdeno/api.cc | 15 | ||||
-rw-r--r-- | core/libdeno/binding.cc | 27 | ||||
-rw-r--r-- | core/libdeno/buffer.h | 140 | ||||
-rw-r--r-- | core/libdeno/deno.h | 15 | ||||
-rw-r--r-- | core/libdeno/internal.h | 26 | ||||
-rw-r--r-- | core/libdeno/libdeno_test.cc | 33 | ||||
-rw-r--r-- | core/libdeno/modules_test.cc | 7 | ||||
-rw-r--r-- | core/libdeno/test.h | 2 |
15 files changed, 415 insertions, 304 deletions
diff --git a/cli/ops.rs b/cli/ops.rs index d8a0c5cfa..5463bac4d 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -23,11 +23,11 @@ use crate::tokio_write; use crate::version; use crate::worker::root_specifier_to_url; use crate::worker::Worker; -use deno::deno_buf; use deno::js_check; use deno::Buf; use deno::JSError; use deno::Op; +use deno::PinnedBuf; use flatbuffers::FlatBufferBuilder; use futures; use futures::Async; @@ -64,7 +64,7 @@ pub type OpWithError = dyn Future<Item = Buf, Error = DenoError> + Send; // TODO Ideally we wouldn't have to box the OpWithError being returned. // The box is just to make it easier to get a prototype refactor working. type OpCreator = - fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: deno_buf) + fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: Option<PinnedBuf>) -> Box<OpWithError>; pub type OpSelector = fn(inner_type: msg::Any) -> Option<OpCreator>; @@ -81,11 +81,11 @@ fn empty_buf() -> Buf { pub fn dispatch_all( state: &ThreadSafeState, control: &[u8], - zero_copy: deno_buf, + zero_copy: Option<PinnedBuf>, op_selector: OpSelector, ) -> (bool, Box<Op>) { let bytes_sent_control = control.len(); - let bytes_sent_zero_copy = zero_copy.len(); + let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); let base = msg::get_root_as_base(&control); let is_sync = base.sync(); let inner_type = base.inner_type(); @@ -226,9 +226,9 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option<OpCreator> { fn op_now( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let seconds = state.start_time.elapsed().as_secs(); let mut subsec_nanos = state.start_time.elapsed().subsec_nanos(); let reduced_time_precision = 2_000_000; // 2ms in nanoseconds @@ -262,7 +262,7 @@ fn op_now( fn op_is_tty( _state: &ThreadSafeState, base: &msg::Base<'_>, - _data: deno_buf, + _data: Option<PinnedBuf>, ) -> Box<OpWithError> { let builder = &mut FlatBufferBuilder::new(); let inner = msg::IsTTYRes::create( @@ -287,7 +287,7 @@ fn op_is_tty( fn op_exit( _state: &ThreadSafeState, base: &msg::Base<'_>, - _data: deno_buf, + _data: Option<PinnedBuf>, ) -> Box<OpWithError> { let inner = base.inner_as_exit().unwrap(); std::process::exit(inner.code()) @@ -296,9 +296,9 @@ fn op_exit( fn op_start( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let mut builder = FlatBufferBuilder::new(); let state = state; @@ -351,9 +351,9 @@ fn op_start( fn op_format_error( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_format_error().unwrap(); let orig_error = String::from(inner.error().unwrap()); @@ -410,9 +410,9 @@ pub fn odd_future(err: DenoError) -> Box<OpWithError> { fn op_fetch_module_meta_data( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_fetch_module_meta_data().unwrap(); let cmd_id = base.cmd_id(); let specifier = inner.specifier().unwrap(); @@ -452,9 +452,9 @@ fn op_fetch_module_meta_data( fn op_compiler_config( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_compiler_config().unwrap(); let cmd_id = base.cmd_id(); let compiler_type = inner.compiler_type().unwrap(); @@ -486,9 +486,9 @@ fn op_compiler_config( fn op_chdir( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_chdir().unwrap(); let directory = inner.directory().unwrap(); Box::new(futures::future::result(|| -> OpResult { @@ -500,10 +500,10 @@ fn op_chdir( fn op_global_timer_stop( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { assert!(base.sync()); - assert_eq!(data.len(), 0); + assert!(data.is_none()); let state = state; let mut t = state.global_timer.lock().unwrap(); t.cancel(); @@ -513,10 +513,10 @@ fn op_global_timer_stop( fn op_global_timer( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { assert!(!base.sync()); - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_global_timer().unwrap(); let val = inner.timeout(); @@ -546,9 +546,9 @@ fn op_global_timer( fn op_set_env( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_set_env().unwrap(); let key = inner.key().unwrap(); let value = inner.value().unwrap(); @@ -562,9 +562,9 @@ fn op_set_env( fn op_env( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); if let Err(e) = state.check_env() { @@ -594,9 +594,9 @@ fn op_env( fn op_permissions( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); let builder = &mut FlatBufferBuilder::new(); let inner = msg::PermissionsRes::create( @@ -624,9 +624,9 @@ fn op_permissions( fn op_revoke_permission( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_permission_revoke().unwrap(); let permission = inner.permission().unwrap(); let result = match permission { @@ -647,7 +647,7 @@ fn op_revoke_permission( fn op_fetch( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { let inner = base.inner_as_fetch().unwrap(); let cmd_id = base.cmd_id(); @@ -656,10 +656,9 @@ fn op_fetch( assert!(header.is_request()); let url = header.url().unwrap(); - let body = if data.is_empty() { - hyper::Body::empty() - } else { - hyper::Body::from(Vec::from(&*data)) + let body = match data { + None => hyper::Body::empty(), + Some(buf) => hyper::Body::from(Vec::from(&*buf)), }; let maybe_req = msg_util::deserialize_request(header, body); @@ -734,9 +733,9 @@ where fn op_make_temp_dir( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let base = Box::new(*base); let inner = base.inner_as_make_temp_dir().unwrap(); let cmd_id = base.cmd_id(); @@ -783,9 +782,9 @@ fn op_make_temp_dir( fn op_mkdir( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_mkdir().unwrap(); let path = String::from(inner.path().unwrap()); let recursive = inner.recursive(); @@ -805,9 +804,9 @@ fn op_mkdir( fn op_chmod( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_chmod().unwrap(); let _mode = inner.mode(); let path = String::from(inner.path().unwrap()); @@ -844,9 +843,9 @@ fn op_chmod( fn op_open( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_open().unwrap(); let filename_str = inner.filename().unwrap(); @@ -934,9 +933,9 @@ fn op_open( fn op_close( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_close().unwrap(); let rid = inner.rid(); match resources::lookup(rid) { @@ -951,9 +950,9 @@ fn op_close( fn op_kill( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_kill().unwrap(); let pid = inner.pid(); let signo = inner.signo(); @@ -966,9 +965,9 @@ fn op_kill( fn op_shutdown( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_shutdown().unwrap(); let rid = inner.rid(); let how = inner.how(); @@ -992,7 +991,7 @@ fn op_shutdown( fn op_read( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { let cmd_id = base.cmd_id(); let inner = base.inner_as_read().unwrap(); @@ -1001,7 +1000,7 @@ fn op_read( match resources::lookup(rid) { None => odd_future(errors::bad_resource()), Some(resource) => { - let op = tokio::io::read(resource, data) + let op = tokio::io::read(resource, data.unwrap()) .map_err(DenoError::from) .and_then(move |(_resource, _buf, nread)| { let builder = &mut FlatBufferBuilder::new(); @@ -1030,7 +1029,7 @@ fn op_read( fn op_write( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { let cmd_id = base.cmd_id(); let inner = base.inner_as_write().unwrap(); @@ -1039,7 +1038,7 @@ fn op_write( match resources::lookup(rid) { None => odd_future(errors::bad_resource()), Some(resource) => { - let op = tokio_write::write(resource, data) + let op = tokio_write::write(resource, data.unwrap()) .map_err(DenoError::from) .and_then(move |(_resource, _buf, nwritten)| { let builder = &mut FlatBufferBuilder::new(); @@ -1067,9 +1066,9 @@ fn op_write( fn op_seek( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let _cmd_id = base.cmd_id(); let inner = base.inner_as_seek().unwrap(); let rid = inner.rid(); @@ -1089,9 +1088,9 @@ fn op_seek( fn op_remove( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_remove().unwrap(); let path_ = inner.path().unwrap(); let path = PathBuf::from(path_); @@ -1118,9 +1117,9 @@ fn op_remove( fn op_copy_file( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_copy_file().unwrap(); let from_ = inner.from().unwrap(); let from = PathBuf::from(from_); @@ -1174,9 +1173,9 @@ fn get_mode(_perm: &fs::Permissions) -> u32 { fn op_cwd( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); Box::new(futures::future::result(|| -> OpResult { let path = std::env::current_dir()?; @@ -1200,9 +1199,9 @@ fn op_cwd( fn op_stat( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_stat().unwrap(); let cmd_id = base.cmd_id(); let filename_ = inner.filename().unwrap(); @@ -1252,9 +1251,9 @@ fn op_stat( fn op_read_dir( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_read_dir().unwrap(); let cmd_id = base.cmd_id(); let path = String::from(inner.path().unwrap()); @@ -1313,9 +1312,9 @@ fn op_read_dir( fn op_rename( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_rename().unwrap(); let oldpath = PathBuf::from(inner.oldpath().unwrap()); let newpath_ = inner.newpath().unwrap(); @@ -1333,9 +1332,9 @@ fn op_rename( fn op_link( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_link().unwrap(); let oldname = PathBuf::from(inner.oldname().unwrap()); let newname_ = inner.newname().unwrap(); @@ -1355,9 +1354,9 @@ fn op_link( fn op_symlink( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_symlink().unwrap(); let oldname = PathBuf::from(inner.oldname().unwrap()); let newname_ = inner.newname().unwrap(); @@ -1384,9 +1383,9 @@ fn op_symlink( fn op_read_link( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_readlink().unwrap(); let cmd_id = base.cmd_id(); let name_ = inner.name().unwrap(); @@ -1422,9 +1421,9 @@ fn op_read_link( fn op_repl_start( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_repl_start().unwrap(); let cmd_id = base.cmd_id(); let history_file = String::from(inner.history_file().unwrap()); @@ -1453,9 +1452,9 @@ fn op_repl_start( fn op_repl_readline( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_repl_readline().unwrap(); let cmd_id = base.cmd_id(); let rid = inner.rid(); @@ -1489,9 +1488,9 @@ fn op_repl_readline( fn op_truncate( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_truncate().unwrap(); let filename = String::from(inner.name().unwrap()); @@ -1512,9 +1511,9 @@ fn op_truncate( fn op_utime( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_utime().unwrap(); let filename = String::from(inner.filename().unwrap()); @@ -1535,9 +1534,9 @@ fn op_utime( fn op_listen( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); if let Err(e) = state.check_net("listen") { return odd_future(e); } @@ -1597,9 +1596,9 @@ fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult { fn op_accept( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); if let Err(e) = state.check_net("accept") { return odd_future(e); } @@ -1623,9 +1622,9 @@ fn op_accept( fn op_dial( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); if let Err(e) = state.check_net("dial") { return odd_future(e); } @@ -1649,9 +1648,9 @@ fn op_dial( fn op_metrics( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); let builder = &mut FlatBufferBuilder::new(); @@ -1673,9 +1672,9 @@ fn op_metrics( fn op_resources( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); let builder = &mut FlatBufferBuilder::new(); @@ -1725,7 +1724,7 @@ fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio { fn op_run( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { assert!(base.sync()); let cmd_id = base.cmd_id(); @@ -1734,7 +1733,7 @@ fn op_run( return odd_future(e); } - assert_eq!(data.len(), 0); + assert!(data.is_none()); let inner = base.inner_as_run().unwrap(); let args = inner.args().unwrap(); let env = inner.env().unwrap(); @@ -1798,9 +1797,9 @@ fn op_run( fn op_run_status( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_run_status().unwrap(); let rid = inner.rid(); @@ -1871,9 +1870,9 @@ impl Future for GetMessageFuture { fn op_worker_get_message( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); let op = GetMessageFuture { @@ -1906,11 +1905,11 @@ fn op_worker_get_message( fn op_worker_post_message( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { let cmd_id = base.cmd_id(); - let d = Vec::from(data.as_ref()).into_boxed_slice(); + let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let tx = { let wc = state.worker_channels.lock().unwrap(); @@ -1936,9 +1935,9 @@ fn op_worker_post_message( fn op_create_worker( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_create_worker().unwrap(); let specifier = inner.specifier().unwrap(); @@ -1995,9 +1994,9 @@ fn op_create_worker( fn op_host_get_worker_closed( state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_host_get_worker_closed().unwrap(); let rid = inner.rid(); @@ -2026,9 +2025,9 @@ fn op_host_get_worker_closed( fn op_host_get_message( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { - assert_eq!(data.len(), 0); + assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_host_get_message().unwrap(); let rid = inner.rid(); @@ -2060,13 +2059,13 @@ fn op_host_get_message( fn op_host_post_message( _state: &ThreadSafeState, base: &msg::Base<'_>, - data: deno_buf, + data: Option<PinnedBuf>, ) -> Box<OpWithError> { let cmd_id = base.cmd_id(); let inner = base.inner_as_host_post_message().unwrap(); let rid = inner.rid(); - let d = Vec::from(data.as_ref()).into_boxed_slice(); + let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let op = resources::post_message_to_worker(rid, d); let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); diff --git a/cli/state.rs b/cli/state.rs index 5aefe7d90..2bfc641d5 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -8,9 +8,9 @@ use crate::permissions::DenoPermissions; use crate::resources; use crate::resources::ResourceId; use crate::worker::Worker; -use deno::deno_buf; use deno::Buf; use deno::Op; +use deno::PinnedBuf; use futures::future::Shared; use std; use std::collections::HashMap; @@ -84,7 +84,7 @@ impl ThreadSafeState { pub fn dispatch( &self, control: &[u8], - zero_copy: deno_buf, + zero_copy: Option<PinnedBuf>, ) -> (bool, Box<Op>) { ops::dispatch_all(self, control, zero_copy, self.dispatch_selector) } diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 3e02cdd46..b355f5568 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -111,7 +111,10 @@ fn test_record_from() { pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send; -fn dispatch(control: &[u8], zero_copy_buf: deno_buf) -> (bool, Box<Op>) { +fn dispatch( + control: &[u8], + zero_copy_buf: Option<PinnedBuf>, +) -> (bool, Box<Op>) { let record = Record::from(control); let is_sync = record.promise_id == 0; let http_bench_op = match record.op_id { @@ -266,8 +269,9 @@ fn op_close(rid: i32) -> Box<HttpBenchOp> { })) } -fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<HttpBenchOp> { +fn op_read(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> { debug!("read rid={}", rid); + let mut zero_copy_buf = zero_copy_buf.unwrap(); Box::new( futures::future::poll_fn(move || { let mut table = RESOURCE_TABLE.lock().unwrap(); @@ -285,8 +289,9 @@ fn op_read(rid: i32, mut zero_copy_buf: deno_buf) -> Box<HttpBenchOp> { ) } -fn op_write(rid: i32, zero_copy_buf: deno_buf) -> Box<HttpBenchOp> { +fn op_write(rid: i32, zero_copy_buf: Option<PinnedBuf>) -> Box<HttpBenchOp> { debug!("write rid={}", rid); + let zero_copy_buf = zero_copy_buf.unwrap(); Box::new( futures::future::poll_fn(move || { let mut table = RESOURCE_TABLE.lock().unwrap(); diff --git a/core/isolate.rs b/core/isolate.rs index 5488fab75..2cafb29b6 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -8,6 +8,8 @@ use crate::js_errors::JSError; use crate::libdeno; use crate::libdeno::deno_buf; use crate::libdeno::deno_mod; +use crate::libdeno::deno_pinned_buf; +use crate::libdeno::PinnedBuf; use crate::libdeno::Snapshot1; use crate::libdeno::Snapshot2; use crate::shared_queue::SharedQueue; @@ -26,33 +28,6 @@ use std::sync::{Arc, Mutex, Once, ONCE_INIT}; pub type Buf = Box<[u8]>; pub type Op = dyn Future<Item = Buf, Error = ()> + Send; -struct PendingOp { - op: Box<Op>, - zero_copy_id: usize, // non-zero if associated zero-copy buffer. -} - -struct OpResult { - buf: Buf, - zero_copy_id: usize, -} - -impl Future for PendingOp { - type Item = OpResult; - type Error = (); - - fn poll(&mut self) -> Poll<OpResult, ()> { - // Ops should not error. If an op experiences an error it needs to - // encode that error into a buf, so it can be returned to JS. - Ok(match self.op.poll().expect("ops should not error") { - NotReady => NotReady, - Ready(buf) => Ready(OpResult { - buf, - zero_copy_id: self.zero_copy_id, - }), - }) - } -} - /// Stores a script used to initalize a Isolate pub struct Script<'a> { pub source: &'a str, @@ -71,7 +46,8 @@ pub enum StartupData<'a> { #[derive(Default)] pub struct Config { - dispatch: Option<Arc<Fn(&[u8], deno_buf) -> (bool, Box<Op>) + Send + Sync>>, + dispatch: + Option<Arc<Fn(&[u8], Option<PinnedBuf>) -> (bool, Box<Op>) + Send + Sync>>, pub will_snapshot: bool, } @@ -81,7 +57,7 @@ impl Config { /// corresponds to the second argument of Deno.core.dispatch(). pub fn dispatch<F>(&mut self, f: F) where - F: Fn(&[u8], deno_buf) -> (bool, Box<Op>) + Send + Sync + 'static, + F: Fn(&[u8], Option<PinnedBuf>) -> (bool, Box<Op>) + Send + Sync + 'static, { self.dispatch = Some(Arc::new(f)); } @@ -101,7 +77,7 @@ pub struct Isolate { config: Config, needs_init: bool, shared: SharedQueue, - pending_ops: FuturesUnordered<PendingOp>, + pending_ops: FuturesUnordered<Box<Op>>, have_unpolled_ops: bool, } @@ -194,24 +170,22 @@ impl Isolate { extern "C" fn pre_dispatch( user_data: *mut c_void, control_argv0: deno_buf, - zero_copy_buf: deno_buf, + zero_copy_buf: deno_pinned_buf, ) { let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; - let zero_copy_id = zero_copy_buf.zero_copy_id; - let control_shared = isolate.shared.shift(); let (is_sync, op) = if control_argv0.len() > 0 { // The user called Deno.core.send(control) if let Some(ref f) = isolate.config.dispatch { - f(control_argv0.as_ref(), zero_copy_buf) + f(control_argv0.as_ref(), PinnedBuf::new(zero_copy_buf)) } else { panic!("isolate.config.dispatch not set") } } else if let Some(c) = control_shared { // The user called Deno.sharedQueue.push(control) if let Some(ref f) = isolate.config.dispatch { - f(&c, zero_copy_buf) + f(&c, PinnedBuf::new(zero_copy_buf)) } else { panic!("isolate.config.dispatch not set") } @@ -235,17 +209,11 @@ impl Isolate { // picked up. let _ = isolate.respond(Some(&res_record)); } else { - isolate.pending_ops.push(PendingOp { op, zero_copy_id }); + isolate.pending_ops.push(op); isolate.have_unpolled_ops = true; } } - fn zero_copy_release(&self, zero_copy_id: usize) { - unsafe { - libdeno::deno_zero_copy_release(self.libdeno_isolate, zero_copy_id) - } - } - #[inline] unsafe fn from_raw_ptr<'a>(ptr: *const c_void) -> &'a mut Self { let ptr = ptr as *mut _; @@ -469,17 +437,13 @@ impl Future for Isolate { Err(_) => panic!("unexpected op error"), Ok(Ready(None)) => break, Ok(NotReady) => break, - Ok(Ready(Some(r))) => { - if r.zero_copy_id > 0 { - self.zero_copy_release(r.zero_copy_id); - } - - let successful_push = self.shared.push(&r.buf); + Ok(Ready(Some(buf))) => { + let successful_push = self.shared.push(&buf); if !successful_push { // If we couldn't push the response to the shared queue, because // there wasn't enough size, we will return the buffer via the // legacy route, using the argument of deno_respond. - overflow_response = Some(r.buf); + overflow_response = Some(buf); break; } } @@ -591,47 +555,45 @@ pub mod tests { let dispatch_count_ = dispatch_count.clone(); let mut config = Config::default(); - config.dispatch( - move |control: &[u8], _zero_copy_buf: deno_buf| -> (bool, Box<Op>) { - dispatch_count_.fetch_add(1, Ordering::Relaxed); - match mode { - Mode::AsyncImmediate => { - assert_eq!(control.len(), 1); - assert_eq!(control[0], 42); - let buf = vec![43u8].into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) - } - Mode::OverflowReqSync => { - assert_eq!(control.len(), 100 * 1024 * 1024); - let buf = vec![43u8].into_boxed_slice(); - (true, Box::new(futures::future::ok(buf))) - } - Mode::OverflowResSync => { - assert_eq!(control.len(), 1); - assert_eq!(control[0], 42); - let mut vec = Vec::<u8>::new(); - vec.resize(100 * 1024 * 1024, 0); - vec[0] = 99; - let buf = vec.into_boxed_slice(); - (true, Box::new(futures::future::ok(buf))) - } - Mode::OverflowReqAsync => { - assert_eq!(control.len(), 100 * 1024 * 1024); - let buf = vec![43u8].into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) - } - Mode::OverflowResAsync => { - assert_eq!(control.len(), 1); - assert_eq!(control[0], 42); - let mut vec = Vec::<u8>::new(); - vec.resize(100 * 1024 * 1024, 0); - vec[0] = 4; - let buf = vec.into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) - } + config.dispatch(move |control, _| -> (bool, Box<Op>) { + dispatch_count_.fetch_add(1, Ordering::Relaxed); + match mode { + Mode::AsyncImmediate => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let buf = vec![43u8].into_boxed_slice(); + (false, Box::new(futures::future::ok(buf))) } - }, - ); + Mode::OverflowReqSync => { + assert_eq!(control.len(), 100 * 1024 * 1024); + let buf = vec![43u8].into_boxed_slice(); + (true, Box::new(futures::future::ok(buf))) + } + Mode::OverflowResSync => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let mut vec = Vec::<u8>::new(); + vec.resize(100 * 1024 * 1024, 0); + vec[0] = 99; + let buf = vec.into_boxed_slice(); + (true, Box::new(futures::future::ok(buf))) + } + Mode::OverflowReqAsync => { + assert_eq!(control.len(), 100 * 1024 * 1024); + let buf = vec![43u8].into_boxed_slice(); + (false, Box::new(futures::future::ok(buf))) + } + Mode::OverflowResAsync => { + assert_eq!(control.len(), 1); + assert_eq!(control[0], 42); + let mut vec = Vec::<u8>::new(); + vec.resize(100 * 1024 * 1024, 0); + vec[0] = 4; + let buf = vec.into_boxed_slice(); + (false, Box::new(futures::future::ok(buf))) + } + } + }); let mut isolate = Isolate::new(StartupData::None, config); js_check(isolate.execute( diff --git a/core/lib.rs b/core/lib.rs index 56cd69a68..9533bce32 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -16,6 +16,7 @@ pub use crate::isolate::*; pub use crate::js_errors::*; pub use crate::libdeno::deno_buf; pub use crate::libdeno::deno_mod; +pub use crate::libdeno::PinnedBuf; pub use crate::modules::*; pub fn v8_version() -> &'static str { diff --git a/core/libdeno.rs b/core/libdeno.rs index 048db1311..4853c318e 100644 --- a/core/libdeno.rs +++ b/core/libdeno.rs @@ -4,9 +4,13 @@ use libc::c_char; use libc::c_int; use libc::c_void; use libc::size_t; +use std::convert::From; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; +use std::option::Option; use std::ptr::null; +use std::ptr::NonNull; +use std::slice; // TODO(F001): change this definition to `extern { pub type isolate; }` // After RFC 1861 is stablized. See https://github.com/rust-lang/rust/issues/43467. @@ -27,7 +31,6 @@ pub struct deno_buf { alloc_len: usize, data_ptr: *const u8, data_len: usize, - pub zero_copy_id: usize, } /// `deno_buf` can not clone, and there is no interior mutability. @@ -42,7 +45,6 @@ impl deno_buf { alloc_len: 0, data_ptr: null(), data_len: 0, - zero_copy_id: 0, } } @@ -53,7 +55,6 @@ impl deno_buf { alloc_len: 0, data_ptr: ptr, data_len: len, - zero_copy_id: 0, } } } @@ -67,7 +68,6 @@ impl<'a> From<&'a [u8]> for deno_buf { alloc_len: 0, data_ptr: x.as_ref().as_ptr(), data_len: x.len(), - zero_copy_id: 0, } } } @@ -80,35 +80,81 @@ impl Deref for deno_buf { } } -impl DerefMut for deno_buf { +impl AsRef<[u8]> for deno_buf { #[inline] - fn deref_mut(&mut self) -> &mut [u8] { + fn as_ref(&self) -> &[u8] { + &*self + } +} + +/// A PinnedBuf encapsulates a slice that's been borrowed from a JavaScript +/// ArrayBuffer object. JavaScript objects can normally be garbage collected, +/// but the existence of a PinnedBuf inhibits this until it is dropped. It +/// behaves much like an Arc<[u8]>, although a PinnedBuf currently can't be +/// cloned. +#[repr(C)] +pub struct PinnedBuf { + data_ptr: NonNull<u8>, + data_len: usize, + pin: NonNull<c_void>, +} + +#[repr(C)] +pub struct PinnedBufRaw { + data_ptr: *mut u8, + data_len: usize, + pin: *mut c_void, +} + +unsafe impl Send for PinnedBuf {} +unsafe impl Send for PinnedBufRaw {} + +impl PinnedBuf { + pub fn new(raw: PinnedBufRaw) -> Option<Self> { + NonNull::new(raw.data_ptr).map(|data_ptr| PinnedBuf { + data_ptr, + data_len: raw.data_len, + pin: NonNull::new(raw.pin).unwrap(), + }) + } +} + +impl Drop for PinnedBuf { + fn drop(&mut self) { unsafe { - if self.alloc_ptr.is_null() { - panic!("Can't modify the buf"); - } - std::slice::from_raw_parts_mut(self.data_ptr as *mut u8, self.data_len) + let raw = &mut *(self as *mut PinnedBuf as *mut PinnedBufRaw); + deno_pinned_buf_delete(raw); } } } -impl AsRef<[u8]> for deno_buf { - #[inline] +impl Deref for PinnedBuf { + type Target = [u8]; + fn deref(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.data_ptr.as_ptr(), self.data_len) } + } +} + +impl DerefMut for PinnedBuf { + fn deref_mut(&mut self) -> &mut [u8] { + unsafe { slice::from_raw_parts_mut(self.data_ptr.as_ptr(), self.data_len) } + } +} + +impl AsRef<[u8]> for PinnedBuf { fn as_ref(&self) -> &[u8] { &*self } } -impl AsMut<[u8]> for deno_buf { - #[inline] +impl AsMut<[u8]> for PinnedBuf { fn as_mut(&mut self) -> &mut [u8] { - if self.alloc_ptr.is_null() { - panic!("Can't modify the buf"); - } &mut *self } } +pub use PinnedBufRaw as deno_pinned_buf; + #[repr(C)] pub struct deno_snapshot<'a> { pub data_ptr: *const u8, @@ -156,7 +202,7 @@ impl Snapshot2<'_> { type deno_recv_cb = unsafe extern "C" fn( user_data: *mut c_void, control_buf: deno_buf, // deprecated - zero_copy_buf: deno_buf, + zero_copy_buf: deno_pinned_buf, ); #[allow(non_camel_case_types)] @@ -220,7 +266,7 @@ extern "C" { user_data: *const c_void, buf: deno_buf, ); - pub fn deno_zero_copy_release(i: *const isolate, zero_copy_id: usize); + pub fn deno_pinned_buf_delete(buf: &mut deno_pinned_buf); pub fn deno_execute( i: *const isolate, user_data: *const c_void, diff --git a/core/libdeno/BUILD.gn b/core/libdeno/BUILD.gn index 8ad3dccc3..31a5be640 100644 --- a/core/libdeno/BUILD.gn +++ b/core/libdeno/BUILD.gn @@ -46,6 +46,7 @@ v8_source_set("libdeno") { sources = [ "api.cc", "binding.cc", + "buffer.h", "deno.h", "exceptions.cc", "exceptions.h", diff --git a/core/libdeno/api.cc b/core/libdeno/api.cc index 53dad58e6..1110b1d34 100644 --- a/core/libdeno/api.cc +++ b/core/libdeno/api.cc @@ -46,7 +46,7 @@ Deno* deno_new(deno_config config) { } deno::DenoIsolate* d = new deno::DenoIsolate(config); v8::Isolate::CreateParams params; - params.array_buffer_allocator = d->array_buffer_allocator_; + params.array_buffer_allocator = &deno::ArrayBufferAllocator::global(); params.external_references = deno::external_references; if (config.load_snapshot.data_ptr) { @@ -148,12 +148,9 @@ void deno_execute(Deno* d_, void* user_data, const char* js_filename, deno::Execute(context, js_filename, js_source); } -void deno_zero_copy_release(Deno* d_, size_t zero_copy_id) { - auto* d = unwrap(d_); - v8::Isolate::Scope isolate_scope(d->isolate_); - v8::Locker locker(d->isolate_); - v8::HandleScope handle_scope(d->isolate_); - d->DeleteZeroCopyRef(zero_copy_id); +void deno_pinned_buf_delete(deno_pinned_buf* buf) { + // The PinnedBuf destructor implicitly releases the ArrayBuffer reference. + auto _ = deno::PinnedBuf(*buf); } void deno_respond(Deno* d_, void* user_data, deno_buf buf) { @@ -161,7 +158,6 @@ void deno_respond(Deno* d_, void* user_data, deno_buf buf) { if (d->current_args_ != nullptr) { // Synchronous response. if (buf.data_ptr != nullptr) { - DCHECK_EQ(buf.zero_copy_id, 0); auto ab = deno::ImportBuf(d, buf); d->current_args_->GetReturnValue().Set(ab); } @@ -189,9 +185,6 @@ void deno_respond(Deno* d_, void* user_data, deno_buf buf) { v8::Local<v8::Value> args[1]; int argc = 0; - // You cannot use zero_copy_buf with deno_respond(). Use - // deno_zero_copy_release() instead. - DCHECK_EQ(buf.zero_copy_id, 0); if (buf.data_ptr != nullptr) { args[0] = deno::ImportBuf(d, buf); argc = 1; diff --git a/core/libdeno/binding.cc b/core/libdeno/binding.cc index ab633f46d..80f733b4f 100644 --- a/core/libdeno/binding.cc +++ b/core/libdeno/binding.cc @@ -163,9 +163,6 @@ void ErrorToJSON(const v8::FunctionCallbackInfo<v8::Value>& args) { } v8::Local<v8::Uint8Array> ImportBuf(DenoIsolate* d, deno_buf buf) { - // Do not use ImportBuf with zero_copy buffers. - DCHECK_EQ(buf.zero_copy_id, 0); - if (buf.data_ptr == nullptr) { return v8::Local<v8::Uint8Array>(); } @@ -248,11 +245,9 @@ void Send(const v8::FunctionCallbackInfo<v8::Value>& args) { DenoIsolate* d = DenoIsolate::FromIsolate(isolate); DCHECK_EQ(d->isolate_, isolate); - deno_buf control = {nullptr, 0u, nullptr, 0u, 0u}; - deno_buf zero_copy = {nullptr, 0u, nullptr, 0u, 0u}; - v8::HandleScope handle_scope(isolate); + deno_buf control = {nullptr, 0u, nullptr, 0u}; if (args.Length() > 0) { v8::Local<v8::Value> control_v = args[0]; if (control_v->IsArrayBufferView()) { @@ -261,25 +256,15 @@ void Send(const v8::FunctionCallbackInfo<v8::Value>& args) { } } - v8::Local<v8::Value> zero_copy_v; - if (args.Length() == 2) { - if (args[1]->IsArrayBufferView()) { - zero_copy_v = args[1]; - zero_copy = GetContents( - isolate, v8::Local<v8::ArrayBufferView>::Cast(zero_copy_v)); - size_t zero_copy_id = d->next_zero_copy_id_++; - DCHECK_GT(zero_copy_id, 0); - zero_copy.zero_copy_id = zero_copy_id; - // If the zero_copy ArrayBuffer was given, we must maintain a strong - // reference to it until deno_zero_copy_release is called. - d->AddZeroCopyRef(zero_copy_id, zero_copy_v); - } - } + PinnedBuf zero_copy = + args[1]->IsArrayBufferView() + ? PinnedBuf(v8::Local<v8::ArrayBufferView>::Cast(args[1])) + : PinnedBuf(); DCHECK_NULL(d->current_args_); d->current_args_ = &args; - d->recv_cb_(d->user_data_, control, zero_copy); + d->recv_cb_(d->user_data_, control, zero_copy.IntoRaw()); if (d->current_args_ == nullptr) { // This indicates that deno_repond() was called already. diff --git a/core/libdeno/buffer.h b/core/libdeno/buffer.h new file mode 100644 index 000000000..4b3587d2d --- /dev/null +++ b/core/libdeno/buffer.h @@ -0,0 +1,140 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +#ifndef BUFFER_H_ +#define BUFFER_H_ + +// Cpplint bans the use of <mutex> because it duplicates functionality in +// chromium //base. However Deno doensn't use that, so suppress that lint. +#include <memory> +#include <mutex> // NOLINT +#include <string> +#include <unordered_map> +#include <utility> + +#include "third_party/v8/include/v8.h" +#include "third_party/v8/src/base/logging.h" + +namespace deno { + +class ArrayBufferAllocator : public v8::ArrayBuffer::Allocator { + public: + static ArrayBufferAllocator& global() { + static ArrayBufferAllocator global_instance; + return global_instance; + } + + void* Allocate(size_t length) override { return new uint8_t[length](); } + + void* AllocateUninitialized(size_t length) override { + return new uint8_t[length]; + } + + void Free(void* data, size_t length) override { Unref(data); } + + private: + friend class PinnedBuf; + + void Ref(void* data) { + std::lock_guard<std::mutex> lock(ref_count_map_mutex_); + // Note: + // - `unordered_map::insert(make_pair(key, value))` returns the existing + // item if the key, already exists in the map, otherwise it creates an + // new entry with `value`. + // - Buffers not in the map have an implicit reference count of one. + auto entry = ref_count_map_.insert(std::make_pair(data, 1)).first; + ++entry->second; + } + + void Unref(void* data) { + { + std::lock_guard<std::mutex> lock(ref_count_map_mutex_); + auto entry = ref_count_map_.find(data); + if (entry == ref_count_map_.end()) { + // Buffers not in the map have an implicit ref count of one. After + // dereferencing there are no references left, so we delete the buffer. + } else if (--entry->second == 0) { + // The reference count went to zero, so erase the map entry and free the + // buffer. + ref_count_map_.erase(entry); + } else { + // After decreasing the reference count the buffer still has references + // left, so we leave the pin in place. + return; + } + delete[] reinterpret_cast<uint8_t*>(data); + } + } + + private: + ArrayBufferAllocator() {} + + ~ArrayBufferAllocator() { + // TODO(pisciaureus): Enable this check. It currently fails sometimes + // because the compiler worker isolate never actually exits, so when the + // process exits this isolate still holds on to some buffers. + // CHECK(ref_count_map_.empty()); + } + + std::unordered_map<void*, size_t> ref_count_map_; + std::mutex ref_count_map_mutex_; +}; + +class PinnedBuf { + struct Unref { + // This callback gets called from the Pin destructor. + void operator()(void* ptr) { ArrayBufferAllocator::global().Unref(ptr); } + }; + // The Pin is a unique (non-copyable) smart pointer which automatically + // unrefs the referenced ArrayBuffer in its destructor. + using Pin = std::unique_ptr<void, Unref>; + + uint8_t* data_ptr_; + size_t data_len_; + Pin pin_; + + public: + // PinnedBuf::Raw is a POD struct with the same memory layout as the PinBuf + // itself. It is used to move a PinnedBuf between C and Rust. + struct Raw { + uint8_t* data_ptr; + size_t data_len; + void* pin; + }; + + PinnedBuf() : data_ptr_(nullptr), data_len_(0), pin_() {} + + explicit PinnedBuf(v8::Local<v8::ArrayBufferView> view) { + auto buf = view->Buffer()->GetContents().Data(); + ArrayBufferAllocator::global().Ref(buf); + + data_ptr_ = reinterpret_cast<uint8_t*>(buf) + view->ByteOffset(); + data_len_ = view->ByteLength(); + pin_ = Pin(buf); + } + + // This constructor recreates a PinnedBuf that has previously been converted + // to a PinnedBuf::Raw using the IntoRaw() method. This is a move operation; + // the Raw struct is emptied in the process. + explicit PinnedBuf(Raw raw) + : data_ptr_(raw.data_ptr), data_len_(raw.data_len), pin_(raw.pin) { + raw.data_ptr = nullptr; + raw.data_len = 0; + raw.pin = nullptr; + } + + // The IntoRaw() method converts the PinnedBuf to a PinnedBuf::Raw so it's + // ownership can be moved to Rust. The source PinnedBuf is emptied in the + // process, but the pinned ArrayBuffer is not dereferenced. In order to not + // leak it, the raw struct must eventually be turned back into a PinnedBuf + // using the constructor above. + Raw IntoRaw() { + Raw raw{ + .data_ptr = data_ptr_, .data_len = data_len_, .pin = pin_.release()}; + data_ptr_ = nullptr; + data_len_ = 0; + return raw; + } +}; + +} // namespace deno + +#endif // BUFFER_H_ diff --git a/core/libdeno/deno.h b/core/libdeno/deno.h index 8054090ed..f83f00834 100644 --- a/core/libdeno/deno.h +++ b/core/libdeno/deno.h @@ -1,21 +1,26 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. #ifndef DENO_H_ #define DENO_H_ + #include <stddef.h> #include <stdint.h> + +#include "buffer.h" + // Neither Rust nor Go support calling directly into C++ functions, therefore // the public interface to libdeno is done in C. #ifdef __cplusplus extern "C" { #endif +typedef deno::PinnedBuf::Raw deno_pinned_buf; + // Data that gets transmitted. typedef struct { - uint8_t* alloc_ptr; // Start of memory allocation (returned from `malloc()`). + uint8_t* alloc_ptr; // Start of memory allocation (from `new uint8_t[len]`). size_t alloc_len; // Length of the memory allocation. uint8_t* data_ptr; // Start of logical contents (within the allocation). size_t data_len; // Length of logical contents. - size_t zero_copy_id; // 0 = normal, 1 = must call deno_zero_copy_release. } deno_buf; typedef struct { @@ -29,7 +34,7 @@ typedef struct deno_s Deno; // control_buf is valid for only for the lifetime of this callback. // data_buf is valid until deno_respond() is called. typedef void (*deno_recv_cb)(void* user_data, deno_buf control_buf, - deno_buf zerop_copy_buf); + deno_pinned_buf zero_copy_buf); void deno_init(); const char* deno_v8_version(); @@ -84,9 +89,7 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename, void deno_respond(Deno* d, void* user_data, deno_buf buf); // consumes zero_copy -// Calling this function more than once with the same zero_copy_id will result -// in an error. -void deno_zero_copy_release(Deno* d, size_t zero_copy_id); +void deno_pinned_buf_delete(deno_pinned_buf* buf); void deno_check_promise_errors(Deno* d); diff --git a/core/libdeno/internal.h b/core/libdeno/internal.h index 0f4df9908..b75cc9717 100644 --- a/core/libdeno/internal.h +++ b/core/libdeno/internal.h @@ -6,6 +6,8 @@ #include <string> #include <utility> #include <vector> + +#include "buffer.h" #include "deno.h" #include "third_party/v8/include/v8.h" #include "third_party/v8/src/base/logging.h" @@ -36,11 +38,9 @@ class DenoIsolate { snapshot_creator_(nullptr), global_import_buf_ptr_(nullptr), recv_cb_(config.recv_cb), - next_zero_copy_id_(1), // zero_copy_id must not be zero. user_data_(nullptr), resolve_cb_(nullptr), has_snapshotted_(false) { - array_buffer_allocator_ = v8::ArrayBuffer::Allocator::NewDefaultAllocator(); if (config.load_snapshot.data_ptr) { snapshot_.data = reinterpret_cast<const char*>(config.load_snapshot.data_ptr); @@ -65,7 +65,6 @@ class DenoIsolate { } else { isolate_->Dispose(); } - delete array_buffer_allocator_; } static inline DenoIsolate* FromIsolate(v8::Isolate* isolate) { @@ -89,31 +88,13 @@ class DenoIsolate { } } - void DeleteZeroCopyRef(size_t zero_copy_id) { - DCHECK_NE(zero_copy_id, 0); - // Delete persistent reference to data ArrayBuffer. - auto it = zero_copy_map_.find(zero_copy_id); - if (it != zero_copy_map_.end()) { - it->second.Reset(); - zero_copy_map_.erase(it); - } - } - - void AddZeroCopyRef(size_t zero_copy_id, v8::Local<v8::Value> zero_copy_v) { - zero_copy_map_.emplace(std::piecewise_construct, - std::make_tuple(zero_copy_id), - std::make_tuple(isolate_, zero_copy_v)); - } - v8::Isolate* isolate_; v8::Locker* locker_; - v8::ArrayBuffer::Allocator* array_buffer_allocator_; deno_buf shared_; const v8::FunctionCallbackInfo<v8::Value>* current_args_; v8::SnapshotCreator* snapshot_creator_; void* global_import_buf_ptr_; deno_recv_cb recv_cb_; - size_t next_zero_copy_id_; void* user_data_; std::map<deno_mod, ModuleInfo> mods_; @@ -121,7 +102,6 @@ class DenoIsolate { deno_resolve_cb resolve_cb_; v8::Persistent<v8::Context> context_; - std::map<size_t, v8::Persistent<v8::Value>> zero_copy_map_; std::map<int, v8::Persistent<v8::Value>> pending_promise_map_; std::string last_exception_; v8::Persistent<v8::Function> recv_; @@ -177,7 +157,7 @@ static intptr_t external_references[] = { reinterpret_cast<intptr_t>(MessageCallback), 0}; -static const deno_buf empty_buf = {nullptr, 0, nullptr, 0, 0}; +static const deno_buf empty_buf = {nullptr, 0, nullptr, 0}; static const deno_snapshot empty_snapshot = {nullptr, 0}; Deno* NewFromSnapshot(void* user_data, deno_recv_cb cb); diff --git a/core/libdeno/libdeno_test.cc b/core/libdeno/libdeno_test.cc index 2b254b32f..70ea5581c 100644 --- a/core/libdeno/libdeno_test.cc +++ b/core/libdeno/libdeno_test.cc @@ -43,22 +43,19 @@ TEST(LibDenoTest, ErrorsCorrectly) { deno_buf strbuf(const char* str) { auto len = strlen(str); - deno_buf buf; - buf.alloc_ptr = reinterpret_cast<uint8_t*>(strdup(str)); - buf.alloc_len = len + 1; + buf.alloc_ptr = new uint8_t[len]; + buf.alloc_len = len; buf.data_ptr = buf.alloc_ptr; buf.data_len = len; - buf.zero_copy_id = 0; - + memcpy(buf.data_ptr, str, len); return buf; } -void assert_null(deno_buf b) { - EXPECT_EQ(b.alloc_ptr, nullptr); - EXPECT_EQ(b.alloc_len, 0u); +void assert_null(deno_pinned_buf b) { EXPECT_EQ(b.data_ptr, nullptr); EXPECT_EQ(b.data_len, 0u); + EXPECT_EQ(b.pin, nullptr); } TEST(LibDenoTest, RecvReturnEmpty) { @@ -88,8 +85,6 @@ TEST(LibDenoTest, RecvReturnBar) { EXPECT_EQ(buf.data_ptr[0], 'a'); EXPECT_EQ(buf.data_ptr[1], 'b'); EXPECT_EQ(buf.data_ptr[2], 'c'); - EXPECT_EQ(zero_copy_buf.zero_copy_id, 0u); - EXPECT_EQ(zero_copy_buf.data_ptr, nullptr); deno_respond(d, user_data, strbuf("bar")); }; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb}); @@ -124,11 +119,11 @@ TEST(LibDenoTest, SendRecvSlice) { // Make copy of the backing buffer -- this is currently necessary // because deno_respond() takes ownership over the buffer, but we are // not given ownership of `buf` by our caller. - uint8_t* alloc_ptr = reinterpret_cast<uint8_t*>(malloc(alloc_len)); + uint8_t* alloc_ptr = new uint8_t[alloc_len]; memcpy(alloc_ptr, buf.alloc_ptr, alloc_len); // Make a slice that is a bit shorter than the original. deno_buf buf2{alloc_ptr, alloc_len, alloc_ptr + data_offset, - buf.data_len - 19, 0}; + buf.data_len - 19}; // Place some values into the buffer for the JS side to verify. buf2.data_ptr[0] = 200 + i; buf2.data_ptr[buf2.data_len - 1] = 200 - i; @@ -193,10 +188,11 @@ TEST(LibDenoTest, GlobalErrorHandling) { TEST(LibDenoTest, ZeroCopyBuf) { static int count = 0; - static deno_buf zero_copy_buf2; - auto recv_cb = [](auto user_data, deno_buf buf, deno_buf zero_copy_buf) { + static deno_pinned_buf zero_copy_buf2; + auto recv_cb = [](auto user_data, deno_buf buf, + deno_pinned_buf zero_copy_buf) { count++; - EXPECT_GT(zero_copy_buf.zero_copy_id, 0u); + EXPECT_NE(zero_copy_buf.pin, nullptr); zero_copy_buf.data_ptr[0] = 4; zero_copy_buf.data_ptr[1] = 2; zero_copy_buf2 = zero_copy_buf; @@ -207,8 +203,7 @@ TEST(LibDenoTest, ZeroCopyBuf) { // Note zero_copy_buf won't actually be freed here because in // libdeno_test.js zeroCopyBuf is a rooted global. We just want to exercise // the API here. - auto d = reinterpret_cast<Deno*>(user_data); - deno_zero_copy_release(d, zero_copy_buf.zero_copy_id); + deno_pinned_buf_delete(&zero_copy_buf); }; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb}); deno_execute(d, d, "a.js", "ZeroCopyBuf()"); @@ -271,7 +266,7 @@ TEST(LibDenoTest, EncodeErrorBug) { TEST(LibDenoTest, Shared) { uint8_t s[] = {0, 1, 2}; - deno_buf shared = {nullptr, 0, s, 3, 0}; + deno_buf shared = {nullptr, 0, s, 3}; Deno* d = deno_new(deno_config{0, snapshot, shared, nullptr}); deno_execute(d, nullptr, "a.js", "Shared()"); EXPECT_EQ(nullptr, deno_last_exception(d)); @@ -306,7 +301,7 @@ TEST(LibDenoTest, LibDenoEvalContextError) { TEST(LibDenoTest, SharedAtomics) { int32_t s[] = {0, 1, 2}; - deno_buf shared = {nullptr, 0, reinterpret_cast<uint8_t*>(s), sizeof s, 0}; + deno_buf shared = {nullptr, 0, reinterpret_cast<uint8_t*>(s), sizeof s}; Deno* d = deno_new(deno_config{0, empty_snapshot, shared, nullptr}); deno_execute(d, nullptr, "a.js", "Atomics.add(new Int32Array(Deno.core.shared), 0, 1)"); diff --git a/core/libdeno/modules_test.cc b/core/libdeno/modules_test.cc index 0eaa9e8eb..f3a97396f 100644 --- a/core/libdeno/modules_test.cc +++ b/core/libdeno/modules_test.cc @@ -1,13 +1,14 @@ -// Copyright 2018 the Deno authors. All rights reserved. MIT license. +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. #include "test.h" static int exec_count = 0; -void recv_cb(void* user_data, deno_buf buf, deno_buf zero_copy_buf) { +void recv_cb(void* user_data, deno_buf buf, deno_pinned_buf zero_copy_buf) { // We use this to check that scripts have executed. EXPECT_EQ(1u, buf.data_len); EXPECT_EQ(buf.data_ptr[0], 4); - EXPECT_EQ(zero_copy_buf.zero_copy_id, 0u); EXPECT_EQ(zero_copy_buf.data_ptr, nullptr); + EXPECT_EQ(zero_copy_buf.data_len, 0u); + EXPECT_EQ(zero_copy_buf.pin, nullptr); exec_count++; } diff --git a/core/libdeno/test.h b/core/libdeno/test.h index dd5dc99b2..d7e6a3f80 100644 --- a/core/libdeno/test.h +++ b/core/libdeno/test.h @@ -6,7 +6,7 @@ #include "testing/gtest/include/gtest/gtest.h" extern deno_snapshot snapshot; // Loaded in libdeno/test.cc -const deno_buf empty = {nullptr, 0, nullptr, 0, 0}; +const deno_buf empty = {nullptr, 0, nullptr, 0}; const deno_snapshot empty_snapshot = {nullptr, 0}; #endif // TEST_H_ |