diff options
author | Ryan Dahl <ry@tinyclouds.org> | 2019-08-07 14:02:29 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-07 14:02:29 -0400 |
commit | e438ac2c74c823882dc9c9ecde2a9e9ed7bcfb4b (patch) | |
tree | 480b2daa83e31b26c5d611feff021674d289e492 /core/shared_queue.rs | |
parent | 5350abbc7ffdba6d17166fa00ad89e86979a43f7 (diff) |
Add op_id throughout op API (#2734)
Removes the magic number hack to switch between flatbuffers and the
minimal dispatcher.
Adds machinery to pass the op_id through the shared_queue.
Diffstat (limited to 'core/shared_queue.rs')
-rw-r--r-- | core/shared_queue.rs | 62 |
1 files changed, 34 insertions, 28 deletions
diff --git a/core/shared_queue.rs b/core/shared_queue.rs index 616272f8d..11c8e2127 100644 --- a/core/shared_queue.rs +++ b/core/shared_queue.rs @@ -17,6 +17,7 @@ SharedQueue Binary Layout */ use crate::libdeno::deno_buf; +use crate::libdeno::OpId; const MAX_RECORDS: usize = 100; /// Total number of records added. @@ -27,7 +28,7 @@ const INDEX_NUM_SHIFTED_OFF: usize = 1; /// It grows monotonically. const INDEX_HEAD: usize = 2; const INDEX_OFFSETS: usize = 3; -const INDEX_RECORDS: usize = 3 + MAX_RECORDS; +const INDEX_RECORDS: usize = INDEX_OFFSETS + 2 * MAX_RECORDS; /// Byte offset of where the records begin. Also where the head starts. const HEAD_INIT: usize = 4 * INDEX_RECORDS; /// A rough guess at how big we should make the shared buffer in bytes. @@ -98,16 +99,19 @@ impl SharedQueue { s[INDEX_NUM_SHIFTED_OFF] as usize } - fn set_end(&mut self, index: usize, end: usize) { + fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) { let s = self.as_u32_slice_mut(); - s[INDEX_OFFSETS + index] = end as u32; + s[INDEX_OFFSETS + 2 * index] = end as u32; + s[INDEX_OFFSETS + 2 * index + 1] = op_id; } #[cfg(test)] - fn get_end(&self, index: usize) -> Option<usize> { + fn get_meta(&self, index: usize) -> Option<(OpId, usize)> { if index < self.num_records() { let s = self.as_u32_slice(); - Some(s[INDEX_OFFSETS + index] as usize) + let end = s[INDEX_OFFSETS + 2 * index] as usize; + let op_id = s[INDEX_OFFSETS + 2 * index + 1]; + Some((op_id, end)) } else { None } @@ -120,7 +124,7 @@ impl SharedQueue { HEAD_INIT } else { let s = self.as_u32_slice(); - s[INDEX_OFFSETS + index - 1] as usize + s[INDEX_OFFSETS + 2 * (index - 1)] as usize }) } else { None @@ -129,7 +133,7 @@ impl SharedQueue { /// Returns none if empty. #[cfg(test)] - pub fn shift(&mut self) -> Option<&[u8]> { + pub fn shift(&mut self) -> Option<(OpId, &[u8])> { let u32_slice = self.as_u32_slice(); let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize; if self.size() == 0 { @@ -138,7 +142,7 @@ impl SharedQueue { } let off = self.get_offset(i).unwrap(); - let end = self.get_end(i).unwrap(); + let (op_id, end) = self.get_meta(i).unwrap(); if self.size() > 1 { let u32_slice = self.as_u32_slice_mut(); @@ -146,16 +150,16 @@ impl SharedQueue { } else { self.reset(); } - debug!( + println!( "rust:shared_queue:shift: num_records={}, num_shifted_off={}, head={}", self.num_records(), self.num_shifted_off(), self.head() ); - Some(&self.bytes[off..end]) + Some((op_id, &self.bytes[off..end])) } - pub fn push(&mut self, record: &[u8]) -> bool { + pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool { let off = self.head(); let end = off + record.len(); let index = self.num_records(); @@ -163,7 +167,7 @@ impl SharedQueue { debug!("WARNING the sharedQueue overflowed"); return false; } - self.set_end(index, end); + self.set_meta(index, end, op_id); assert_eq!(end - off, record.len()); self.bytes[off..end].copy_from_slice(record); let u32_slice = self.as_u32_slice_mut(); @@ -193,28 +197,28 @@ mod tests { let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice(); let len = r.len() + h; - assert!(q.push(&r)); + assert!(q.push(0, &r)); assert_eq!(q.head(), len); let r = vec![6, 7].into_boxed_slice(); - assert!(q.push(&r)); + assert!(q.push(0, &r)); let r = vec![8, 9, 10, 11].into_boxed_slice(); - assert!(q.push(&r)); + assert!(q.push(0, &r)); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 3); - let r = q.shift().unwrap(); + let (_op_id, r) = q.shift().unwrap(); assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 2); - let r = q.shift().unwrap(); + let (_op_id, r) = q.shift().unwrap(); assert_eq!(r, vec![6, 7].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 1); - let r = q.shift().unwrap(); + let (_op_id, r) = q.shift().unwrap(); assert_eq!(r, vec![8, 9, 10, 11].as_slice()); assert_eq!(q.num_records(), 0); assert_eq!(q.size(), 0); @@ -235,19 +239,21 @@ mod tests { #[test] fn overflow() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); - assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1))); + assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 1))); assert_eq!(q.size(), 1); - assert!(!q.push(&alloc_buf(2))); + assert!(!q.push(0, &alloc_buf(2))); assert_eq!(q.size(), 1); - assert!(q.push(&alloc_buf(1))); + assert!(q.push(0, &alloc_buf(1))); assert_eq!(q.size(), 2); - assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1); + let (_op_id, buf) = q.shift().unwrap(); + assert_eq!(buf.len(), RECOMMENDED_SIZE - 1); assert_eq!(q.size(), 1); - assert!(!q.push(&alloc_buf(1))); + assert!(!q.push(0, &alloc_buf(1))); - assert_eq!(q.shift().unwrap().len(), 1); + let (_op_id, buf) = q.shift().unwrap(); + assert_eq!(buf.len(), 1); assert_eq!(q.size(), 0); } @@ -255,11 +261,11 @@ mod tests { fn full_records() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); for _ in 0..MAX_RECORDS { - assert!(q.push(&alloc_buf(1))) + assert!(q.push(0, &alloc_buf(1))) } - assert_eq!(q.push(&alloc_buf(1)), false); + assert_eq!(q.push(0, &alloc_buf(1)), false); // Even if we shift one off, we still cannot push a new record. - assert_eq!(q.shift().unwrap().len(), 1); - assert_eq!(q.push(&alloc_buf(1)), false); + let _ignored = q.shift().unwrap(); + assert_eq!(q.push(0, &alloc_buf(1)), false); } } |