summaryrefslogtreecommitdiff
path: root/core/shared_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/shared_queue.rs')
-rw-r--r--core/shared_queue.rs43
1 files changed, 23 insertions, 20 deletions
diff --git a/core/shared_queue.rs b/core/shared_queue.rs
index dad3e380d..0f35d1310 100644
--- a/core/shared_queue.rs
+++ b/core/shared_queue.rs
@@ -134,7 +134,8 @@ impl SharedQueue {
HEAD_INIT
} else {
let s = self.as_u32_slice();
- s[INDEX_OFFSETS + 2 * (index - 1)] as usize
+ let prev_end = s[INDEX_OFFSETS + 2 * (index - 1)] as usize;
+ (prev_end + 3) & !3
})
} else {
None
@@ -153,7 +154,6 @@ impl SharedQueue {
let off = self.get_offset(i).unwrap();
let (op_id, end) = self.get_meta(i).unwrap();
-
if self.size() > 1 {
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_SHIFTED_OFF] += 1;
@@ -169,29 +169,33 @@ impl SharedQueue {
Some((op_id, &self.bytes()[off..end]))
}
- /// Because JS-side may cast `record` to Int32Array it is required
- /// that `record`'s length is divisible by 4.
+ /// Because JS-side may cast popped message to Int32Array it is required
+ /// that every message is aligned to 4-bytes.
pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool {
let off = self.head();
+ assert_eq!(off % 4, 0);
let end = off + record.len();
+ let aligned_end = (end + 3) & !3;
debug!(
- "rust:shared_queue:pre-push: op={}, off={}, end={}, len={}",
+ "rust:shared_queue:pre-push: op={}, off={}, end={}, len={}, aligned_end={}",
op_id,
off,
end,
- record.len()
+ record.len(),
+ aligned_end,
);
let index = self.num_records();
- if end > self.bytes().len() || index >= MAX_RECORDS {
+ if aligned_end > self.bytes().len() || index >= MAX_RECORDS {
debug!("WARNING the sharedQueue overflowed");
return false;
}
+ assert_eq!(aligned_end % 4, 0);
self.set_meta(index, end, op_id);
assert_eq!(end - off, record.len());
self.bytes_mut()[off..end].copy_from_slice(record);
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_RECORDS] += 1;
- u32_slice[INDEX_HEAD] = end as u32;
+ u32_slice[INDEX_HEAD] = aligned_end as u32;
debug!(
"rust:shared_queue:push: num_records={}, num_shifted_off={}, head={}",
self.num_records(),
@@ -258,15 +262,15 @@ mod tests {
#[test]
fn overflow() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
- assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 1)));
+ assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 5)));
assert_eq!(q.size(), 1);
- assert!(!q.push(0, &alloc_buf(2)));
+ assert!(!q.push(0, &alloc_buf(6)));
assert_eq!(q.size(), 1);
assert!(q.push(0, &alloc_buf(1)));
assert_eq!(q.size(), 2);
let (_op_id, buf) = q.shift().unwrap();
- assert_eq!(buf.len(), RECOMMENDED_SIZE - 1);
+ assert_eq!(buf.len(), RECOMMENDED_SIZE - 5);
assert_eq!(q.size(), 1);
assert!(!q.push(0, &alloc_buf(1)));
@@ -291,14 +295,13 @@ mod tests {
#[test]
fn allow_any_buf_length() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
- // check that `record` that has length not a multiple of 4 will cause panic
- q.push(0, &alloc_buf(1));
- q.push(0, &alloc_buf(2));
- q.push(0, &alloc_buf(3));
- q.push(0, &alloc_buf(4));
- q.push(0, &alloc_buf(5));
- q.push(0, &alloc_buf(6));
- q.push(0, &alloc_buf(7));
- q.push(0, &alloc_buf(8));
+ // Check that `record` that has length not a multiple of 4 will
+ // not cause panic. Still make sure that records are always
+ // aligned to 4 bytes.
+ for i in 1..9 {
+ q.push(0, &alloc_buf(i));
+ assert_eq!(q.num_records(), i);
+ assert_eq!(q.head() % 4, 0);
+ }
}
}