diff options
author | Inteon <42113979+inteon@users.noreply.github.com> | 2021-03-20 17:51:08 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-03-20 17:51:08 +0100 |
commit | 1251c893212d57303ecdfa8d953d1e487cb7ec7d (patch) | |
tree | 80b3a55872db0a4ee0c9e594601d330e39ca4873 /core/core.js | |
parent | 0d26a82ea9169c013e9b0f29c1ec418b28e273cf (diff) |
refactor: Move bin ops to deno_core and unify logic with json ops (#9457)
This commit moves implementation of bin ops to "deno_core" crates
as well as unifying logic between bin ops and json ops to reuse
as much code as possible (both in Rust and JavaScript).
Diffstat (limited to 'core/core.js')
-rw-r--r-- | core/core.js | 266 |
1 files changed, 197 insertions, 69 deletions
diff --git a/core/core.js b/core/core.js index f44bf253e..2de8e1fff 100644 --- a/core/core.js +++ b/core/core.js @@ -30,13 +30,22 @@ SharedQueue Binary Layout const core = window.Deno.core; const { recv, send } = core; + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////////////////// Dispatch ///////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const dispatch = send; + const dispatchByName = (opName, control, ...zeroCopy) => + dispatch(opsCache[opName], control, ...zeroCopy); + + //////////////////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////// Shared array buffer /////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + let sharedBytes; let shared32; - let asyncHandlers; - let opsCache = {}; - const errorMap = {}; function init() { const shared = core.shared; @@ -45,6 +54,7 @@ SharedQueue Binary Layout assert(shared32 == null); sharedBytes = new Uint8Array(shared); shared32 = new Int32Array(shared); + asyncHandlers = []; // Callers should not call core.recv, use setAsyncHandler. recv(handleAsyncMsgFromRust); @@ -150,15 +160,43 @@ SharedQueue Binary Layout return [opId, buf]; } + //////////////////////////////////////////////////////////////////////////////////////////// + ////////////////////////////////////// Error handling ////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const errorMap = {}; + + function registerErrorClass(errorName, className, args) { + if (typeof errorMap[errorName] !== "undefined") { + throw new TypeError(`Error class for "${errorName}" already registered`); + } + errorMap[errorName] = [className, args ?? []]; + } + + function handleError(className, message) { + if (typeof errorMap[className] === "undefined") { + return new Error( + `Unregistered error class: "${className}"\n` + + ` ${message}\n` + + ` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`, + ); + } + + const [ErrorClass, args] = errorMap[className]; + return new ErrorClass(message, ...args); + } + + //////////////////////////////////////////////////////////////////////////////////////////// + ////////////////////////////////////// Async handling ////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + let asyncHandlers = []; + function setAsyncHandler(opId, cb) { assert(opId != null); asyncHandlers[opId] = cb; } - function setAsyncHandlerByName(opName, cb) { - setAsyncHandler(opsCache[opName], cb); - } - function handleAsyncMsgFromRust() { while (true) { const opIdBuf = shift(); @@ -166,108 +204,196 @@ SharedQueue Binary Layout break; } assert(asyncHandlers[opIdBuf[0]] != null); - asyncHandlers[opIdBuf[0]](opIdBuf[1]); + asyncHandlers[opIdBuf[0]](opIdBuf[1], true); } for (let i = 0; i < arguments.length; i += 2) { - asyncHandlers[arguments[i]](arguments[i + 1]); + asyncHandlers[arguments[i]](arguments[i + 1], false); } } - function dispatch(opName, control, ...zeroCopy) { - return send(opsCache[opName], control, ...zeroCopy); - } + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////// General sync & async ops handling //////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// - function registerErrorClass(errorName, className, args) { - if (typeof errorMap[errorName] !== "undefined") { - throw new TypeError(`Error class for "${errorName}" already registered`); + let nextRequestId = 1; + const promiseTable = {}; + + function asyncHandle(u8Array, isCopyNeeded, opResultParser) { + const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded); + if (error !== null) { + promiseTable[requestId][1](error); + } else { + promiseTable[requestId][0](result); } - errorMap[errorName] = [className, args ?? []]; + delete promiseTable[requestId]; } - function getErrorClassAndArgs(errorName) { - return errorMap[errorName] ?? [undefined, []]; + function opAsync(opName, opRequestBuilder, opResultParser) { + const opId = opsCache[opName]; + // Make sure requests of this type are handled by the asyncHandler + // The asyncHandler's role is to call the "promiseTable[requestId]" function + if (typeof asyncHandlers[opId] === "undefined") { + asyncHandlers[opId] = (buffer, isCopyNeeded) => + asyncHandle(buffer, isCopyNeeded, opResultParser); + } + + const requestId = nextRequestId++; + + // Create and store promise + const promise = new Promise((resolve, reject) => { + promiseTable[requestId] = [resolve, reject]; + }); + + // Synchronously dispatch async request + core.dispatch(opId, ...opRequestBuilder(requestId)); + + // Wait for async response + return promise; } - // Returns Uint8Array - function encodeJson(args) { - const s = JSON.stringify(args); - return core.encode(s); + function opSync(opName, opRequestBuilder, opResultParser) { + const opId = opsCache[opName]; + const u8Array = core.dispatch(opId, ...opRequestBuilder()); + + const [_, result, error] = opResultParser(u8Array, false); + if (error !== null) throw error; + return result; } - function decodeJson(ui8) { - const s = core.decode(ui8); - return JSON.parse(s); + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////////////// Bin ops handling ///////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const binRequestHeaderByteLength = 8 + 4; + const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength); + const scratchView = new DataView(scratchBuffer); + + function binOpBuildRequest(requestId, argument, zeroCopy) { + scratchView.setBigUint64(0, BigInt(requestId), true); + scratchView.setUint32(8, argument, true); + return [scratchView, ...zeroCopy]; } - let nextPromiseId = 1; - const promiseTable = {}; + function binOpParseResult(u8Array, isCopyNeeded) { + // Decode header value from u8Array + const headerByteLength = 8 + 2 * 4; + assert(u8Array.byteLength >= headerByteLength); + assert(u8Array.byteLength % 4 == 0); + const view = new DataView( + u8Array.buffer, + u8Array.byteOffset + u8Array.byteLength - headerByteLength, + headerByteLength, + ); + + const requestId = Number(view.getBigUint64(0, true)); + const status = view.getUint32(8, true); + const result = view.getUint32(12, true); + + // Error handling + if (status !== 0) { + const className = core.decode(u8Array.subarray(0, result)); + const message = core.decode(u8Array.subarray(result, -headerByteLength)) + .trim(); + + return [requestId, null, handleError(className, message)]; + } - function processResponse(res) { - if ("ok" in res) { - return res.ok; + if (u8Array.byteLength === headerByteLength) { + return [requestId, result, null]; } - const [ErrorClass, args] = getErrorClassAndArgs(res.err.className); - if (!ErrorClass) { - throw new Error( - `Unregistered error class: "${res.err.className}"\n ${res.err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`, - ); + + // Rest of response buffer is passed as reference or as a copy + let respBuffer = null; + if (isCopyNeeded) { + // Copy part of the response array (if sent through shared array buf) + respBuffer = u8Array.slice(0, result); + } else { + // Create view on existing array (if sent through overflow) + respBuffer = u8Array.subarray(0, result); } - throw new ErrorClass(res.err.message, ...args); + + return [requestId, respBuffer, null]; } - async function jsonOpAsync(opName, args = null, ...zeroCopy) { - setAsyncHandler(opsCache[opName], jsonOpAsyncHandler); - - const promiseId = nextPromiseId++; - const reqBuf = core.encode("\0".repeat(8) + JSON.stringify(args)); - new DataView(reqBuf.buffer).setBigUint64(0, BigInt(promiseId)); - dispatch(opName, reqBuf, ...zeroCopy); - let resolve, reject; - const promise = new Promise((resolve_, reject_) => { - resolve = resolve_; - reject = reject_; - }); - promise.resolve = resolve; - promise.reject = reject; - promiseTable[promiseId] = promise; - return processResponse(await promise); + function binOpAsync(opName, argument = 0, ...zeroCopy) { + return opAsync( + opName, + (requestId) => binOpBuildRequest(requestId, argument, zeroCopy), + binOpParseResult, + ); + } + + function binOpSync(opName, argument = 0, ...zeroCopy) { + return opSync( + opName, + () => binOpBuildRequest(0, argument, zeroCopy), + binOpParseResult, + ); } - function jsonOpSync(opName, args = null, ...zeroCopy) { - const argsBuf = encodeJson(args); - const res = dispatch(opName, argsBuf, ...zeroCopy); - return processResponse(decodeJson(res)); + //////////////////////////////////////////////////////////////////////////////////////////// + ///////////////////////////////////// Json ops handling //////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////////// + + const jsonRequestHeaderLength = 8; + + function jsonOpBuildRequest(requestId, argument, zeroCopy) { + const u8Array = core.encode( + "\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument), + ); + new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true); + return [u8Array, ...zeroCopy]; } - function jsonOpAsyncHandler(buf) { - // Json Op. - const res = decodeJson(buf); - const promise = promiseTable[res.promiseId]; - delete promiseTable[res.promiseId]; - promise.resolve(res); + function jsonOpParseResult(u8Array, _) { + const data = JSON.parse(core.decode(u8Array)); + + if ("err" in data) { + return [ + data.requestId, + null, + handleError(data.err.className, data.err.message), + ]; + } + + return [data.requestId, data.ok, null]; + } + + function jsonOpAsync(opName, argument = null, ...zeroCopy) { + return opAsync( + opName, + (requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy), + jsonOpParseResult, + ); + } + + function jsonOpSync(opName, argument = null, ...zeroCopy) { + return opSync( + opName, + () => [core.encode(JSON.stringify(argument)), ...zeroCopy], + jsonOpParseResult, + ); } function resources() { return jsonOpSync("op_resources"); } - function close(rid) { - jsonOpSync("op_close", { rid }); + return jsonOpSync("op_close", { rid }); } Object.assign(window.Deno.core, { jsonOpAsync, jsonOpSync, - setAsyncHandler, - setAsyncHandlerByName, - dispatch: send, - dispatchByName: dispatch, + binOpAsync, + binOpSync, + dispatch, + dispatchByName, ops, close, resources, registerErrorClass, - getErrorClassAndArgs, sharedQueueInit: init, // sharedQueue is private but exposed for testing. sharedQueue: { @@ -279,5 +405,7 @@ SharedQueue Binary Layout reset, shift, }, + // setAsyncHandler is private but exposed for testing. + setAsyncHandler, }); })(this); |