diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/core.d.ts | 6 | ||||
-rw-r--r-- | core/examples/http_bench.js | 16 | ||||
-rw-r--r-- | core/examples/http_bench.rs | 16 | ||||
-rw-r--r-- | core/isolate.rs | 118 | ||||
-rw-r--r-- | core/libdeno.rs | 1 | ||||
-rw-r--r-- | core/libdeno/api.cc | 8 | ||||
-rw-r--r-- | core/libdeno/deno.h | 8 | ||||
-rw-r--r-- | core/libdeno/libdeno.d.ts | 7 | ||||
-rw-r--r-- | core/libdeno/libdeno_test.cc | 2 | ||||
-rw-r--r-- | core/shared_queue.js | 17 | ||||
-rw-r--r-- | core/shared_queue.rs | 45 |
11 files changed, 157 insertions, 87 deletions
diff --git a/core/core.d.ts b/core/core.d.ts index b1d1ac57f..0c530e343 100644 --- a/core/core.d.ts +++ b/core/core.d.ts @@ -4,15 +4,13 @@ // Deno core. These are not intended to be used directly by runtime users of // Deno and therefore do not flow through to the runtime type library. -declare interface MessageCallback { - (msg: Uint8Array): void; -} +declare type MessageCallback = (promiseId: number, msg: Uint8Array) => void; declare interface DenoCore { dispatch( control: Uint8Array, zeroCopy?: ArrayBufferView | null - ): Uint8Array | null; + ): Uint8Array | null | number; setAsyncHandler(cb: MessageCallback): void; sharedQueue: { head(): number; diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js index 8eb764b55..7e678aecd 100644 --- a/core/examples/http_bench.js +++ b/core/examples/http_bench.js @@ -13,7 +13,6 @@ const responseBuf = new Uint8Array( .map(c => c.charCodeAt(0)) ); const promiseMap = new Map(); -let nextPromiseId = 1; function assert(cond) { if (!cond) { @@ -37,8 +36,8 @@ const scratchBytes = new Uint8Array( ); assert(scratchBytes.byteLength === 4 * 4); -function send(promiseId, opId, arg, zeroCopy = null) { - scratch32[0] = promiseId; +function send(isSync, opId, arg, zeroCopy = null) { + scratch32[0] = isSync; scratch32[1] = opId; scratch32[2] = arg; scratch32[3] = -1; @@ -47,10 +46,9 @@ function send(promiseId, opId, arg, zeroCopy = null) { /** Returns Promise<number> */ function sendAsync(opId, arg, zeroCopy = null) { - const promiseId = nextPromiseId++; + const promiseId = send(false, opId, arg, zeroCopy); const p = createResolvable(); promiseMap.set(promiseId, p); - send(promiseId, opId, arg, zeroCopy); return p; } @@ -58,7 +56,7 @@ function recordFromBuf(buf) { assert(buf.byteLength === 16); const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); return { - promiseId: buf32[0], + isSync: !!buf32[0], opId: buf32[1], arg: buf32[2], result: buf32[3] @@ -67,14 +65,14 @@ function recordFromBuf(buf) { /** Returns i32 number */ function sendSync(opId, arg) { - const buf = send(0, opId, arg); + const buf = send(true, opId, arg); const record = recordFromBuf(buf); return record.result; } -function handleAsyncMsgFromRust(buf) { +function handleAsyncMsgFromRust(promiseId, buf) { const record = recordFromBuf(buf); - const { promiseId, result } = record; + const { result } = record; const p = promiseMap.get(promiseId); promiseMap.delete(promiseId); p.resolve(result); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index e8c5ec1b7..0f0cd6a4b 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -44,7 +44,7 @@ const OP_CLOSE: i32 = 5; #[derive(Clone, Debug, PartialEq)] pub struct Record { - pub promise_id: i32, + pub is_sync: i32, pub op_id: i32, pub arg: i32, pub result: i32, @@ -52,8 +52,8 @@ pub struct Record { impl Into<Buf> for Record { fn into(self) -> Buf { - let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result] - .into_boxed_slice(); + let buf32 = + vec![self.is_sync, self.op_id, self.arg, self.result].into_boxed_slice(); let ptr = Box::into_raw(buf32) as *mut [u8; 16]; unsafe { Box::from_raw(ptr) } } @@ -65,7 +65,7 @@ impl From<&[u8]> for Record { let ptr = s.as_ptr() as *const i32; let ints = unsafe { std::slice::from_raw_parts(ptr, 4) }; Record { - promise_id: ints[0], + is_sync: ints[0], op_id: ints[1], arg: ints[2], result: ints[3], @@ -81,7 +81,7 @@ impl From<Buf> for Record { let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) }; assert_eq!(ints.len(), 4); Record { - promise_id: ints[0], + is_sync: ints[0], op_id: ints[1], arg: ints[2], result: ints[3], @@ -92,7 +92,7 @@ impl From<Buf> for Record { #[test] fn test_record_from() { let r = Record { - promise_id: 1, + is_sync: 1, op_id: 2, arg: 3, result: 4, @@ -111,9 +111,9 @@ fn test_record_from() { pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send; -fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> Op { +fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp { let record = Record::from(control); - let is_sync = record.promise_id == 0; + let is_sync = record.is_sync == 1; let http_bench_op = match record.op_id { OP_LISTEN => { assert!(is_sync); diff --git a/core/isolate.rs b/core/isolate.rs index 14e1b88aa..ae14c0040 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -21,21 +21,29 @@ use futures::Async::*; use futures::Future; use futures::Poll; use libc::c_char; +use libc::c_int; use libc::c_void; use std::ffi::CStr; use std::ffi::CString; use std::ptr::null; +use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Mutex, Once, ONCE_INIT}; pub type Buf = Box<[u8]>; -pub type OpAsyncFuture = Box<dyn Future<Item = Buf, Error = ()> + Send>; +pub type OpAsyncFuture<I, E> = Box<dyn Future<Item = I, Error = E> + Send>; -pub enum Op { +pub enum Op<E> { Sync(Buf), - Async(OpAsyncFuture), + Async(OpAsyncFuture<Buf, E>), } +pub type CoreError = (); + +type CoreOpAsyncFuture = OpAsyncFuture<(c_int, Buf), CoreError>; + +pub type CoreOp = Op<CoreError>; + /// Stores a script used to initalize a Isolate pub struct Script<'a> { pub source: &'a str, @@ -68,7 +76,9 @@ pub enum StartupData<'a> { None, } -type DispatchFn = Fn(&[u8], Option<PinnedBuf>) -> Op; +pub type OpResult<E> = Result<Op<E>, E>; + +type CoreDispatchFn = Fn(&[u8], Option<PinnedBuf>) -> CoreOp; pub type DynImportFuture = Box<dyn Future<Item = deno_mod, Error = ()> + Send>; type DynImportFn = Fn(&str, &str) -> DynImportFuture; @@ -93,6 +103,12 @@ impl Future for DynImport { } } +enum ResponseData { + None, + Buffer(deno_buf), + PromiseId(c_int), +} + /// A single execution context of JavaScript. Corresponds roughly to the "Web /// Worker" concept in the DOM. An Isolate is a Future that can be used with /// Tokio. The Isolate future complete when there is an error or when all @@ -104,14 +120,15 @@ impl Future for DynImport { pub struct Isolate { libdeno_isolate: *const libdeno::isolate, shared_libdeno_isolate: Arc<Mutex<Option<*const libdeno::isolate>>>, - dispatch: Option<Arc<DispatchFn>>, + dispatch: Option<Arc<CoreDispatchFn>>, dyn_import: Option<Arc<DynImportFn>>, needs_init: bool, shared: SharedQueue, - pending_ops: FuturesUnordered<OpAsyncFuture>, + pending_ops: FuturesUnordered<CoreOpAsyncFuture>, pending_dyn_imports: FuturesUnordered<DynImport>, have_unpolled_ops: bool, startup_script: Option<OwnedScript>, + next_promise_id: AtomicI32, } unsafe impl Send for Isolate {} @@ -176,6 +193,7 @@ impl Isolate { have_unpolled_ops: false, pending_dyn_imports: FuturesUnordered::new(), startup_script, + next_promise_id: AtomicI32::new(1), } } @@ -184,7 +202,7 @@ impl Isolate { /// corresponds to the second argument of Deno.core.dispatch(). pub fn set_dispatch<F>(&mut self, f: F) where - F: Fn(&[u8], Option<PinnedBuf>) -> Op + Send + Sync + 'static, + F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static, { self.dispatch = Some(Arc::new(f)); } @@ -239,6 +257,10 @@ impl Isolate { } } + pub fn get_next_promise_id(&self) -> i32 { + self.next_promise_id.fetch_add(1, Ordering::SeqCst) + } + extern "C" fn pre_dispatch( user_data: *mut c_void, control_argv0: deno_buf, @@ -279,9 +301,17 @@ impl Isolate { // return value. // TODO(ry) check that if JSError thrown during respond(), that it will be // picked up. - let _ = isolate.respond(Some(&buf)); + let _ = + isolate.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref()))); } Op::Async(fut) => { + let promise_id = isolate.get_next_promise_id(); + let _ = isolate.respond(ResponseData::PromiseId(promise_id)); + let fut = Box::new(fut.and_then( + move |buf| -> Result<(c_int, Buf), CoreError> { + Ok((promise_id, buf)) + }, + )); isolate.pending_ops.push(fut); isolate.have_unpolled_ops = true; } @@ -340,14 +370,34 @@ impl Isolate { } } - fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), JSError> { - let buf = match maybe_buf { - None => deno_buf::empty(), - Some(r) => deno_buf::from(r), + // the result type is a placeholder for a more specific enum type + fn respond(&mut self, data: ResponseData) -> Result<(), JSError> { + match data { + ResponseData::PromiseId(pid) => unsafe { + libdeno::deno_respond( + self.libdeno_isolate, + self.as_raw_ptr(), + deno_buf::empty(), + &pid, + ) + }, + ResponseData::Buffer(r) => unsafe { + libdeno::deno_respond( + self.libdeno_isolate, + self.as_raw_ptr(), + r, + null(), + ) + }, + ResponseData::None => unsafe { + libdeno::deno_respond( + self.libdeno_isolate, + self.as_raw_ptr(), + deno_buf::empty(), + null(), + ) + }, }; - unsafe { - libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf) - } if let Some(err) = self.last_exception() { Err(err) } else { @@ -525,7 +575,7 @@ impl Future for Isolate { self.shared_init(); - let mut overflow_response: Option<Buf> = None; + let mut overflow_response: Option<(c_int, Buf)> = None; loop { // If there are any pending dyn_import futures, do those first. @@ -546,13 +596,13 @@ impl Future for Isolate { Err(_) => panic!("unexpected op error"), Ok(Ready(None)) => break, Ok(NotReady) => break, - Ok(Ready(Some(buf))) => { - let successful_push = self.shared.push(&buf); + Ok(Ready(Some(op))) => { + let successful_push = self.shared.push(op.0, &op.1); if !successful_push { // If we couldn't push the response to the shared queue, because // there wasn't enough size, we will return the buffer via the // legacy route, using the argument of deno_respond. - overflow_response = Some(buf); + overflow_response = Some(op); break; } } @@ -560,14 +610,16 @@ impl Future for Isolate { } if self.shared.size() > 0 { - self.respond(None)?; + self.respond(ResponseData::None)?; // The other side should have shifted off all the messages. assert_eq!(self.shared.size(), 0); } if overflow_response.is_some() { - let buf = overflow_response.take().unwrap(); - self.respond(Some(&buf))?; + let op = overflow_response.take().unwrap(); + let promise_id_bytes = op.0.to_be_bytes(); + let buf: Buf = [&promise_id_bytes, &op.1[..]].concat().into(); + self.respond(ResponseData::Buffer(deno_buf::from(buf.as_ref())))?; } self.check_promise_errors(); @@ -664,7 +716,7 @@ pub mod tests { let dispatch_count_ = dispatch_count.clone(); let mut isolate = Isolate::new(StartupData::None, false); - isolate.set_dispatch(move |control, _| -> Op { + isolate.set_dispatch(move |control, _| -> CoreOp { dispatch_count_.fetch_add(1, Ordering::Relaxed); match mode { Mode::AsyncImmediate => { @@ -834,7 +886,7 @@ pub mod tests { "setup2.js", r#" let nrecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((promiseId, buf) => { assert(buf.byteLength === 1); assert(buf[0] === 43); nrecv++; @@ -1025,7 +1077,7 @@ pub mod tests { "overflow_req_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); let response = Deno.core.dispatch(control); @@ -1047,7 +1099,7 @@ pub mod tests { "overflow_res_sync.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { asyncRecv++ }); + Deno.core.setAsyncHandler((promiseId, buf) => { asyncRecv++ }); // Large message that will overflow the shared space. let control = new Uint8Array([42]); let response = Deno.core.dispatch(control); @@ -1068,7 +1120,7 @@ pub mod tests { "overflow_req_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((cmdId, buf) => { assert(buf.byteLength === 1); assert(buf[0] === 43); asyncRecv++; @@ -1076,8 +1128,8 @@ pub mod tests { // Large message that will overflow the shared space. let control = new Uint8Array(100 * 1024 * 1024); let response = Deno.core.dispatch(control); - // Async messages always have null response. - assert(response == null); + // Async messages always have number type response. + assert(typeof response == "number"); assert(asyncRecv == 0); "#, )); @@ -1097,7 +1149,7 @@ pub mod tests { "overflow_res_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((cmdId, buf) => { assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; @@ -1105,7 +1157,7 @@ pub mod tests { // Large message that will overflow the shared space. let control = new Uint8Array([42]); let response = Deno.core.dispatch(control); - assert(response == null); + assert(typeof response == "number"); assert(asyncRecv == 0); "#, )); @@ -1125,7 +1177,7 @@ pub mod tests { "overflow_res_multiple_dispatch_async.js", r#" let asyncRecv = 0; - Deno.core.setAsyncHandler((buf) => { + Deno.core.setAsyncHandler((cmdId, buf) => { assert(buf.byteLength === 100 * 1024 * 1024); assert(buf[0] === 4); asyncRecv++; @@ -1133,7 +1185,7 @@ pub mod tests { // Large message that will overflow the shared space. let control = new Uint8Array([42]); let response = Deno.core.dispatch(control); - assert(response == null); + assert(typeof response == "number"); assert(asyncRecv == 0); // Dispatch another message to verify that pending ops // are done even if shared space overflows diff --git a/core/libdeno.rs b/core/libdeno.rs index 84f21e89e..a17a8e521 100644 --- a/core/libdeno.rs +++ b/core/libdeno.rs @@ -267,6 +267,7 @@ extern "C" { i: *const isolate, user_data: *const c_void, buf: deno_buf, + promise_id: *const c_int, ); pub fn deno_pinned_buf_delete(buf: &mut deno_pinned_buf); pub fn deno_execute( diff --git a/core/libdeno/api.cc b/core/libdeno/api.cc index 8a3a56156..30f82b6cc 100644 --- a/core/libdeno/api.cc +++ b/core/libdeno/api.cc @@ -153,11 +153,15 @@ void deno_pinned_buf_delete(deno_pinned_buf* buf) { auto _ = deno::PinnedBuf(buf); } -void deno_respond(Deno* d_, void* user_data, deno_buf buf) { +void deno_respond(Deno* d_, void* user_data, deno_buf buf, int* promise_id) { auto* d = unwrap(d_); if (d->current_args_ != nullptr) { // Synchronous response. - if (buf.data_ptr != nullptr) { + if (promise_id != nullptr) { + auto number = v8::Number::New(d->isolate_, *promise_id); + d->current_args_->GetReturnValue().Set(number); + } else { + CHECK_NOT_NULL(buf.data_ptr); auto ab = deno::ImportBuf(d, buf); d->current_args_->GetReturnValue().Set(ab); } diff --git a/core/libdeno/deno.h b/core/libdeno/deno.h index 745285554..4f29f2c7a 100644 --- a/core/libdeno/deno.h +++ b/core/libdeno/deno.h @@ -81,8 +81,10 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename, // deno_respond sends up to one message back for every deno_recv_cb made. // // If this is called during deno_recv_cb, the issuing libdeno.send() in -// javascript will synchronously return the specified buf as an ArrayBuffer (or -// null if buf is empty). +// javascript will synchronously return the specified promise_id(number) +// or buf(Uint8Array) (or null if buf and promise_id are both null/empty). +// Calling with non-null for both buf and promise_id will result in the +// promise_id being returned. // // If this is called after deno_recv_cb has returned, the deno_respond // will call into the JS callback specified by libdeno.recv(). @@ -92,7 +94,7 @@ void deno_execute(Deno* d, void* user_data, const char* js_filename, // releasing its memory.) // // If a JS exception was encountered, deno_last_exception() will be non-NULL. -void deno_respond(Deno* d, void* user_data, deno_buf buf); +void deno_respond(Deno* d, void* user_data, deno_buf buf, int* promise_id); // consumes zero_copy void deno_pinned_buf_delete(deno_pinned_buf* buf); diff --git a/core/libdeno/libdeno.d.ts b/core/libdeno/libdeno.d.ts index 1bc7367d9..093e846ab 100644 --- a/core/libdeno/libdeno.d.ts +++ b/core/libdeno/libdeno.d.ts @@ -12,14 +12,13 @@ interface EvalErrorInfo { thrown: any; } -declare interface MessageCallback { - (msg: Uint8Array): void; -} +declare type MessageCallbackInternal = (msg: Uint8Array) => void; declare interface DenoCore { - recv(cb: MessageCallback): void; + recv(cb: MessageCallbackInternal): void; send( + cmdId: number, control: null | ArrayBufferView, data?: ArrayBufferView ): null | Uint8Array; diff --git a/core/libdeno/libdeno_test.cc b/core/libdeno/libdeno_test.cc index 485c95bff..b72a8e098 100644 --- a/core/libdeno/libdeno_test.cc +++ b/core/libdeno/libdeno_test.cc @@ -75,7 +75,7 @@ TEST(LibDenoTest, RecvReturnBar) { EXPECT_EQ(buf.data_ptr[1], 'b'); EXPECT_EQ(buf.data_ptr[2], 'c'); uint8_t response[] = {'b', 'a', 'r'}; - deno_respond(d, user_data, {response, sizeof response}); + deno_respond(d, user_data, {response, sizeof response}, nullptr); }; Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr}); deno_execute(d, d, "a.js", "RecvReturnBar()"); diff --git a/core/shared_queue.js b/core/shared_queue.js index 75f370ce4..b413f011e 100644 --- a/core/shared_queue.js +++ b/core/shared_queue.js @@ -151,14 +151,27 @@ SharedQueue Binary Layout function handleAsyncMsgFromRust(buf) { if (buf) { - asyncHandler(buf); + handleAsyncMsgFromRustInner(buf); } else { while ((buf = shift()) != null) { - asyncHandler(buf); + handleAsyncMsgFromRustInner(buf); } } } + function handleAsyncMsgFromRustInner(buf) { + // DataView to extract cmdId value. + const dataView = new DataView(buf.buffer, buf.byteOffset, 4); + const promiseId = dataView.getInt32(0); + // Uint8 buffer view shifted right and shortened 4 bytes to remove cmdId from view window. + const bufViewFinal = new Uint8Array( + buf.buffer, + buf.byteOffset + 4, + buf.byteLength - 4 + ); + asyncHandler(promiseId, bufViewFinal); + } + function dispatch(control, zeroCopy = null) { maybeInit(); // First try to push control to shared. diff --git a/core/shared_queue.rs b/core/shared_queue.rs index c33a37b90..1460fb172 100644 --- a/core/shared_queue.rs +++ b/core/shared_queue.rs @@ -17,6 +17,7 @@ SharedQueue Binary Layout */ use crate::libdeno::deno_buf; +use libc::c_int; const MAX_RECORDS: usize = 100; /// Total number of records added. @@ -152,17 +153,19 @@ impl SharedQueue { Some(&self.bytes[off..end]) } - pub fn push(&mut self, record: &[u8]) -> bool { + pub fn push(&mut self, promise_id: c_int, record: &[u8]) -> bool { let off = self.head(); - let end = off + record.len(); + let end = off + record.len() + 4; let index = self.num_records(); if end > self.bytes.len() || index >= MAX_RECORDS { debug!("WARNING the sharedQueue overflowed"); return false; } self.set_end(index, end); - assert_eq!(end - off, record.len()); - self.bytes[off..end].copy_from_slice(record); + assert_eq!(end - off, record.len() + 4); + let pid_bytes = promise_id.to_be_bytes(); + self.bytes[off..off + 4].copy_from_slice(&pid_bytes); + self.bytes[off + 4..end].copy_from_slice(record); let u32_slice = self.as_u32_slice_mut(); u32_slice[INDEX_NUM_RECORDS] += 1; u32_slice[INDEX_HEAD] = end as u32; @@ -189,30 +192,30 @@ mod tests { assert!(h > 0); let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice(); - let len = r.len() + h; - assert!(q.push(&r)); + let len = r.len() + h + 4; + assert!(q.push(1, &r)); assert_eq!(q.head(), len); let r = vec![6, 7].into_boxed_slice(); - assert!(q.push(&r)); + assert!(q.push(1, &r)); let r = vec![8, 9, 10, 11].into_boxed_slice(); - assert!(q.push(&r)); + assert!(q.push(1, &r)); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 3); let r = q.shift().unwrap(); - assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice()); + assert_eq!(&r[4..], vec![1, 2, 3, 4, 5].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 2); let r = q.shift().unwrap(); - assert_eq!(r, vec![6, 7].as_slice()); + assert_eq!(&r[4..], vec![6, 7].as_slice()); assert_eq!(q.num_records(), 3); assert_eq!(q.size(), 1); let r = q.shift().unwrap(); - assert_eq!(r, vec![8, 9, 10, 11].as_slice()); + assert_eq!(&r[4..], vec![8, 9, 10, 11].as_slice()); assert_eq!(q.num_records(), 0); assert_eq!(q.size(), 0); @@ -232,19 +235,19 @@ mod tests { #[test] fn overflow() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); - assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1))); + assert!(q.push(1, &alloc_buf(RECOMMENDED_SIZE - 1 - (4 * 2)))); assert_eq!(q.size(), 1); - assert!(!q.push(&alloc_buf(2))); + assert!(!q.push(1, &alloc_buf(2))); assert_eq!(q.size(), 1); - assert!(q.push(&alloc_buf(1))); + assert!(q.push(1, &alloc_buf(1))); assert_eq!(q.size(), 2); - assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1); + assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1 - 4); assert_eq!(q.size(), 1); - assert!(!q.push(&alloc_buf(1))); + assert!(!q.push(1, &alloc_buf(1))); - assert_eq!(q.shift().unwrap().len(), 1); + assert_eq!(q.shift().unwrap().len(), 1 + 4); assert_eq!(q.size(), 0); } @@ -252,11 +255,11 @@ mod tests { fn full_records() { let mut q = SharedQueue::new(RECOMMENDED_SIZE); for _ in 0..MAX_RECORDS { - assert!(q.push(&alloc_buf(1))) + assert!(q.push(1, &alloc_buf(1))) } - assert_eq!(q.push(&alloc_buf(1)), false); + assert_eq!(q.push(1, &alloc_buf(1)), false); // Even if we shift one off, we still cannot push a new record. - assert_eq!(q.shift().unwrap().len(), 1); - assert_eq!(q.push(&alloc_buf(1)), false); + assert_eq!(q.shift().unwrap().len(), 1 + 4); + assert_eq!(q.push(1, &alloc_buf(1)), false); } } |