diff options
Diffstat (limited to 'core/shared_queue.rs')
-rw-r--r-- | core/shared_queue.rs | 313 |
1 files changed, 0 insertions, 313 deletions
diff --git a/core/shared_queue.rs b/core/shared_queue.rs deleted file mode 100644 index dda54a4df..000000000 --- a/core/shared_queue.rs +++ /dev/null @@ -1,313 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -/* -SharedQueue Binary Layout -+-------------------------------+-------------------------------+ -| NUM_RECORDS (32) | -+---------------------------------------------------------------+ -| NUM_SHIFTED_OFF (32) | -+---------------------------------------------------------------+ -| HEAD (32) | -+---------------------------------------------------------------+ -| OFFSETS (32) | -+---------------------------------------------------------------+ -| RECORD_ENDS (*MAX_RECORDS) ... -+---------------------------------------------------------------+ -| RECORDS (*MAX_RECORDS) ... -+---------------------------------------------------------------+ - */ - -use crate::bindings; -use crate::ops::OpId; -use log::debug; -use rusty_v8 as v8; -use std::convert::TryInto; - -const MAX_RECORDS: usize = 100; -/// Total number of records added. -const INDEX_NUM_RECORDS: usize = 0; -/// Number of records that have been shifted off. -const INDEX_NUM_SHIFTED_OFF: usize = 1; -/// The head is the number of initialized bytes in SharedQueue. -/// It grows monotonically. -const INDEX_HEAD: usize = 2; -const INDEX_OFFSETS: usize = 3; -const INDEX_RECORDS: usize = INDEX_OFFSETS + 2 * MAX_RECORDS; -/// Byte offset of where the records begin. Also where the head starts. -const HEAD_INIT: usize = 4 * INDEX_RECORDS; -/// A rough guess at how big we should make the shared buffer in bytes. -pub const RECOMMENDED_SIZE: usize = 128 * MAX_RECORDS; - -pub struct SharedQueue { - buf: v8::SharedRef<v8::BackingStore>, -} - -impl SharedQueue { - pub fn new(len: usize) -> Self { - let buf = vec![0; HEAD_INIT + len].into_boxed_slice(); - let buf = v8::SharedArrayBuffer::new_backing_store_from_boxed_slice(buf); - let mut q = Self { - buf: buf.make_shared(), - }; - q.reset(); - q - } - - pub fn get_backing_store(&mut self) -> &mut v8::SharedRef<v8::BackingStore> { - &mut self.buf - } - - pub fn bytes(&self) -> &[u8] { - unsafe { - bindings::get_backing_store_slice(&self.buf, 0, self.buf.byte_length()) - } - } - - pub fn bytes_mut(&mut self) -> &mut [u8] { - unsafe { - bindings::get_backing_store_slice_mut( - &self.buf, - 0, - self.buf.byte_length(), - ) - } - } - - fn reset(&mut self) { - debug!("rust:shared_queue:reset"); - let s: &mut [u32] = self.as_u32_slice_mut(); - s[INDEX_NUM_RECORDS] = 0; - s[INDEX_NUM_SHIFTED_OFF] = 0; - s[INDEX_HEAD] = HEAD_INIT as u32; - } - - fn as_u32_slice(&self) -> &[u32] { - let p = self.bytes().as_ptr(); - // Assert pointer is 32 bit aligned before casting. - assert_eq!((p as usize) % std::mem::align_of::<u32>(), 0); - #[allow(clippy::cast_ptr_alignment)] - let p32 = p as *const u32; - unsafe { std::slice::from_raw_parts(p32, self.bytes().len() / 4) } - } - - fn as_u32_slice_mut(&mut self) -> &mut [u32] { - let p = self.bytes_mut().as_mut_ptr(); - // Assert pointer is 32 bit aligned before casting. - assert_eq!((p as usize) % std::mem::align_of::<u32>(), 0); - #[allow(clippy::cast_ptr_alignment)] - let p32 = p as *mut u32; - unsafe { std::slice::from_raw_parts_mut(p32, self.bytes().len() / 4) } - } - - pub fn size(&self) -> usize { - let s = self.as_u32_slice(); - (s[INDEX_NUM_RECORDS] - s[INDEX_NUM_SHIFTED_OFF]) as usize - } - - fn num_records(&self) -> usize { - let s = self.as_u32_slice(); - s[INDEX_NUM_RECORDS] as usize - } - - fn head(&self) -> usize { - let s = self.as_u32_slice(); - s[INDEX_HEAD] as usize - } - - fn num_shifted_off(&self) -> usize { - let s = self.as_u32_slice(); - s[INDEX_NUM_SHIFTED_OFF] as usize - } - - fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) { - let s = self.as_u32_slice_mut(); - s[INDEX_OFFSETS + 2 * index] = end as u32; - s[INDEX_OFFSETS + 2 * index + 1] = op_id.try_into().unwrap(); - } - - #[cfg(test)] - fn get_meta(&self, index: usize) -> Option<(OpId, usize)> { - if index < self.num_records() { - let s = self.as_u32_slice(); - let end = s[INDEX_OFFSETS + 2 * index] as usize; - let op_id = s[INDEX_OFFSETS + 2 * index + 1] as OpId; - Some((op_id, end)) - } else { - None - } - } - - #[cfg(test)] - fn get_offset(&self, index: usize) -> Option<usize> { - if index < self.num_records() { - Some(if index == 0 { - HEAD_INIT - } else { - let s = self.as_u32_slice(); - let prev_end = s[INDEX_OFFSETS + 2 * (index - 1)] as usize; - (prev_end + 3) & !3 - }) - } else { - None - } - } - - /// Returns none if empty. - #[cfg(test)] - pub fn shift(&mut self) -> Option<(OpId, &[u8])> { - let u32_slice = self.as_u32_slice(); - let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize; - if self.size() == 0 { - assert_eq!(i, 0); - return None; - } - - let off = self.get_offset(i).unwrap(); - let (op_id, end) = self.get_meta(i).unwrap(); - if self.size() > 1 { - let u32_slice = self.as_u32_slice_mut(); - u32_slice[INDEX_NUM_SHIFTED_OFF] += 1; - } else { - self.reset(); - } - println!( - "rust:shared_queue:shift: num_records={}, num_shifted_off={}, head={}", - self.num_records(), - self.num_shifted_off(), - self.head() - ); - Some((op_id, &self.bytes()[off..end])) - } - - /// Because JS-side may cast popped message to Int32Array it is required - /// that every message is aligned to 4-bytes. - pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool { - let off = self.head(); - assert_eq!(off % 4, 0); - let end = off + record.len(); - let aligned_end = (end + 3) & !3; - debug!( - "rust:shared_queue:pre-push: op={}, off={}, end={}, len={}, aligned_end={}", - op_id, - off, - end, - record.len(), - aligned_end, - ); - let index = self.num_records(); - if aligned_end > self.bytes().len() || index >= MAX_RECORDS { - debug!("WARNING the sharedQueue overflowed"); - return false; - } - assert_eq!(aligned_end % 4, 0); - self.set_meta(index, end, op_id); - assert_eq!(end - off, record.len()); - self.bytes_mut()[off..end].copy_from_slice(record); - let u32_slice = self.as_u32_slice_mut(); - u32_slice[INDEX_NUM_RECORDS] += 1; - u32_slice[INDEX_HEAD] = aligned_end as u32; - debug!( - "rust:shared_queue:push: num_records={}, num_shifted_off={}, head={}", - self.num_records(), - self.num_shifted_off(), - self.head() - ); - true - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn basic() { - let mut q = SharedQueue::new(RECOMMENDED_SIZE); - - let h = q.head(); - assert!(h > 0); - - let r = vec![1u8, 2, 3, 4].into_boxed_slice(); - let len = r.len() + h; - assert!(q.push(0, &r)); - assert_eq!(q.head(), len); - - let r = vec![5, 6, 7, 8].into_boxed_slice(); - assert!(q.push(0, &r)); - - let r = vec![9, 10, 11, 12].into_boxed_slice(); - assert!(q.push(0, &r)); - assert_eq!(q.num_records(), 3); - assert_eq!(q.size(), 3); - - let (_op_id, r) = q.shift().unwrap(); - assert_eq!(r, vec![1, 2, 3, 4].as_slice()); - assert_eq!(q.num_records(), 3); - assert_eq!(q.size(), 2); - - let (_op_id, r) = q.shift().unwrap(); - assert_eq!(r, vec![5, 6, 7, 8].as_slice()); - assert_eq!(q.num_records(), 3); - assert_eq!(q.size(), 1); - - let (_op_id, r) = q.shift().unwrap(); - assert_eq!(r, vec![9, 10, 11, 12].as_slice()); - assert_eq!(q.num_records(), 0); - assert_eq!(q.size(), 0); - - assert!(q.shift().is_none()); - assert!(q.shift().is_none()); - - assert_eq!(q.num_records(), 0); - assert_eq!(q.size(), 0); - } - - fn alloc_buf(byte_length: usize) -> Box<[u8]> { - vec![0; byte_length].into_boxed_slice() - } - - #[test] - fn overflow() { - let mut q = SharedQueue::new(RECOMMENDED_SIZE); - assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 5))); - assert_eq!(q.size(), 1); - assert!(!q.push(0, &alloc_buf(6))); - assert_eq!(q.size(), 1); - assert!(q.push(0, &alloc_buf(1))); - assert_eq!(q.size(), 2); - - let (_op_id, buf) = q.shift().unwrap(); - assert_eq!(buf.len(), RECOMMENDED_SIZE - 5); - assert_eq!(q.size(), 1); - - assert!(!q.push(0, &alloc_buf(1))); - - let (_op_id, buf) = q.shift().unwrap(); - assert_eq!(buf.len(), 1); - assert_eq!(q.size(), 0); - } - - #[test] - fn full_records() { - let mut q = SharedQueue::new(RECOMMENDED_SIZE); - for _ in 0..MAX_RECORDS { - assert!(q.push(0, &alloc_buf(1))) - } - assert_eq!(q.push(0, &alloc_buf(1)), false); - // Even if we shift one off, we still cannot push a new record. - let _ignored = q.shift().unwrap(); - assert_eq!(q.push(0, &alloc_buf(1)), false); - } - - #[test] - fn allow_any_buf_length() { - let mut q = SharedQueue::new(RECOMMENDED_SIZE); - // Check that `record` that has length not a multiple of 4 will - // not cause panic. Still make sure that records are always - // aligned to 4 bytes. - for i in 1..9 { - q.push(0, &alloc_buf(i)); - assert_eq!(q.num_records(), i); - assert_eq!(q.head() % 4, 0); - } - } -} |