summaryrefslogtreecommitdiff
path: root/core/shared_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/shared_queue.rs')
-rw-r--r--core/shared_queue.rs289
1 files changed, 289 insertions, 0 deletions
diff --git a/core/shared_queue.rs b/core/shared_queue.rs
new file mode 100644
index 000000000..dbb738f15
--- /dev/null
+++ b/core/shared_queue.rs
@@ -0,0 +1,289 @@
+// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+/*
+SharedQueue Binary Layout
++-------------------------------+-------------------------------+
+| NUM_RECORDS (32) |
++---------------------------------------------------------------+
+| NUM_SHIFTED_OFF (32) |
++---------------------------------------------------------------+
+| HEAD (32) |
++---------------------------------------------------------------+
+| OFFSETS (32) |
++---------------------------------------------------------------+
+| RECORD_ENDS (*MAX_RECORDS) ...
++---------------------------------------------------------------+
+| RECORDS (*MAX_RECORDS) ...
++---------------------------------------------------------------+
+ */
+
+use crate::libdeno::deno_buf;
+use crate::libdeno::OpId;
+
+const MAX_RECORDS: usize = 100;
+/// Total number of records added.
+const INDEX_NUM_RECORDS: usize = 0;
+/// Number of records that have been shifted off.
+const INDEX_NUM_SHIFTED_OFF: usize = 1;
+/// The head is the number of initialized bytes in SharedQueue.
+/// It grows monotonically.
+const INDEX_HEAD: usize = 2;
+const INDEX_OFFSETS: usize = 3;
+const INDEX_RECORDS: usize = INDEX_OFFSETS + 2 * MAX_RECORDS;
+/// Byte offset of where the records begin. Also where the head starts.
+const HEAD_INIT: usize = 4 * INDEX_RECORDS;
+/// A rough guess at how big we should make the shared buffer in bytes.
+pub const RECOMMENDED_SIZE: usize = 128 * MAX_RECORDS;
+
+pub struct SharedQueue {
+ bytes: Vec<u8>,
+}
+
+impl SharedQueue {
+ pub fn new(len: usize) -> Self {
+ let mut bytes = Vec::new();
+ bytes.resize(HEAD_INIT + len, 0);
+ let mut q = Self { bytes };
+ q.reset();
+ q
+ }
+
+ pub fn as_deno_buf(&self) -> deno_buf {
+ let ptr = self.bytes.as_ptr();
+ let len = self.bytes.len();
+ unsafe { deno_buf::from_raw_parts(ptr, len) }
+ }
+
+ fn reset(&mut self) {
+ debug!("rust:shared_queue:reset");
+ let s: &mut [u32] = self.as_u32_slice_mut();
+ s[INDEX_NUM_RECORDS] = 0;
+ s[INDEX_NUM_SHIFTED_OFF] = 0;
+ s[INDEX_HEAD] = HEAD_INIT as u32;
+ }
+
+ fn as_u32_slice(&self) -> &[u32] {
+ let p = self.bytes.as_ptr();
+ // Assert pointer is 32 bit aligned before casting.
+ assert_eq!((p as usize) % std::mem::align_of::<u32>(), 0);
+ #[allow(clippy::cast_ptr_alignment)]
+ let p32 = p as *const u32;
+ unsafe { std::slice::from_raw_parts(p32, self.bytes.len() / 4) }
+ }
+
+ fn as_u32_slice_mut(&mut self) -> &mut [u32] {
+ let p = self.bytes.as_mut_ptr();
+ // Assert pointer is 32 bit aligned before casting.
+ assert_eq!((p as usize) % std::mem::align_of::<u32>(), 0);
+ #[allow(clippy::cast_ptr_alignment)]
+ let p32 = p as *mut u32;
+ unsafe { std::slice::from_raw_parts_mut(p32, self.bytes.len() / 4) }
+ }
+
+ pub fn size(&self) -> usize {
+ let s = self.as_u32_slice();
+ (s[INDEX_NUM_RECORDS] - s[INDEX_NUM_SHIFTED_OFF]) as usize
+ }
+
+ fn num_records(&self) -> usize {
+ let s = self.as_u32_slice();
+ s[INDEX_NUM_RECORDS] as usize
+ }
+
+ fn head(&self) -> usize {
+ let s = self.as_u32_slice();
+ s[INDEX_HEAD] as usize
+ }
+
+ fn num_shifted_off(&self) -> usize {
+ let s = self.as_u32_slice();
+ s[INDEX_NUM_SHIFTED_OFF] as usize
+ }
+
+ fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) {
+ let s = self.as_u32_slice_mut();
+ s[INDEX_OFFSETS + 2 * index] = end as u32;
+ s[INDEX_OFFSETS + 2 * index + 1] = op_id;
+ }
+
+ #[cfg(test)]
+ fn get_meta(&self, index: usize) -> Option<(OpId, usize)> {
+ if index < self.num_records() {
+ let s = self.as_u32_slice();
+ let end = s[INDEX_OFFSETS + 2 * index] as usize;
+ let op_id = s[INDEX_OFFSETS + 2 * index + 1];
+ Some((op_id, end))
+ } else {
+ None
+ }
+ }
+
+ #[cfg(test)]
+ fn get_offset(&self, index: usize) -> Option<usize> {
+ if index < self.num_records() {
+ Some(if index == 0 {
+ HEAD_INIT
+ } else {
+ let s = self.as_u32_slice();
+ s[INDEX_OFFSETS + 2 * (index - 1)] as usize
+ })
+ } else {
+ None
+ }
+ }
+
+ /// Returns none if empty.
+ #[cfg(test)]
+ pub fn shift(&mut self) -> Option<(OpId, &[u8])> {
+ let u32_slice = self.as_u32_slice();
+ let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize;
+ if self.size() == 0 {
+ assert_eq!(i, 0);
+ return None;
+ }
+
+ let off = self.get_offset(i).unwrap();
+ let (op_id, end) = self.get_meta(i).unwrap();
+
+ if self.size() > 1 {
+ let u32_slice = self.as_u32_slice_mut();
+ u32_slice[INDEX_NUM_SHIFTED_OFF] += 1;
+ } else {
+ self.reset();
+ }
+ println!(
+ "rust:shared_queue:shift: num_records={}, num_shifted_off={}, head={}",
+ self.num_records(),
+ self.num_shifted_off(),
+ self.head()
+ );
+ Some((op_id, &self.bytes[off..end]))
+ }
+
+ /// Because JS-side may cast `record` to Int32Array it is required
+ /// that `record`'s length is divisible by 4.
+ pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool {
+ let off = self.head();
+ let end = off + record.len();
+ debug!(
+ "rust:shared_queue:pre-push: op={}, off={}, end={}, len={}",
+ op_id,
+ off,
+ end,
+ record.len()
+ );
+ assert_eq!(record.len() % 4, 0);
+ let index = self.num_records();
+ if end > self.bytes.len() || index >= MAX_RECORDS {
+ debug!("WARNING the sharedQueue overflowed");
+ return false;
+ }
+ self.set_meta(index, end, op_id);
+ assert_eq!(end - off, record.len());
+ self.bytes[off..end].copy_from_slice(record);
+ let u32_slice = self.as_u32_slice_mut();
+ u32_slice[INDEX_NUM_RECORDS] += 1;
+ u32_slice[INDEX_HEAD] = end as u32;
+ debug!(
+ "rust:shared_queue:push: num_records={}, num_shifted_off={}, head={}",
+ self.num_records(),
+ self.num_shifted_off(),
+ self.head()
+ );
+ true
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::ops::Buf;
+
+ #[test]
+ fn basic() {
+ let mut q = SharedQueue::new(RECOMMENDED_SIZE);
+
+ let h = q.head();
+ assert!(h > 0);
+
+ let r = vec![1u8, 2, 3, 4].into_boxed_slice();
+ let len = r.len() + h;
+ assert!(q.push(0, &r));
+ assert_eq!(q.head(), len);
+
+ let r = vec![5, 6, 7, 8].into_boxed_slice();
+ assert!(q.push(0, &r));
+
+ let r = vec![9, 10, 11, 12].into_boxed_slice();
+ assert!(q.push(0, &r));
+ assert_eq!(q.num_records(), 3);
+ assert_eq!(q.size(), 3);
+
+ let (_op_id, r) = q.shift().unwrap();
+ assert_eq!(r, vec![1, 2, 3, 4].as_slice());
+ assert_eq!(q.num_records(), 3);
+ assert_eq!(q.size(), 2);
+
+ let (_op_id, r) = q.shift().unwrap();
+ assert_eq!(r, vec![5, 6, 7, 8].as_slice());
+ assert_eq!(q.num_records(), 3);
+ assert_eq!(q.size(), 1);
+
+ let (_op_id, r) = q.shift().unwrap();
+ assert_eq!(r, vec![9, 10, 11, 12].as_slice());
+ assert_eq!(q.num_records(), 0);
+ assert_eq!(q.size(), 0);
+
+ assert!(q.shift().is_none());
+ assert!(q.shift().is_none());
+
+ assert_eq!(q.num_records(), 0);
+ assert_eq!(q.size(), 0);
+ }
+
+ fn alloc_buf(byte_length: usize) -> Buf {
+ let mut v = Vec::new();
+ v.resize(byte_length, 0);
+ v.into_boxed_slice()
+ }
+
+ #[test]
+ fn overflow() {
+ let mut q = SharedQueue::new(RECOMMENDED_SIZE);
+ assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 4)));
+ assert_eq!(q.size(), 1);
+ assert!(!q.push(0, &alloc_buf(8)));
+ assert_eq!(q.size(), 1);
+ assert!(q.push(0, &alloc_buf(4)));
+ assert_eq!(q.size(), 2);
+
+ let (_op_id, buf) = q.shift().unwrap();
+ assert_eq!(buf.len(), RECOMMENDED_SIZE - 4);
+ assert_eq!(q.size(), 1);
+
+ assert!(!q.push(0, &alloc_buf(4)));
+
+ let (_op_id, buf) = q.shift().unwrap();
+ assert_eq!(buf.len(), 4);
+ assert_eq!(q.size(), 0);
+ }
+
+ #[test]
+ fn full_records() {
+ let mut q = SharedQueue::new(RECOMMENDED_SIZE);
+ for _ in 0..MAX_RECORDS {
+ assert!(q.push(0, &alloc_buf(4)))
+ }
+ assert_eq!(q.push(0, &alloc_buf(4)), false);
+ // Even if we shift one off, we still cannot push a new record.
+ let _ignored = q.shift().unwrap();
+ assert_eq!(q.push(0, &alloc_buf(4)), false);
+ }
+
+ #[test]
+ #[should_panic]
+ fn bad_buf_length() {
+ let mut q = SharedQueue::new(RECOMMENDED_SIZE);
+ // check that `record` that has length not a multiple of 4 will cause panic
+ q.push(0, &alloc_buf(3));
+ }
+}