summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Finch <andyfinch7@gmail.com>2020-01-17 08:26:11 -0500
committerRy Dahl <ry@tinyclouds.org>2020-01-17 08:26:11 -0500
commitfe5662058e0c7767e7b2724451f9d5a3f133ffbf (patch)
treebe43744898f64af6605246ba95ede55a869f7ab4
parentd8ad81d3fb5b4b9502ea021edfc99201538553e6 (diff)
feat: support individual async handler for each op (#3690)
-rw-r--r--cli/js/dispatch.ts57
-rw-r--r--cli/js/dispatch_json.ts2
-rw-r--r--cli/js/dispatch_minimal.ts15
-rw-r--r--cli/js/os.ts2
-rw-r--r--cli/js/plugins.ts4
-rw-r--r--core/examples/http_bench.js4
-rw-r--r--core/isolate.rs15
-rw-r--r--core/shared_queue.js16
-rw-r--r--deno_typescript/lib.deno_core.d.ts6
9 files changed, 37 insertions, 84 deletions
diff --git a/cli/js/dispatch.ts b/cli/js/dispatch.ts
index 31c42c224..d658c5de7 100644
--- a/cli/js/dispatch.ts
+++ b/cli/js/dispatch.ts
@@ -85,57 +85,12 @@ export function setPluginAsyncHandler(
PLUGIN_ASYNC_HANDLER_MAP.set(opId, handler);
}
-export function asyncMsgFromRust(opId: number, ui8: Uint8Array): void {
- switch (opId) {
- case OP_WRITE:
- case OP_READ:
- minimal.asyncMsgFromRust(opId, ui8);
- break;
- case OP_GET_DIR:
- case OP_EXIT:
- case OP_IS_TTY:
- case OP_ENV:
- case OP_EXEC_PATH:
- case OP_UTIME:
- case OP_OPEN:
- case OP_SEEK:
- case OP_FETCH:
- case OP_REPL_START:
- case OP_REPL_READLINE:
- case OP_ACCEPT:
- case OP_ACCEPT_TLS:
- case OP_DIAL:
- case OP_GLOBAL_TIMER:
- case OP_HOST_GET_WORKER_CLOSED:
- case OP_HOST_GET_MESSAGE:
- case OP_WORKER_GET_MESSAGE:
- case OP_RUN_STATUS:
- case OP_MKDIR:
- case OP_CHMOD:
- case OP_CHOWN:
- case OP_REMOVE:
- case OP_COPY_FILE:
- case OP_STAT:
- case OP_REALPATH:
- case OP_READ_DIR:
- case OP_RENAME:
- case OP_LINK:
- case OP_SYMLINK:
- case OP_READ_LINK:
- case OP_TRUNCATE:
- case OP_MAKE_TEMP_DIR:
- case OP_DIAL_TLS:
- case OP_FETCH_SOURCE_FILES:
- case OP_COMPILE:
- case OP_TRANSPILE:
- json.asyncMsgFromRust(opId, ui8);
- break;
+export function getAsyncHandler(opName: string): (msg: Uint8Array) => void {
+ switch (opName) {
+ case "OP_WRITE":
+ case "OP_READ":
+ return minimal.asyncMsgFromRust;
default:
- const handler = PLUGIN_ASYNC_HANDLER_MAP.get(opId);
- if (handler) {
- handler(ui8);
- } else {
- throw Error("bad async opId");
- }
+ return json.asyncMsgFromRust;
}
}
diff --git a/cli/js/dispatch_json.ts b/cli/js/dispatch_json.ts
index 07095ea4b..adccb69c6 100644
--- a/cli/js/dispatch_json.ts
+++ b/cli/js/dispatch_json.ts
@@ -43,7 +43,7 @@ function unwrapResponse(res: JsonResponse): Ok {
return res.ok;
}
-export function asyncMsgFromRust(opId: number, resUi8: Uint8Array): void {
+export function asyncMsgFromRust(resUi8: Uint8Array): void {
const res = decode(resUi8);
util.assert(res.promiseId != null);
diff --git a/cli/js/dispatch_minimal.ts b/cli/js/dispatch_minimal.ts
index 32b282c20..1ce3fbaef 100644
--- a/cli/js/dispatch_minimal.ts
+++ b/cli/js/dispatch_minimal.ts
@@ -18,7 +18,6 @@ function nextPromiseId(): number {
export interface RecordMinimal {
promiseId: number;
- opId: number; // Maybe better called dispatchId
arg: number;
result: number;
err?: {
@@ -27,10 +26,7 @@ export interface RecordMinimal {
};
}
-export function recordFromBufMinimal(
- opId: number,
- ui8: Uint8Array
-): RecordMinimal {
+export function recordFromBufMinimal(ui8: Uint8Array): RecordMinimal {
const header = ui8.slice(0, 12);
const buf32 = new Int32Array(
header.buffer,
@@ -52,7 +48,6 @@ export function recordFromBufMinimal(
return {
promiseId,
- opId,
arg,
result,
err
@@ -74,8 +69,8 @@ const scratchBytes = new Uint8Array(
);
util.assert(scratchBytes.byteLength === scratch32.length * 4);
-export function asyncMsgFromRust(opId: number, ui8: Uint8Array): void {
- const record = recordFromBufMinimal(opId, ui8);
+export function asyncMsgFromRust(ui8: Uint8Array): void {
+ const record = recordFromBufMinimal(ui8);
const { promiseId } = record;
const promise = promiseTableMin.get(promiseId);
promiseTableMin.delete(promiseId);
@@ -95,7 +90,7 @@ export async function sendAsyncMinimal(
const promise = util.createResolvable<RecordMinimal>();
const buf = core.dispatch(opId, scratchBytes, zeroCopy);
if (buf) {
- const record = recordFromBufMinimal(opId, buf);
+ const record = recordFromBufMinimal(buf);
// Sync result.
promise.resolve(record);
} else {
@@ -115,6 +110,6 @@ export function sendSyncMinimal(
scratch32[0] = 0; // promiseId 0 indicates sync
scratch32[1] = arg;
const res = core.dispatch(opId, scratchBytes, zeroCopy)!;
- const resRecord = recordFromBufMinimal(opId, res);
+ const resRecord = recordFromBufMinimal(res);
return unwrapResponse(resRecord);
}
diff --git a/cli/js/os.ts b/cli/js/os.ts
index e0dd0752b..9b4301aea 100644
--- a/cli/js/os.ts
+++ b/cli/js/os.ts
@@ -89,7 +89,6 @@ interface Start {
// the runtime and the compiler environments.
// @internal
export function start(preserveDenoNamespace = true, source?: string): Start {
- core.setAsyncHandler(dispatch.asyncMsgFromRust);
const ops = core.ops();
// TODO(bartlomieju): this is a prototype, we should come up with
// something a bit more sophisticated
@@ -98,6 +97,7 @@ export function start(preserveDenoNamespace = true, source?: string): Start {
// Assign op ids to actual variables
// TODO(ry) This type casting is gross and should be fixed.
((dispatch as unknown) as { [key: string]: number })[opName] = opId;
+ core.setAsyncHandler(opId, dispatch.getAsyncHandler(opName));
}
// First we send an empty `Start` message to let the privileged side know we
// are ready. The response should be a `StartRes` message containing the CLI
diff --git a/cli/js/plugins.ts b/cli/js/plugins.ts
index 324ae3408..4d2072c79 100644
--- a/cli/js/plugins.ts
+++ b/cli/js/plugins.ts
@@ -1,5 +1,5 @@
import { sendSync } from "./dispatch_json.ts";
-import { OP_OPEN_PLUGIN, setPluginAsyncHandler } from "./dispatch.ts";
+import { OP_OPEN_PLUGIN } from "./dispatch.ts";
import { core } from "./core.ts";
export interface AsyncHandler {
@@ -25,7 +25,7 @@ class PluginOpImpl implements PluginOp {
}
setAsyncHandler(handler: AsyncHandler): void {
- setPluginAsyncHandler(this.opId, handler);
+ core.setAsyncHandler(this.opId, handler);
}
}
diff --git a/core/examples/http_bench.js b/core/examples/http_bench.js
index db57401cc..dc204dfca 100644
--- a/core/examples/http_bench.js
+++ b/core/examples/http_bench.js
@@ -123,8 +123,10 @@ async function serve(rid) {
let ops;
async function main() {
- Deno.core.setAsyncHandler(handleAsyncMsgFromRust);
ops = Deno.core.ops();
+ for (const opName in ops) {
+ Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust);
+ }
Deno.core.print("http_bench.js start\n");
diff --git a/core/isolate.rs b/core/isolate.rs
index 50fcadb48..9c13f0e4d 100644
--- a/core/isolate.rs
+++ b/core/isolate.rs
@@ -900,7 +900,7 @@ pub mod tests {
"setup2.js",
r#"
let nrecv = 0;
- Deno.core.setAsyncHandler((opId, buf) => {
+ Deno.core.setAsyncHandler(1, (buf) => {
nrecv++;
});
"#,
@@ -1021,7 +1021,7 @@ pub mod tests {
"overflow_req_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array(100 * 1024 * 1024);
let response = Deno.core.dispatch(1, control);
@@ -1043,7 +1043,7 @@ pub mod tests {
"overflow_res_sync.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ });
+ Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ });
// Large message that will overflow the shared space.
let control = new Uint8Array([42]);
let response = Deno.core.dispatch(1, control);
@@ -1064,8 +1064,7 @@ pub mod tests {
"overflow_req_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((opId, buf) => {
- assert(opId == 1);
+ Deno.core.setAsyncHandler(1, (buf) => {
assert(buf.byteLength === 4);
assert(buf[0] === 43);
asyncRecv++;
@@ -1097,8 +1096,7 @@ pub mod tests {
"overflow_res_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((opId, buf) => {
- assert(opId == 1);
+ Deno.core.setAsyncHandler(1, (buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
@@ -1126,8 +1124,7 @@ pub mod tests {
"overflow_res_multiple_dispatch_async.js",
r#"
let asyncRecv = 0;
- Deno.core.setAsyncHandler((opId, buf) => {
- assert(opId === 1);
+ Deno.core.setAsyncHandler(1, (buf) => {
assert(buf.byteLength === 100 * 1024 * 1024);
assert(buf[0] === 4);
asyncRecv++;
diff --git a/core/shared_queue.js b/core/shared_queue.js
index 430261511..093cc223f 100644
--- a/core/shared_queue.js
+++ b/core/shared_queue.js
@@ -38,6 +38,9 @@ SharedQueue Binary Layout
let sharedBytes;
let shared32;
+
+ let asyncHandlers;
+
let initialized = false;
function maybeInit() {
@@ -54,6 +57,7 @@ SharedQueue Binary Layout
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);
+ asyncHandlers = [];
// Callers should not call Deno.core.recv, use setAsyncHandler.
Deno.core.recv(handleAsyncMsgFromRust);
}
@@ -157,24 +161,24 @@ SharedQueue Binary Layout
return [opId, buf];
}
- let asyncHandler;
- function setAsyncHandler(cb) {
+ function setAsyncHandler(opId, cb) {
maybeInit();
- assert(asyncHandler == null);
- asyncHandler = cb;
+ assert(opId != null);
+ asyncHandlers[opId] = cb;
}
function handleAsyncMsgFromRust(opId, buf) {
if (buf) {
// This is the overflow_response case of deno::Isolate::poll().
- asyncHandler(opId, buf);
+ asyncHandlers[opId](buf);
} else {
while (true) {
const opIdBuf = shift();
if (opIdBuf == null) {
break;
}
- asyncHandler(...opIdBuf);
+ assert(asyncHandlers[opIdBuf[0]] != null);
+ asyncHandlers[opIdBuf[0]](opIdBuf[1]);
}
}
}
diff --git a/deno_typescript/lib.deno_core.d.ts b/deno_typescript/lib.deno_core.d.ts
index 88a2de3c6..21ce6a67f 100644
--- a/deno_typescript/lib.deno_core.d.ts
+++ b/deno_typescript/lib.deno_core.d.ts
@@ -5,7 +5,7 @@
// Deno and therefore do not flow through to the runtime type library.
declare interface MessageCallback {
- (opId: number, msg: Uint8Array): void;
+ (msg: Uint8Array): void;
}
interface EvalErrorInfo {
@@ -27,7 +27,7 @@ declare interface DenoCore {
control: Uint8Array,
zeroCopy?: ArrayBufferView | null
): Uint8Array | null;
- setAsyncHandler(cb: MessageCallback): void;
+ setAsyncHandler(opId: number, cb: MessageCallback): void;
sharedQueue: {
head(): number;
numRecords(): number;
@@ -39,7 +39,7 @@ declare interface DenoCore {
ops(): Record<string, number>;
- recv(cb: MessageCallback): void;
+ recv(cb: (opId: number, msg: Uint8Array) => void): void;
send(
opId: number,