summaryrefslogtreecommitdiff
path: root/core/shared_queue.rs
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2019-08-07 14:02:29 -0400
committerGitHub <noreply@github.com>2019-08-07 14:02:29 -0400
commite438ac2c74c823882dc9c9ecde2a9e9ed7bcfb4b (patch)
tree480b2daa83e31b26c5d611feff021674d289e492 /core/shared_queue.rs
parent5350abbc7ffdba6d17166fa00ad89e86979a43f7 (diff)
Add op_id throughout op API (#2734)
Removes the magic number hack to switch between flatbuffers and the minimal dispatcher. Adds machinery to pass the op_id through the shared_queue.
Diffstat (limited to 'core/shared_queue.rs')
-rw-r--r--core/shared_queue.rs62
1 files changed, 34 insertions, 28 deletions
diff --git a/core/shared_queue.rs b/core/shared_queue.rs
index 616272f8d..11c8e2127 100644
--- a/core/shared_queue.rs
+++ b/core/shared_queue.rs
@@ -17,6 +17,7 @@ SharedQueue Binary Layout
*/
use crate::libdeno::deno_buf;
+use crate::libdeno::OpId;
const MAX_RECORDS: usize = 100;
/// Total number of records added.
@@ -27,7 +28,7 @@ const INDEX_NUM_SHIFTED_OFF: usize = 1;
/// It grows monotonically.
const INDEX_HEAD: usize = 2;
const INDEX_OFFSETS: usize = 3;
-const INDEX_RECORDS: usize = 3 + MAX_RECORDS;
+const INDEX_RECORDS: usize = INDEX_OFFSETS + 2 * MAX_RECORDS;
/// Byte offset of where the records begin. Also where the head starts.
const HEAD_INIT: usize = 4 * INDEX_RECORDS;
/// A rough guess at how big we should make the shared buffer in bytes.
@@ -98,16 +99,19 @@ impl SharedQueue {
s[INDEX_NUM_SHIFTED_OFF] as usize
}
- fn set_end(&mut self, index: usize, end: usize) {
+ fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) {
let s = self.as_u32_slice_mut();
- s[INDEX_OFFSETS + index] = end as u32;
+ s[INDEX_OFFSETS + 2 * index] = end as u32;
+ s[INDEX_OFFSETS + 2 * index + 1] = op_id;
}
#[cfg(test)]
- fn get_end(&self, index: usize) -> Option<usize> {
+ fn get_meta(&self, index: usize) -> Option<(OpId, usize)> {
if index < self.num_records() {
let s = self.as_u32_slice();
- Some(s[INDEX_OFFSETS + index] as usize)
+ let end = s[INDEX_OFFSETS + 2 * index] as usize;
+ let op_id = s[INDEX_OFFSETS + 2 * index + 1];
+ Some((op_id, end))
} else {
None
}
@@ -120,7 +124,7 @@ impl SharedQueue {
HEAD_INIT
} else {
let s = self.as_u32_slice();
- s[INDEX_OFFSETS + index - 1] as usize
+ s[INDEX_OFFSETS + 2 * (index - 1)] as usize
})
} else {
None
@@ -129,7 +133,7 @@ impl SharedQueue {
/// Returns none if empty.
#[cfg(test)]
- pub fn shift(&mut self) -> Option<&[u8]> {
+ pub fn shift(&mut self) -> Option<(OpId, &[u8])> {
let u32_slice = self.as_u32_slice();
let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize;
if self.size() == 0 {
@@ -138,7 +142,7 @@ impl SharedQueue {
}
let off = self.get_offset(i).unwrap();
- let end = self.get_end(i).unwrap();
+ let (op_id, end) = self.get_meta(i).unwrap();
if self.size() > 1 {
let u32_slice = self.as_u32_slice_mut();
@@ -146,16 +150,16 @@ impl SharedQueue {
} else {
self.reset();
}
- debug!(
+ println!(
"rust:shared_queue:shift: num_records={}, num_shifted_off={}, head={}",
self.num_records(),
self.num_shifted_off(),
self.head()
);
- Some(&self.bytes[off..end])
+ Some((op_id, &self.bytes[off..end]))
}
- pub fn push(&mut self, record: &[u8]) -> bool {
+ pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool {
let off = self.head();
let end = off + record.len();
let index = self.num_records();
@@ -163,7 +167,7 @@ impl SharedQueue {
debug!("WARNING the sharedQueue overflowed");
return false;
}
- self.set_end(index, end);
+ self.set_meta(index, end, op_id);
assert_eq!(end - off, record.len());
self.bytes[off..end].copy_from_slice(record);
let u32_slice = self.as_u32_slice_mut();
@@ -193,28 +197,28 @@ mod tests {
let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice();
let len = r.len() + h;
- assert!(q.push(&r));
+ assert!(q.push(0, &r));
assert_eq!(q.head(), len);
let r = vec![6, 7].into_boxed_slice();
- assert!(q.push(&r));
+ assert!(q.push(0, &r));
let r = vec![8, 9, 10, 11].into_boxed_slice();
- assert!(q.push(&r));
+ assert!(q.push(0, &r));
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 3);
- let r = q.shift().unwrap();
+ let (_op_id, r) = q.shift().unwrap();
assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 2);
- let r = q.shift().unwrap();
+ let (_op_id, r) = q.shift().unwrap();
assert_eq!(r, vec![6, 7].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 1);
- let r = q.shift().unwrap();
+ let (_op_id, r) = q.shift().unwrap();
assert_eq!(r, vec![8, 9, 10, 11].as_slice());
assert_eq!(q.num_records(), 0);
assert_eq!(q.size(), 0);
@@ -235,19 +239,21 @@ mod tests {
#[test]
fn overflow() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
- assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1)));
+ assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 1)));
assert_eq!(q.size(), 1);
- assert!(!q.push(&alloc_buf(2)));
+ assert!(!q.push(0, &alloc_buf(2)));
assert_eq!(q.size(), 1);
- assert!(q.push(&alloc_buf(1)));
+ assert!(q.push(0, &alloc_buf(1)));
assert_eq!(q.size(), 2);
- assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1);
+ let (_op_id, buf) = q.shift().unwrap();
+ assert_eq!(buf.len(), RECOMMENDED_SIZE - 1);
assert_eq!(q.size(), 1);
- assert!(!q.push(&alloc_buf(1)));
+ assert!(!q.push(0, &alloc_buf(1)));
- assert_eq!(q.shift().unwrap().len(), 1);
+ let (_op_id, buf) = q.shift().unwrap();
+ assert_eq!(buf.len(), 1);
assert_eq!(q.size(), 0);
}
@@ -255,11 +261,11 @@ mod tests {
fn full_records() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
for _ in 0..MAX_RECORDS {
- assert!(q.push(&alloc_buf(1)))
+ assert!(q.push(0, &alloc_buf(1)))
}
- assert_eq!(q.push(&alloc_buf(1)), false);
+ assert_eq!(q.push(0, &alloc_buf(1)), false);
// Even if we shift one off, we still cannot push a new record.
- assert_eq!(q.shift().unwrap().len(), 1);
- assert_eq!(q.push(&alloc_buf(1)), false);
+ let _ignored = q.shift().unwrap();
+ assert_eq!(q.push(0, &alloc_buf(1)), false);
}
}