summaryrefslogtreecommitdiff
path: root/core/core.js
diff options
context:
space:
mode:
Diffstat (limited to 'core/core.js')
-rw-r--r--core/core.js266
1 files changed, 197 insertions, 69 deletions
diff --git a/core/core.js b/core/core.js
index f44bf253e..2de8e1fff 100644
--- a/core/core.js
+++ b/core/core.js
@@ -30,13 +30,22 @@ SharedQueue Binary Layout
const core = window.Deno.core;
const { recv, send } = core;
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////////// Dispatch /////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const dispatch = send;
+ const dispatchByName = (opName, control, ...zeroCopy) =>
+ dispatch(opsCache[opName], control, ...zeroCopy);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ //////////////////////////////////// Shared array buffer ///////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
let sharedBytes;
let shared32;
- let asyncHandlers;
-
let opsCache = {};
- const errorMap = {};
function init() {
const shared = core.shared;
@@ -45,6 +54,7 @@ SharedQueue Binary Layout
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);
+
asyncHandlers = [];
// Callers should not call core.recv, use setAsyncHandler.
recv(handleAsyncMsgFromRust);
@@ -150,15 +160,43 @@ SharedQueue Binary Layout
return [opId, buf];
}
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////// Error handling //////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const errorMap = {};
+
+ function registerErrorClass(errorName, className, args) {
+ if (typeof errorMap[errorName] !== "undefined") {
+ throw new TypeError(`Error class for "${errorName}" already registered`);
+ }
+ errorMap[errorName] = [className, args ?? []];
+ }
+
+ function handleError(className, message) {
+ if (typeof errorMap[className] === "undefined") {
+ return new Error(
+ `Unregistered error class: "${className}"\n` +
+ ` ${message}\n` +
+ ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
+ );
+ }
+
+ const [ErrorClass, args] = errorMap[className];
+ return new ErrorClass(message, ...args);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////// Async handling //////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ let asyncHandlers = [];
+
function setAsyncHandler(opId, cb) {
assert(opId != null);
asyncHandlers[opId] = cb;
}
- function setAsyncHandlerByName(opName, cb) {
- setAsyncHandler(opsCache[opName], cb);
- }
-
function handleAsyncMsgFromRust() {
while (true) {
const opIdBuf = shift();
@@ -166,108 +204,196 @@ SharedQueue Binary Layout
break;
}
assert(asyncHandlers[opIdBuf[0]] != null);
- asyncHandlers[opIdBuf[0]](opIdBuf[1]);
+ asyncHandlers[opIdBuf[0]](opIdBuf[1], true);
}
for (let i = 0; i < arguments.length; i += 2) {
- asyncHandlers[arguments[i]](arguments[i + 1]);
+ asyncHandlers[arguments[i]](arguments[i + 1], false);
}
}
- function dispatch(opName, control, ...zeroCopy) {
- return send(opsCache[opName], control, ...zeroCopy);
- }
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////// General sync & async ops handling ////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
- function registerErrorClass(errorName, className, args) {
- if (typeof errorMap[errorName] !== "undefined") {
- throw new TypeError(`Error class for "${errorName}" already registered`);
+ let nextRequestId = 1;
+ const promiseTable = {};
+
+ function asyncHandle(u8Array, isCopyNeeded, opResultParser) {
+ const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded);
+ if (error !== null) {
+ promiseTable[requestId][1](error);
+ } else {
+ promiseTable[requestId][0](result);
}
- errorMap[errorName] = [className, args ?? []];
+ delete promiseTable[requestId];
}
- function getErrorClassAndArgs(errorName) {
- return errorMap[errorName] ?? [undefined, []];
+ function opAsync(opName, opRequestBuilder, opResultParser) {
+ const opId = opsCache[opName];
+ // Make sure requests of this type are handled by the asyncHandler
+ // The asyncHandler's role is to call the "promiseTable[requestId]" function
+ if (typeof asyncHandlers[opId] === "undefined") {
+ asyncHandlers[opId] = (buffer, isCopyNeeded) =>
+ asyncHandle(buffer, isCopyNeeded, opResultParser);
+ }
+
+ const requestId = nextRequestId++;
+
+ // Create and store promise
+ const promise = new Promise((resolve, reject) => {
+ promiseTable[requestId] = [resolve, reject];
+ });
+
+ // Synchronously dispatch async request
+ core.dispatch(opId, ...opRequestBuilder(requestId));
+
+ // Wait for async response
+ return promise;
}
- // Returns Uint8Array
- function encodeJson(args) {
- const s = JSON.stringify(args);
- return core.encode(s);
+ function opSync(opName, opRequestBuilder, opResultParser) {
+ const opId = opsCache[opName];
+ const u8Array = core.dispatch(opId, ...opRequestBuilder());
+
+ const [_, result, error] = opResultParser(u8Array, false);
+ if (error !== null) throw error;
+ return result;
}
- function decodeJson(ui8) {
- const s = core.decode(ui8);
- return JSON.parse(s);
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////// Bin ops handling /////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const binRequestHeaderByteLength = 8 + 4;
+ const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength);
+ const scratchView = new DataView(scratchBuffer);
+
+ function binOpBuildRequest(requestId, argument, zeroCopy) {
+ scratchView.setBigUint64(0, BigInt(requestId), true);
+ scratchView.setUint32(8, argument, true);
+ return [scratchView, ...zeroCopy];
}
- let nextPromiseId = 1;
- const promiseTable = {};
+ function binOpParseResult(u8Array, isCopyNeeded) {
+ // Decode header value from u8Array
+ const headerByteLength = 8 + 2 * 4;
+ assert(u8Array.byteLength >= headerByteLength);
+ assert(u8Array.byteLength % 4 == 0);
+ const view = new DataView(
+ u8Array.buffer,
+ u8Array.byteOffset + u8Array.byteLength - headerByteLength,
+ headerByteLength,
+ );
+
+ const requestId = Number(view.getBigUint64(0, true));
+ const status = view.getUint32(8, true);
+ const result = view.getUint32(12, true);
+
+ // Error handling
+ if (status !== 0) {
+ const className = core.decode(u8Array.subarray(0, result));
+ const message = core.decode(u8Array.subarray(result, -headerByteLength))
+ .trim();
+
+ return [requestId, null, handleError(className, message)];
+ }
- function processResponse(res) {
- if ("ok" in res) {
- return res.ok;
+ if (u8Array.byteLength === headerByteLength) {
+ return [requestId, result, null];
}
- const [ErrorClass, args] = getErrorClassAndArgs(res.err.className);
- if (!ErrorClass) {
- throw new Error(
- `Unregistered error class: "${res.err.className}"\n ${res.err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
- );
+
+ // Rest of response buffer is passed as reference or as a copy
+ let respBuffer = null;
+ if (isCopyNeeded) {
+ // Copy part of the response array (if sent through shared array buf)
+ respBuffer = u8Array.slice(0, result);
+ } else {
+ // Create view on existing array (if sent through overflow)
+ respBuffer = u8Array.subarray(0, result);
}
- throw new ErrorClass(res.err.message, ...args);
+
+ return [requestId, respBuffer, null];
}
- async function jsonOpAsync(opName, args = null, ...zeroCopy) {
- setAsyncHandler(opsCache[opName], jsonOpAsyncHandler);
-
- const promiseId = nextPromiseId++;
- const reqBuf = core.encode("\0".repeat(8) + JSON.stringify(args));
- new DataView(reqBuf.buffer).setBigUint64(0, BigInt(promiseId));
- dispatch(opName, reqBuf, ...zeroCopy);
- let resolve, reject;
- const promise = new Promise((resolve_, reject_) => {
- resolve = resolve_;
- reject = reject_;
- });
- promise.resolve = resolve;
- promise.reject = reject;
- promiseTable[promiseId] = promise;
- return processResponse(await promise);
+ function binOpAsync(opName, argument = 0, ...zeroCopy) {
+ return opAsync(
+ opName,
+ (requestId) => binOpBuildRequest(requestId, argument, zeroCopy),
+ binOpParseResult,
+ );
+ }
+
+ function binOpSync(opName, argument = 0, ...zeroCopy) {
+ return opSync(
+ opName,
+ () => binOpBuildRequest(0, argument, zeroCopy),
+ binOpParseResult,
+ );
}
- function jsonOpSync(opName, args = null, ...zeroCopy) {
- const argsBuf = encodeJson(args);
- const res = dispatch(opName, argsBuf, ...zeroCopy);
- return processResponse(decodeJson(res));
+ ////////////////////////////////////////////////////////////////////////////////////////////
+ ///////////////////////////////////// Json ops handling ////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////////////////////
+
+ const jsonRequestHeaderLength = 8;
+
+ function jsonOpBuildRequest(requestId, argument, zeroCopy) {
+ const u8Array = core.encode(
+ "\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument),
+ );
+ new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true);
+ return [u8Array, ...zeroCopy];
}
- function jsonOpAsyncHandler(buf) {
- // Json Op.
- const res = decodeJson(buf);
- const promise = promiseTable[res.promiseId];
- delete promiseTable[res.promiseId];
- promise.resolve(res);
+ function jsonOpParseResult(u8Array, _) {
+ const data = JSON.parse(core.decode(u8Array));
+
+ if ("err" in data) {
+ return [
+ data.requestId,
+ null,
+ handleError(data.err.className, data.err.message),
+ ];
+ }
+
+ return [data.requestId, data.ok, null];
+ }
+
+ function jsonOpAsync(opName, argument = null, ...zeroCopy) {
+ return opAsync(
+ opName,
+ (requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy),
+ jsonOpParseResult,
+ );
+ }
+
+ function jsonOpSync(opName, argument = null, ...zeroCopy) {
+ return opSync(
+ opName,
+ () => [core.encode(JSON.stringify(argument)), ...zeroCopy],
+ jsonOpParseResult,
+ );
}
function resources() {
return jsonOpSync("op_resources");
}
-
function close(rid) {
- jsonOpSync("op_close", { rid });
+ return jsonOpSync("op_close", { rid });
}
Object.assign(window.Deno.core, {
jsonOpAsync,
jsonOpSync,
- setAsyncHandler,
- setAsyncHandlerByName,
- dispatch: send,
- dispatchByName: dispatch,
+ binOpAsync,
+ binOpSync,
+ dispatch,
+ dispatchByName,
ops,
close,
resources,
registerErrorClass,
- getErrorClassAndArgs,
sharedQueueInit: init,
// sharedQueue is private but exposed for testing.
sharedQueue: {
@@ -279,5 +405,7 @@ SharedQueue Binary Layout
reset,
shift,
},
+ // setAsyncHandler is private but exposed for testing.
+ setAsyncHandler,
});
})(this);