summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/core.d.ts6
-rw-r--r--core/examples/http_bench.js16
-rw-r--r--core/examples/http_bench.rs16
-rw-r--r--core/isolate.rs118
-rw-r--r--core/libdeno.rs1
-rw-r--r--core/libdeno/api.cc8
-rw-r--r--core/libdeno/deno.h8
-rw-r--r--core/libdeno/libdeno.d.ts7
-rw-r--r--core/libdeno/libdeno_test.cc2
-rw-r--r--core/shared_queue.js17
-rw-r--r--core/shared_queue.rs45
11 files changed, 157 insertions, 87 deletions
diff --git a/core/core.d.ts b/core/core.d.ts
index b1d1ac57f..0c530e343 100644
--- a/core/core.d.ts
+++ b/core/core.d.ts
@@ -4,15 +4,13 @@
// Deno core. These are not intended to be used directly by runtime users of
// Deno and therefore do not flow through to the runtime type library.
-declare interface MessageCallback {
- (msg: Uint8Array): void;
-}
+declare type MessageCallback = (promiseId: number, msg: Uint8Array) => void;
declare interface DenoCore {
dispatch(
control: Uint8Array,
zeroCopy?: ArrayBufferView | null
- ): Uint8Array | null;
+ ): Uint8Array | null | number;
setAsyncHandler(cb: MessageCallback): void;
sharedQueue: {
head(): number;
diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js
index 8eb764b55..7e678aecd 100644
--- a/core/examples/http_bench.js
+++ b/core/examples/http_bench.js
@@ -13,7 +13,6 @@ const responseBuf = new Uint8Array(
.map(c => c.charCodeAt(0))
);
const promiseMap = new Map();
-let nextPromiseId = 1;
function assert(cond) {
if (!cond) {
@@ -37,8 +36,8 @@ const scratchBytes = new Uint8Array(
);
assert(scratchBytes.byteLength === 4 * 4);
-function send(promiseId, opId, arg, zeroCopy = null) {
- scratch32[0] = promiseId;
+function send(isSync, opId, arg, zeroCopy = null) {
+ scratch32[0] = isSync;
scratch32[1] = opId;
scratch32[2] = arg;
scratch32[3] = -1;
@@ -47,10 +46,9 @@ function send(promiseId, opId, arg, zeroCopy = null) {
/** Returns Promise<number> */
function sendAsync(opId, arg, zeroCopy = null) {
- const promiseId = nextPromiseId++;
+ const promiseId = send(false, opId, arg, zeroCopy);
const p = createResolvable();
promiseMap.set(promiseId, p);
- send(promiseId, opId, arg, zeroCopy);
return p;
}
@@ -58,7 +56,7 @@ function recordFromBuf(buf) {
assert(buf.byteLength === 16);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
return {
- promiseId: buf32[0],
+ isSync: !!buf32[0],
opId: buf32[1],
arg: buf32[2],
result: buf32[3]
@@ -67,14 +65,14 @@ function recordFromBuf(buf) {
/** Returns i32 number */
function sendSync(opId, arg) {
- const buf = send(0, opId, arg);
+ const buf = send(true, opId, arg);
const record = recordFromBuf(buf);
return record.result;
}
-function handleAsyncMsgFromRust(buf) {
+function handleAsyncMsgFromRust(promiseId, buf) {
const record = recordFromBuf(buf);
- const { promiseId, result } = record;
+ const { result } = record;
const p = promiseMap.get(promiseId);
promiseMap.delete(promiseId);
p.resolve(result);
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index e8c5ec1b7..0f0cd6a4b 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -44,7 +44,7 @@ const OP_CLOSE: i32 = 5;
#[derive(Clone, Debug, PartialEq)]
pub struct Record {
- pub promise_id: i32,
+ pub is_sync: i32,
pub op_id: i32,
pub arg: i32,
pub result: i32,
@@ -52,8 +52,8 @@ pub struct Record {
impl Into<Buf> for Record {
fn into(self) -> Buf {
- let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result]
- .into_boxed_slice();
+ let buf32 =
+ vec![self.is_sync, self.op_id, self.arg, self.result].into_boxed_slice();
let ptr = Box::into_raw(buf32) as *mut [u8; 16];
unsafe { Box::from_raw(ptr) }
}
@@ -65,7 +65,7 @@ impl From<&[u8]> for Record {
let ptr = s.as_ptr() as *const i32;
let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
Record {
- promise_id: ints[0],
+ is_sync: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
@@ -81,7 +81,7 @@ impl From<Buf> for Record {
let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) };
assert_eq!(ints.len(), 4);
Record {
- promise_id: ints[0],
+ is_sync: ints[0],
op_id: ints[1],
arg: ints[2],
result: ints[3],
@@ -92,7 +92,7 @@ impl From<Buf> for Record {
#[test]
fn test_record_from() {
let r = Record {
- promise_id: 1,
+ is_sync: 1,
op_id: 2,
arg: 3,
result: 4,
@@ -111,9 +111,9 @@ fn test_record_from() {
pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
-fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> Op {
+fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
let record = Record::from(control);
- let is_sync = record.promise_id == 0;
+ let is_sync = record.is_sync == 1;
let http_bench_op = match record.op_id {
OP_LISTEN => {
assert!(is_sync);
diff --git a/core/isolate.rs b/core/isolate.rs
index 14e1b88aa..ae14c0040 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -21,21 +21,29 @@ use futures::Async::*;
use futures::Future;
use futures::Poll;
use libc::c_char;
+use libc::c_int;
use libc::c_void;
use std::ffi::CStr;
use std::ffi::CString;
use std::ptr::null;
+use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Mutex, Once, ONCE_INIT};
pub type Buf = Box<[u8]>;
-pub type OpAsyncFuture = Box<dyn Future<Item = Buf, Error = ()> + Send>;
+pub type OpAsyncFuture<I, E> = Box<dyn Future<Item = I, Error = E> + Send>;
-pub enum Op {
+pub enum Op<E> {
Sync(Buf),
- Async(OpAsyncFuture),
+ Async(OpAsyncFuture<Buf, E>),
}
+pub type CoreError = ();
+
+type CoreOpAsyncFuture = OpAsyncFuture<(c_int, Buf), CoreError>;
+
+pub type CoreOp = Op<CoreError>;
+
/// Stores a script used to initalize a Isolate
pub struct Script<'a> {
pub source: &'a str,
@@ -68,7 +76,9 @@ pub enum StartupData<'a> {
None,
}
-type DispatchFn = Fn(&[u8], Option<PinnedBuf>) -> Op;
+pub type OpResult<E> = Result<Op<E>, E>;
+
+type CoreDispatchFn = Fn(&[u8], Option<PinnedBuf>) -> CoreOp;
pub type DynImportFuture = Box<dyn Future<Item = deno_mod, Error = ()> + Send>;
type DynImportFn = Fn(&str, &str) -> DynImportFuture;
@@ -93,6 +103,12 @@ impl Future for DynImport {
}
}
+enum ResponseData {
+ None,
+ Buffer(deno_buf),
+ PromiseId(c_int),
+}
+
/// A single execution context of JavaScript. Corresponds roughly to the "Web
/// Worker" concept in the DOM. An Isolate is a Future that can be used with
/// Tokio. The Isolate future complete when there is an error or when all
@@ -104,14 +120,15 @@ impl Future for DynImport {
pub struct Isolate {
libdeno_isolate: *const libdeno::isolate,
shared_libdeno_isolate: Arc<Mutex<Option<*const libdeno::isolate>>>,
- dispatch: Option<Arc<DispatchFn>>,
+ dispatch: Option<Arc<CoreDispatchFn>>,
dyn_import: Option<Arc<DynImportFn>>,
needs_init: bool,
shared: SharedQueue,
- pending_ops: FuturesUnordered<OpAsyncFuture>,
+ pending_ops: FuturesUnordered<CoreOpAsyncFuture>,
pending_dyn_imports: FuturesUnordered<DynImport>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
+ next_promise_id: AtomicI32,
}
unsafe impl Send for Isolate {}
@@ -176,6 +193,7 @@ impl Isolate {
have_unpolled_ops: false,
pending_dyn_imports: FuturesUnordered::new(),
startup_script,
+ next_promise_id: AtomicI32::new(1),
}
}
@@ -184,7 +202,7 @@ impl Isolate {
/// corresponds to the second argument of Deno.core.dispatch().
pub fn set_dispatch<F>(&mut self, f: F)
where
- F: Fn(&[u8], Option<PinnedBuf>) -> Op + Send + Sync + 'static,
+ F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
self.dispatch = Some(Arc::new(f));
}
@@ -239,6 +257,10 @@ impl Isolate {
}
}
+ pub fn get_next_promise_id(&self) -> i32 {
+ self.next_promise_id.fetch_add(1, Ordering::SeqCst)
+ }
+
extern "C" fn pre_dispatch(
user_data: *mut c_void,
control_argv0: deno_buf,
@@ -279,9 +301,17 @@ impl Isolate {
// return value.
// TODO(ry) check that if JSError thrown during respond(), that it will be
// picked up.
- let _ = isolate.respond(Some(&buf));
+ let _ =
+ isolate.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())));
}
Op::Async(fut) => {
+ let promise_id = isolate.get_next_promise_id();
+ let _ = isolate.respond(ResponseData::PromiseId(promise_id));
+ let fut = Box::new(fut.and_then(
+ move |buf| -> Result<(c_int, Buf), CoreError> {
+ Ok((promise_id, buf))
+ },
+ ));
isolate.pending_ops.push(fut);
isolate.have_unpolled_ops = true;
}
@@ -340,14 +370,34 @@ impl Isolate {
}
}
- fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), JSError> {
- let buf = match maybe_buf {
- None => deno_buf::empty(),
- Some(r) => deno_buf::from(r),
+ // the result type is a placeholder for a more specific enum type
+ fn respond(&mut self, data: ResponseData) -> Result<(), JSError> {
+ match data {
+ ResponseData::PromiseId(pid) => unsafe {
+ libdeno::deno_respond(
+ self.libdeno_isolate,
+ self.as_raw_ptr(),
+ deno_buf::empty(),
+ &pid,
+ )
+ },
+ ResponseData::Buffer(r) => unsafe {
+ libdeno::deno_respond(
+ self.libdeno_isolate,
+ self.as_raw_ptr(),
+ r,
+ null(),
+ )
+ },
+ ResponseData::None => unsafe {
+ libdeno::deno_respond(
+ self.libdeno_isolate,
+ self.as_raw_ptr(),
+ deno_buf::empty(),
+ null(),
+ )
+ },
};
- unsafe {
- libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf)
- }
if let Some(err) = self.last_exception() {
Err(err)
} else {
@@ -525,7 +575,7 @@ impl Future for Isolate {
self.shared_init();
- let mut overflow_response: Option<Buf> = None;
+ let mut overflow_response: Option<(c_int, Buf)> = None;
loop {
// If there are any pending dyn_import futures, do those first.
@@ -546,13 +596,13 @@ impl Future for Isolate {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => break,
Ok(NotReady) => break,
- Ok(Ready(Some(buf))) => {
- let successful_push = self.shared.push(&buf);
+ Ok(Ready(Some(op))) => {
+ let successful_push = self.shared.push(op.0, &op.1);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
- overflow_response = Some(buf);
+ overflow_response = Some(op);
break;
}
}
@@ -560,14 +610,16 @@ impl Future for Isolate {
}
if self.shared.size() > 0 {
- self.respond(None)?;
+ self.respond(ResponseData::None)?;
// The other side should have shifted off all the messages.
assert_eq!(self.shared.size(), 0);
}
if overflow_response.is_some() {
- let buf = overflow_response.take().unwrap();
- self.respond(Some(&buf))?;
+ let op = overflow_response.take().unwrap();
+ let promise_id_bytes = op.0.to_be_bytes();
+ let buf: Buf = [&promise_id_bytes, &op.1[..]].concat().into();
+ self.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())))?;
}
self.check_promise_errors();
@@ -664,7 +716,7 @@ pub mod tests {
let dispatch_count_ = dispatch_count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
- isolate.set_dispatch(move |control, _| -> Op {
+ isolate.set_dispatch(move |control, _| -> CoreOp {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::AsyncImmediate => {
@@ -834,7 +886,7 @@ pub mod tests {
"setup2.js",
r#"
let nrecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((promiseId, buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
nrecv++;
@@ -1025,7 +1077,7 @@ pub mod tests {
"overflow_req_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(control);
@@ -1047,7 +1099,7 @@ pub mod tests {
"overflow_res_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
@@ -1068,7 +1120,7 @@ pub mod tests {
"overflow_req_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((cmdId, buf) => {
assert(buf.byteLength === 1);
assert(buf[0] === 43);
asyncRecv++;
@@ -1076,8 +1128,8 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(control);
- // Async messages always have null response.
- assert(response == null);
+ // Async messages always have number type response.
+ assert(typeof response == "number");
assert(asyncRecv == 0);
"#,
));
@@ -1097,7 +1149,7 @@ pub mod tests {
"overflow_res_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((cmdId, buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
@@ -1105,7 +1157,7 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
- assert(response == null);
+ assert(typeof response == "number");
assert(asyncRecv == 0);
"#,
));
@@ -1125,7 +1177,7 @@ pub mod tests {
"overflow_res_multiple_dispatch_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((cmdId, buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
@@ -1133,7 +1185,7 @@ pub mod tests {
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(control);
- assert(response == null);
+ assert(typeof response == "number");
assert(asyncRecv == 0);
// Dispatch another message to verify that pending ops
// are done even if shared space overflows
diff --git a/core/libdeno.rs b/core/libdeno.rs
index 84f21e89e..a17a8e521 100644
--- a/core/libdeno.rs
+++ b/core/libdeno.rs
@@ -267,6 +267,7 @@ extern "C" {
i: *const isolate,
user_data: *const c_void,
buf: deno_buf,
+ promise_id: *const c_int,
);
pub fn deno_pinned_buf_delete(buf: &mut deno_pinned_buf);
pub fn deno_execute(
diff --git a/core/libdeno/api.cc b/core/libdeno/api.cc
index 8a3a56156..30f82b6cc 100644
--- a/core/libdeno/api.cc
+++ b/core/libdeno/api.cc
@@ -153,11 +153,15 @@ void deno_pinned_buf_delete(deno_pinned_buf* buf) {
auto _ = deno::PinnedBuf(buf);
}
-void deno_respond(Deno* d_, void* user_data, deno_buf buf) {
+void deno_respond(Deno* d_, void* user_data, deno_buf buf, int* promise_id) {
auto* d = unwrap(d_);
if (d->current_args_ != nullptr) {
// Synchronous response.
- if (buf.data_ptr != nullptr) {
+ if (promise_id != nullptr) {
+ auto number = v8::Number::New(d->isolate_, *promise_id);
+ d->current_args_->GetReturnValue().Set(number);
+ } else {
+ CHECK_NOT_NULL(buf.data_ptr);
auto ab = deno::ImportBuf(d, buf);
d->current_args_->GetReturnValue().Set(ab);
}
diff --git a/core/libdeno/deno.h b/core/libdeno/deno.h
index 745285554..4f29f2c7a 100644
--- a/core/libdeno/deno.h
+++ b/core/libdeno/deno.h
@@ -81,8 +81,10 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename,
// deno_respond sends up to one message back for every deno_recv_cb made.
//
// If this is called during deno_recv_cb, the issuing libdeno.send() in
-// javascript will synchronously return the specified buf as an ArrayBuffer (or
-// null if buf is empty).
+// javascript will synchronously return the specified promise_id(number)
+// or buf(Uint8Array) (or null if buf and promise_id are both null/empty).
+// Calling with non-null for both buf and promise_id will result in the
+// promise_id being returned.
//
// If this is called after deno_recv_cb has returned, the deno_respond
// will call into the JS callback specified by libdeno.recv().
@@ -92,7 +94,7 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename,
// releasing its memory.)
//
// If a JS exception was encountered, deno_last_exception() will be non-NULL.
-void deno_respond(Deno* d, void* user_data, deno_buf buf);
+void deno_respond(Deno* d, void* user_data, deno_buf buf, int* promise_id);
// consumes zero_copy
void deno_pinned_buf_delete(deno_pinned_buf* buf);
diff --git a/core/libdeno/libdeno.d.ts b/core/libdeno/libdeno.d.ts
index 1bc7367d9..093e846ab 100644
--- a/core/libdeno/libdeno.d.ts
+++ b/core/libdeno/libdeno.d.ts
@@ -12,14 +12,13 @@ interface EvalErrorInfo {
thrown: any;
}
-declare interface MessageCallback {
- (msg: Uint8Array): void;
-}
+declare type MessageCallbackInternal = (msg: Uint8Array) => void;
declare interface DenoCore {
- recv(cb: MessageCallback): void;
+ recv(cb: MessageCallbackInternal): void;
send(
+ cmdId: number,
control: null | ArrayBufferView,
data?: ArrayBufferView
): null | Uint8Array;
diff --git a/core/libdeno/libdeno_test.cc b/core/libdeno/libdeno_test.cc
index 485c95bff..b72a8e098 100644
--- a/core/libdeno/libdeno_test.cc
+++ b/core/libdeno/libdeno_test.cc
@@ -75,7 +75,7 @@ TEST(LibDenoTest, RecvReturnBar) {
EXPECT_EQ(buf.data_ptr[1], 'b');
EXPECT_EQ(buf.data_ptr[2], 'c');
uint8_t response[] = {'b', 'a', 'r'};
- deno_respond(d, user_data, {response, sizeof response});
+ deno_respond(d, user_data, {response, sizeof response}, nullptr);
};
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr});
deno_execute(d, d, "a.js", "RecvReturnBar()");
diff --git a/core/shared_queue.js b/core/shared_queue.js
index 75f370ce4..b413f011e 100644
--- a/core/shared_queue.js
+++ b/core/shared_queue.js
@@ -151,14 +151,27 @@ SharedQueue Binary Layout
function handleAsyncMsgFromRust(buf) {
if (buf) {
- asyncHandler(buf);
+ handleAsyncMsgFromRustInner(buf);
} else {
while ((buf = shift()) != null) {
- asyncHandler(buf);
+ handleAsyncMsgFromRustInner(buf);
}
}
}
+ function handleAsyncMsgFromRustInner(buf) {
+ // DataView to extract cmdId value.
+ const dataView = new DataView(buf.buffer, buf.byteOffset, 4);
+ const promiseId = dataView.getInt32(0);
+ // Uint8 buffer view shifted right and shortened 4 bytes to remove cmdId from view window.
+ const bufViewFinal = new Uint8Array(
+ buf.buffer,
+ buf.byteOffset + 4,
+ buf.byteLength - 4
+ );
+ asyncHandler(promiseId, bufViewFinal);
+ }
+
function dispatch(control, zeroCopy = null) {
maybeInit();
// First try to push control to shared.
diff --git a/core/shared_queue.rs b/core/shared_queue.rs
index c33a37b90..1460fb172 100644
--- a/core/shared_queue.rs
+++ b/core/shared_queue.rs
@@ -17,6 +17,7 @@ SharedQueue Binary Layout
*/
use crate::libdeno::deno_buf;
+use libc::c_int;
const MAX_RECORDS: usize = 100;
/// Total number of records added.
@@ -152,17 +153,19 @@ impl SharedQueue {
Some(&self.bytes[off..end])
}
- pub fn push(&mut self, record: &[u8]) -> bool {
+ pub fn push(&mut self, promise_id: c_int, record: &[u8]) -> bool {
let off = self.head();
- let end = off + record.len();
+ let end = off + record.len() + 4;
let index = self.num_records();
if end > self.bytes.len() || index >= MAX_RECORDS {
debug!("WARNING the sharedQueue overflowed");
return false;
}
self.set_end(index, end);
- assert_eq!(end - off, record.len());
- self.bytes[off..end].copy_from_slice(record);
+ assert_eq!(end - off, record.len() + 4);
+ let pid_bytes = promise_id.to_be_bytes();
+ self.bytes[off..off + 4].copy_from_slice(&pid_bytes);
+ self.bytes[off + 4..end].copy_from_slice(record);
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_RECORDS] += 1;
u32_slice[INDEX_HEAD] = end as u32;
@@ -189,30 +192,30 @@ mod tests {
assert!(h > 0);
let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice();
- let len = r.len() + h;
- assert!(q.push(&r));
+ let len = r.len() + h + 4;
+ assert!(q.push(1, &r));
assert_eq!(q.head(), len);
let r = vec![6, 7].into_boxed_slice();
- assert!(q.push(&r));
+ assert!(q.push(1, &r));
let r = vec![8, 9, 10, 11].into_boxed_slice();
- assert!(q.push(&r));
+ assert!(q.push(1, &r));
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 3);
let r = q.shift().unwrap();
- assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice());
+ assert_eq!(&r[4..], vec![1, 2, 3, 4, 5].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 2);
let r = q.shift().unwrap();
- assert_eq!(r, vec![6, 7].as_slice());
+ assert_eq!(&r[4..], vec![6, 7].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 1);
let r = q.shift().unwrap();
- assert_eq!(r, vec![8, 9, 10, 11].as_slice());
+ assert_eq!(&r[4..], vec![8, 9, 10, 11].as_slice());
assert_eq!(q.num_records(), 0);
assert_eq!(q.size(), 0);
@@ -232,19 +235,19 @@ mod tests {
#[test]
fn overflow() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
- assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1)));
+ assert!(q.push(1, &alloc_buf(RECOMMENDED_SIZE - 1 - (4 * 2))));
assert_eq!(q.size(), 1);
- assert!(!q.push(&alloc_buf(2)));
+ assert!(!q.push(1, &alloc_buf(2)));
assert_eq!(q.size(), 1);
- assert!(q.push(&alloc_buf(1)));
+ assert!(q.push(1, &alloc_buf(1)));
assert_eq!(q.size(), 2);
- assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1);
+ assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1 - 4);
assert_eq!(q.size(), 1);
- assert!(!q.push(&alloc_buf(1)));
+ assert!(!q.push(1, &alloc_buf(1)));
- assert_eq!(q.shift().unwrap().len(), 1);
+ assert_eq!(q.shift().unwrap().len(), 1 + 4);
assert_eq!(q.size(), 0);
}
@@ -252,11 +255,11 @@ mod tests {
fn full_records() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
for _ in 0..MAX_RECORDS {
- assert!(q.push(&alloc_buf(1)))
+ assert!(q.push(1, &alloc_buf(1)))
}
- assert_eq!(q.push(&alloc_buf(1)), false);
+ assert_eq!(q.push(1, &alloc_buf(1)), false);
// Even if we shift one off, we still cannot push a new record.
- assert_eq!(q.shift().unwrap().len(), 1);
- assert_eq!(q.push(&alloc_buf(1)), false);
+ assert_eq!(q.shift().unwrap().len(), 1 + 4);
+ assert_eq!(q.push(1, &alloc_buf(1)), false);
}
}