summaryrefslogtreecommitdiff
path: root/core/core.js
diff options
context:
space:
mode:
Diffstat (limited to 'core/core.js')
-rw-r--r--core/core.js401
1 files changed, 57 insertions, 344 deletions
diff --git a/core/core.js b/core/core.js
index 2de8e1fff..7c39c1bae 100644
--- a/core/core.js
+++ b/core/core.js
@@ -1,171 +1,37 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
-/*
-SharedQueue Binary Layout
-+-------------------------------+-------------------------------+
-| NUM_RECORDS (32) |
-+---------------------------------------------------------------+
-| NUM_SHIFTED_OFF (32) |
-+---------------------------------------------------------------+
-| HEAD (32) |
-+---------------------------------------------------------------+
-| OFFSETS (32) |
-+---------------------------------------------------------------+
-| RECORD_ENDS (*MAX_RECORDS) ...
-+---------------------------------------------------------------+
-| RECORDS (*MAX_RECORDS) ...
-+---------------------------------------------------------------+
- */
"use strict";
((window) => {
- const MAX_RECORDS = 100;
- const INDEX_NUM_RECORDS = 0;
- const INDEX_NUM_SHIFTED_OFF = 1;
- const INDEX_HEAD = 2;
- const INDEX_OFFSETS = 3;
- const INDEX_RECORDS = INDEX_OFFSETS + 2 * MAX_RECORDS;
- const HEAD_INIT = 4 * INDEX_RECORDS;
-
// Available on start due to bindings.
const core = window.Deno.core;
const { recv, send } = core;
- ////////////////////////////////////////////////////////////////////////////////////////////
- ///////////////////////////////////////// Dispatch /////////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- const dispatch = send;
- const dispatchByName = (opName, control, ...zeroCopy) =>
- dispatch(opsCache[opName], control, ...zeroCopy);
-
- ////////////////////////////////////////////////////////////////////////////////////////////
- //////////////////////////////////// Shared array buffer ///////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- let sharedBytes;
- let shared32;
-
let opsCache = {};
+ const errorMap = {};
+ let nextPromiseId = 1;
+ const promiseTable = new Map();
function init() {
- const shared = core.shared;
- assert(shared.byteLength > 0);
- assert(sharedBytes == null);
- assert(shared32 == null);
- sharedBytes = new Uint8Array(shared);
- shared32 = new Int32Array(shared);
-
- asyncHandlers = [];
- // Callers should not call core.recv, use setAsyncHandler.
recv(handleAsyncMsgFromRust);
}
function ops() {
// op id 0 is a special value to retrieve the map of registered ops.
- const opsMapBytes = send(0);
- const opsMapJson = String.fromCharCode.apply(null, opsMapBytes);
- opsCache = JSON.parse(opsMapJson);
- return { ...opsCache };
- }
-
- function assert(cond) {
- if (!cond) {
- throw Error("assert");
- }
- }
-
- function reset() {
- shared32[INDEX_NUM_RECORDS] = 0;
- shared32[INDEX_NUM_SHIFTED_OFF] = 0;
- shared32[INDEX_HEAD] = HEAD_INIT;
- }
-
- function head() {
- return shared32[INDEX_HEAD];
- }
-
- function numRecords() {
- return shared32[INDEX_NUM_RECORDS];
- }
-
- function size() {
- return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
- }
-
- function setMeta(index, end, opId) {
- shared32[INDEX_OFFSETS + 2 * index] = end;
- shared32[INDEX_OFFSETS + 2 * index + 1] = opId;
- }
-
- function getMeta(index) {
- if (index >= numRecords()) {
- return null;
- }
- const buf = shared32[INDEX_OFFSETS + 2 * index];
- const opId = shared32[INDEX_OFFSETS + 2 * index + 1];
- return [opId, buf];
+ const newOpsCache = Object.fromEntries(send(0));
+ opsCache = Object.freeze(newOpsCache);
+ return opsCache;
}
- function getOffset(index) {
- if (index >= numRecords()) {
- return null;
- }
- if (index == 0) {
- return HEAD_INIT;
- }
- const prevEnd = shared32[INDEX_OFFSETS + 2 * (index - 1)];
- return (prevEnd + 3) & ~3;
- }
-
- function push(opId, buf) {
- const off = head();
- const end = off + buf.byteLength;
- const alignedEnd = (end + 3) & ~3;
- const index = numRecords();
- const shouldNotPush = alignedEnd > shared32.byteLength ||
- index >= MAX_RECORDS;
- if (shouldNotPush) {
- // console.log("shared_queue.js push fail");
- return false;
+ function handleAsyncMsgFromRust() {
+ for (let i = 0; i < arguments.length; i += 2) {
+ opAsyncHandler(arguments[i], arguments[i + 1]);
}
- setMeta(index, end, opId);
- assert(alignedEnd % 4 === 0);
- assert(end - off == buf.byteLength);
- sharedBytes.set(buf, off);
- shared32[INDEX_NUM_RECORDS] += 1;
- shared32[INDEX_HEAD] = alignedEnd;
- return true;
}
- /// Returns null if empty.
- function shift() {
- const i = shared32[INDEX_NUM_SHIFTED_OFF];
- if (size() == 0) {
- assert(i == 0);
- return null;
- }
-
- const off = getOffset(i);
- const [opId, end] = getMeta(i);
-
- if (size() > 1) {
- shared32[INDEX_NUM_SHIFTED_OFF] += 1;
- } else {
- reset();
- }
-
- assert(off != null);
- assert(end != null);
- const buf = sharedBytes.subarray(off, end);
- return [opId, buf];
+ function dispatch(opName, promiseId, control, zeroCopy) {
+ return send(opsCache[opName], promiseId, control, zeroCopy);
}
- ////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////// Error handling //////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- const errorMap = {};
-
function registerErrorClass(errorName, className, args) {
if (typeof errorMap[errorName] !== "undefined") {
throw new TypeError(`Error class for "${errorName}" already registered`);
@@ -173,239 +39,86 @@ SharedQueue Binary Layout
errorMap[errorName] = [className, args ?? []];
}
- function handleError(className, message) {
- if (typeof errorMap[className] === "undefined") {
- return new Error(
- `Unregistered error class: "${className}"\n` +
- ` ${message}\n` +
- ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
- );
- }
-
- const [ErrorClass, args] = errorMap[className];
- return new ErrorClass(message, ...args);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////
- ////////////////////////////////////// Async handling //////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- let asyncHandlers = [];
-
- function setAsyncHandler(opId, cb) {
- assert(opId != null);
- asyncHandlers[opId] = cb;
+ function getErrorClassAndArgs(errorName) {
+ return errorMap[errorName] ?? [undefined, []];
}
- function handleAsyncMsgFromRust() {
- while (true) {
- const opIdBuf = shift();
- if (opIdBuf == null) {
- break;
- }
- assert(asyncHandlers[opIdBuf[0]] != null);
- asyncHandlers[opIdBuf[0]](opIdBuf[1], true);
- }
-
- for (let i = 0; i < arguments.length; i += 2) {
- asyncHandlers[arguments[i]](arguments[i + 1], false);
+ function processResponse(res) {
+ // const [ok, err] = res;
+ if (res[1] === null) {
+ return res[0];
}
+ throw processErr(res[1]);
}
- ////////////////////////////////////////////////////////////////////////////////////////////
- ///////////////////////////// General sync & async ops handling ////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- let nextRequestId = 1;
- const promiseTable = {};
-
- function asyncHandle(u8Array, isCopyNeeded, opResultParser) {
- const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded);
- if (error !== null) {
- promiseTable[requestId][1](error);
- } else {
- promiseTable[requestId][0](result);
+ function processErr(err) {
+ const [ErrorClass, args] = getErrorClassAndArgs(err.className);
+ if (!ErrorClass) {
+ return new Error(
+ `Unregistered error class: "${err.className}"\n ${err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
+ );
}
- delete promiseTable[requestId];
+ return new ErrorClass(err.message, ...args);
}
- function opAsync(opName, opRequestBuilder, opResultParser) {
- const opId = opsCache[opName];
- // Make sure requests of this type are handled by the asyncHandler
- // The asyncHandler's role is to call the "promiseTable[requestId]" function
- if (typeof asyncHandlers[opId] === "undefined") {
- asyncHandlers[opId] = (buffer, isCopyNeeded) =>
- asyncHandle(buffer, isCopyNeeded, opResultParser);
- }
-
- const requestId = nextRequestId++;
-
- // Create and store promise
- const promise = new Promise((resolve, reject) => {
- promiseTable[requestId] = [resolve, reject];
+ function jsonOpAsync(opName, args = null, zeroCopy = null) {
+ const promiseId = nextPromiseId++;
+ const maybeError = dispatch(opName, promiseId, args, zeroCopy);
+ // Handle sync error (e.g: error parsing args)
+ if (maybeError) processResponse(maybeError);
+ let resolve, reject;
+ const promise = new Promise((resolve_, reject_) => {
+ resolve = resolve_;
+ reject = reject_;
});
-
- // Synchronously dispatch async request
- core.dispatch(opId, ...opRequestBuilder(requestId));
-
- // Wait for async response
+ promise.resolve = resolve;
+ promise.reject = reject;
+ promiseTable.set(promiseId, promise);
return promise;
}
- function opSync(opName, opRequestBuilder, opResultParser) {
- const opId = opsCache[opName];
- const u8Array = core.dispatch(opId, ...opRequestBuilder());
-
- const [_, result, error] = opResultParser(u8Array, false);
- if (error !== null) throw error;
- return result;
+ function jsonOpSync(opName, args = null, zeroCopy = null) {
+ return processResponse(dispatch(opName, null, args, zeroCopy));
}
- ////////////////////////////////////////////////////////////////////////////////////////////
- ///////////////////////////////////// Bin ops handling /////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- const binRequestHeaderByteLength = 8 + 4;
- const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength);
- const scratchView = new DataView(scratchBuffer);
-
- function binOpBuildRequest(requestId, argument, zeroCopy) {
- scratchView.setBigUint64(0, BigInt(requestId), true);
- scratchView.setUint32(8, argument, true);
- return [scratchView, ...zeroCopy];
- }
-
- function binOpParseResult(u8Array, isCopyNeeded) {
- // Decode header value from u8Array
- const headerByteLength = 8 + 2 * 4;
- assert(u8Array.byteLength >= headerByteLength);
- assert(u8Array.byteLength % 4 == 0);
- const view = new DataView(
- u8Array.buffer,
- u8Array.byteOffset + u8Array.byteLength - headerByteLength,
- headerByteLength,
- );
-
- const requestId = Number(view.getBigUint64(0, true));
- const status = view.getUint32(8, true);
- const result = view.getUint32(12, true);
-
- // Error handling
- if (status !== 0) {
- const className = core.decode(u8Array.subarray(0, result));
- const message = core.decode(u8Array.subarray(result, -headerByteLength))
- .trim();
-
- return [requestId, null, handleError(className, message)];
- }
-
- if (u8Array.byteLength === headerByteLength) {
- return [requestId, result, null];
- }
-
- // Rest of response buffer is passed as reference or as a copy
- let respBuffer = null;
- if (isCopyNeeded) {
- // Copy part of the response array (if sent through shared array buf)
- respBuffer = u8Array.slice(0, result);
+ function opAsyncHandler(promiseId, res) {
+ // const [ok, err] = res;
+ const promise = promiseTable.get(promiseId);
+ promiseTable.delete(promiseId);
+ if (!res[1]) {
+ promise.resolve(res[0]);
} else {
- // Create view on existing array (if sent through overflow)
- respBuffer = u8Array.subarray(0, result);
- }
-
- return [requestId, respBuffer, null];
- }
-
- function binOpAsync(opName, argument = 0, ...zeroCopy) {
- return opAsync(
- opName,
- (requestId) => binOpBuildRequest(requestId, argument, zeroCopy),
- binOpParseResult,
- );
- }
-
- function binOpSync(opName, argument = 0, ...zeroCopy) {
- return opSync(
- opName,
- () => binOpBuildRequest(0, argument, zeroCopy),
- binOpParseResult,
- );
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////
- ///////////////////////////////////// Json ops handling ////////////////////////////////////
- ////////////////////////////////////////////////////////////////////////////////////////////
-
- const jsonRequestHeaderLength = 8;
-
- function jsonOpBuildRequest(requestId, argument, zeroCopy) {
- const u8Array = core.encode(
- "\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument),
- );
- new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true);
- return [u8Array, ...zeroCopy];
- }
-
- function jsonOpParseResult(u8Array, _) {
- const data = JSON.parse(core.decode(u8Array));
-
- if ("err" in data) {
- return [
- data.requestId,
- null,
- handleError(data.err.className, data.err.message),
- ];
+ promise.reject(processErr(res[1]));
}
-
- return [data.requestId, data.ok, null];
}
- function jsonOpAsync(opName, argument = null, ...zeroCopy) {
- return opAsync(
- opName,
- (requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy),
- jsonOpParseResult,
- );
+ function binOpSync(opName, args = null, zeroCopy = null) {
+ return jsonOpSync(opName, args, zeroCopy);
}
- function jsonOpSync(opName, argument = null, ...zeroCopy) {
- return opSync(
- opName,
- () => [core.encode(JSON.stringify(argument)), ...zeroCopy],
- jsonOpParseResult,
- );
+ function binOpAsync(opName, args = null, zeroCopy = null) {
+ return jsonOpAsync(opName, args, zeroCopy);
}
function resources() {
return jsonOpSync("op_resources");
}
+
function close(rid) {
- return jsonOpSync("op_close", { rid });
+ jsonOpSync("op_close", { rid });
}
Object.assign(window.Deno.core, {
- jsonOpAsync,
- jsonOpSync,
binOpAsync,
binOpSync,
- dispatch,
- dispatchByName,
+ jsonOpAsync,
+ jsonOpSync,
+ dispatch: send,
+ dispatchByName: dispatch,
ops,
close,
resources,
registerErrorClass,
- sharedQueueInit: init,
- // sharedQueue is private but exposed for testing.
- sharedQueue: {
- MAX_RECORDS,
- head,
- numRecords,
- size,
- push,
- reset,
- shift,
- },
- // setAsyncHandler is private but exposed for testing.
- setAsyncHandler,
+ init,
});
})(this);