summaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/Cargo.toml1
-rw-r--r--core/benches/op_baseline.rs8
-rw-r--r--core/bindings.rs126
-rw-r--r--core/core.js401
-rw-r--r--core/core_test.js88
-rw-r--r--core/examples/hello_world.rs53
-rw-r--r--core/examples/http_bench_bin_ops.js11
-rw-r--r--core/examples/http_bench_json_ops.js20
-rw-r--r--core/examples/http_bench_json_ops.rs45
-rw-r--r--core/lib.deno_core.d.ts4
-rw-r--r--core/lib.rs6
-rw-r--r--core/ops.rs154
-rw-r--r--core/ops_bin.rs340
-rw-r--r--core/ops_json.rs104
-rw-r--r--core/plugin_api.rs1
-rw-r--r--core/runtime.rs512
-rw-r--r--core/shared_queue.rs313
17 files changed, 509 insertions, 1678 deletions
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 450aa7600..ad01d2d26 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -14,6 +14,7 @@ path = "lib.rs"
[dependencies]
anyhow = "1.0.38"
+erased-serde = "0.3.13"
futures = "0.3.12"
indexmap = "1.6.1"
lazy_static = "1.4.0"
diff --git a/core/benches/op_baseline.rs b/core/benches/op_baseline.rs
index ed10c2d16..9c288fe27 100644
--- a/core/benches/op_baseline.rs
+++ b/core/benches/op_baseline.rs
@@ -5,12 +5,14 @@ use deno_core::json_op_sync;
use deno_core::v8;
use deno_core::JsRuntime;
use deno_core::Op;
+use deno_core::OpResponse;
fn create_js_runtime() -> JsRuntime {
let mut runtime = JsRuntime::new(Default::default());
runtime.register_op("pi_bin", bin_op_sync(|_, _, _| Ok(314159)));
runtime.register_op("pi_json", json_op_sync(|_, _: (), _| Ok(314159)));
- runtime.register_op("nop", |_, _| Op::Sync(Box::new(9_u64.to_le_bytes())));
+ runtime
+ .register_op("nop", |_, _, _| Op::Sync(OpResponse::Value(Box::new(9))));
// Init ops
runtime
@@ -43,7 +45,7 @@ fn bench_op_pi_bin(b: &mut Bencher) {
bench_runtime_js(
b,
r#"for(let i=0; i < 1e3; i++) {
- Deno.core.binOpSync("pi_bin", 0);
+ Deno.core.binOpSync("pi_bin", 0, nopView);
}"#,
);
}
@@ -61,7 +63,7 @@ fn bench_op_nop(b: &mut Bencher) {
bench_runtime_js(
b,
r#"for(let i=0; i < 1e3; i++) {
- Deno.core.dispatchByName("nop", nopView);
+ Deno.core.dispatchByName("nop", null, null, nopView);
}"#,
);
}
diff --git a/core/bindings.rs b/core/bindings.rs
index 4665803be..3484d3cbd 100644
--- a/core/bindings.rs
+++ b/core/bindings.rs
@@ -1,11 +1,13 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::error::AnyError;
-use crate::runtime::JsRuntimeState;
use crate::JsRuntime;
use crate::Op;
use crate::OpId;
+use crate::OpPayload;
+use crate::OpResponse;
use crate::OpTable;
+use crate::PromiseId;
use crate::ZeroCopyBuf;
use futures::future::FutureExt;
use rusty_v8 as v8;
@@ -38,9 +40,6 @@ lazy_static::lazy_static! {
function: eval_context.map_fn_to()
},
v8::ExternalReference {
- getter: shared_getter.map_fn_to()
- },
- v8::ExternalReference {
function: queue_microtask.map_fn_to()
},
v8::ExternalReference {
@@ -142,9 +141,6 @@ pub fn initialize_context<'s>(
set_func(scope, core_val, "getProxyDetails", get_proxy_details);
set_func(scope, core_val, "heapStats", heap_stats);
- let shared_key = v8::String::new(scope, "shared").unwrap();
- core_val.set_accessor(scope, shared_key.into(), shared_getter);
-
// Direct bindings on `window`.
set_func(scope, global, "queueMicrotask", queue_microtask);
@@ -380,59 +376,86 @@ fn send<'s>(
let mut state = state_rc.borrow_mut();
let op_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
+ .map(|l| l.value() as OpId)
.map_err(AnyError::from)
- .and_then(|l| OpId::try_from(l.value()).map_err(AnyError::from))
{
Ok(op_id) => op_id,
Err(err) => {
- let msg = format!("invalid op id: {}", err);
- let msg = v8::String::new(scope, &msg).unwrap();
- let exc = v8::Exception::type_error(scope, msg);
- scope.throw_exception(exc);
+ throw_type_error(scope, format!("invalid op id: {}", err));
return;
}
};
- let buf_iter = (1..args.length()).map(|idx| {
- v8::Local::<v8::ArrayBufferView>::try_from(args.get(idx))
- .map(|view| ZeroCopyBuf::new(scope, view))
- .map_err(|err| {
- let msg = format!("Invalid argument at position {}: {}", idx, err);
- let msg = v8::String::new(scope, &msg).unwrap();
- v8::Exception::type_error(scope, msg)
- })
- });
-
- let bufs = match buf_iter.collect::<Result<_, _>>() {
- Ok(bufs) => bufs,
- Err(exc) => {
- scope.throw_exception(exc);
+ // send(0) returns obj of all ops, handle as special case
+ if op_id == 0 {
+ // TODO: Serialize as HashMap when serde_v8 supports maps ...
+ let ops = OpTable::op_entries(state.op_state.clone());
+ rv.set(to_v8(scope, ops).unwrap());
+ return;
+ }
+
+ // PromiseId
+ let arg1 = args.get(1);
+ let promise_id = if arg1.is_null_or_undefined() {
+ Ok(0) // Accept null or undefined as 0
+ } else {
+ // Otherwise expect int
+ v8::Local::<v8::Integer>::try_from(arg1)
+ .map(|l| l.value() as PromiseId)
+ .map_err(AnyError::from)
+ };
+ // Fail if promise id invalid (not null/undefined or int)
+ let promise_id: PromiseId = match promise_id {
+ Ok(promise_id) => promise_id,
+ Err(err) => {
+ throw_type_error(scope, format!("invalid promise id: {}", err));
return;
}
};
- let op = OpTable::route_op(op_id, state.op_state.clone(), bufs);
- assert_eq!(state.shared.size(), 0);
- match op {
- Op::Sync(buf) if !buf.is_empty() => {
- rv.set(boxed_slice_to_uint8array(scope, buf).into());
+ // Structured args
+ let v = args.get(2);
+
+ // Buf arg (optional)
+ let arg3 = args.get(3);
+ let buf: Option<ZeroCopyBuf> = if arg3.is_null_or_undefined() {
+ None
+ } else {
+ match v8::Local::<v8::ArrayBufferView>::try_from(arg3)
+ .map(|view| ZeroCopyBuf::new(scope, view))
+ .map_err(AnyError::from)
+ {
+ Ok(buf) => Some(buf),
+ Err(err) => {
+ throw_type_error(scope, format!("Err with buf arg: {}", err));
+ return;
+ }
}
- Op::Sync(_) => {}
+ };
+
+ let payload = OpPayload::new(scope, v);
+ let op = OpTable::route_op(op_id, state.op_state.clone(), payload, buf);
+ match op {
+ Op::Sync(resp) => match resp {
+ OpResponse::Value(v) => {
+ rv.set(to_v8(scope, v).unwrap());
+ }
+ OpResponse::Buffer(buf) => {
+ rv.set(boxed_slice_to_uint8array(scope, buf).into());
+ }
+ },
Op::Async(fut) => {
- let fut2 = fut.map(move |buf| (op_id, buf));
+ let fut2 = fut.map(move |resp| (promise_id, resp));
state.pending_ops.push(fut2.boxed_local());
state.have_unpolled_ops = true;
}
Op::AsyncUnref(fut) => {
- let fut2 = fut.map(move |buf| (op_id, buf));
+ let fut2 = fut.map(move |resp| (promise_id, resp));
state.pending_unref_ops.push(fut2.boxed_local());
state.have_unpolled_ops = true;
}
Op::NotFound => {
- let msg = format!("Unknown op id: {}", op_id);
- let msg = v8::String::new(scope, &msg).unwrap();
- let exc = v8::Exception::type_error(scope, msg);
- scope.throw_exception(exc);
+ throw_type_error(scope, format!("Unknown op id: {}", op_id));
}
}
}
@@ -711,33 +734,6 @@ fn queue_microtask(
};
}
-fn shared_getter(
- scope: &mut v8::HandleScope,
- _name: v8::Local<v8::Name>,
- _args: v8::PropertyCallbackArguments,
- mut rv: v8::ReturnValue,
-) {
- let state_rc = JsRuntime::state(scope);
- let mut state = state_rc.borrow_mut();
- let JsRuntimeState {
- shared_ab, shared, ..
- } = &mut *state;
-
- // Lazily initialize the persistent external ArrayBuffer.
- let shared_ab = match shared_ab {
- Some(ref ab) => v8::Local::new(scope, ab),
- slot @ None => {
- let ab = v8::SharedArrayBuffer::with_backing_store(
- scope,
- shared.get_backing_store(),
- );
- slot.replace(v8::Global::new(scope, ab));
- ab
- }
- };
- rv.set(shared_ab.into())
-}
-
// Called by V8 during `Isolate::mod_instantiate`.
pub fn module_resolve_callback<'s>(
context: v8::Local<'s, v8::Context>,
diff --git a/core/core.js b/core/core.js
index 2de8e1fff..7c39c1bae 100644
--- a/core/core.js
+++ b/core/core.js
@@ -1,171 +1,37 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-/*
-SharedQueue Binary Layout
-+-------------------------------+-------------------------------+
-| NUM_RECORDS (32) |
-+---------------------------------------------------------------+
-| NUM_SHIFTED_OFF (32) |
-+---------------------------------------------------------------+
-| HEAD (32) |
-+---------------------------------------------------------------+
-| OFFSETS (32) |
-+---------------------------------------------------------------+
-| RECORD_ENDS (*MAX_RECORDS) ...
-+---------------------------------------------------------------+
-| RECORDS (*MAX_RECORDS) ...
-+---------------------------------------------------------------+
- */
"use strict";
((window) => {
- const MAX_RECORDS = 100;
- const INDEX_NUM_RECORDS = 0;
- const INDEX_NUM_SHIFTED_OFF = 1;
- const INDEX_HEAD = 2;
- const INDEX_OFFSETS = 3;
- const INDEX_RECORDS = INDEX_OFFSETS + 2 * MAX_RECORDS;
- const HEAD_INIT = 4 * INDEX_RECORDS;
-
// Available on start due to bindings.
const core = window.Deno.core;
const { recv, send } = core;
- ////////////////////////////////////////////////////////////////////////////////////////////
- ///////////////////////////////////////// Dispatch /////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- const dispatch = send;
- const dispatchByName = (opName, control, ...zeroCopy) =>
- dispatch(opsCache[opName], control, ...zeroCopy);
-
- ////////////////////////////////////////////////////////////////////////////////////////////
- //////////////////////////////////// Shared array buffer ///////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- let sharedBytes;
- let shared32;
-
let opsCache = {};
+ const errorMap = {};
+ let nextPromiseId = 1;
+ const promiseTable = new Map();
function init() {
- const shared = core.shared;
- assert(shared.byteLength > 0);
- assert(sharedBytes == null);
- assert(shared32 == null);
- sharedBytes = new Uint8Array(shared);
- shared32 = new Int32Array(shared);
-
- asyncHandlers = [];
- // Callers should not call core.recv, use setAsyncHandler.
recv(handleAsyncMsgFromRust);
}
function ops() {
// op id 0 is a special value to retrieve the map of registered ops.
- const opsMapBytes = send(0);
- const opsMapJson = String.fromCharCode.apply(null, opsMapBytes);
- opsCache = JSON.parse(opsMapJson);
- return { ...opsCache };
- }
-
- function assert(cond) {
- if (!cond) {
- throw Error("assert");
- }
- }
-
- function reset() {
- shared32[INDEX_NUM_RECORDS] = 0;
- shared32[INDEX_NUM_SHIFTED_OFF] = 0;
- shared32[INDEX_HEAD] = HEAD_INIT;
- }
-
- function head() {
- return shared32[INDEX_HEAD];
- }
-
- function numRecords() {
- return shared32[INDEX_NUM_RECORDS];
- }
-
- function size() {
- return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
- }
-
- function setMeta(index, end, opId) {
- shared32[INDEX_OFFSETS + 2 * index] = end;
- shared32[INDEX_OFFSETS + 2 * index + 1] = opId;
- }
-
- function getMeta(index) {
- if (index >= numRecords()) {
- return null;
- }
- const buf = shared32[INDEX_OFFSETS + 2 * index];
- const opId = shared32[INDEX_OFFSETS + 2 * index + 1];
- return [opId, buf];
+ const newOpsCache = Object.fromEntries(send(0));
+ opsCache = Object.freeze(newOpsCache);
+ return opsCache;
}
- function getOffset(index) {
- if (index >= numRecords()) {
- return null;
- }
- if (index == 0) {
- return HEAD_INIT;
- }
- const prevEnd = shared32[INDEX_OFFSETS + 2 * (index - 1)];
- return (prevEnd + 3) & ~3;
- }
-
- function push(opId, buf) {
- const off = head();
- const end = off + buf.byteLength;
- const alignedEnd = (end + 3) & ~3;
- const index = numRecords();
- const shouldNotPush = alignedEnd > shared32.byteLength ||
- index >= MAX_RECORDS;
- if (shouldNotPush) {
- // console.log("shared_queue.js push fail");
- return false;
+ function handleAsyncMsgFromRust() {
+ for (let i = 0; i < arguments.length; i += 2) {
+ opAsyncHandler(arguments[i], arguments[i + 1]);
}
- setMeta(index, end, opId);
- assert(alignedEnd % 4 === 0);
- assert(end - off == buf.byteLength);
- sharedBytes.set(buf, off);
- shared32[INDEX_NUM_RECORDS] += 1;
- shared32[INDEX_HEAD] = alignedEnd;
- return true;
}
- /// Returns null if empty.
- function shift() {
- const i = shared32[INDEX_NUM_SHIFTED_OFF];
- if (size() == 0) {
- assert(i == 0);
- return null;
- }
-
- const off = getOffset(i);
- const [opId, end] = getMeta(i);
-
- if (size() > 1) {
- shared32[INDEX_NUM_SHIFTED_OFF] += 1;
- } else {
- reset();
- }
-
- assert(off != null);
- assert(end != null);
- const buf = sharedBytes.subarray(off, end);
- return [opId, buf];
+ function dispatch(opName, promiseId, control, zeroCopy) {
+ return send(opsCache[opName], promiseId, control, zeroCopy);
}
- ////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////// Error handling //////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- const errorMap = {};
-
function registerErrorClass(errorName, className, args) {
if (typeof errorMap[errorName] !== "undefined") {
throw new TypeError(`Error class for "${errorName}" already registered`);
@@ -173,239 +39,86 @@ SharedQueue Binary Layout
errorMap[errorName] = [className, args ?? []];
}
- function handleError(className, message) {
- if (typeof errorMap[className] === "undefined") {
- return new Error(
- `Unregistered error class: "${className}"\n` +
- ` ${message}\n` +
- ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
- );
- }
-
- const [ErrorClass, args] = errorMap[className];
- return new ErrorClass(message, ...args);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////// Async handling //////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- let asyncHandlers = [];
-
- function setAsyncHandler(opId, cb) {
- assert(opId != null);
- asyncHandlers[opId] = cb;
+ function getErrorClassAndArgs(errorName) {
+ return errorMap[errorName] ?? [undefined, []];
}
- function handleAsyncMsgFromRust() {
- while (true) {
- const opIdBuf = shift();
- if (opIdBuf == null) {
- break;
- }
- assert(asyncHandlers[opIdBuf[0]] != null);
- asyncHandlers[opIdBuf[0]](opIdBuf[1], true);
- }
-
- for (let i = 0; i < arguments.length; i += 2) {
- asyncHandlers[arguments[i]](arguments[i + 1], false);
+ function processResponse(res) {
+ // const [ok, err] = res;
+ if (res[1] === null) {
+ return res[0];
}
+ throw processErr(res[1]);
}
- ////////////////////////////////////////////////////////////////////////////////////////////
- ///////////////////////////// General sync & async ops handling ////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- let nextRequestId = 1;
- const promiseTable = {};
-
- function asyncHandle(u8Array, isCopyNeeded, opResultParser) {
- const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded);
- if (error !== null) {
- promiseTable[requestId][1](error);
- } else {
- promiseTable[requestId][0](result);
+ function processErr(err) {
+ const [ErrorClass, args] = getErrorClassAndArgs(err.className);
+ if (!ErrorClass) {
+ return new Error(
+ `Unregistered error class: "${err.className}"\n ${err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
+ );
}
- delete promiseTable[requestId];
+ return new ErrorClass(err.message, ...args);
}
- function opAsync(opName, opRequestBuilder, opResultParser) {
- const opId = opsCache[opName];
- // Make sure requests of this type are handled by the asyncHandler
- // The asyncHandler's role is to call the "promiseTable[requestId]" function
- if (typeof asyncHandlers[opId] === "undefined") {
- asyncHandlers[opId] = (buffer, isCopyNeeded) =>
- asyncHandle(buffer, isCopyNeeded, opResultParser);
- }
-
- const requestId = nextRequestId++;
-
- // Create and store promise
- const promise = new Promise((resolve, reject) => {
- promiseTable[requestId] = [resolve, reject];
+ function jsonOpAsync(opName, args = null, zeroCopy = null) {
+ const promiseId = nextPromiseId++;
+ const maybeError = dispatch(opName, promiseId, args, zeroCopy);
+ // Handle sync error (e.g: error parsing args)
+ if (maybeError) processResponse(maybeError);
+ let resolve, reject;
+ const promise = new Promise((resolve_, reject_) => {
+ resolve = resolve_;
+ reject = reject_;
});
-
- // Synchronously dispatch async request
- core.dispatch(opId, ...opRequestBuilder(requestId));
-
- // Wait for async response
+ promise.resolve = resolve;
+ promise.reject = reject;
+ promiseTable.set(promiseId, promise);
return promise;
}
- function opSync(opName, opRequestBuilder, opResultParser) {
- const opId = opsCache[opName];
- const u8Array = core.dispatch(opId, ...opRequestBuilder());
-
- const [_, result, error] = opResultParser(u8Array, false);
- if (error !== null) throw error;
- return result;
+ function jsonOpSync(opName, args = null, zeroCopy = null) {
+ return processResponse(dispatch(opName, null, args, zeroCopy));
}
- ////////////////////////////////////////////////////////////////////////////////////////////
- ///////////////////////////////////// Bin ops handling /////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- const binRequestHeaderByteLength = 8 + 4;
- const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength);
- const scratchView = new DataView(scratchBuffer);
-
- function binOpBuildRequest(requestId, argument, zeroCopy) {
- scratchView.setBigUint64(0, BigInt(requestId), true);
- scratchView.setUint32(8, argument, true);
- return [scratchView, ...zeroCopy];
- }
-
- function binOpParseResult(u8Array, isCopyNeeded) {
- // Decode header value from u8Array
- const headerByteLength = 8 + 2 * 4;
- assert(u8Array.byteLength >= headerByteLength);
- assert(u8Array.byteLength % 4 == 0);
- const view = new DataView(
- u8Array.buffer,
- u8Array.byteOffset + u8Array.byteLength - headerByteLength,
- headerByteLength,
- );
-
- const requestId = Number(view.getBigUint64(0, true));
- const status = view.getUint32(8, true);
- const result = view.getUint32(12, true);
-
- // Error handling
- if (status !== 0) {
- const className = core.decode(u8Array.subarray(0, result));
- const message = core.decode(u8Array.subarray(result, -headerByteLength))
- .trim();
-
- return [requestId, null, handleError(className, message)];
- }
-
- if (u8Array.byteLength === headerByteLength) {
- return [requestId, result, null];
- }
-
- // Rest of response buffer is passed as reference or as a copy
- let respBuffer = null;
- if (isCopyNeeded) {
- // Copy part of the response array (if sent through shared array buf)
- respBuffer = u8Array.slice(0, result);
+ function opAsyncHandler(promiseId, res) {
+ // const [ok, err] = res;
+ const promise = promiseTable.get(promiseId);
+ promiseTable.delete(promiseId);
+ if (!res[1]) {
+ promise.resolve(res[0]);
} else {
- // Create view on existing array (if sent through overflow)
- respBuffer = u8Array.subarray(0, result);
- }
-
- return [requestId, respBuffer, null];
- }
-
- function binOpAsync(opName, argument = 0, ...zeroCopy) {
- return opAsync(
- opName,
- (requestId) => binOpBuildRequest(requestId, argument, zeroCopy),
- binOpParseResult,
- );
- }
-
- function binOpSync(opName, argument = 0, ...zeroCopy) {
- return opSync(
- opName,
- () => binOpBuildRequest(0, argument, zeroCopy),
- binOpParseResult,
- );
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////
- ///////////////////////////////////// Json ops handling ////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- const jsonRequestHeaderLength = 8;
-
- function jsonOpBuildRequest(requestId, argument, zeroCopy) {
- const u8Array = core.encode(
- "\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument),
- );
- new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true);
- return [u8Array, ...zeroCopy];
- }
-
- function jsonOpParseResult(u8Array, _) {
- const data = JSON.parse(core.decode(u8Array));
-
- if ("err" in data) {
- return [
- data.requestId,
- null,
- handleError(data.err.className, data.err.message),
- ];
+ promise.reject(processErr(res[1]));
}
-
- return [data.requestId, data.ok, null];
}
- function jsonOpAsync(opName, argument = null, ...zeroCopy) {
- return opAsync(
- opName,
- (requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy),
- jsonOpParseResult,
- );
+ function binOpSync(opName, args = null, zeroCopy = null) {
+ return jsonOpSync(opName, args, zeroCopy);
}
- function jsonOpSync(opName, argument = null, ...zeroCopy) {
- return opSync(
- opName,
- () => [core.encode(JSON.stringify(argument)), ...zeroCopy],
- jsonOpParseResult,
- );
+ function binOpAsync(opName, args = null, zeroCopy = null) {
+ return jsonOpAsync(opName, args, zeroCopy);
}
function resources() {
return jsonOpSync("op_resources");
}
+
function close(rid) {
- return jsonOpSync("op_close", { rid });
+ jsonOpSync("op_close", { rid });
}
Object.assign(window.Deno.core, {
- jsonOpAsync,
- jsonOpSync,
binOpAsync,
binOpSync,
- dispatch,
- dispatchByName,
+ jsonOpAsync,
+ jsonOpSync,
+ dispatch: send,
+ dispatchByName: dispatch,
ops,
close,
resources,
registerErrorClass,
- sharedQueueInit: init,
- // sharedQueue is private but exposed for testing.
- sharedQueue: {
- MAX_RECORDS,
- head,
- numRecords,
- size,
- push,
- reset,
- shift,
- },
- // setAsyncHandler is private but exposed for testing.
- setAsyncHandler,
+ init,
});
})(this);
diff --git a/core/core_test.js b/core/core_test.js
deleted file mode 100644
index 89385a0aa..000000000
--- a/core/core_test.js
+++ /dev/null
@@ -1,88 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-"use strict";
-
-function assert(cond) {
- if (!cond) {
- throw Error("assert");
- }
-}
-
-// Check overflow (corresponds to full_records test in rust)
-function fullRecords(q) {
- q.reset();
- const oneByte = new Uint8Array([42]);
- for (let i = 0; i < q.MAX_RECORDS; i++) {
- assert(q.push(1, oneByte));
- }
- assert(!q.push(1, oneByte));
- const [opId, r] = q.shift();
- assert(opId == 1);
- assert(r.byteLength == 1);
- assert(r[0] == 42);
- // Even if we shift one off, we still cannot push a new record.
- assert(!q.push(1, oneByte));
-}
-
-function main() {
- const q = Deno.core.sharedQueue;
-
- const h = q.head();
- assert(h > 0);
-
- // This record's len is not divisible by
- // 4 so after pushing it to the queue,
- // next record offset should be aligned to 4.
- let r = new Uint8Array([1, 2, 3, 4, 5]);
- const len = r.byteLength + h;
- assert(q.push(1, r));
- // Record should be aligned to 4 bytes
- assert(q.head() == len + 3);
-
- r = new Uint8Array([6, 7]);
- assert(q.push(1, r));
-
- r = new Uint8Array([8, 9, 10, 11]);
- assert(q.push(1, r));
- assert(q.numRecords() == 3);
- assert(q.size() == 3);
-
- let opId;
- [opId, r] = q.shift();
- assert(r.byteLength == 5);
- assert(r[0] == 1);
- assert(r[1] == 2);
- assert(r[2] == 3);
- assert(r[3] == 4);
- assert(r[4] == 5);
- assert(q.numRecords() == 3);
- assert(q.size() == 2);
-
- [opId, r] = q.shift();
- assert(r.byteLength == 2);
- assert(r[0] == 6);
- assert(r[1] == 7);
- assert(q.numRecords() == 3);
- assert(q.size() == 1);
-
- [opId, r] = q.shift();
- assert(opId == 1);
- assert(r.byteLength == 4);
- assert(r[0] == 8);
- assert(r[1] == 9);
- assert(r[2] == 10);
- assert(r[3] == 11);
- assert(q.numRecords() == 0);
- assert(q.size() == 0);
-
- assert(q.shift() == null);
- assert(q.shift() == null);
- assert(q.numRecords() == 0);
- assert(q.size() == 0);
-
- fullRecords(q);
-
- Deno.core.print("shared_queue_test.js ok\n");
- q.reset();
-}
-
-main();
diff --git a/core/examples/hello_world.rs b/core/examples/hello_world.rs
index c46fc1d98..3b63d2bda 100644
--- a/core/examples/hello_world.rs
+++ b/core/examples/hello_world.rs
@@ -2,11 +2,8 @@
//! This example shows you how to define ops in Rust and then call them from
//! JavaScript.
-use anyhow::anyhow;
use deno_core::json_op_sync;
use deno_core::JsRuntime;
-use deno_core::Op;
-use serde_json::Value;
use std::io::Write;
fn main() {
@@ -21,55 +18,52 @@ fn main() {
//
// The second one just transforms some input and returns it to JavaScript.
- // Register the op for outputting bytes to stdout.
+ // Register the op for outputting a string to stdout.
// It can be invoked with Deno.core.dispatch and the id this method returns
// or Deno.core.dispatchByName and the name provided.
runtime.register_op(
"op_print",
- // The op_fn callback takes a state object OpState
- // and a vector of ZeroCopyBuf's, which are mutable references
- // to ArrayBuffer's in JavaScript.
- |_state, zero_copy| {
+ // The op_fn callback takes a state object OpState,
+ // a structured arg of type `T` and an optional ZeroCopyBuf,
+ // a mutable reference to a JavaScript ArrayBuffer
+ json_op_sync(|_state, msg: Option<String>, zero_copy| {
let mut out = std::io::stdout();
+ // Write msg to stdout
+ if let Some(msg) = msg {
+ out.write_all(msg.as_bytes()).unwrap();
+ }
+
// Write the contents of every buffer to stdout
for buf in zero_copy {
out.write_all(&buf).unwrap();
}
- Op::Sync(Box::new([])) // No meaningful result
- },
+ Ok(()) // No meaningful result
+ }),
);
// Register the JSON op for summing a number array.
- // A JSON op is just an op where the first ZeroCopyBuf is a serialized JSON
- // value, the return value is also a serialized JSON value. It can be invoked
- // with Deno.core.jsonOpSync and the name.
runtime.register_op(
"op_sum",
// The json_op_sync function automatically deserializes
// the first ZeroCopyBuf and serializes the return value
// to reduce boilerplate
- json_op_sync(|_state, json: Vec<f64>, zero_copy| {
- // We check that we only got the JSON value.
- if !zero_copy.is_empty() {
- Err(anyhow!("Expected exactly one argument"))
- } else {
- // And if we did, do our actual task
- let sum = json.iter().fold(0.0, |a, v| a + v);
-
- // Finally we return a JSON value
- Ok(Value::from(sum))
- }
+ json_op_sync(|_state, nums: Vec<f64>, _| {
+ // Sum inputs
+ let sum = nums.iter().fold(0.0, |a, v| a + v);
+ // return as a Result<f64, AnyError>
+ Ok(sum)
}),
);
// Now we see how to invoke the ops we just defined. The runtime automatically
// contains a Deno.core object with several functions for interacting with it.
// You can find its definition in core.js.
- runtime.execute(
- "<init>",
- r#"
+ runtime
+ .execute(
+ "<init>",
+ r#"
// First we initialize the ops cache.
// This maps op names to their id's.
Deno.core.ops();
@@ -78,14 +72,15 @@ Deno.core.ops();
// our op_print op to display the stringified argument.
const _newline = new Uint8Array([10]);
function print(value) {
- Deno.core.dispatchByName('op_print', Deno.core.encode(value.toString()), _newline);
+ Deno.core.dispatchByName('op_print', 0, value.toString(), _newline);
}
// Finally we register the error class used by op_sum
// so that it throws the correct class.
Deno.core.registerErrorClass('Error', Error);
"#,
- ).unwrap();
+ )
+ .unwrap();
// Now we can finally use this in an example.
runtime
diff --git a/core/examples/http_bench_bin_ops.js b/core/examples/http_bench_bin_ops.js
index 18f98419f..cf5e275b1 100644
--- a/core/examples/http_bench_bin_ops.js
+++ b/core/examples/http_bench_bin_ops.js
@@ -9,14 +9,19 @@ const responseBuf = new Uint8Array(
.map((c) => c.charCodeAt(0)),
);
+// This buffer exists purely to avoid trigerring the bin-op buf assert
+// in practice all deno bin ops accept buffers, this bench is an exception
+// TODO(@AaronO): remove once we drop variadic BufVec compat
+const nopBuffer = new Uint8Array();
+
/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
- return Deno.core.binOpSync("listen");
+ return Deno.core.binOpSync("listen", 0, nopBuffer);
}
/** Accepts a connection, returns rid. */
function accept(rid) {
- return Deno.core.binOpAsync("accept", rid);
+ return Deno.core.binOpAsync("accept", rid, nopBuffer);
}
/**
@@ -33,7 +38,7 @@ function write(rid, data) {
}
function close(rid) {
- Deno.core.binOpSync("close", rid);
+ Deno.core.binOpSync("close", rid, nopBuffer);
}
async function serve(rid) {
diff --git a/core/examples/http_bench_json_ops.js b/core/examples/http_bench_json_ops.js
index 071df100f..791fcc499 100644
--- a/core/examples/http_bench_json_ops.js
+++ b/core/examples/http_bench_json_ops.js
@@ -11,33 +11,29 @@ const responseBuf = new Uint8Array(
/** Listens on 0.0.0.0:4500, returns rid. */
function listen() {
- const { rid } = Deno.core.jsonOpSync("listen");
- return rid;
+ return Deno.core.jsonOpSync("listen");
}
/** Accepts a connection, returns rid. */
-async function accept(serverRid) {
- const { rid } = await Deno.core.jsonOpAsync("accept", { rid: serverRid });
- return rid;
+function accept(serverRid) {
+ return Deno.core.jsonOpAsync("accept", serverRid);
}
/**
* Reads a packet from the rid, presumably an http request. data is ignored.
* Returns bytes read.
*/
-async function read(rid, data) {
- const { nread } = await Deno.core.jsonOpAsync("read", { rid }, data);
- return nread;
+function read(rid, data) {
+ return Deno.core.jsonOpAsync("read", rid, data);
}
/** Writes a fixed HTTP response to the socket rid. Returns bytes written. */
-async function write(rid, data) {
- const { nwritten } = await Deno.core.jsonOpAsync("write", { rid }, data);
- return nwritten;
+function write(rid, data) {
+ return Deno.core.jsonOpAsync("write", rid, data);
}
function close(rid) {
- Deno.core.jsonOpSync("close", { rid });
+ Deno.core.jsonOpSync("close", rid);
}
async function serve(rid) {
diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs
index bc96ce478..b1116d757 100644
--- a/core/examples/http_bench_json_ops.rs
+++ b/core/examples/http_bench_json_ops.rs
@@ -9,10 +9,8 @@ use deno_core::JsRuntime;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
+use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
-use serde::Deserialize;
-use serde::Serialize;
-use serde_json::Value;
use std::cell::RefCell;
use std::convert::TryFrom;
use std::env;
@@ -121,11 +119,6 @@ fn create_js_runtime() -> JsRuntime {
runtime
}
-#[derive(Deserialize, Serialize)]
-struct ResourceId {
- rid: u32,
-}
-
fn op_listen(
state: &mut OpState,
_args: (),
@@ -137,71 +130,71 @@ fn op_listen(
std_listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(std_listener)?;
let rid = state.resource_table.add(listener);
- Ok(ResourceId { rid })
+ Ok(rid)
}
fn op_close(
state: &mut OpState,
- args: ResourceId,
+ rid: ResourceId,
_buf: &mut [ZeroCopyBuf],
) -> Result<(), AnyError> {
- log::debug!("close rid={}", args.rid);
+ log::debug!("close rid={}", rid);
state
.resource_table
- .close(args.rid)
+ .close(rid)
.map(|_| ())
.ok_or_else(bad_resource_id)
}
async fn op_accept(
state: Rc<RefCell<OpState>>,
- args: ResourceId,
+ rid: ResourceId,
_bufs: BufVec,
) -> Result<ResourceId, AnyError> {
- log::debug!("accept rid={}", args.rid);
+ log::debug!("accept rid={}", rid);
let listener = state
.borrow()
.resource_table
- .get::<TcpListener>(args.rid)
+ .get::<TcpListener>(rid)
.ok_or_else(bad_resource_id)?;
let stream = listener.accept().await?;
let rid = state.borrow_mut().resource_table.add(stream);
- Ok(ResourceId { rid })
+ Ok(rid)
}
async fn op_read(
state: Rc<RefCell<OpState>>,
- args: ResourceId,
+ rid: ResourceId,
mut bufs: BufVec,
-) -> Result<Value, AnyError> {
+) -> Result<usize, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
- log::debug!("read rid={}", args.rid);
+ log::debug!("read rid={}", rid);
let stream = state
.borrow()
.resource_table
- .get::<TcpStream>(args.rid)
+ .get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let nread = stream.read(&mut bufs[0]).await?;
- Ok(serde_json::json!({ "nread": nread }))
+ Ok(nread)
}
async fn op_write(
state: Rc<RefCell<OpState>>,
- args: ResourceId,
+ rid: ResourceId,
bufs: BufVec,
-) -> Result<Value, AnyError> {
+) -> Result<usize, AnyError> {
assert_eq!(bufs.len(), 1, "Invalid number of arguments");
- log::debug!("write rid={}", args.rid);
+ log::debug!("write rid={}", rid);
let stream = state
.borrow()
.resource_table
- .get::<TcpStream>(args.rid)
+ .get::<TcpStream>(rid)
.ok_or_else(bad_resource_id)?;
let nwritten = stream.write(&bufs[0]).await?;
- Ok(serde_json::json!({ "nwritten": nwritten }))
+ Ok(nwritten)
}
fn main() {
diff --git a/core/lib.deno_core.d.ts b/core/lib.deno_core.d.ts
index 004ed0529..b12879a9b 100644
--- a/core/lib.deno_core.d.ts
+++ b/core/lib.deno_core.d.ts
@@ -11,14 +11,14 @@ declare namespace Deno {
function jsonOpSync(
opName: string,
args?: any,
- ...zeroCopy: Uint8Array[]
+ zeroCopy?: Uint8Array,
): any;
/** Send a JSON op to Rust, and asynchronously receive the result. */
function jsonOpAsync(
opName: string,
args?: any,
- ...zeroCopy: Uint8Array[]
+ zeroCopy?: Uint8Array,
): Promise<any>;
/**
diff --git a/core/lib.rs b/core/lib.rs
index ea6968b60..12ccc1d8a 100644
--- a/core/lib.rs
+++ b/core/lib.rs
@@ -14,7 +14,6 @@ mod ops_json;
pub mod plugin_api;
mod resources;
mod runtime;
-mod shared_queue;
mod zero_copy_buf;
// Re-exports
@@ -56,12 +55,17 @@ pub use crate::modules::RecursiveModuleLoad;
pub use crate::normalize_path::normalize_path;
pub use crate::ops::op_close;
pub use crate::ops::op_resources;
+pub use crate::ops::serialize_op_result;
pub use crate::ops::Op;
pub use crate::ops::OpAsyncFuture;
pub use crate::ops::OpFn;
pub use crate::ops::OpId;
+pub use crate::ops::OpPayload;
+pub use crate::ops::OpResponse;
pub use crate::ops::OpState;
pub use crate::ops::OpTable;
+pub use crate::ops::PromiseId;
+pub use crate::ops::Serializable;
pub use crate::ops_bin::bin_op_async;
pub use crate::ops_bin::bin_op_sync;
pub use crate::ops_bin::ValueOrVector;
diff --git a/core/ops.rs b/core/ops.rs
index 212a713ad..3af60d072 100644
--- a/core/ops.rs
+++ b/core/ops.rs
@@ -6,10 +6,12 @@ use crate::error::AnyError;
use crate::gotham_state::GothamState;
use crate::resources::ResourceTable;
use crate::runtime::GetErrorClassFn;
-use crate::BufVec;
use crate::ZeroCopyBuf;
use futures::Future;
use indexmap::IndexMap;
+use rusty_v8 as v8;
+use serde::de::DeserializeOwned;
+use serde::Serialize;
use serde_json::json;
use serde_json::Value;
use std::cell::RefCell;
@@ -20,12 +22,50 @@ use std::ops::DerefMut;
use std::pin::Pin;
use std::rc::Rc;
-pub type OpAsyncFuture = Pin<Box<dyn Future<Output = Box<[u8]>>>>;
-pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, BufVec) -> Op + 'static;
+pub use erased_serde::Serialize as Serializable;
+pub type PromiseId = u64;
+pub type OpAsyncFuture = Pin<Box<dyn Future<Output = OpResponse>>>;
+pub type OpFn =
+ dyn Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static;
pub type OpId = usize;
+pub struct OpPayload<'a, 'b, 'c> {
+ pub(crate) scope: Option<&'a mut v8::HandleScope<'b>>,
+ pub(crate) value: Option<v8::Local<'c, v8::Value>>,
+}
+
+impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> {
+ pub fn new(
+ scope: &'a mut v8::HandleScope<'b>,
+ value: v8::Local<'c, v8::Value>,
+ ) -> Self {
+ Self {
+ scope: Some(scope),
+ value: Some(value),
+ }
+ }
+
+ pub fn empty() -> Self {
+ Self {
+ scope: None,
+ value: None,
+ }
+ }
+
+ pub fn deserialize<T: DeserializeOwned>(self) -> Result<T, AnyError> {
+ serde_v8::from_v8(self.scope.unwrap(), self.value.unwrap())
+ .map_err(AnyError::from)
+ .map_err(|e| type_error(format!("Error parsing args: {}", e)))
+ }
+}
+
+pub enum OpResponse {
+ Value(Box<dyn Serializable>),
+ Buffer(Box<[u8]>),
+}
+
pub enum Op {
- Sync(Box<[u8]>),
+ Sync(OpResponse),
Async(OpAsyncFuture),
/// AsyncUnref is the variation of Async, which doesn't block the program
/// exiting.
@@ -33,6 +73,32 @@ pub enum Op {
NotFound,
}
+#[derive(Serialize)]
+pub struct OpResult<R>(Option<R>, Option<OpError>);
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct OpError {
+ class_name: &'static str,
+ message: String,
+}
+
+pub fn serialize_op_result<R: Serialize + 'static>(
+ result: Result<R, AnyError>,
+ state: Rc<RefCell<OpState>>,
+) -> OpResponse {
+ OpResponse::Value(Box::new(match result {
+ Ok(v) => OpResult::<R>(Some(v), None),
+ Err(err) => OpResult::<R>(
+ None,
+ Some(OpError {
+ class_name: (state.borrow().get_error_class_fn)(&err),
+ message: err.to_string(),
+ }),
+ ),
+ }))
+}
+
/// Maintains the resources and ops inside a JS runtime.
pub struct OpState {
pub resource_table: ResourceTable,
@@ -73,41 +139,43 @@ pub struct OpTable(IndexMap<String, Rc<OpFn>>);
impl OpTable {
pub fn register_op<F>(&mut self, name: &str, op_fn: F) -> OpId
where
- F: Fn(Rc<RefCell<OpState>>, BufVec) -> Op + 'static,
+ F: Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static,
{
let (op_id, prev) = self.0.insert_full(name.to_owned(), Rc::new(op_fn));
assert!(prev.is_none());
op_id
}
+ pub fn op_entries(state: Rc<RefCell<OpState>>) -> Vec<(String, OpId)> {
+ state.borrow().op_table.0.keys().cloned().zip(0..).collect()
+ }
+
pub fn route_op(
op_id: OpId,
state: Rc<RefCell<OpState>>,
- bufs: BufVec,
+ payload: OpPayload,
+ buf: Option<ZeroCopyBuf>,
) -> Op {
- if op_id == 0 {
- let ops: HashMap<String, OpId> =
- state.borrow().op_table.0.keys().cloned().zip(0..).collect();
- let buf = serde_json::to_vec(&ops).map(Into::into).unwrap();
- Op::Sync(buf)
- } else {
- let op_fn = state
- .borrow()
- .op_table
- .0
- .get_index(op_id)
- .map(|(_, op_fn)| op_fn.clone());
- match op_fn {
- Some(f) => (f)(state, bufs),
- None => Op::NotFound,
- }
+ let op_fn = state
+ .borrow()
+ .op_table
+ .0
+ .get_index(op_id)
+ .map(|(_, op_fn)| op_fn.clone());
+ match op_fn {
+ Some(f) => (f)(state, payload, buf),
+ None => Op::NotFound,
}
}
}
impl Default for OpTable {
fn default() -> Self {
- fn dummy(_state: Rc<RefCell<OpState>>, _bufs: BufVec) -> Op {
+ fn dummy(
+ _state: Rc<RefCell<OpState>>,
+ _p: OpPayload,
+ _b: Option<ZeroCopyBuf>,
+ ) -> Op {
unreachable!()
}
Self(once(("ops".to_owned(), Rc::new(dummy) as _)).collect())
@@ -164,24 +232,36 @@ mod tests {
let bar_id;
{
let op_table = &mut state.borrow_mut().op_table;
- foo_id = op_table.register_op("foo", |_, _| Op::Sync(b"oof!"[..].into()));
+ foo_id = op_table.register_op("foo", |_, _, _| {
+ Op::Sync(OpResponse::Buffer(b"oof!"[..].into()))
+ });
assert_eq!(foo_id, 1);
- bar_id = op_table.register_op("bar", |_, _| Op::Sync(b"rab!"[..].into()));
+ bar_id = op_table.register_op("bar", |_, _, _| {
+ Op::Sync(OpResponse::Buffer(b"rab!"[..].into()))
+ });
assert_eq!(bar_id, 2);
}
- let foo_res = OpTable::route_op(foo_id, state.clone(), Default::default());
- assert!(matches!(foo_res, Op::Sync(buf) if &*buf == b"oof!"));
- let bar_res = OpTable::route_op(bar_id, state.clone(), Default::default());
- assert!(matches!(bar_res, Op::Sync(buf) if &*buf == b"rab!"));
-
- let catalog_res = OpTable::route_op(0, state, Default::default());
- let mut catalog_entries = match catalog_res {
- Op::Sync(buf) => serde_json::from_slice::<HashMap<String, OpId>>(&buf)
- .map(|map| map.into_iter().collect::<Vec<_>>())
- .unwrap(),
- _ => panic!("unexpected `Op` variant"),
- };
+ let foo_res = OpTable::route_op(
+ foo_id,
+ state.clone(),
+ OpPayload::empty(),
+ Default::default(),
+ );
+ assert!(
+ matches!(foo_res, Op::Sync(OpResponse::Buffer(buf)) if &*buf == b"oof!")
+ );
+ let bar_res = OpTable::route_op(
+ bar_id,
+ state.clone(),
+ OpPayload::empty(),
+ Default::default(),
+ );
+ assert!(
+ matches!(bar_res, Op::Sync(OpResponse::Buffer(buf)) if &*buf == b"rab!")
+ );
+
+ let mut catalog_entries = OpTable::op_entries(state);
catalog_entries.sort_by(|(_, id1), (_, id2)| id1.partial_cmp(id2).unwrap());
assert_eq!(
catalog_entries,
diff --git a/core/ops_bin.rs b/core/ops_bin.rs
index 3e13c23d5..cdd2d630d 100644
--- a/core/ops_bin.rs
+++ b/core/ops_bin.rs
@@ -1,54 +1,23 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
+use crate::error::type_error;
use crate::error::AnyError;
use crate::futures::future::FutureExt;
+use crate::serialize_op_result;
use crate::BufVec;
use crate::Op;
use crate::OpFn;
+use crate::OpPayload;
+use crate::OpResponse;
use crate::OpState;
use crate::ZeroCopyBuf;
use std::boxed::Box;
use std::cell::RefCell;
-use std::convert::TryInto;
use std::future::Future;
use std::rc::Rc;
-#[derive(Copy, Clone, Debug, PartialEq)]
-pub struct RequestHeader {
- pub request_id: u64,
- pub argument: u32,
-}
-
-impl RequestHeader {
- pub fn from_raw(bytes: &[u8]) -> Option<Self> {
- if bytes.len() < 3 * 4 {
- return None;
- }
-
- Some(Self {
- request_id: u64::from_le_bytes(bytes[0..8].try_into().unwrap()),
- argument: u32::from_le_bytes(bytes[8..12].try_into().unwrap()),
- })
- }
-}
-
-#[derive(Copy, Clone, Debug, PartialEq)]
-pub struct ResponseHeader {
- pub request_id: u64,
- pub status: u32,
- pub result: u32,
-}
-
-impl From<ResponseHeader> for [u8; 16] {
- fn from(r: ResponseHeader) -> Self {
- let mut resp_header = [0u8; 16];
- resp_header[0..8].copy_from_slice(&r.request_id.to_le_bytes());
- resp_header[8..12].copy_from_slice(&r.status.to_le_bytes());
- resp_header[12..16].copy_from_slice(&r.result.to_le_bytes());
- resp_header
- }
-}
-
+// TODO: rewrite this, to have consistent buffer returns
+// possibly via direct serde_v8 support
pub trait ValueOrVector {
fn value(&self) -> u32;
fn vector(self) -> Option<Vec<u8>>;
@@ -72,10 +41,6 @@ impl ValueOrVector for u32 {
}
}
-fn gen_padding_32bit(len: usize) -> &'static [u8] {
- &[b' ', b' ', b' '][0..(4 - (len & 3)) & 3]
-}
-
/// Creates an op that passes data synchronously using raw ui8 buffer.
///
/// The provided function `op_fn` has the following parameters:
@@ -104,78 +69,47 @@ where
F: Fn(&mut OpState, u32, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
R: ValueOrVector,
{
- Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
- let mut bufs_iter = bufs.into_iter();
- let record_buf = bufs_iter.next().expect("Expected record at position 0");
- let mut zero_copy = bufs_iter.collect::<BufVec>();
-
- let req_header = match RequestHeader::from_raw(&record_buf) {
- Some(r) => r,
- None => {
- let error_class = b"TypeError";
- let error_message = b"Unparsable control buffer";
- let len = error_class.len() + error_message.len();
- let padding = gen_padding_32bit(len);
- let resp_header = ResponseHeader {
- request_id: 0,
- status: 1,
- result: error_class.len() as u32,
- };
- return Op::Sync(
- error_class
- .iter()
- .chain(error_message.iter())
- .chain(padding)
- .chain(&Into::<[u8; 16]>::into(resp_header))
- .cloned()
- .collect(),
- );
- }
- };
+ Box::new(move |state, payload, buf| -> Op {
+ let min_arg: u32 = payload.deserialize().unwrap();
+ // For sig compat map Option<ZeroCopyBuf> to BufVec
+ let mut bufs: BufVec = match buf {
+ Some(b) => vec![b],
+ None => vec![],
+ }
+ .into();
+ // Bin op buffer arg assert
+ if bufs.is_empty() {
+ return Op::Sync(serialize_bin_result::<u32>(
+ Err(type_error("bin-ops require a non-null buffer arg")),
+ state,
+ ));
+ }
- match op_fn(&mut state.borrow_mut(), req_header.argument, &mut zero_copy) {
- Ok(possibly_vector) => {
- let resp_header = ResponseHeader {
- request_id: req_header.request_id,
- status: 0,
- result: possibly_vector.value(),
- };
- let resp_encoded_header = Into::<[u8; 16]>::into(resp_header);
+ let result = op_fn(&mut state.borrow_mut(), min_arg, &mut bufs);
+ Op::Sync(serialize_bin_result(result, state))
+ })
+}
- let resp_vector = match possibly_vector.vector() {
- Some(mut vector) => {
- let padding = gen_padding_32bit(vector.len());
- vector.extend(padding);
- vector.extend(&resp_encoded_header);
- vector
- }
- None => resp_encoded_header.to_vec(),
- };
- Op::Sync(resp_vector.into_boxed_slice())
- }
- Err(error) => {
- let error_class =
- (state.borrow().get_error_class_fn)(&error).as_bytes();
- let error_message = error.to_string().as_bytes().to_owned();
- let len = error_class.len() + error_message.len();
- let padding = gen_padding_32bit(len);
- let resp_header = ResponseHeader {
- request_id: req_header.request_id,
- status: 1,
- result: error_class.len() as u32,
- };
- return Op::Sync(
- error_class
- .iter()
- .chain(error_message.iter())
- .chain(padding)
- .chain(&Into::<[u8; 16]>::into(resp_header))
- .cloned()
- .collect(),
- );
+// wraps serialize_op_result but handles ValueOrVector
+fn serialize_bin_result<R>(
+ result: Result<R, AnyError>,
+ state: Rc<RefCell<OpState>>,
+) -> OpResponse
+where
+ R: ValueOrVector,
+{
+ match result {
+ Ok(v) => {
+ let min_val = v.value();
+ match v.vector() {
+ // Warning! this is incorrect, but buffers aren't use ATM, will fix in future PR
+ Some(vec) => OpResponse::Buffer(vec.into()),
+ // u32
+ None => serialize_op_result(Ok(min_val), state),
}
}
- })
+ Err(e) => serialize_op_result::<()>(Err(e), state),
+ }
}
/// Creates an op that passes data asynchronously using raw ui8 buffer.
@@ -208,170 +142,30 @@ where
R: Future<Output = Result<RV, AnyError>> + 'static,
RV: ValueOrVector,
{
- Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
- let mut bufs_iter = bufs.into_iter();
- let record_buf = bufs_iter.next().expect("Expected record at position 0");
- let zero_copy = bufs_iter.collect::<BufVec>();
-
- let req_header = match RequestHeader::from_raw(&record_buf) {
- Some(r) => r,
- None => {
- let error_class = b"TypeError";
- let error_message = b"Unparsable control buffer";
- let len = error_class.len() + error_message.len();
- let padding = gen_padding_32bit(len);
- let resp_header = ResponseHeader {
- request_id: 0,
- status: 1,
- result: error_class.len() as u32,
- };
- return Op::Sync(
- error_class
- .iter()
- .chain(error_message.iter())
- .chain(padding)
- .chain(&Into::<[u8; 16]>::into(resp_header))
- .cloned()
- .collect(),
- );
+ Box::new(
+ move |state: Rc<RefCell<OpState>>,
+ p: OpPayload,
+ b: Option<ZeroCopyBuf>|
+ -> Op {
+ let min_arg: u32 = p.deserialize().unwrap();
+ // For sig compat map Option<ZeroCopyBuf> to BufVec
+ let bufs: BufVec = match b {
+ Some(b) => vec![b],
+ None => vec![],
+ }
+ .into();
+ // Bin op buffer arg assert
+ if bufs.is_empty() {
+ return Op::Sync(serialize_bin_result::<u32>(
+ Err(type_error("bin-ops require a non-null buffer arg")),
+ state,
+ ));
}
- };
-
- let fut =
- op_fn(state.clone(), req_header.argument, zero_copy).map(move |result| {
- match result {
- Ok(possibly_vector) => {
- let resp_header = ResponseHeader {
- request_id: req_header.request_id,
- status: 0,
- result: possibly_vector.value(),
- };
- let resp_encoded_header = Into::<[u8; 16]>::into(resp_header);
-
- let resp_vector = match possibly_vector.vector() {
- Some(mut vector) => {
- let padding = gen_padding_32bit(vector.len());
- vector.extend(padding);
- vector.extend(&resp_encoded_header);
- vector
- }
- None => resp_encoded_header.to_vec(),
- };
- resp_vector.into_boxed_slice()
- }
- Err(error) => {
- let error_class =
- (state.borrow().get_error_class_fn)(&error).as_bytes();
- let error_message = error.to_string().as_bytes().to_owned();
- let len = error_class.len() + error_message.len();
- let padding = gen_padding_32bit(len);
- let resp_header = ResponseHeader {
- request_id: req_header.request_id,
- status: 1,
- result: error_class.len() as u32,
- };
-
- error_class
- .iter()
- .chain(error_message.iter())
- .chain(padding)
- .chain(&Into::<[u8; 16]>::into(resp_header))
- .cloned()
- .collect()
- }
- }
- });
- let temp = Box::pin(fut);
- Op::Async(temp)
- })
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn padding() {
- assert_eq!(gen_padding_32bit(0), &[] as &[u8]);
- assert_eq!(gen_padding_32bit(1), &[b' ', b' ', b' ']);
- assert_eq!(gen_padding_32bit(2), &[b' ', b' ']);
- assert_eq!(gen_padding_32bit(3), &[b' ']);
- assert_eq!(gen_padding_32bit(4), &[] as &[u8]);
- assert_eq!(gen_padding_32bit(5), &[b' ', b' ', b' ']);
- }
-
- #[test]
- fn response_header_to_bytes() {
- // Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´
- let resp_header = ResponseHeader {
- request_id: 0x0102030405060708u64,
- status: 0x090A0B0Cu32,
- result: 0x0D0E0F10u32,
- };
-
- // All numbers are always little-endian encoded, as the js side also wants this to be fixed
- assert_eq!(
- &Into::<[u8; 16]>::into(resp_header),
- &[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 16, 15, 14, 13]
- );
- }
-
- #[test]
- fn response_header_to_bytes_max_value() {
- // Max size of an js Number is 1^53 - 1, so use this value as max for 64bit ´request_id´
- let resp_header = ResponseHeader {
- request_id: (1u64 << 53u64) - 1u64,
- status: 0xFFFFFFFFu32,
- result: 0xFFFFFFFFu32,
- };
-
- // All numbers are always little-endian encoded, as the js side also wants this to be fixed
- assert_eq!(
- &Into::<[u8; 16]>::into(resp_header),
- &[
- 255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255, 255, 255, 255,
- 255
- ]
- );
- }
-
- #[test]
- fn request_header_from_bytes() {
- let req_header =
- RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9])
- .unwrap();
-
- assert_eq!(req_header.request_id, 0x0102030405060708u64);
- assert_eq!(req_header.argument, 0x090A0B0Cu32);
- }
-
- #[test]
- fn request_header_from_bytes_max_value() {
- let req_header = RequestHeader::from_raw(&[
- 255, 255, 255, 255, 255, 255, 31, 0, 255, 255, 255, 255,
- ])
- .unwrap();
-
- assert_eq!(req_header.request_id, (1u64 << 53u64) - 1u64);
- assert_eq!(req_header.argument, 0xFFFFFFFFu32);
- }
-
- #[test]
- fn request_header_from_bytes_too_short() {
- let req_header =
- RequestHeader::from_raw(&[8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10]);
-
- assert_eq!(req_header, None);
- }
-
- #[test]
- fn request_header_from_bytes_long() {
- let req_header = RequestHeader::from_raw(&[
- 8, 7, 6, 5, 4, 3, 2, 1, 12, 11, 10, 9, 13, 14, 15, 16, 17, 18, 19, 20, 21,
- ])
- .unwrap();
- assert_eq!(req_header.request_id, 0x0102030405060708u64);
- assert_eq!(req_header.argument, 0x090A0B0Cu32);
- }
+ let fut = op_fn(state.clone(), min_arg, bufs)
+ .map(move |result| serialize_bin_result(result, state));
+ let temp = Box::pin(fut);
+ Op::Async(temp)
+ },
+ )
}
diff --git a/core/ops_json.rs b/core/ops_json.rs
index 0ef91ed33..ee336830b 100644
--- a/core/ops_json.rs
+++ b/core/ops_json.rs
@@ -1,37 +1,19 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-use crate::error::type_error;
use crate::error::AnyError;
+use crate::serialize_op_result;
use crate::BufVec;
use crate::Op;
use crate::OpFn;
+use crate::OpPayload;
use crate::OpState;
use crate::ZeroCopyBuf;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::cell::RefCell;
-use std::convert::TryInto;
use std::future::Future;
use std::rc::Rc;
-fn json_serialize_op_result<R: Serialize>(
- request_id: Option<u64>,
- result: Result<R, AnyError>,
- get_error_class_fn: crate::runtime::GetErrorClassFn,
-) -> Box<[u8]> {
- let value = match result {
- Ok(v) => serde_json::json!({ "ok": v, "requestId": request_id }),
- Err(err) => serde_json::json!({
- "requestId": request_id,
- "err": {
- "className": (get_error_class_fn)(&err),
- "message": err.to_string(),
- }
- }),
- };
- serde_json::to_vec(&value).unwrap().into_boxed_slice()
-}
-
/// Creates an op that passes data synchronously using JSON.
///
/// The provided function `op_fn` has the following parameters:
@@ -59,15 +41,20 @@ pub fn json_op_sync<F, V, R>(op_fn: F) -> Box<OpFn>
where
F: Fn(&mut OpState, V, &mut [ZeroCopyBuf]) -> Result<R, AnyError> + 'static,
V: DeserializeOwned,
- R: Serialize,
+ R: Serialize + 'static,
{
- Box::new(move |state: Rc<RefCell<OpState>>, mut bufs: BufVec| -> Op {
- let result = serde_json::from_slice(&bufs[0])
- .map_err(AnyError::from)
- .and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs[1..]));
- let buf =
- json_serialize_op_result(None, result, state.borrow().get_error_class_fn);
- Op::Sync(buf)
+ Box::new(move |state, payload, buf: Option<ZeroCopyBuf>| -> Op {
+ // For sig compat map Option<ZeroCopyBuf> to BufVec
+ let mut bufs: BufVec = match buf {
+ Some(b) => vec![b],
+ None => vec![],
+ }
+ .into();
+
+ let result = payload
+ .deserialize()
+ .and_then(|args| op_fn(&mut state.borrow_mut(), args, &mut bufs));
+ Op::Sync(serialize_op_result(result, state))
})
}
@@ -100,35 +87,38 @@ where
F: Fn(Rc<RefCell<OpState>>, V, BufVec) -> R + 'static,
V: DeserializeOwned,
R: Future<Output = Result<RV, AnyError>> + 'static,
- RV: Serialize,
+ RV: Serialize + 'static,
{
- let try_dispatch_op =
- move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> {
- let request_id = bufs[0]
- .get(0..8)
- .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
- .ok_or_else(|| type_error("missing or invalid `requestId`"))?;
- let args = serde_json::from_slice(&bufs[0][8..])?;
- let bufs = bufs[1..].into();
- use crate::futures::FutureExt;
- let fut = op_fn(state.clone(), args, bufs).map(move |result| {
- json_serialize_op_result(
- Some(request_id),
- result,
- state.borrow().get_error_class_fn,
- )
- });
- Ok(Op::Async(Box::pin(fut)))
- };
-
- Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
- match try_dispatch_op(state.clone(), bufs) {
- Ok(op) => op,
- Err(err) => Op::Sync(json_serialize_op_result(
- None,
- Err::<(), AnyError>(err),
- state.borrow().get_error_class_fn,
- )),
+ let try_dispatch_op = move |state: Rc<RefCell<OpState>>,
+ p: OpPayload,
+ b: Option<ZeroCopyBuf>|
+ -> Result<Op, AnyError> {
+ // For sig compat map Option<ZeroCopyBuf> to BufVec
+ let bufs: BufVec = match b {
+ Some(b) => vec![b],
+ None => vec![],
}
- })
+ .into();
+ // Parse args
+ let args = p.deserialize()?;
+
+ use crate::futures::FutureExt;
+ let fut = op_fn(state.clone(), args, bufs)
+ .map(move |result| serialize_op_result(result, state));
+ Ok(Op::Async(Box::pin(fut)))
+ };
+
+ Box::new(
+ move |state: Rc<RefCell<OpState>>,
+ p: OpPayload,
+ b: Option<ZeroCopyBuf>|
+ -> Op {
+ match try_dispatch_op(state.clone(), p, b) {
+ Ok(op) => op,
+ Err(err) => {
+ Op::Sync(serialize_op_result(Err::<(), AnyError>(err), state))
+ }
+ }
+ },
+ )
}
diff --git a/core/plugin_api.rs b/core/plugin_api.rs
index 98eb3f2ca..f91b28403 100644
--- a/core/plugin_api.rs
+++ b/core/plugin_api.rs
@@ -10,6 +10,7 @@
pub use crate::Op;
pub use crate::OpId;
+pub use crate::OpResponse;
pub use crate::ZeroCopyBuf;
pub type InitFn = fn(&mut dyn Interface);
diff --git a/core/runtime.rs b/core/runtime.rs
index 80fe90d2f..1f9e62f4f 100644
--- a/core/runtime.rs
+++ b/core/runtime.rs
@@ -20,10 +20,11 @@ use crate::modules::NoopModuleLoader;
use crate::modules::PrepareLoadFuture;
use crate::modules::RecursiveModuleLoad;
use crate::ops::*;
-use crate::shared_queue::SharedQueue;
-use crate::shared_queue::RECOMMENDED_SIZE;
-use crate::BufVec;
+use crate::OpPayload;
+use crate::OpResponse;
use crate::OpState;
+use crate::PromiseId;
+use crate::ZeroCopyBuf;
use futures::channel::mpsc;
use futures::future::poll_fn;
use futures::stream::FuturesUnordered;
@@ -45,7 +46,7 @@ use std::sync::Once;
use std::task::Context;
use std::task::Poll;
-type PendingOpFuture = Pin<Box<dyn Future<Output = (OpId, Box<[u8]>)>>>;
+type PendingOpFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResponse)>>>;
pub enum Snapshot {
Static(&'static [u8]),
@@ -99,7 +100,6 @@ struct ModEvaluate {
/// embedder slots.
pub(crate) struct JsRuntimeState {
pub global_context: Option<v8::Global<v8::Context>>,
- pub(crate) shared_ab: Option<v8::Global<v8::SharedArrayBuffer>>,
pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>,
pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>,
pub(crate) pending_promise_exceptions:
@@ -107,7 +107,6 @@ pub(crate) struct JsRuntimeState {
pending_dyn_mod_evaluate: HashMap<ModuleLoadId, DynImportModEvaluate>,
pending_mod_evaluate: Option<ModEvaluate>,
pub(crate) js_error_create_fn: Rc<JsErrorCreateFn>,
- pub(crate) shared: SharedQueue,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) have_unpolled_ops: bool,
@@ -276,11 +275,9 @@ impl JsRuntime {
pending_promise_exceptions: HashMap::new(),
pending_dyn_mod_evaluate: HashMap::new(),
pending_mod_evaluate: None,
- shared_ab: None,
js_recv_cb: None,
js_macrotask_cb: None,
js_error_create_fn,
- shared: SharedQueue::new(RECOMMENDED_SIZE),
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
op_state: Rc::new(RefCell::new(op_state)),
@@ -305,7 +302,7 @@ impl JsRuntime {
}
if !options.will_snapshot {
- js_runtime.shared_queue_init();
+ js_runtime.core_js_init();
}
js_runtime
@@ -350,16 +347,13 @@ impl JsRuntime {
.unwrap();
}
- /// Executes a JavaScript code to initialize shared queue binding
- /// between Rust and JS.
+ /// Executes JavaScript code to initialize core.js,
+ /// specifically the js_recv_cb setter
///
/// This function mustn't be called during snapshotting.
- fn shared_queue_init(&mut self) {
+ fn core_js_init(&mut self) {
self
- .execute(
- "deno:core/shared_queue_init.js",
- "Deno.core.sharedQueueInit()",
- )
+ .execute("deno:core/init.js", "Deno.core.init()")
.unwrap();
}
@@ -448,7 +442,7 @@ impl JsRuntime {
/// * [json_op_async()](fn.json_op_async.html)
pub fn register_op<F>(&mut self, name: &str, op_fn: F) -> OpId
where
- F: Fn(Rc<RefCell<OpState>>, BufVec) -> Op + 'static,
+ F: Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static,
{
Self::state(self.v8_isolate())
.borrow_mut()
@@ -516,8 +510,8 @@ impl JsRuntime {
// Ops
{
- let overflow_response = self.poll_pending_ops(cx);
- self.async_op_response(overflow_response)?;
+ let async_responses = self.poll_pending_ops(cx);
+ self.async_op_response(async_responses)?;
self.drain_macrotasks()?;
self.check_promise_exceptions()?;
}
@@ -1325,9 +1319,12 @@ impl JsRuntime {
self.mod_instantiate(root_id).map(|_| root_id)
}
- fn poll_pending_ops(&mut self, cx: &mut Context) -> Vec<(OpId, Box<[u8]>)> {
+ fn poll_pending_ops(
+ &mut self,
+ cx: &mut Context,
+ ) -> Vec<(PromiseId, OpResponse)> {
let state_rc = Self::state(self.v8_isolate());
- let mut overflow_response: Vec<(OpId, Box<[u8]>)> = Vec::new();
+ let mut async_responses: Vec<(PromiseId, OpResponse)> = Vec::new();
let mut state = state_rc.borrow_mut();
@@ -1339,11 +1336,8 @@ impl JsRuntime {
match pending_r {
Poll::Ready(None) => break,
Poll::Pending => break,
- Poll::Ready(Some((op_id, buf))) => {
- let successful_push = state.shared.push(op_id, &buf);
- if !successful_push {
- overflow_response.push((op_id, buf));
- }
+ Poll::Ready(Some((promise_id, resp))) => {
+ async_responses.push((promise_id, resp));
}
};
}
@@ -1353,16 +1347,13 @@ impl JsRuntime {
match unref_r {
Poll::Ready(None) => break,
Poll::Pending => break,
- Poll::Ready(Some((op_id, buf))) => {
- let successful_push = state.shared.push(op_id, &buf);
- if !successful_push {
- overflow_response.push((op_id, buf));
- }
+ Poll::Ready(Some((promise_id, resp))) => {
+ async_responses.push((promise_id, resp));
}
};
}
- overflow_response
+ async_responses
}
fn check_promise_exceptions(&mut self) -> Result<(), AnyError> {
@@ -1391,17 +1382,15 @@ impl JsRuntime {
exception_to_err_result(scope, exception, true)
}
- // Respond using shared queue and optionally overflown response
+ // Send finished responses to JS
fn async_op_response(
&mut self,
- overflown_responses: Vec<(OpId, Box<[u8]>)>,
+ async_responses: Vec<(PromiseId, OpResponse)>,
) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
- let shared_queue_size = state_rc.borrow().shared.size();
- let overflown_responses_size = overflown_responses.len();
-
- if shared_queue_size == 0 && overflown_responses_size == 0 {
+ let async_responses_size = async_responses.len();
+ if async_responses_size == 0 {
return Ok(());
}
@@ -1422,26 +1411,32 @@ impl JsRuntime {
let tc_scope = &mut v8::TryCatch::new(scope);
+ // We return async responses to JS in unbounded batches (may change),
+ // each batch is a flat vector of tuples:
+ // `[promise_id1, op_result1, promise_id2, op_result2, ...]`
+ // promise_id is a simple integer, op_result is an ops::OpResult
+ // which contains a value OR an error, encoded as a tuple.
+ // This batch is received in JS via the special `arguments` variable
+ // and then each tuple is used to resolve or reject promises
let mut args: Vec<v8::Local<v8::Value>> =
- Vec::with_capacity(2 * overflown_responses_size);
- for overflown_response in overflown_responses {
- let (op_id, buf) = overflown_response;
- args.push(v8::Integer::new(tc_scope, op_id as i32).into());
- args.push(bindings::boxed_slice_to_uint8array(tc_scope, buf).into());
+ Vec::with_capacity(2 * async_responses_size);
+ for overflown_response in async_responses {
+ let (promise_id, resp) = overflown_response;
+ args.push(v8::Integer::new(tc_scope, promise_id as i32).into());
+ args.push(match resp {
+ OpResponse::Value(value) => serde_v8::to_v8(tc_scope, value).unwrap(),
+ OpResponse::Buffer(buf) => {
+ bindings::boxed_slice_to_uint8array(tc_scope, buf).into()
+ }
+ });
}
- if shared_queue_size > 0 || overflown_responses_size > 0 {
+ if async_responses_size > 0 {
js_recv_cb.call(tc_scope, global, args.as_slice());
}
match tc_scope.exception() {
- None => {
- // The other side should have shifted off all the messages.
- let shared_queue_size = state_rc.borrow().shared.size();
- assert_eq!(shared_queue_size, 0);
-
- Ok(())
- }
+ None => Ok(()),
Some(exception) => exception_to_err_result(tc_scope, exception, false),
}
}
@@ -1485,7 +1480,6 @@ impl JsRuntime {
pub mod tests {
use super::*;
use crate::modules::ModuleSourceFuture;
- use crate::BufVec;
use futures::future::lazy;
use futures::FutureExt;
use std::io;
@@ -1501,31 +1495,10 @@ pub mod tests {
futures::executor::block_on(lazy(move |cx| f(cx)));
}
- fn poll_until_ready(
- runtime: &mut JsRuntime,
- max_poll_count: usize,
- ) -> Result<(), AnyError> {
- let mut cx = Context::from_waker(futures::task::noop_waker_ref());
- for _ in 0..max_poll_count {
- match runtime.poll_event_loop(&mut cx) {
- Poll::Pending => continue,
- Poll::Ready(val) => return val,
- }
- }
- panic!(
- "JsRuntime still not ready after polling {} times.",
- max_poll_count
- )
- }
-
enum Mode {
Async,
AsyncUnref,
- AsyncZeroCopy(u8),
- OverflowReqSync,
- OverflowResSync,
- OverflowReqAsync,
- OverflowResAsync,
+ AsyncZeroCopy(bool),
}
struct TestState {
@@ -1533,68 +1506,39 @@ pub mod tests {
dispatch_count: Arc<AtomicUsize>,
}
- fn dispatch(op_state: Rc<RefCell<OpState>>, bufs: BufVec) -> Op {
+ fn dispatch(
+ op_state: Rc<RefCell<OpState>>,
+ payload: OpPayload,
+ buf: Option<ZeroCopyBuf>,
+ ) -> Op {
let op_state_ = op_state.borrow();
let test_state = op_state_.borrow::<TestState>();
test_state.dispatch_count.fetch_add(1, Ordering::Relaxed);
match test_state.mode {
Mode::Async => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 1);
- assert_eq!(bufs[0][0], 42);
- let buf = vec![43u8].into_boxed_slice();
- Op::Async(futures::future::ready(buf).boxed())
+ let control: u8 = payload.deserialize().unwrap();
+ assert_eq!(control, 42);
+ let resp = OpResponse::Value(Box::new(43));
+ Op::Async(Box::pin(futures::future::ready(resp)))
}
Mode::AsyncUnref => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 1);
- assert_eq!(bufs[0][0], 42);
+ let control: u8 = payload.deserialize().unwrap();
+ assert_eq!(control, 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
- vec![43u8].into_boxed_slice()
+ OpResponse::Value(Box::new(43))
};
- Op::AsyncUnref(fut.boxed())
+ Op::AsyncUnref(Box::pin(fut))
}
- Mode::AsyncZeroCopy(count) => {
- assert_eq!(bufs.len(), count as usize);
- bufs.iter().enumerate().for_each(|(idx, buf)| {
+ Mode::AsyncZeroCopy(has_buffer) => {
+ assert_eq!(buf.is_some(), has_buffer);
+ if let Some(buf) = buf {
assert_eq!(buf.len(), 1);
- assert_eq!(idx, buf[0] as usize);
- });
+ }
- let buf = vec![43u8].into_boxed_slice();
- Op::Async(futures::future::ready(buf).boxed())
- }
- Mode::OverflowReqSync => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 100 * 1024 * 1024);
- let buf = vec![43u8].into_boxed_slice();
- Op::Sync(buf)
- }
- Mode::OverflowResSync => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 1);
- assert_eq!(bufs[0][0], 42);
- let mut vec = vec![0u8; 100 * 1024 * 1024];
- vec[0] = 99;
- let buf = vec.into_boxed_slice();
- Op::Sync(buf)
- }
- Mode::OverflowReqAsync => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 100 * 1024 * 1024);
- let buf = vec![43u8].into_boxed_slice();
- Op::Async(futures::future::ready(buf).boxed())
- }
- Mode::OverflowResAsync => {
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 1);
- assert_eq!(bufs[0][0], 42);
- let mut vec = vec![0u8; 100 * 1024 * 1024];
- vec[0] = 4;
- let buf = vec.into_boxed_slice();
- Op::Async(futures::future::ready(buf).boxed())
+ let resp = OpResponse::Value(Box::new(43));
+ Op::Async(Box::pin(futures::future::ready(resp)))
}
}
}
@@ -1633,10 +1577,10 @@ pub mod tests {
.execute(
"filename.js",
r#"
- let control = new Uint8Array([42]);
- Deno.core.send(1, control);
+ let control = 42;
+ Deno.core.send(1, null, control);
async function main() {
- Deno.core.send(1, control);
+ Deno.core.send(1, null, control);
}
main();
"#,
@@ -1647,7 +1591,7 @@ pub mod tests {
#[test]
fn test_dispatch_no_zero_copy_buf() {
- let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(0));
+ let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false));
runtime
.execute(
"filename.js",
@@ -1661,14 +1605,13 @@ pub mod tests {
#[test]
fn test_dispatch_stack_zero_copy_bufs() {
- let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(2));
+ let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(true));
runtime
.execute(
"filename.js",
r#"
let zero_copy_a = new Uint8Array([0]);
- let zero_copy_b = new Uint8Array([1]);
- Deno.core.send(1, zero_copy_a, zero_copy_b);
+ Deno.core.send(1, null, null, zero_copy_a);
"#,
)
.unwrap();
@@ -1676,23 +1619,7 @@ pub mod tests {
}
#[test]
- fn test_dispatch_heap_zero_copy_bufs() {
- let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(5));
- runtime.execute(
- "filename.js",
- r#"
- let zero_copy_a = new Uint8Array([0]);
- let zero_copy_b = new Uint8Array([1]);
- let zero_copy_c = new Uint8Array([2]);
- let zero_copy_d = new Uint8Array([3]);
- let zero_copy_e = new Uint8Array([4]);
- Deno.core.send(1, zero_copy_a, zero_copy_b, zero_copy_c, zero_copy_d, zero_copy_e);
- "#,
- ).unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- }
-
- #[test]
+ #[ignore] // TODO(ry) re-enable? setAsyncHandler has been removed
fn test_poll_async_delayed_ops() {
run_in_task(|cx| {
let (mut runtime, dispatch_count) = setup(Mode::Async);
@@ -1714,8 +1641,8 @@ pub mod tests {
"check1.js",
r#"
assert(nrecv == 0);
- let control = new Uint8Array([42]);
- Deno.core.send(1, control);
+ let control = 42;
+ Deno.core.send(1, null, control);
assert(nrecv == 0);
"#,
)
@@ -1728,7 +1655,7 @@ pub mod tests {
"check2.js",
r#"
assert(nrecv == 1);
- Deno.core.send(1, control);
+ Deno.core.send(1, null, control);
assert(nrecv == 1);
"#,
)
@@ -1743,6 +1670,7 @@ pub mod tests {
}
#[test]
+ #[ignore] // TODO(ry) re-enable? setAsyncHandler has been removed
fn test_poll_async_optional_ops() {
run_in_task(|cx| {
let (mut runtime, dispatch_count) = setup(Mode::AsyncUnref);
@@ -1754,8 +1682,8 @@ pub mod tests {
// This handler will never be called
assert(false);
});
- let control = new Uint8Array([42]);
- Deno.core.send(1, control);
+ let control = 42;
+ Deno.core.send(1, null, control);
"#,
)
.unwrap();
@@ -1818,261 +1746,9 @@ pub mod tests {
}
#[test]
- fn overflow_req_sync() {
- let (mut runtime, dispatch_count) = setup(Mode::OverflowReqSync);
- runtime
- .execute(
- "overflow_req_sync.js",
- r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ });
- // Large message that will overflow the shared space.
- let control = new Uint8Array(100 * 1024 * 1024);
- let response = Deno.core.dispatch(1, control);
- assert(response instanceof Uint8Array);
- assert(response.length == 1);
- assert(response[0] == 43);
- assert(asyncRecv == 0);
- "#,
- )
- .unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- }
-
- #[test]
- fn overflow_res_sync() {
- // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
- // should optimize this.
- let (mut runtime, dispatch_count) = setup(Mode::OverflowResSync);
- runtime
- .execute(
- "overflow_res_sync.js",
- r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ });
- // Large message that will overflow the shared space.
- let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(1, control);
- assert(response instanceof Uint8Array);
- assert(response.length == 100 * 1024 * 1024);
- assert(response[0] == 99);
- assert(asyncRecv == 0);
- "#,
- )
- .unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- }
-
- #[test]
- fn overflow_req_async() {
- run_in_task(|cx| {
- let (mut runtime, dispatch_count) = setup(Mode::OverflowReqAsync);
- runtime
- .execute(
- "overflow_req_async.js",
- r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => {
- assert(buf.byteLength === 1);
- assert(buf[0] === 43);
- asyncRecv++;
- });
- // Large message that will overflow the shared space.
- let control = new Uint8Array(100 * 1024 * 1024);
- let response = Deno.core.dispatch(1, control);
- // Async messages always have null response.
- assert(response == null);
- assert(asyncRecv == 0);
- "#,
- )
- .unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
- runtime
- .execute("check.js", "assert(asyncRecv == 1);")
- .unwrap();
- });
- }
-
- #[test]
- fn overflow_res_async_combined_with_unref() {
- run_in_task(|cx| {
- let mut runtime = JsRuntime::new(Default::default());
-
- runtime.register_op(
- "test1",
- |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
- let mut vec = vec![0u8; 100 * 1024 * 1024];
- vec[0] = 4;
- let buf = vec.into_boxed_slice();
- Op::Async(futures::future::ready(buf).boxed())
- },
- );
-
- runtime.register_op(
- "test2",
- |_op_state: Rc<RefCell<OpState>>, _bufs: BufVec| -> Op {
- let mut vec = vec![0u8; 100 * 1024 * 1024];
- vec[0] = 4;
- let buf = vec.into_boxed_slice();
- Op::AsyncUnref(futures::future::ready(buf).boxed())
- },
- );
-
- runtime
- .execute(
- "overflow_res_async_combined_with_unref.js",
- r#"
- function assert(cond) {
- if (!cond) {
- throw Error("assert");
- }
- }
-
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => {
- assert(buf.byteLength === 100 * 1024 * 1024);
- assert(buf[0] === 4);
- asyncRecv++;
- });
- Deno.core.setAsyncHandler(2, (buf) => {
- assert(buf.byteLength === 100 * 1024 * 1024);
- assert(buf[0] === 4);
- asyncRecv++;
- });
- let control = new Uint8Array(1);
- let response1 = Deno.core.dispatch(1, control);
- // Async messages always have null response.
- assert(response1 == null);
- assert(asyncRecv == 0);
- let response2 = Deno.core.dispatch(2, control);
- // Async messages always have null response.
- assert(response2 == null);
- assert(asyncRecv == 0);
- "#,
- )
- .unwrap();
- assert!(matches!(runtime.poll_event_loop(cx), Poll::Ready(Ok(_))));
- runtime
- .execute("check.js", "assert(asyncRecv == 2);")
- .unwrap();
- });
- }
-
- #[test]
- fn overflow_res_async() {
- run_in_task(|_cx| {
- // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
- // should optimize this.
- let (mut runtime, dispatch_count) = setup(Mode::OverflowResAsync);
- runtime
- .execute(
- "overflow_res_async.js",
- r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => {
- assert(buf.byteLength === 100 * 1024 * 1024);
- assert(buf[0] === 4);
- asyncRecv++;
- });
- // Large message that will overflow the shared space.
- let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(1, control);
- assert(response == null);
- assert(asyncRecv == 0);
- "#,
- )
- .unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
- poll_until_ready(&mut runtime, 3).unwrap();
- runtime
- .execute("check.js", "assert(asyncRecv == 1);")
- .unwrap();
- });
- }
-
- #[test]
- fn overflow_res_multiple_dispatch_async() {
- // TODO(ry) This test is quite slow due to memcpy-ing 100MB into JS. We
- // should optimize this.
- run_in_task(|_cx| {
- let (mut runtime, dispatch_count) = setup(Mode::OverflowResAsync);
- runtime
- .execute(
- "overflow_res_multiple_dispatch_async.js",
- r#"
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => {
- assert(buf.byteLength === 100 * 1024 * 1024);
- assert(buf[0] === 4);
- asyncRecv++;
- });
- // Large message that will overflow the shared space.
- let control = new Uint8Array([42]);
- let response = Deno.core.dispatch(1, control);
- assert(response == null);
- assert(asyncRecv == 0);
- // Dispatch another message to verify that pending ops
- // are done even if shared space overflows
- Deno.core.dispatch(1, control);
- "#,
- )
- .unwrap();
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
- poll_until_ready(&mut runtime, 3).unwrap();
- runtime
- .execute("check.js", "assert(asyncRecv == 2);")
- .unwrap();
- });
- }
-
- #[test]
- fn shared_queue_not_empty_when_js_error() {
- run_in_task(|_cx| {
- let dispatch_count = Arc::new(AtomicUsize::new(0));
- let mut runtime = JsRuntime::new(Default::default());
- let op_state = runtime.op_state();
- op_state.borrow_mut().put(TestState {
- mode: Mode::Async,
- dispatch_count: dispatch_count.clone(),
- });
-
- runtime.register_op("test", dispatch);
- runtime
- .execute(
- "shared_queue_not_empty_when_js_error.js",
- r#"
- const assert = (cond) => {if (!cond) throw Error("assert")};
- let asyncRecv = 0;
- Deno.core.setAsyncHandler(1, (buf) => {
- asyncRecv++;
- throw Error('x');
- });
-
- Deno.core.dispatch(1, new Uint8Array([42]));
- Deno.core.dispatch(1, new Uint8Array([42]));
- "#,
- )
- .unwrap();
-
- assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
- if poll_until_ready(&mut runtime, 3).is_ok() {
- panic!("Thrown error was not detected!")
- }
- runtime
- .execute("check.js", "assert(asyncRecv == 1);")
- .unwrap();
-
- let state_rc = JsRuntime::state(runtime.v8_isolate());
- let shared_queue_size = state_rc.borrow().shared.size();
- assert_eq!(shared_queue_size, 1);
- });
- }
-
- #[test]
fn test_pre_dispatch() {
run_in_task(|mut cx| {
- let (mut runtime, _dispatch_count) = setup(Mode::OverflowResAsync);
+ let (mut runtime, _dispatch_count) = setup(Mode::Async);
runtime
.execute(
"bad_op_id.js",
@@ -2094,19 +1770,6 @@ pub mod tests {
}
#[test]
- fn core_test_js() {
- run_in_task(|mut cx| {
- let (mut runtime, _dispatch_count) = setup(Mode::Async);
- runtime
- .execute("core_test.js", include_str!("core_test.js"))
- .unwrap();
- if let Poll::Ready(Err(_)) = runtime.poll_event_loop(&mut cx) {
- unreachable!();
- }
- });
- }
-
- #[test]
fn syntax_error() {
let mut runtime = JsRuntime::new(Default::default());
let src = "hocuspocus(";
@@ -2315,13 +1978,12 @@ pub mod tests {
let dispatch_count = Arc::new(AtomicUsize::new(0));
let dispatch_count_ = dispatch_count.clone();
- let dispatcher = move |_state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
+ let dispatcher = move |_state, payload: OpPayload, _buf| -> Op {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
- assert_eq!(bufs.len(), 1);
- assert_eq!(bufs[0].len(), 1);
- assert_eq!(bufs[0][0], 42);
- let buf = [43u8, 0, 0, 0][..].into();
- Op::Async(futures::future::ready(buf).boxed())
+ let control: u8 = payload.deserialize().unwrap();
+ assert_eq!(control, 42);
+ let resp = OpResponse::Value(Box::new(43));
+ Op::Async(Box::pin(futures::future::ready(resp)))
};
let mut runtime = JsRuntime::new(RuntimeOptions {
@@ -2353,8 +2015,8 @@ pub mod tests {
r#"
import { b } from './b.js'
if (b() != 'b') throw Error();
- let control = new Uint8Array([42]);
- Deno.core.send(1, control);
+ let control = 42;
+ Deno.core.send(1, null, control);
"#,
)
.unwrap();
diff --git a/core/shared_queue.rs b/core/shared_queue.rs
deleted file mode 100644
index dda54a4df..000000000
--- a/core/shared_queue.rs
+++ /dev/null
@@ -1,313 +0,0 @@
-// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-/*
-SharedQueue Binary Layout
-+-------------------------------+-------------------------------+
-| NUM_RECORDS (32) |
-+---------------------------------------------------------------+
-| NUM_SHIFTED_OFF (32) |
-+---------------------------------------------------------------+
-| HEAD (32) |
-+---------------------------------------------------------------+
-| OFFSETS (32) |
-+---------------------------------------------------------------+
-| RECORD_ENDS (*MAX_RECORDS) ...
-+---------------------------------------------------------------+
-| RECORDS (*MAX_RECORDS) ...
-+---------------------------------------------------------------+
- */
-
-use crate::bindings;
-use crate::ops::OpId;
-use log::debug;
-use rusty_v8 as v8;
-use std::convert::TryInto;
-
-const MAX_RECORDS: usize = 100;
-/// Total number of records added.
-const INDEX_NUM_RECORDS: usize = 0;
-/// Number of records that have been shifted off.
-const INDEX_NUM_SHIFTED_OFF: usize = 1;
-/// The head is the number of initialized bytes in SharedQueue.
-/// It grows monotonically.
-const INDEX_HEAD: usize = 2;
-const INDEX_OFFSETS: usize = 3;
-const INDEX_RECORDS: usize = INDEX_OFFSETS + 2 * MAX_RECORDS;
-/// Byte offset of where the records begin. Also where the head starts.
-const HEAD_INIT: usize = 4 * INDEX_RECORDS;
-/// A rough guess at how big we should make the shared buffer in bytes.
-pub const RECOMMENDED_SIZE: usize = 128 * MAX_RECORDS;
-
-pub struct SharedQueue {
- buf: v8::SharedRef<v8::BackingStore>,
-}
-
-impl SharedQueue {
- pub fn new(len: usize) -> Self {
- let buf = vec![0; HEAD_INIT + len].into_boxed_slice();
- let buf = v8::SharedArrayBuffer::new_backing_store_from_boxed_slice(buf);
- let mut q = Self {
- buf: buf.make_shared(),
- };
- q.reset();
- q
- }
-
- pub fn get_backing_store(&mut self) -> &mut v8::SharedRef<v8::BackingStore> {
- &mut self.buf
- }
-
- pub fn bytes(&self) -> &[u8] {
- unsafe {
- bindings::get_backing_store_slice(&self.buf, 0, self.buf.byte_length())
- }
- }
-
- pub fn bytes_mut(&mut self) -> &mut [u8] {
- unsafe {
- bindings::get_backing_store_slice_mut(
- &self.buf,
- 0,
- self.buf.byte_length(),
- )
- }
- }
-
- fn reset(&mut self) {
- debug!("rust:shared_queue:reset");
- let s: &mut [u32] = self.as_u32_slice_mut();
- s[INDEX_NUM_RECORDS] = 0;
- s[INDEX_NUM_SHIFTED_OFF] = 0;
- s[INDEX_HEAD] = HEAD_INIT as u32;
- }
-
- fn as_u32_slice(&self) -> &[u32] {
- let p = self.bytes().as_ptr();
- // Assert pointer is 32 bit aligned before casting.
- assert_eq!((p as usize) % std::mem::align_of::<u32>(), 0);
- #[allow(clippy::cast_ptr_alignment)]
- let p32 = p as *const u32;
- unsafe { std::slice::from_raw_parts(p32, self.bytes().len() / 4) }
- }
-
- fn as_u32_slice_mut(&mut self) -> &mut [u32] {
- let p = self.bytes_mut().as_mut_ptr();
- // Assert pointer is 32 bit aligned before casting.
- assert_eq!((p as usize) % std::mem::align_of::<u32>(), 0);
- #[allow(clippy::cast_ptr_alignment)]
- let p32 = p as *mut u32;
- unsafe { std::slice::from_raw_parts_mut(p32, self.bytes().len() / 4) }
- }
-
- pub fn size(&self) -> usize {
- let s = self.as_u32_slice();
- (s[INDEX_NUM_RECORDS] - s[INDEX_NUM_SHIFTED_OFF]) as usize
- }
-
- fn num_records(&self) -> usize {
- let s = self.as_u32_slice();
- s[INDEX_NUM_RECORDS] as usize
- }
-
- fn head(&self) -> usize {
- let s = self.as_u32_slice();
- s[INDEX_HEAD] as usize
- }
-
- fn num_shifted_off(&self) -> usize {
- let s = self.as_u32_slice();
- s[INDEX_NUM_SHIFTED_OFF] as usize
- }
-
- fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) {
- let s = self.as_u32_slice_mut();
- s[INDEX_OFFSETS + 2 * index] = end as u32;
- s[INDEX_OFFSETS + 2 * index + 1] = op_id.try_into().unwrap();
- }
-
- #[cfg(test)]
- fn get_meta(&self, index: usize) -> Option<(OpId, usize)> {
- if index < self.num_records() {
- let s = self.as_u32_slice();
- let end = s[INDEX_OFFSETS + 2 * index] as usize;
- let op_id = s[INDEX_OFFSETS + 2 * index + 1] as OpId;
- Some((op_id, end))
- } else {
- None
- }
- }
-
- #[cfg(test)]
- fn get_offset(&self, index: usize) -> Option<usize> {
- if index < self.num_records() {
- Some(if index == 0 {
- HEAD_INIT
- } else {
- let s = self.as_u32_slice();
- let prev_end = s[INDEX_OFFSETS + 2 * (index - 1)] as usize;
- (prev_end + 3) & !3
- })
- } else {
- None
- }
- }
-
- /// Returns none if empty.
- #[cfg(test)]
- pub fn shift(&mut self) -> Option<(OpId, &[u8])> {
- let u32_slice = self.as_u32_slice();
- let i = u32_slice[INDEX_NUM_SHIFTED_OFF] as usize;
- if self.size() == 0 {
- assert_eq!(i, 0);
- return None;
- }
-
- let off = self.get_offset(i).unwrap();
- let (op_id, end) = self.get_meta(i).unwrap();
- if self.size() > 1 {
- let u32_slice = self.as_u32_slice_mut();
- u32_slice[INDEX_NUM_SHIFTED_OFF] += 1;
- } else {
- self.reset();
- }
- println!(
- "rust:shared_queue:shift: num_records={}, num_shifted_off={}, head={}",
- self.num_records(),
- self.num_shifted_off(),
- self.head()
- );
- Some((op_id, &self.bytes()[off..end]))
- }
-
- /// Because JS-side may cast popped message to Int32Array it is required
- /// that every message is aligned to 4-bytes.
- pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool {
- let off = self.head();
- assert_eq!(off % 4, 0);
- let end = off + record.len();
- let aligned_end = (end + 3) & !3;
- debug!(
- "rust:shared_queue:pre-push: op={}, off={}, end={}, len={}, aligned_end={}",
- op_id,
- off,
- end,
- record.len(),
- aligned_end,
- );
- let index = self.num_records();
- if aligned_end > self.bytes().len() || index >= MAX_RECORDS {
- debug!("WARNING the sharedQueue overflowed");
- return false;
- }
- assert_eq!(aligned_end % 4, 0);
- self.set_meta(index, end, op_id);
- assert_eq!(end - off, record.len());
- self.bytes_mut()[off..end].copy_from_slice(record);
- let u32_slice = self.as_u32_slice_mut();
- u32_slice[INDEX_NUM_RECORDS] += 1;
- u32_slice[INDEX_HEAD] = aligned_end as u32;
- debug!(
- "rust:shared_queue:push: num_records={}, num_shifted_off={}, head={}",
- self.num_records(),
- self.num_shifted_off(),
- self.head()
- );
- true
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn basic() {
- let mut q = SharedQueue::new(RECOMMENDED_SIZE);
-
- let h = q.head();
- assert!(h > 0);
-
- let r = vec![1u8, 2, 3, 4].into_boxed_slice();
- let len = r.len() + h;
- assert!(q.push(0, &r));
- assert_eq!(q.head(), len);
-
- let r = vec![5, 6, 7, 8].into_boxed_slice();
- assert!(q.push(0, &r));
-
- let r = vec![9, 10, 11, 12].into_boxed_slice();
- assert!(q.push(0, &r));
- assert_eq!(q.num_records(), 3);
- assert_eq!(q.size(), 3);
-
- let (_op_id, r) = q.shift().unwrap();
- assert_eq!(r, vec![1, 2, 3, 4].as_slice());
- assert_eq!(q.num_records(), 3);
- assert_eq!(q.size(), 2);
-
- let (_op_id, r) = q.shift().unwrap();
- assert_eq!(r, vec![5, 6, 7, 8].as_slice());
- assert_eq!(q.num_records(), 3);
- assert_eq!(q.size(), 1);
-
- let (_op_id, r) = q.shift().unwrap();
- assert_eq!(r, vec![9, 10, 11, 12].as_slice());
- assert_eq!(q.num_records(), 0);
- assert_eq!(q.size(), 0);
-
- assert!(q.shift().is_none());
- assert!(q.shift().is_none());
-
- assert_eq!(q.num_records(), 0);
- assert_eq!(q.size(), 0);
- }
-
- fn alloc_buf(byte_length: usize) -> Box<[u8]> {
- vec![0; byte_length].into_boxed_slice()
- }
-
- #[test]
- fn overflow() {
- let mut q = SharedQueue::new(RECOMMENDED_SIZE);
- assert!(q.push(0, &alloc_buf(RECOMMENDED_SIZE - 5)));
- assert_eq!(q.size(), 1);
- assert!(!q.push(0, &alloc_buf(6)));
- assert_eq!(q.size(), 1);
- assert!(q.push(0, &alloc_buf(1)));
- assert_eq!(q.size(), 2);
-
- let (_op_id, buf) = q.shift().unwrap();
- assert_eq!(buf.len(), RECOMMENDED_SIZE - 5);
- assert_eq!(q.size(), 1);
-
- assert!(!q.push(0, &alloc_buf(1)));
-
- let (_op_id, buf) = q.shift().unwrap();
- assert_eq!(buf.len(), 1);
- assert_eq!(q.size(), 0);
- }
-
- #[test]
- fn full_records() {
- let mut q = SharedQueue::new(RECOMMENDED_SIZE);
- for _ in 0..MAX_RECORDS {
- assert!(q.push(0, &alloc_buf(1)))
- }
- assert_eq!(q.push(0, &alloc_buf(1)), false);
- // Even if we shift one off, we still cannot push a new record.
- let _ignored = q.shift().unwrap();
- assert_eq!(q.push(0, &alloc_buf(1)), false);
- }
-
- #[test]
- fn allow_any_buf_length() {
- let mut q = SharedQueue::new(RECOMMENDED_SIZE);
- // Check that `record` that has length not a multiple of 4 will
- // not cause panic. Still make sure that records are always
- // aligned to 4 bytes.
- for i in 1..9 {
- q.push(0, &alloc_buf(i));
- assert_eq!(q.num_records(), i);
- assert_eq!(q.head() % 4, 0);
- }
- }
-}