diff options
Diffstat (limited to 'core/shared_queue.rs')
-rw-r--r-- | core/shared_queue.rs | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/core/shared_queue.rs b/core/shared_queue.rs index c33a37b90..1460fb172 100644 --- a/core/shared_queue.rs +++ b/core/shared_queue.rs @@ -17,6 +17,7 @@ SharedQueue Binary Layout */ use crate::libdeno::deno_buf; +use libc::c_int; const MAX_RECORDS: usize = 100; /// Total number of records added. @@ -152,17 +153,19 @@ impl SharedQueue { Some(&self.bytes[off..end]) } - pub fn push(&mut self, record: &[u8]) -> bool { + pub fn push(&mut self, promise_id: c_int, record: &[u8]) -> bool { let off = self.head(); - let end = off + record.len(); + let end = off + record.len() + 4; let index = self.num_records(); if end > self.bytes.len() || index >= MAX_RECORDS { debug!("WARNING the sharedQueue overflowed"); return false; } self.set_end(index, end); - assert_eq!(end - off, record.len()); - self.bytes[off..end].copy_from_slice(record); + assert_eq!(end - off, record.len() + 4); + let pid_bytes = promise_id.to_be_bytes(); + self.bytes[off..off + 4].copy_from_slice(&pid_bytes); + self.bytes[off + 4..end].copy_from_slice(record); let u32_slice = self.as_u32_slice_mut(); u32_slice[INDEX_NUM_RECORDS] += 1; u32_slice[INDEX_HEAD] = end as u32; @@ -189,30 +192,30 @@ mod tests { assert!(h > 0); let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice(); - let len = r.len() + h; - assert!(q.push(&r)); + let len = r.len() + h + 4; + assert!(q.push(1, &r)); assert_eq!(q.head(), len); let r = vec![6, 7].into_boxed_slice(); - assert!(q.push(&r)); + assert!(q.push(1, &r)); let r = vec![8, 9, 10, 11].into_boxed_slice(); - assert!(q.push(&r)); + assert!(q.push(1, &r)); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 3); let r = q.shift().unwrap(); - assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice()); + assert_eq!(&r[4..], vec![1, 2, 3, 4, 5].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 2); let r = q.shift().unwrap(); - assert_eq!(r, vec![6, 7].as_slice()); + assert_eq!(&r[4..], vec![6, 7].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 1); let r = q.shift().unwrap(); - assert_eq!(r, vec![8, 9, 10, 11].as_slice()); + assert_eq!(&r[4..], vec![8, 9, 10, 11].as_slice()); assert_eq!(q.num_records(), 0); assert_eq!(q.size(), 0); @@ -232,19 +235,19 @@ mod tests { #[test] fn overflow() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); - assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1))); + assert!(q.push(1, &alloc_buf(RECOMMENDED_SIZE - 1 - (4 * 2)))); assert_eq!(q.size(), 1); - assert!(!q.push(&alloc_buf(2))); + assert!(!q.push(1, &alloc_buf(2))); assert_eq!(q.size(), 1); - assert!(q.push(&alloc_buf(1))); + assert!(q.push(1, &alloc_buf(1))); assert_eq!(q.size(), 2); - assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1); + assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1 - 4); assert_eq!(q.size(), 1); - assert!(!q.push(&alloc_buf(1))); + assert!(!q.push(1, &alloc_buf(1))); - assert_eq!(q.shift().unwrap().len(), 1); + assert_eq!(q.shift().unwrap().len(), 1 + 4); assert_eq!(q.size(), 0); } @@ -252,11 +255,11 @@ mod tests { fn full_records() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); for _ in 0..MAX_RECORDS { - assert!(q.push(&alloc_buf(1))) + assert!(q.push(1, &alloc_buf(1))) } - assert_eq!(q.push(&alloc_buf(1)), false); + assert_eq!(q.push(1, &alloc_buf(1)), false); // Even if we shift one off, we still cannot push a new record. - assert_eq!(q.shift().unwrap().len(), 1); - assert_eq!(q.push(&alloc_buf(1)), false); + assert_eq!(q.shift().unwrap().len(), 1 + 4); + assert_eq!(q.push(1, &alloc_buf(1)), false); } } |