summaryrefslogtreecommitdiff
path: root/core/shared_queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'core/shared_queue.rs')
-rw-r--r--core/shared_queue.rs45
1 files changed, 24 insertions, 21 deletions
diff --git a/core/shared_queue.rs b/core/shared_queue.rs
index c33a37b90..1460fb172 100644
--- a/core/shared_queue.rs
+++ b/core/shared_queue.rs
@@ -17,6 +17,7 @@ SharedQueue Binary Layout
*/
use crate::libdeno::deno_buf;
+use libc::c_int;
const MAX_RECORDS: usize = 100;
/// Total number of records added.
@@ -152,17 +153,19 @@ impl SharedQueue {
Some(&self.bytes[off..end])
}
- pub fn push(&mut self, record: &[u8]) -> bool {
+ pub fn push(&mut self, promise_id: c_int, record: &[u8]) -> bool {
let off = self.head();
- let end = off + record.len();
+ let end = off + record.len() + 4;
let index = self.num_records();
if end > self.bytes.len() || index >= MAX_RECORDS {
debug!("WARNING the sharedQueue overflowed");
return false;
}
self.set_end(index, end);
- assert_eq!(end - off, record.len());
- self.bytes[off..end].copy_from_slice(record);
+ assert_eq!(end - off, record.len() + 4);
+ let pid_bytes = promise_id.to_be_bytes();
+ self.bytes[off..off + 4].copy_from_slice(&pid_bytes);
+ self.bytes[off + 4..end].copy_from_slice(record);
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_RECORDS] += 1;
u32_slice[INDEX_HEAD] = end as u32;
@@ -189,30 +192,30 @@ mod tests {
assert!(h > 0);
let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice();
- let len = r.len() + h;
- assert!(q.push(&r));
+ let len = r.len() + h + 4;
+ assert!(q.push(1, &r));
assert_eq!(q.head(), len);
let r = vec![6, 7].into_boxed_slice();
- assert!(q.push(&r));
+ assert!(q.push(1, &r));
let r = vec![8, 9, 10, 11].into_boxed_slice();
- assert!(q.push(&r));
+ assert!(q.push(1, &r));
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 3);
let r = q.shift().unwrap();
- assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice());
+ assert_eq!(&r[4..], vec![1, 2, 3, 4, 5].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 2);
let r = q.shift().unwrap();
- assert_eq!(r, vec![6, 7].as_slice());
+ assert_eq!(&r[4..], vec![6, 7].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 1);
let r = q.shift().unwrap();
- assert_eq!(r, vec![8, 9, 10, 11].as_slice());
+ assert_eq!(&r[4..], vec![8, 9, 10, 11].as_slice());
assert_eq!(q.num_records(), 0);
assert_eq!(q.size(), 0);
@@ -232,19 +235,19 @@ mod tests {
#[test]
fn overflow() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
- assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1)));
+ assert!(q.push(1, &alloc_buf(RECOMMENDED_SIZE - 1 - (4 * 2))));
assert_eq!(q.size(), 1);
- assert!(!q.push(&alloc_buf(2)));
+ assert!(!q.push(1, &alloc_buf(2)));
assert_eq!(q.size(), 1);
- assert!(q.push(&alloc_buf(1)));
+ assert!(q.push(1, &alloc_buf(1)));
assert_eq!(q.size(), 2);
- assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1);
+ assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1 - 4);
assert_eq!(q.size(), 1);
- assert!(!q.push(&alloc_buf(1)));
+ assert!(!q.push(1, &alloc_buf(1)));
- assert_eq!(q.shift().unwrap().len(), 1);
+ assert_eq!(q.shift().unwrap().len(), 1 + 4);
assert_eq!(q.size(), 0);
}
@@ -252,11 +255,11 @@ mod tests {
fn full_records() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
for _ in 0..MAX_RECORDS {
- assert!(q.push(&alloc_buf(1)))
+ assert!(q.push(1, &alloc_buf(1)))
}
- assert_eq!(q.push(&alloc_buf(1)), false);
+ assert_eq!(q.push(1, &alloc_buf(1)), false);
// Even if we shift one off, we still cannot push a new record.
- assert_eq!(q.shift().unwrap().len(), 1);
- assert_eq!(q.push(&alloc_buf(1)), false);
+ assert_eq!(q.shift().unwrap().len(), 1 + 4);
+ assert_eq!(q.push(1, &alloc_buf(1)), false);
}
}