summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/core.d.ts3
-rw-r--r--core/examples/http_bench.js20
-rw-r--r--core/examples/http_bench.rs47
-rw-r--r--core/isolate.rs108
-rw-r--r--core/lib.rs1
-rw-r--r--core/libdeno.rs6
-rw-r--r--core/libdeno/api.cc10
-rw-r--r--core/libdeno/binding.cc17
-rw-r--r--core/libdeno/deno.h30
-rw-r--r--core/libdeno/libdeno.d.ts3
-rw-r--r--core/libdeno/libdeno_test.cc51
-rw-r--r--core/libdeno/libdeno_test.js39
-rw-r--r--core/libdeno/modules_test.cc20
-rw-r--r--core/shared_queue.js44
-rw-r--r--core/shared_queue.rs62
-rw-r--r--core/shared_queue_test.js23
16 files changed, 302 insertions, 182 deletions
diff --git a/core/core.d.ts b/core/core.d.ts
index b1d1ac57f..1e9eb7c04 100644
--- a/core/core.d.ts
+++ b/core/core.d.ts
@@ -5,11 +5,12 @@
// Deno and therefore do not flow through to the runtime type library.
declare interface MessageCallback {
- (msg: Uint8Array): void;
+ (opId: number, msg: Uint8Array): void;
}
declare interface DenoCore {
dispatch(
+ opId: number,
control: Uint8Array,
zeroCopy?: ArrayBufferView | null
): Uint8Array | null;
diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js
index 8eb764b55..4c68f2be6 100644
--- a/core/examples/http_bench.js
+++ b/core/examples/http_bench.js
@@ -29,20 +29,19 @@ function createResolvable() {
return Object.assign(promise, methods);
}
-const scratch32 = new Int32Array(4);
+const scratch32 = new Int32Array(3);
const scratchBytes = new Uint8Array(
scratch32.buffer,
scratch32.byteOffset,
scratch32.byteLength
);
-assert(scratchBytes.byteLength === 4 * 4);
+assert(scratchBytes.byteLength === 3 * 4);
function send(promiseId, opId, arg, zeroCopy = null) {
scratch32[0] = promiseId;
- scratch32[1] = opId;
- scratch32[2] = arg;
- scratch32[3] = -1;
- return Deno.core.dispatch(scratchBytes, zeroCopy);
+ scratch32[1] = arg;
+ scratch32[2] = -1;
+ return Deno.core.dispatch(opId, scratchBytes, zeroCopy);
}
/** Returns Promise<number> */
@@ -55,13 +54,12 @@ function sendAsync(opId, arg, zeroCopy = null) {
}
function recordFromBuf(buf) {
- assert(buf.byteLength === 16);
+ assert(buf.byteLength === 3 * 4);
const buf32 = new Int32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
return {
promiseId: buf32[0],
- opId: buf32[1],
- arg: buf32[2],
- result: buf32[3]
+ arg: buf32[1],
+ result: buf32[2]
};
}
@@ -72,7 +70,7 @@ function sendSync(opId, arg) {
return record.result;
}
-function handleAsyncMsgFromRust(buf) {
+function handleAsyncMsgFromRust(opId, buf) {
const record = recordFromBuf(buf);
const { promiseId, result } = record;
const p = promiseMap.get(promiseId);
diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs
index 73a4720c2..3c077562d 100644
--- a/core/examples/http_bench.rs
+++ b/core/examples/http_bench.rs
@@ -36,25 +36,23 @@ impl log::Log for Logger {
fn flush(&self) {}
}
-const OP_LISTEN: i32 = 1;
-const OP_ACCEPT: i32 = 2;
-const OP_READ: i32 = 3;
-const OP_WRITE: i32 = 4;
-const OP_CLOSE: i32 = 5;
+const OP_LISTEN: OpId = 1;
+const OP_ACCEPT: OpId = 2;
+const OP_READ: OpId = 3;
+const OP_WRITE: OpId = 4;
+const OP_CLOSE: OpId = 5;
#[derive(Clone, Debug, PartialEq)]
pub struct Record {
pub promise_id: i32,
- pub op_id: i32,
pub arg: i32,
pub result: i32,
}
impl Into<Buf> for Record {
fn into(self) -> Buf {
- let buf32 = vec![self.promise_id, self.op_id, self.arg, self.result]
- .into_boxed_slice();
- let ptr = Box::into_raw(buf32) as *mut [u8; 16];
+ let buf32 = vec![self.promise_id, self.arg, self.result].into_boxed_slice();
+ let ptr = Box::into_raw(buf32) as *mut [u8; 3 * 4];
unsafe { Box::from_raw(ptr) }
}
}
@@ -63,28 +61,26 @@ impl From<&[u8]> for Record {
fn from(s: &[u8]) -> Record {
#[allow(clippy::cast_ptr_alignment)]
let ptr = s.as_ptr() as *const i32;
- let ints = unsafe { std::slice::from_raw_parts(ptr, 4) };
+ let ints = unsafe { std::slice::from_raw_parts(ptr, 3) };
Record {
promise_id: ints[0],
- op_id: ints[1],
- arg: ints[2],
- result: ints[3],
+ arg: ints[1],
+ result: ints[2],
}
}
}
impl From<Buf> for Record {
fn from(buf: Buf) -> Record {
- assert_eq!(buf.len(), 4 * 4);
+ assert_eq!(buf.len(), 3 * 4);
#[allow(clippy::cast_ptr_alignment)]
- let ptr = Box::into_raw(buf) as *mut [i32; 4];
+ let ptr = Box::into_raw(buf) as *mut [i32; 3];
let ints: Box<[i32]> = unsafe { Box::from_raw(ptr) };
- assert_eq!(ints.len(), 4);
+ assert_eq!(ints.len(), 3);
Record {
promise_id: ints[0],
- op_id: ints[1],
- arg: ints[2],
- result: ints[3],
+ arg: ints[1],
+ result: ints[2],
}
}
}
@@ -93,7 +89,6 @@ impl From<Buf> for Record {
fn test_record_from() {
let r = Record {
promise_id: 1,
- op_id: 2,
arg: 3,
result: 4,
};
@@ -102,7 +97,7 @@ fn test_record_from() {
#[cfg(target_endian = "little")]
assert_eq!(
buf,
- vec![1u8, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
+ vec![1u8, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0].into_boxed_slice()
);
let actual = Record::from(buf);
assert_eq!(actual, expected);
@@ -111,10 +106,14 @@ fn test_record_from() {
pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
-fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
+fn dispatch(
+ op_id: OpId,
+ control: &[u8],
+ zero_copy_buf: Option<PinnedBuf>,
+) -> CoreOp {
let record = Record::from(control);
let is_sync = record.promise_id == 0;
- let http_bench_op = match record.op_id {
+ let http_bench_op = match op_id {
OP_LISTEN => {
assert!(is_sync);
op_listen()
@@ -139,7 +138,7 @@ fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> CoreOp {
let rid = record.arg;
op_write(rid, zero_copy_buf)
}
- _ => panic!("bad op {}", record.op_id),
+ _ => panic!("bad op {}", op_id),
};
let mut record_a = record.clone();
let mut record_b = record.clone();
diff --git a/core/isolate.rs b/core/isolate.rs
index 0f693ff92..d3ac4457e 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -12,6 +12,7 @@ use crate::libdeno::deno_buf;
use crate::libdeno::deno_dyn_import_id;
use crate::libdeno::deno_mod;
use crate::libdeno::deno_pinned_buf;
+use crate::libdeno::OpId;
use crate::libdeno::PinnedBuf;
use crate::libdeno::Snapshot1;
use crate::libdeno::Snapshot2;
@@ -33,6 +34,9 @@ pub type Buf = Box<[u8]>;
pub type OpAsyncFuture<E> = Box<dyn Future<Item = Buf, Error = E> + Send>;
+type PendingOpFuture =
+ Box<dyn Future<Item = (OpId, Buf), Error = CoreError> + Send>;
+
pub enum Op<E> {
Sync(Buf),
Async(OpAsyncFuture<E>),
@@ -40,10 +44,13 @@ pub enum Op<E> {
pub type CoreError = ();
-type CoreOpAsyncFuture = OpAsyncFuture<CoreError>;
-
pub type CoreOp = Op<CoreError>;
+pub type OpResult<E> = Result<Op<E>, E>;
+
+/// Args: op_id, control_buf, zero_copy_buf
+type CoreDispatchFn = dyn Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp;
+
/// Stores a script used to initalize a Isolate
pub struct Script<'a> {
pub source: &'a str,
@@ -76,10 +83,6 @@ pub enum StartupData<'a> {
None,
}
-pub type OpResult<E> = Result<Op<E>, E>;
-
-type CoreDispatchFn = dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp;
-
pub type DynImportFuture =
Box<dyn Future<Item = deno_mod, Error = ErrBox> + Send>;
type DynImportFn = dyn Fn(&str, &str) -> DynImportFuture;
@@ -121,7 +124,7 @@ pub struct Isolate {
js_error_create: Arc<JSErrorCreateFn>,
needs_init: bool,
shared: SharedQueue,
- pending_ops: FuturesUnordered<CoreOpAsyncFuture>,
+ pending_ops: FuturesUnordered<PendingOpFuture>,
pending_dyn_imports: FuturesUnordered<DynImport>,
have_unpolled_ops: bool,
startup_script: Option<OwnedScript>,
@@ -198,7 +201,7 @@ impl Isolate {
/// corresponds to the second argument of Deno.core.dispatch().
pub fn set_dispatch<F>(&mut self, f: F)
where
- F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
+ F: Fn(OpId, &[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{
self.dispatch = Some(Arc::new(f));
}
@@ -265,13 +268,14 @@ impl Isolate {
extern "C" fn pre_dispatch(
user_data: *mut c_void,
- control_argv0: deno_buf,
+ op_id: OpId,
+ control_buf: deno_buf,
zero_copy_buf: deno_pinned_buf,
) {
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };
let op = if let Some(ref f) = isolate.dispatch {
- f(control_argv0.as_ref(), PinnedBuf::new(zero_copy_buf))
+ f(op_id, control_buf.as_ref(), PinnedBuf::new(zero_copy_buf))
} else {
panic!("isolate.dispatch not set")
};
@@ -280,13 +284,17 @@ impl Isolate {
match op {
Op::Sync(buf) => {
// For sync messages, we always return the response via Deno.core.send's
- // return value.
- // TODO(ry) check that if JSError thrown during respond(), that it will be
- // picked up.
- let _ = isolate.respond(Some(&buf));
+ // return value. Sync messages ignore the op_id.
+ let op_id = 0;
+ isolate
+ .respond(Some((op_id, &buf)))
+ // Because this is a sync op, deno_respond() does not actually call
+ // into JavaScript. We should not get an error here.
+ .expect("unexpected error");
}
Op::Async(fut) => {
- isolate.pending_ops.push(fut);
+ let fut2 = fut.map(move |buf| (op_id, buf));
+ isolate.pending_ops.push(Box::new(fut2));
isolate.have_unpolled_ops = true;
}
}
@@ -347,13 +355,16 @@ impl Isolate {
}
}
- fn respond(&mut self, maybe_buf: Option<&[u8]>) -> Result<(), ErrBox> {
- let buf = match maybe_buf {
- None => deno_buf::empty(),
- Some(r) => deno_buf::from(r),
+ fn respond(
+ &mut self,
+ maybe_buf: Option<(OpId, &[u8])>,
+ ) -> Result<(), ErrBox> {
+ let (op_id, buf) = match maybe_buf {
+ None => (0, deno_buf::empty()),
+ Some((op_id, r)) => (op_id, deno_buf::from(r)),
};
unsafe {
- libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), buf)
+ libdeno::deno_respond(self.libdeno_isolate, self.as_raw_ptr(), op_id, buf)
}
self.check_last_exception()
}
@@ -541,7 +552,7 @@ impl Future for Isolate {
fn poll(&mut self) -> Poll<(), ErrBox> {
self.shared_init();
- let mut overflow_response: Option<Buf> = None;
+ let mut overflow_response: Option<(OpId, Buf)> = None;
loop {
// If there are any pending dyn_import futures, do those first.
@@ -567,13 +578,13 @@ impl Future for Isolate {
Err(_) => panic!("unexpected op error"),
Ok(Ready(None)) => break,
Ok(NotReady) => break,
- Ok(Ready(Some(buf))) => {
- let successful_push = self.shared.push(&buf);
+ Ok(Ready(Some((op_id, buf)))) => {
+ let successful_push = self.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
- overflow_response = Some(buf);
+ overflow_response = Some((op_id, buf));
break;
}
}
@@ -592,8 +603,8 @@ impl Future for Isolate {
if overflow_response.is_some() {
// Lock the current thread for V8.
let locker = LockerScope::new(self.libdeno_isolate);
- let buf = overflow_response.take().unwrap();
- self.respond(Some(&buf))?;
+ let (op_id, buf) = overflow_response.take().unwrap();
+ self.respond(Some((op_id, &buf)))?;
drop(locker);
}
@@ -633,10 +644,11 @@ impl IsolateHandle {
}
}
-pub fn js_check(r: Result<(), ErrBox>) {
+pub fn js_check<T>(r: Result<T, ErrBox>) -> T {
if let Err(e) = r {
panic!(e.to_string());
}
+ r.unwrap()
}
#[cfg(test)]
@@ -689,7 +701,8 @@ pub mod tests {
let dispatch_count_ = dispatch_count.clone();
let mut isolate = Isolate::new(StartupData::None, false);
- isolate.set_dispatch(move |control, _| -> CoreOp {
+ isolate.set_dispatch(move |op_id, control, _| -> CoreOp {
+ println!("op_id {}", op_id);
dispatch_count_.fetch_add(1, Ordering::Relaxed);
match mode {
Mode::AsyncImmediate => {
@@ -749,9 +762,9 @@ pub mod tests {
"filename.js",
r#"
let control = new Uint8Array([42]);
- Deno.core.send(control);
+ Deno.core.send(42, control);
async function main() {
- Deno.core.send(control);
+ Deno.core.send(42, control);
}
main();
"#,
@@ -770,7 +783,7 @@ pub mod tests {
import { b } from 'b.js'
if (b() != 'b') throw Error();
let control = new Uint8Array([42]);
- Deno.core.send(control);
+ Deno.core.send(42, control);
"#,
)
.unwrap();
@@ -816,7 +829,7 @@ pub mod tests {
"setup2.js",
r#"
let nrecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((opId, buf) => {
nrecv++;
});
"#,
@@ -827,7 +840,7 @@ pub mod tests {
r#"
assert(nrecv == 0);
let control = new Uint8Array([42]);
- Deno.core.send(control);
+ Deno.core.send(42, control);
assert(nrecv == 0);
"#,
));
@@ -838,7 +851,7 @@ pub mod tests {
"check2.js",
r#"
assert(nrecv == 1);
- Deno.core.send(control);
+ Deno.core.send(42, control);
assert(nrecv == 1);
"#,
));
@@ -1016,10 +1029,10 @@ pub mod tests {
"overflow_req_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
- let response = Deno.core.dispatch(control);
+ let response = Deno.core.dispatch(99, control);
assert(response instanceof Uint8Array);
assert(response.length == 1);
assert(response[0] == 43);
@@ -1038,10 +1051,10 @@ pub mod tests {
"overflow_res_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(control);
+ let response = Deno.core.dispatch(99, control);
assert(response instanceof Uint8Array);
assert(response.length == 100 * 1024 * 1024);
assert(response[0] == 99);
@@ -1059,21 +1072,22 @@ pub mod tests {
"overflow_req_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((opId, buf) => {
+ assert(opId == 99);
assert(buf.byteLength === 1);
assert(buf[0] === 43);
asyncRecv++;
});
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
- let response = Deno.core.dispatch(control);
+ let response = Deno.core.dispatch(99, control);
// Async messages always have null response.
assert(response == null);
assert(asyncRecv == 0);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- assert_eq!(Async::Ready(()), isolate.poll().unwrap());
+ assert_eq!(Async::Ready(()), js_check(isolate.poll()));
js_check(isolate.execute("check.js", "assert(asyncRecv == 1);"));
});
}
@@ -1088,14 +1102,15 @@ pub mod tests {
"overflow_res_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((opId, buf) => {
+ assert(opId == 99);
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
});
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(control);
+ let response = Deno.core.dispatch(99, control);
assert(response == null);
assert(asyncRecv == 0);
"#,
@@ -1116,19 +1131,20 @@ pub mod tests {
"overflow_res_multiple_dispatch_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((buf) => {
+ Deno.core.setAsyncHandler((opId, buf) => {
+ assert(opId === 99);
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
});
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(control);
+ let response = Deno.core.dispatch(99, control);
assert(response == null);
assert(asyncRecv == 0);
// Dispatch another message to verify that pending ops
// are done even if shared space overflows
- Deno.core.dispatch(control);
+ Deno.core.dispatch(99, control);
"#,
));
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
diff --git a/core/lib.rs b/core/lib.rs
index 61521aecb..9be1c3891 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -18,6 +18,7 @@ pub use crate::flags::v8_set_flags;
pub use crate::isolate::*;
pub use crate::js_errors::*;
pub use crate::libdeno::deno_mod;
+pub use crate::libdeno::OpId;
pub use crate::libdeno::PinnedBuf;
pub use crate::module_specifier::*;
pub use crate::modules::*;
diff --git a/core/libdeno.rs b/core/libdeno.rs
index c402d8754..071f6ddf5 100644
--- a/core/libdeno.rs
+++ b/core/libdeno.rs
@@ -12,6 +12,8 @@ use std::ptr::null;
use std::ptr::NonNull;
use std::slice;
+pub type OpId = u32;
+
// TODO(F001): change this definition to `extern { pub type isolate; }`
// After RFC 1861 is stablized. See https://github.com/rust-lang/rust/issues/43467.
#[repr(C)]
@@ -188,7 +190,8 @@ impl Snapshot2<'_> {
#[allow(non_camel_case_types)]
type deno_recv_cb = unsafe extern "C" fn(
user_data: *mut c_void,
- control_buf: deno_buf, // deprecated
+ op_id: OpId,
+ control_buf: deno_buf,
zero_copy_buf: deno_pinned_buf,
);
@@ -266,6 +269,7 @@ extern "C" {
pub fn deno_respond(
i: *const isolate,
user_data: *const c_void,
+ op_id: OpId,
buf: deno_buf,
);
pub fn deno_pinned_buf_delete(buf: &mut deno_pinned_buf);
diff --git a/core/libdeno/api.cc b/core/libdeno/api.cc
index 61eeb43ca..1e6b5dfbf 100644
--- a/core/libdeno/api.cc
+++ b/core/libdeno/api.cc
@@ -159,10 +159,11 @@ void deno_pinned_buf_delete(deno_pinned_buf* buf) {
auto _ = deno::PinnedBuf(buf);
}
-void deno_respond(Deno* d_, void* user_data, deno_buf buf) {
+void deno_respond(Deno* d_, void* user_data, deno_op_id op_id, deno_buf buf) {
auto* d = unwrap(d_);
if (d->current_args_ != nullptr) {
// Synchronous response.
+ // Note op_id is not passed back in the case of synchronous response.
if (buf.data_ptr != nullptr) {
auto ab = deno::ImportBuf(d, buf);
d->current_args_->GetReturnValue().Set(ab);
@@ -187,12 +188,13 @@ void deno_respond(Deno* d_, void* user_data, deno_buf buf) {
return;
}
- v8::Local<v8::Value> args[1];
+ v8::Local<v8::Value> args[2];
int argc = 0;
if (buf.data_ptr != nullptr) {
- args[0] = deno::ImportBuf(d, buf);
- argc = 1;
+ args[0] = v8::Integer::New(d->isolate_, op_id);
+ args[1] = deno::ImportBuf(d, buf);
+ argc = 2;
}
auto v = recv_->Call(context, context->Global(), argc, args);
diff --git a/core/libdeno/binding.cc b/core/libdeno/binding.cc
index da582a3bf..291e62f01 100644
--- a/core/libdeno/binding.cc
+++ b/core/libdeno/binding.cc
@@ -223,22 +223,29 @@ void Send(const v8::FunctionCallbackInfo<v8::Value>& args) {
v8::HandleScope handle_scope(isolate);
deno_buf control = {nullptr, 0};
- if (args[0]->IsArrayBufferView()) {
- auto view = v8::Local<v8::ArrayBufferView>::Cast(args[0]);
+
+ int32_t op_id = 0;
+ if (args[0]->IsInt32()) {
+ auto context = d->context_.Get(isolate);
+ op_id = args[0]->Int32Value(context).FromJust();
+ }
+
+ if (args[1]->IsArrayBufferView()) {
+ auto view = v8::Local<v8::ArrayBufferView>::Cast(args[1]);
auto data =
reinterpret_cast<uint8_t*>(view->Buffer()->GetContents().Data());
control = {data + view->ByteOffset(), view->ByteLength()};
}
PinnedBuf zero_copy =
- args[1]->IsArrayBufferView()
- ? PinnedBuf(v8::Local<v8::ArrayBufferView>::Cast(args[1]))
+ args[2]->IsArrayBufferView()
+ ? PinnedBuf(v8::Local<v8::ArrayBufferView>::Cast(args[2]))
: PinnedBuf();
DCHECK_NULL(d->current_args_);
d->current_args_ = &args;
- d->recv_cb_(d->user_data_, control, zero_copy.IntoRaw());
+ d->recv_cb_(d->user_data_, op_id, control, zero_copy.IntoRaw());
if (d->current_args_ == nullptr) {
// This indicates that deno_repond() was called already.
diff --git a/core/libdeno/deno.h b/core/libdeno/deno.h
index fe5214848..2c248a87e 100644
--- a/core/libdeno/deno.h
+++ b/core/libdeno/deno.h
@@ -28,10 +28,22 @@ typedef struct {
typedef struct deno_s Deno;
-// A callback to receive a message from a libdeno.send() javascript call.
+typedef uint32_t deno_op_id;
+
+// A callback to receive a message from a Deno.core.send() javascript call.
// control_buf is valid for only for the lifetime of this callback.
// data_buf is valid until deno_respond() is called.
-typedef void (*deno_recv_cb)(void* user_data, deno_buf control_buf,
+//
+// op_id corresponds to the first argument of Deno.core.send().
+// op_id is an extra user-defined integer valued which is not interpreted by
+// libdeno.
+//
+// control_buf corresponds to the second argument of Deno.core.send().
+//
+// zero_copy_buf corresponds to the third argument of Deno.core.send().
+// The user must call deno_pinned_buf_delete on each zero_copy_buf received.
+typedef void (*deno_recv_cb)(void* user_data, deno_op_id op_id,
+ deno_buf control_buf,
deno_pinned_buf zero_copy_buf);
typedef int deno_dyn_import_id;
@@ -49,7 +61,7 @@ typedef struct {
int will_snapshot; // Default 0. If calling deno_snapshot_new 1.
deno_snapshot load_snapshot; // A startup snapshot to use.
deno_buf shared; // Shared buffer to be mapped to libdeno.shared
- deno_recv_cb recv_cb; // Maps to libdeno.send() calls.
+ deno_recv_cb recv_cb; // Maps to Deno.core.send() calls.
deno_dyn_import_cb dyn_import_cb;
} deno_config;
@@ -78,21 +90,25 @@ void deno_unlock(Deno* d);
void deno_execute(Deno* d, void* user_data, const char* js_filename,
const char* js_source);
-// deno_respond sends up to one message back for every deno_recv_cb made.
+// deno_respond sends one message back for every deno_recv_cb made.
//
-// If this is called during deno_recv_cb, the issuing libdeno.send() in
+// If this is called during deno_recv_cb, the issuing Deno.core.send() in
// javascript will synchronously return the specified buf as an ArrayBuffer (or
// null if buf is empty).
//
// If this is called after deno_recv_cb has returned, the deno_respond
-// will call into the JS callback specified by libdeno.recv().
+// will call into the JS callback specified by Deno.core.recv().
//
// (Ideally, but not currently: After calling deno_respond(), the caller no
// longer owns `buf` and must not use it; deno_respond() is responsible for
// releasing its memory.)
//
+// op_id is an extra user-defined integer valued which is not currently
+// interpreted by libdeno. But it should probably correspond to the op_id in
+// deno_recv_cb.
+//
// If a JS exception was encountered, deno_last_exception() will be non-NULL.
-void deno_respond(Deno* d, void* user_data, deno_buf buf);
+void deno_respond(Deno* d, void* user_data, deno_op_id op_id, deno_buf buf);
// consumes zero_copy
void deno_pinned_buf_delete(deno_pinned_buf* buf);
diff --git a/core/libdeno/libdeno.d.ts b/core/libdeno/libdeno.d.ts
index 1bc7367d9..8a26e49ca 100644
--- a/core/libdeno/libdeno.d.ts
+++ b/core/libdeno/libdeno.d.ts
@@ -13,13 +13,14 @@ interface EvalErrorInfo {
}
declare interface MessageCallback {
- (msg: Uint8Array): void;
+ (opId: number, msg: Uint8Array): void;
}
declare interface DenoCore {
recv(cb: MessageCallback): void;
send(
+ opId: number,
control: null | ArrayBufferView,
data?: ArrayBufferView
): null | Uint8Array;
diff --git a/core/libdeno/libdeno_test.cc b/core/libdeno/libdeno_test.cc
index 16a4a11f6..a72793944 100644
--- a/core/libdeno/libdeno_test.cc
+++ b/core/libdeno/libdeno_test.cc
@@ -49,7 +49,8 @@ void assert_null(deno_pinned_buf b) {
TEST(LibDenoTest, RecvReturnEmpty) {
static int count = 0;
- auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) {
+ auto recv_cb = [](auto _, deno_op_id op_id, auto buf, auto zero_copy_buf) {
+ EXPECT_EQ(op_id, 42u);
assert_null(zero_copy_buf);
count++;
EXPECT_EQ(static_cast<size_t>(3), buf.data_len);
@@ -64,9 +65,43 @@ TEST(LibDenoTest, RecvReturnEmpty) {
deno_delete(d);
}
+TEST(LibDenoTest, BasicRecv) {
+ static int count = 0;
+ auto recv_cb = [](auto user_data, deno_op_id op_id, auto buf,
+ auto zero_copy_buf) {
+ EXPECT_EQ(op_id, 42u);
+ // auto d = reinterpret_cast<Deno*>(user_data);
+ assert_null(zero_copy_buf);
+ count++;
+ EXPECT_EQ(static_cast<size_t>(3), buf.data_len);
+ EXPECT_EQ(buf.data_ptr[0], 1);
+ EXPECT_EQ(buf.data_ptr[1], 2);
+ EXPECT_EQ(buf.data_ptr[2], 3);
+ };
+ Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr});
+ deno_execute(d, d, "a.js", "BasicRecv()");
+ EXPECT_EQ(nullptr, deno_last_exception(d));
+ EXPECT_EQ(count, 1);
+ deno_check_promise_errors(d);
+ EXPECT_EQ(deno_last_exception(d), nullptr);
+ {
+ deno_lock(d);
+ uint8_t response[] = {'b', 'a', 'r'};
+ deno_respond(d, nullptr, 43, {response, sizeof response});
+ deno_unlock(d);
+ }
+ EXPECT_EQ(count, 2);
+ EXPECT_EQ(nullptr, deno_last_exception(d));
+ deno_check_promise_errors(d);
+ EXPECT_EQ(deno_last_exception(d), nullptr);
+ deno_delete(d);
+}
+
TEST(LibDenoTest, RecvReturnBar) {
static int count = 0;
- auto recv_cb = [](auto user_data, auto buf, auto zero_copy_buf) {
+ auto recv_cb = [](auto user_data, deno_op_id op_id, auto buf,
+ auto zero_copy_buf) {
+ EXPECT_EQ(op_id, 42u);
auto d = reinterpret_cast<Deno*>(user_data);
assert_null(zero_copy_buf);
count++;
@@ -75,7 +110,7 @@ TEST(LibDenoTest, RecvReturnBar) {
EXPECT_EQ(buf.data_ptr[1], 'b');
EXPECT_EQ(buf.data_ptr[2], 'c');
uint8_t response[] = {'b', 'a', 'r'};
- deno_respond(d, user_data, {response, sizeof response});
+ deno_respond(d, user_data, op_id, {response, sizeof response});
};
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr});
deno_execute(d, d, "a.js", "RecvReturnBar()");
@@ -126,8 +161,9 @@ TEST(LibDenoTest, GlobalErrorHandling) {
TEST(LibDenoTest, ZeroCopyBuf) {
static int count = 0;
static deno_pinned_buf zero_copy_buf2;
- auto recv_cb = [](auto user_data, deno_buf buf,
+ auto recv_cb = [](auto user_data, deno_op_id op_id, deno_buf buf,
deno_pinned_buf zero_copy_buf) {
+ EXPECT_EQ(op_id, 42u);
count++;
EXPECT_NE(zero_copy_buf.pin, nullptr);
zero_copy_buf.data_ptr[0] = 4;
@@ -155,7 +191,9 @@ TEST(LibDenoTest, ZeroCopyBuf) {
TEST(LibDenoTest, CheckPromiseErrors) {
static int count = 0;
- auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) { count++; };
+ auto recv_cb = [](auto _, deno_op_id op_id, auto buf, auto zero_copy_buf) {
+ count++;
+ };
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, nullptr});
EXPECT_EQ(deno_last_exception(d), nullptr);
deno_execute(d, nullptr, "a.js", "CheckPromiseErrors()");
@@ -264,7 +302,8 @@ TEST(LibDenoTest, SharedAtomics) {
TEST(LibDenoTest, WasmInstantiate) {
static int count = 0;
- auto recv_cb = [](auto _, auto buf, auto zero_copy_buf) {
+ auto recv_cb = [](auto _, deno_op_id op_id, auto buf, auto zero_copy_buf) {
+ EXPECT_EQ(op_id, 42u);
EXPECT_EQ(buf.data_len, 1u);
EXPECT_EQ(buf.data_ptr[0], 42);
count++;
diff --git a/core/libdeno/libdeno_test.js b/core/libdeno/libdeno_test.js
index 006c71666..1c7655391 100644
--- a/core/libdeno/libdeno_test.js
+++ b/core/libdeno/libdeno_test.js
@@ -28,15 +28,30 @@ global.TypedArraySnapshots = () => {
global.RecvReturnEmpty = () => {
const m1 = new Uint8Array("abc".split("").map(c => c.charCodeAt(0)));
const m2 = m1.slice();
- const r1 = Deno.core.send(m1);
+ const r1 = Deno.core.send(42, m1);
assert(r1 == null);
- const r2 = Deno.core.send(m2);
+ const r2 = Deno.core.send(42, m2);
assert(r2 == null);
};
+global.BasicRecv = () => {
+ const m = new Uint8Array([1, 2, 3]);
+ Deno.core.recv((opId, buf) => {
+ assert(opId === 43);
+ assert(buf instanceof Uint8Array);
+ assert(buf.byteLength === 3);
+ const s = String.fromCharCode(...buf);
+ assert(s === "bar");
+ const r = Deno.core.send(42, m);
+ assert(!r); // async
+ });
+ const r = Deno.core.send(42, m);
+ assert(!r); // async
+};
+
global.RecvReturnBar = () => {
const m = new Uint8Array("abc".split("").map(c => c.charCodeAt(0)));
- const r = Deno.core.send(m);
+ const r = Deno.core.send(42, m);
assert(r instanceof Uint8Array);
assert(r.byteLength === 3);
const rstr = String.fromCharCode(...r);
@@ -58,7 +73,7 @@ global.SendRecvSlice = () => {
buf[0] = 100 + i;
buf[buf.length - 1] = 100 - i;
// On the native side, the slice is shortened by 19 bytes.
- buf = Deno.core.send(buf);
+ buf = Deno.core.send(42, buf);
assert(buf.byteOffset === i * 11);
assert(buf.byteLength === abLen - i * 30 - 19);
assert(buf.buffer.byteLength == abLen);
@@ -78,17 +93,17 @@ global.JSSendArrayBufferViewTypes = () => {
const ab1 = new ArrayBuffer(4321);
const u8 = new Uint8Array(ab1, 2468, 1000);
u8[0] = 1;
- Deno.core.send(u8);
+ Deno.core.send(42, u8);
// Send Uint32Array.
const ab2 = new ArrayBuffer(4321);
const u32 = new Uint32Array(ab2, 2468, 1000 / Uint32Array.BYTES_PER_ELEMENT);
u32[0] = 0x02020202;
- Deno.core.send(u32);
+ Deno.core.send(42, u32);
// Send DataView.
const ab3 = new ArrayBuffer(4321);
const dv = new DataView(ab3, 2468, 1000);
dv.setUint8(0, 3);
- Deno.core.send(dv);
+ Deno.core.send(42, dv);
};
// The following join has caused SnapshotBug to segfault when using kKeep.
@@ -110,7 +125,7 @@ global.ZeroCopyBuf = () => {
const b = zeroCopyBuf;
// The second parameter of send should modified by the
// privileged side.
- const r = Deno.core.send(a, b);
+ const r = Deno.core.send(42, a, b);
assert(r == null);
// b is different.
assert(b[0] === 4);
@@ -129,7 +144,7 @@ global.CheckPromiseErrors = () => {
try {
await fn();
} catch (e) {
- Deno.core.send(new Uint8Array([42]));
+ Deno.core.send(42, new Uint8Array([42]));
}
})();
};
@@ -239,17 +254,17 @@ global.WasmInstantiate = () => {
]);
(async () => {
- Deno.core.send(new Uint8Array([42]));
+ Deno.core.send(42, new Uint8Array([42]));
const wasm = await WebAssembly.instantiate(bytes);
- Deno.core.send(new Uint8Array([42]));
+ Deno.core.send(42, new Uint8Array([42]));
const result = wasm.instance.exports.add(1, 3);
if (result != 4) {
throw Error("bad");
}
// To signal success, we send back a fixed buffer.
- Deno.core.send(new Uint8Array([42]));
+ Deno.core.send(42, new Uint8Array([42]));
})();
};
diff --git a/core/libdeno/modules_test.cc b/core/libdeno/modules_test.cc
index 987e88791..e11231528 100644
--- a/core/libdeno/modules_test.cc
+++ b/core/libdeno/modules_test.cc
@@ -2,9 +2,11 @@
#include "test.h"
static int exec_count = 0;
-void recv_cb(void* user_data, deno_buf buf, deno_pinned_buf zero_copy_buf) {
+void recv_cb(void* user_data, deno_op_id op_id, deno_buf buf,
+ deno_pinned_buf zero_copy_buf) {
// We use this to check that scripts have executed.
EXPECT_EQ(1u, buf.data_len);
+ EXPECT_EQ(42u, op_id);
EXPECT_EQ(buf.data_ptr[0], 4);
EXPECT_EQ(zero_copy_buf.data_ptr, nullptr);
EXPECT_EQ(zero_copy_buf.data_len, 0u);
@@ -20,7 +22,7 @@ TEST(ModulesTest, Resolution) {
static deno_mod a = deno_mod_new(d, true, "a.js",
"import { b } from 'b.js'\n"
"if (b() != 'b') throw Error();\n"
- "Deno.core.send(new Uint8Array([4]));");
+ "Deno.core.send(42, new Uint8Array([4]));");
EXPECT_NE(a, 0);
EXPECT_EQ(nullptr, deno_last_exception(d));
@@ -72,7 +74,7 @@ TEST(ModulesTest, ResolutionError) {
static deno_mod a = deno_mod_new(d, true, "a.js",
"import 'bad'\n"
- "Deno.core.send(new Uint8Array([4]));");
+ "Deno.core.send(42, new Uint8Array([4]));");
EXPECT_NE(a, 0);
EXPECT_EQ(nullptr, deno_last_exception(d));
@@ -106,7 +108,7 @@ TEST(ModulesTest, ImportMetaUrl) {
static deno_mod a =
deno_mod_new(d, true, "a.js",
"if ('a.js' != import.meta.url) throw 'hmm'\n"
- "Deno.core.send(new Uint8Array([4]));");
+ "Deno.core.send(42, new Uint8Array([4]));");
EXPECT_NE(a, 0);
EXPECT_EQ(nullptr, deno_last_exception(d));
@@ -165,7 +167,7 @@ TEST(ModulesTest, DynamicImportSuccess) {
" let mod = await import('foo'); \n"
" assert(mod.b() === 'b'); \n"
// Send a message to signify that we're done.
- " Deno.core.send(new Uint8Array([4])); \n"
+ " Deno.core.send(42, new Uint8Array([4])); \n"
"})(); \n";
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, dyn_import_cb});
static deno_mod a = deno_mod_new(d, true, "a.js", src);
@@ -206,7 +208,7 @@ TEST(ModulesTest, DynamicImportError) {
"(async () => { \n"
" let mod = await import('foo'); \n"
// The following should be unreachable.
- " Deno.core.send(new Uint8Array([4])); \n"
+ " Deno.core.send(42, new Uint8Array([4])); \n"
"})(); \n";
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, dyn_import_cb});
static deno_mod a = deno_mod_new(d, true, "a.js", src);
@@ -249,7 +251,7 @@ TEST(ModulesTest, DynamicImportAsync) {
" mod = await import('foo'); \n"
" assert(mod.b() === 'b'); \n"
// Send a message to signify that we're done.
- " Deno.core.send(new Uint8Array([4])); \n"
+ " Deno.core.send(42, new Uint8Array([4])); \n"
"})(); \n";
Deno* d = deno_new(deno_config{0, snapshot, empty, recv_cb, dyn_import_cb});
static deno_mod a = deno_mod_new(d, true, "a.js", src);
@@ -327,7 +329,7 @@ TEST(ModulesTest, DynamicImportThrows) {
"(async () => { \n"
" let mod = await import('b.js'); \n"
// unreachable
- " Deno.core.send(new Uint8Array([4])); \n"
+ " Deno.core.send(42, new Uint8Array([4])); \n"
"})(); \n";
static deno_mod a = deno_mod_new(d, true, "a.js", a_src);
EXPECT_NE(a, 0);
@@ -401,7 +403,7 @@ TEST(ModulesTest, DynamicImportSyntaxError) {
"(async () => { \n"
" let mod = await import('b.js'); \n"
// unreachable
- " Deno.core.send(new Uint8Array([4])); \n"
+ " Deno.core.send(42, new Uint8Array([4])); \n"
"})(); \n";
static deno_mod a = deno_mod_new(d, true, "a.js", src);
EXPECT_NE(a, 0);
diff --git a/core/shared_queue.js b/core/shared_queue.js
index 1b338b052..b69f1b422 100644
--- a/core/shared_queue.js
+++ b/core/shared_queue.js
@@ -26,7 +26,7 @@ SharedQueue Binary Layout
const INDEX_NUM_SHIFTED_OFF = 1;
const INDEX_HEAD = 2;
const INDEX_OFFSETS = 3;
- const INDEX_RECORDS = 3 + MAX_RECORDS;
+ const INDEX_RECORDS = INDEX_OFFSETS + 2 * MAX_RECORDS;
const HEAD_INIT = 4 * INDEX_RECORDS;
// Available on start due to bindings.
@@ -84,13 +84,17 @@ SharedQueue Binary Layout
return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
}
- function setEnd(index, end) {
- shared32[INDEX_OFFSETS + index] = end;
+ // TODO(ry) rename to setMeta
+ function setMeta(index, end, opId) {
+ shared32[INDEX_OFFSETS + 2 * index] = end;
+ shared32[INDEX_OFFSETS + 2 * index + 1] = opId;
}
- function getEnd(index) {
+ function getMeta(index) {
if (index < numRecords()) {
- return shared32[INDEX_OFFSETS + index];
+ const buf = shared32[INDEX_OFFSETS + 2 * index];
+ const opId = shared32[INDEX_OFFSETS + 2 * index + 1];
+ return [opId, buf];
} else {
return null;
}
@@ -101,14 +105,14 @@ SharedQueue Binary Layout
if (index == 0) {
return HEAD_INIT;
} else {
- return shared32[INDEX_OFFSETS + index - 1];
+ return shared32[INDEX_OFFSETS + 2 * (index - 1)];
}
} else {
return null;
}
}
- function push(buf) {
+ function push(opId, buf) {
let off = head();
let end = off + buf.byteLength;
let index = numRecords();
@@ -116,7 +120,7 @@ SharedQueue Binary Layout
// console.log("shared_queue.js push fail");
return false;
}
- setEnd(index, end);
+ setMeta(index, end, opId);
assert(end - off == buf.byteLength);
sharedBytes.set(buf, off);
shared32[INDEX_NUM_RECORDS] += 1;
@@ -132,8 +136,8 @@ SharedQueue Binary Layout
return null;
}
- let off = getOffset(i);
- let end = getEnd(i);
+ const off = getOffset(i);
+ const [opId, end] = getMeta(i);
if (size() > 1) {
shared32[INDEX_NUM_SHIFTED_OFF] += 1;
@@ -143,7 +147,8 @@ SharedQueue Binary Layout
assert(off != null);
assert(end != null);
- return sharedBytes.subarray(off, end);
+ const buf = sharedBytes.subarray(off, end);
+ return [opId, buf];
}
let asyncHandler;
@@ -153,19 +158,24 @@ SharedQueue Binary Layout
asyncHandler = cb;
}
- function handleAsyncMsgFromRust(buf) {
+ function handleAsyncMsgFromRust(opId, buf) {
if (buf) {
- asyncHandler(buf);
+ // This is the overflow_response case of deno::Isolate::poll().
+ asyncHandler(opId, buf);
} else {
- while ((buf = shift()) != null) {
- asyncHandler(buf);
+ while (true) {
+ let opIdBuf = shift();
+ if (opIdBuf == null) {
+ break;
+ }
+ asyncHandler(...opIdBuf);
}
}
}
- function dispatch(control, zeroCopy = null) {
+ function dispatch(opId, control, zeroCopy = null) {
maybeInit();
- return Deno.core.send(control, zeroCopy);
+ return Deno.core.send(opId, control, zeroCopy);
}
const denoCore = {
diff --git a/core/shared_queue.rs b/core/shared_queue.rs
index 616272f8d..11c8e2127 100644
--- a/core/shared_queue.rs
+++ b/core/shared_queue.rs
@@ -17,6 +17,7 @@ SharedQueue Binary Layout
*/
use crate::libdeno::deno_buf;
+use crate::libdeno::OpId;
const MAX_RECORDS: usize = 100;
/// Total number of records added.
@@ -27,7 +28,7 @@ const INDEX_NUM_SHIFTED_OFF: usize = 1;
/// It grows monotonically.
const INDEX_HEAD: usize = 2;
const INDEX_OFFSETS: usize = 3;
-const INDEX_RECORDS: usize = 3 + MAX_RECORDS;
+const INDEX_RECORDS: usize = INDEX_OFFSETS + 2 * MAX_RECORDS;
/// Byte offset of where the records begin. Also where the head starts.
const HEAD_INIT: usize = 4 * INDEX_RECORDS;
/// A rough guess at how big we should make the shared buffer in bytes.
@@ -98,16 +99,19 @@ impl SharedQueue {
s[INDEX_NUM_SHIFTED_OFF] as usize
}
- fn set_end(&mut self, index: usize, end: usize) {
+ fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) {
let s = self.as_u32_slice_mut();
- s[INDEX_OFFSETS + index] = end as u32;
+ s[INDEX_OFFSETS + 2 * index] = end as u32;
+ s[INDEX_OFFSETS + 2 * index + 1] = op_id;
}
#[cfg(test)]
- fn get_end(&self, index: usize) -> Option<usize> {
+ fn get_meta(&self, index: usize) -> Option<(OpId, usize)> {
if index < self.num_records() {
let s = self.as_u32_slice();
- Some(s[INDEX_OFFSETS + index] as usize)
+ let end = s[INDEX_OFFSETS + 2 * index] as usize;
+ let op_id = s[INDEX_OFFSETS + 2 * index + 1];
+ Some((op_id, end))
} else {
None
}
@@ -120,7 +124,7 @@ impl SharedQueue {
HEAD_INIT
} else {
let s = self.as_u32_slice();
- s[INDEX_OFFSETS + index - 1] as usize
+ s[INDEX_OFFSETS + 2 * (index - 1)] as usize
})
} else {
None
@@ -129,7 +133,7 @@ impl SharedQueue {
/// Returns none if empty.
#[cfg(test)]
- pub fn shift(&mut self) -> Option<&[u8]> {
+ pub fn shift(&mut self) -> Option<(OpId, &[u8])> {
let u32_slice = self.as_u32_slice();
let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize;
if self.size() == 0 {
@@ -138,7 +142,7 @@ impl SharedQueue {
}
let off = self.get_offset(i).unwrap();
- let end = self.get_end(i).unwrap();
+ let (op_id, end) = self.get_meta(i).unwrap();
if self.size() > 1 {
let u32_slice = self.as_u32_slice_mut();
@@ -146,16 +150,16 @@ impl SharedQueue {
} else {
self.reset();
}
- debug!(
+ println!(
"rust:shared_queue:shift: num_records={}, num_shifted_off={}, head={}",
self.num_records(),
self.num_shifted_off(),
self.head()
);
- Some(&self.bytes[off..end])
+ Some((op_id, &self.bytes[off..end]))
}
- pub fn push(&mut self, record: &[u8]) -> bool {
+ pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool {
let off = self.head();
let end = off + record.len();
let index = self.num_records();
@@ -163,7 +167,7 @@ impl SharedQueue {
debug!("WARNING the sharedQueue overflowed");
return false;
}
- self.set_end(index, end);
+ self.set_meta(index, end, op_id);
assert_eq!(end - off, record.len());
self.bytes[off..end].copy_from_slice(record);
let u32_slice = self.as_u32_slice_mut();
@@ -193,28 +197,28 @@ mod tests {
let r = vec![1u8, 2, 3, 4, 5].into_boxed_slice();
let len = r.len() + h;
- assert!(q.push(&r));
+ assert!(q.push(0, &r));
assert_eq!(q.head(), len);
let r = vec![6, 7].into_boxed_slice();
- assert!(q.push(&r));
+ assert!(q.push(0, &r));
let r = vec![8, 9, 10, 11].into_boxed_slice();
- assert!(q.push(&r));
+ assert!(q.push(0, &r));
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 3);
- let r = q.shift().unwrap();
+ let (_op_id, r) = q.shift().unwrap();
assert_eq!(r, vec![1, 2, 3, 4, 5].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 2);
- let r = q.shift().unwrap();
+ let (_op_id, r) = q.shift().unwrap();
assert_eq!(r, vec![6, 7].as_slice());
assert_eq!(q.num_records(), 3);
assert_eq!(q.size(), 1);
- let r = q.shift().unwrap();
+ let (_op_id, r) = q.shift().unwrap();
assert_eq!(r, vec![8, 9, 10, 11].as_slice());
assert_eq!(q.num_records(), 0);
assert_eq!(q.size(), 0);
@@ -235,19 +239,21 @@ mod tests {
#[test]
fn overflow() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
- assert!(q.push(&alloc_buf(RECOMMENDED_SIZE - 1)));
+ assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 1)));
assert_eq!(q.size(), 1);
- assert!(!q.push(&alloc_buf(2)));
+ assert!(!q.push(0, &alloc_buf(2)));
assert_eq!(q.size(), 1);
- assert!(q.push(&alloc_buf(1)));
+ assert!(q.push(0, &alloc_buf(1)));
assert_eq!(q.size(), 2);
- assert_eq!(q.shift().unwrap().len(), RECOMMENDED_SIZE - 1);
+ let (_op_id, buf) = q.shift().unwrap();
+ assert_eq!(buf.len(), RECOMMENDED_SIZE - 1);
assert_eq!(q.size(), 1);
- assert!(!q.push(&alloc_buf(1)));
+ assert!(!q.push(0, &alloc_buf(1)));
- assert_eq!(q.shift().unwrap().len(), 1);
+ let (_op_id, buf) = q.shift().unwrap();
+ assert_eq!(buf.len(), 1);
assert_eq!(q.size(), 0);
}
@@ -255,11 +261,11 @@ mod tests {
fn full_records() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
for _ in 0..MAX_RECORDS {
- assert!(q.push(&alloc_buf(1)))
+ assert!(q.push(0, &alloc_buf(1)))
}
- assert_eq!(q.push(&alloc_buf(1)), false);
+ assert_eq!(q.push(0, &alloc_buf(1)), false);
// Even if we shift one off, we still cannot push a new record.
- assert_eq!(q.shift().unwrap().len(), 1);
- assert_eq!(q.push(&alloc_buf(1)), false);
+ let _ignored = q.shift().unwrap();
+ assert_eq!(q.push(0, &alloc_buf(1)), false);
}
}
diff --git a/core/shared_queue_test.js b/core/shared_queue_test.js
index e2597f3bc..682d41d1e 100644
--- a/core/shared_queue_test.js
+++ b/core/shared_queue_test.js
@@ -11,14 +11,15 @@ function fullRecords(q) {
q.reset();
const oneByte = new Uint8Array([42]);
for (let i = 0; i < q.MAX_RECORDS; i++) {
- assert(q.push(oneByte));
+ assert(q.push(99, oneByte));
}
- assert(!q.push(oneByte));
- r = q.shift();
+ assert(!q.push(99, oneByte));
+ const [opId, r] = q.shift();
+ assert(opId == 99);
assert(r.byteLength == 1);
assert(r[0] == 42);
// Even if we shift one off, we still cannot push a new record.
- assert(!q.push(oneByte));
+ assert(!q.push(99, oneByte));
}
function main() {
@@ -29,18 +30,19 @@ function main() {
let r = new Uint8Array([1, 2, 3, 4, 5]);
let len = r.byteLength + h;
- assert(q.push(r));
+ assert(q.push(99, r));
assert(q.head() == len);
r = new Uint8Array([6, 7]);
- assert(q.push(r));
+ assert(q.push(99, r));
r = new Uint8Array([8, 9, 10, 11]);
- assert(q.push(r));
+ assert(q.push(99, r));
assert(q.numRecords() == 3);
assert(q.size() == 3);
- r = q.shift();
+ let opId;
+ [opId, r] = q.shift();
assert(r.byteLength == 5);
assert(r[0] == 1);
assert(r[1] == 2);
@@ -50,14 +52,15 @@ function main() {
assert(q.numRecords() == 3);
assert(q.size() == 2);
- r = q.shift();
+ [opId, r] = q.shift();
assert(r.byteLength == 2);
assert(r[0] == 6);
assert(r[1] == 7);
assert(q.numRecords() == 3);
assert(q.size() == 1);
- r = q.shift();
+ [opId, r] = q.shift();
+ assert(opId == 99);
assert(r.byteLength == 4);
assert(r[0] == 8);
assert(r[1] == 9);