summaryrefslogtreecommitdiff
path: root/js
diff options
context:
space:
mode:
authorRyan Dahl <ry@tinyclouds.org>2018-09-05 22:13:36 -0400
committerRyan Dahl <ry@tinyclouds.org>2018-09-09 18:47:22 -0400
commit0d03fafbfec4545098023b7147c5f8fb6ae06f99 (patch)
treef7c3e10ac0956c4a1ae19c91a711de54de677965 /js
parentff6eefdf87560986274799132d44b00e0a288c21 (diff)
Map promises onto futures.
Refactors handlers.rs The idea is that all Deno "ops" (aka bindings) should map onto a Rust Future. By setting the "sync" flag in the Base message users can determine if the future is executed immediately or put on the event loop. In the case of async futures, a promise is automatically created. Errors are automatically forwarded and raised. TODO: - The file system ops in src/handler.rs are not using the thread pool yet. This will be done in the future using tokio_threadpool::blocking. That is, if you try to call them asynchronously, you will get a promise and it will act asynchronous, but currently it will be blocking. - Handlers in src/handler.rs returned boxed futures. This was to make it easy while developing. We should try to remove this allocation.
Diffstat (limited to 'js')
-rw-r--r--js/errors.ts13
-rw-r--r--js/fbs_util.ts68
-rw-r--r--js/fetch.ts109
-rw-r--r--js/main.ts29
-rw-r--r--js/timers.ts63
-rw-r--r--js/util.ts8
6 files changed, 149 insertions, 141 deletions
diff --git a/js/errors.ts b/js/errors.ts
index 11d4cd509..2d0572e6d 100644
--- a/js/errors.ts
+++ b/js/errors.ts
@@ -10,8 +10,17 @@ export class DenoError<T extends fbs.ErrorKind> extends Error {
// @internal
export function maybeThrowError(base: fbs.Base): void {
+ const err = maybeError(base);
+ if (err != null) {
+ throw err;
+ }
+}
+
+export function maybeError(base: fbs.Base): null | DenoError<fbs.ErrorKind> {
const kind = base.errorKind();
- if (kind !== fbs.ErrorKind.NoError) {
- throw new DenoError(kind, base.error()!);
+ if (kind === fbs.ErrorKind.NoError) {
+ return null;
+ } else {
+ return new DenoError(kind, base.error()!);
}
}
diff --git a/js/fbs_util.ts b/js/fbs_util.ts
index bb623d54d..16f3b6ca2 100644
--- a/js/fbs_util.ts
+++ b/js/fbs_util.ts
@@ -1,27 +1,77 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
+// TODO Rename this file to //js/dispatch.ts
import { libdeno } from "./libdeno";
import { flatbuffers } from "flatbuffers";
-import { maybeThrowError } from "./errors";
import { deno as fbs } from "gen/msg_generated";
+import * as errors from "./errors";
+import * as util from "./util";
+
+let nextCmdId = 0;
+const promiseTable = new Map<number, util.Resolvable<fbs.Base>>();
+
+export function handleAsyncMsgFromRust(ui8: Uint8Array) {
+ const bb = new flatbuffers.ByteBuffer(ui8);
+ const base = fbs.Base.getRootAsBase(bb);
+
+ const cmdId = base.cmdId();
+ const promise = promiseTable.get(cmdId);
+ util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
+ promiseTable.delete(cmdId);
+ const err = errors.maybeError(base);
+ if (err != null) {
+ promise!.reject(err);
+ } else {
+ promise!.resolve(base);
+ }
+}
// @internal
+export function sendAsync(
+ builder: flatbuffers.Builder,
+ msgType: fbs.Any,
+ msg: flatbuffers.Offset
+): Promise<fbs.Base> {
+ const [cmdId, resBuf] = sendInternal(builder, msgType, msg, false);
+ util.assert(resBuf == null);
+ const promise = util.createResolvable<fbs.Base>();
+ promiseTable.set(cmdId, promise);
+ return promise;
+}
+
+// TODO Rename to sendSync
+// @internal
export function send(
builder: flatbuffers.Builder,
msgType: fbs.Any,
msg: flatbuffers.Offset
): null | fbs.Base {
- fbs.Base.startBase(builder);
- fbs.Base.addMsg(builder, msg);
- fbs.Base.addMsgType(builder, msgType);
- builder.finish(fbs.Base.endBase(builder));
-
- const resBuf = libdeno.send(builder.asUint8Array());
+ const [cmdId, resBuf] = sendInternal(builder, msgType, msg, true);
+ util.assert(cmdId >= 0);
if (resBuf == null) {
return null;
} else {
- const bb = new flatbuffers.ByteBuffer(new Uint8Array(resBuf!));
+ const u8 = new Uint8Array(resBuf!);
+ // console.log("recv sync message", util.hexdump(u8));
+ const bb = new flatbuffers.ByteBuffer(u8);
const baseRes = fbs.Base.getRootAsBase(bb);
- maybeThrowError(baseRes);
+ errors.maybeThrowError(baseRes);
return baseRes;
}
}
+
+function sendInternal(
+ builder: flatbuffers.Builder,
+ msgType: fbs.Any,
+ msg: flatbuffers.Offset,
+ sync = true
+): [number, null | Uint8Array] {
+ const cmdId = nextCmdId++;
+ fbs.Base.startBase(builder);
+ fbs.Base.addMsg(builder, msg);
+ fbs.Base.addMsgType(builder, msgType);
+ fbs.Base.addSync(builder, sync);
+ fbs.Base.addCmdId(builder, cmdId);
+ builder.finish(fbs.Base.endBase(builder));
+
+ return [cmdId, libdeno.send(builder.asUint8Array())];
+}
diff --git a/js/fetch.ts b/js/fetch.ts
index 2d57fa65c..20af1d03d 100644
--- a/js/fetch.ts
+++ b/js/fetch.ts
@@ -8,7 +8,7 @@ import {
notImplemented
} from "./util";
import { flatbuffers } from "flatbuffers";
-import { send } from "./fbs_util";
+import { sendAsync } from "./fbs_util";
import { deno as fbs } from "gen/msg_generated";
import {
Headers,
@@ -20,16 +20,6 @@ import {
} from "./fetch_types";
import { TextDecoder } from "./text_encoding";
-/** @internal */
-export function onFetchRes(base: fbs.Base, msg: fbs.FetchRes) {
- const id = msg.id();
- const req = fetchRequests.get(id);
- assert(req != null, `Couldn't find FetchRequest id ${id}`);
- req!.onMsg(base, msg);
-}
-
-const fetchRequests = new Map<number, FetchRequest>();
-
class DenoHeaders implements Headers {
append(name: string, value: string): void {
assert(false, "Implement me");
@@ -58,10 +48,9 @@ class DenoHeaders implements Headers {
}
class FetchResponse implements Response {
- readonly url: string;
+ readonly url: string = "";
body: null;
bodyUsed = false; // TODO
- status = 0;
statusText = "FIXME"; // TODO
readonly type = "basic"; // TODO
redirected = false; // TODO
@@ -71,10 +60,12 @@ class FetchResponse implements Response {
private first = true;
private bodyWaiter: Resolvable<ArrayBuffer>;
- constructor(readonly req: FetchRequest) {
- this.url = req.url;
+ constructor(readonly status: number, readonly body_: ArrayBuffer) {
this.bodyWaiter = createResolvable();
this.trailer = createResolvable();
+ setTimeout(() => {
+ this.bodyWaiter.resolve(body_);
+ }, 0);
}
arrayBuffer(): Promise<ArrayBuffer> {
@@ -114,78 +105,50 @@ class FetchResponse implements Response {
onHeader?: (res: FetchResponse) => void;
onError?: (error: Error) => void;
- onMsg(base: fbs.Base, msg: fbs.FetchRes) {
+ onMsg(base: fbs.Base) {
+ /*
const error = base.error();
if (error != null) {
assert(this.onError != null);
this.onError!(new Error(error));
return;
}
+ */
if (this.first) {
this.first = false;
- this.status = msg.status();
- assert(this.onHeader != null);
- this.onHeader!(this);
- } else {
- // Body message. Assuming it all comes in one message now.
- const bodyArray = msg.bodyArray();
- assert(bodyArray != null);
- const ab = typedArrayToArrayBuffer(bodyArray!);
- this.bodyWaiter.resolve(ab);
}
}
}
-let nextFetchId = 0;
-//TODO implements Request
-class FetchRequest {
- private readonly id: number;
- response: FetchResponse;
- constructor(readonly url: string) {
- this.id = nextFetchId++;
- fetchRequests.set(this.id, this);
- this.response = new FetchResponse(this);
- }
-
- onMsg(base: fbs.Base, msg: fbs.FetchRes) {
- this.response.onMsg(base, msg);
- }
-
- destroy() {
- fetchRequests.delete(this.id);
- }
-
- start() {
- log("dispatch FETCH_REQ", this.id, this.url);
-
- // Send FetchReq message
- const builder = new flatbuffers.Builder();
- const url = builder.createString(this.url);
- fbs.FetchReq.startFetchReq(builder);
- fbs.FetchReq.addId(builder, this.id);
- fbs.FetchReq.addUrl(builder, url);
- const msg = fbs.FetchReq.endFetchReq(builder);
- send(builder, fbs.Any.FetchReq, msg);
- }
-}
-
-export function fetch(
+export async function fetch(
input?: Request | string,
init?: RequestInit
): Promise<Response> {
- const fetchReq = new FetchRequest(input as string);
- const response = fetchReq.response;
- const promise = new Promise<Response>((resolve, reject) => {
- response.onHeader = (response: FetchResponse) => {
- log("onHeader");
- resolve(response);
- };
- response.onError = (error: Error) => {
- log("onError", error);
- reject(error);
- };
- });
- fetchReq.start();
- return promise;
+ const url = input as string;
+ log("dispatch FETCH_REQ", url);
+
+ // Send FetchReq message
+ const builder = new flatbuffers.Builder();
+ const url_ = builder.createString(url);
+ fbs.FetchReq.startFetchReq(builder);
+ fbs.FetchReq.addUrl(builder, url_);
+ const resBase = await sendAsync(
+ builder,
+ fbs.Any.FetchReq,
+ fbs.FetchReq.endFetchReq(builder)
+ );
+
+ // Decode FetchRes
+ assert(fbs.Any.FetchRes === resBase.msgType());
+ const msg = new fbs.FetchRes();
+ assert(resBase.msg(msg) != null);
+
+ const status = msg.status();
+ const bodyArray = msg.bodyArray();
+ assert(bodyArray != null);
+ const body = typedArrayToArrayBuffer(bodyArray!);
+
+ const response = new FetchResponse(status, body);
+ return response;
}
diff --git a/js/main.ts b/js/main.ts
index 538d0cc27..e45562d99 100644
--- a/js/main.ts
+++ b/js/main.ts
@@ -5,10 +5,8 @@ import { assert, log, setLogDebug } from "./util";
import * as os from "./os";
import { DenoCompiler } from "./compiler";
import { libdeno } from "./libdeno";
-import * as timers from "./timers";
-import { onFetchRes } from "./fetch";
import { argv } from "./deno";
-import { send } from "./fbs_util";
+import { send, handleAsyncMsgFromRust } from "./fbs_util";
function sendStart(): fbs.StartRes {
const builder = new flatbuffers.Builder();
@@ -22,29 +20,6 @@ function sendStart(): fbs.StartRes {
return startRes;
}
-function onMessage(ui8: Uint8Array) {
- const bb = new flatbuffers.ByteBuffer(ui8);
- const base = fbs.Base.getRootAsBase(bb);
- switch (base.msgType()) {
- case fbs.Any.FetchRes: {
- const msg = new fbs.FetchRes();
- assert(base.msg(msg) != null);
- onFetchRes(base, msg);
- break;
- }
- case fbs.Any.TimerReady: {
- const msg = new fbs.TimerReady();
- assert(base.msg(msg) != null);
- timers.onMessage(msg);
- break;
- }
- default: {
- assert(false, "Unhandled message type");
- break;
- }
- }
-}
-
function onGlobalError(
message: string,
source: string,
@@ -58,7 +33,7 @@ function onGlobalError(
/* tslint:disable-next-line:no-default-export */
export default function denoMain() {
- libdeno.recv(onMessage);
+ libdeno.recv(handleAsyncMsgFromRust);
libdeno.setGlobalErrorHandler(onGlobalError);
const compiler = DenoCompiler.instance();
diff --git a/js/timers.ts b/js/timers.ts
index d09af0295..6b23c64f1 100644
--- a/js/timers.ts
+++ b/js/timers.ts
@@ -3,7 +3,7 @@ import { assert } from "./util";
import * as util from "./util";
import { deno as fbs } from "gen/msg_generated";
import { flatbuffers } from "flatbuffers";
-import { send } from "./fbs_util";
+import { send, sendAsync } from "./fbs_util";
let nextTimerId = 1;
@@ -19,50 +19,51 @@ interface Timer {
delay: number; // milliseconds
}
-const timers = new Map<number, Timer>();
-
-/** @internal */
-export function onMessage(msg: fbs.TimerReady) {
- const timerReadyId = msg.id();
- const timerReadyDone = msg.done();
- const timer = timers.get(timerReadyId);
- if (!timer) {
- return;
- }
- timer.cb(...timer.args);
- if (timerReadyDone) {
- timers.delete(timerReadyId);
- }
-}
-
function startTimer(
+ id: number,
cb: TimerCallback,
delay: number,
interval: boolean,
// tslint:disable-next-line:no-any
args: any[]
-): number {
- const timer = {
- id: nextTimerId++,
+): void {
+ const timer: Timer = {
+ id,
interval,
delay,
args,
cb
};
- timers.set(timer.id, timer);
-
util.log("timers.ts startTimer");
// Send TimerStart message
const builder = new flatbuffers.Builder();
fbs.TimerStart.startTimerStart(builder);
fbs.TimerStart.addId(builder, timer.id);
- fbs.TimerStart.addInterval(builder, timer.interval);
fbs.TimerStart.addDelay(builder, timer.delay);
const msg = fbs.TimerStart.endTimerStart(builder);
- const baseRes = send(builder, fbs.Any.TimerStart, msg);
- assert(baseRes == null);
- return timer.id;
+
+ sendAsync(builder, fbs.Any.TimerStart, msg).then(
+ baseRes => {
+ assert(fbs.Any.TimerReady === baseRes!.msgType());
+ const msg = new fbs.TimerReady();
+ assert(baseRes!.msg(msg) != null);
+ assert(msg.id() === timer.id);
+ if (msg.canceled()) {
+ util.log("timer canceled message");
+ } else {
+ cb(...args);
+ if (interval) {
+ // TODO Faking setInterval with setTimeout.
+ // We need a new timer implementation, this is just a stopgap.
+ startTimer(id, cb, delay, true, args);
+ }
+ }
+ },
+ error => {
+ throw error;
+ }
+ );
}
export function setTimeout(
@@ -71,7 +72,9 @@ export function setTimeout(
// tslint:disable-next-line:no-any
...args: any[]
): number {
- return startTimer(cb, delay, false, args);
+ const id = nextTimerId++;
+ startTimer(id, cb, delay, false, args);
+ return id;
}
export function setInterval(
@@ -80,12 +83,12 @@ export function setInterval(
// tslint:disable-next-line:no-any
...args: any[]
): number {
- return startTimer(cb, delay, true, args);
+ const id = nextTimerId++;
+ startTimer(id, cb, delay, true, args);
+ return id;
}
export function clearTimer(id: number) {
- timers.delete(id);
-
const builder = new flatbuffers.Builder();
fbs.TimerClear.startTimerClear(builder);
fbs.TimerClear.addId(builder, id);
diff --git a/js/util.ts b/js/util.ts
index 6971ca1b9..efe0dcf84 100644
--- a/js/util.ts
+++ b/js/util.ts
@@ -83,3 +83,11 @@ export function notImplemented(): never {
export function unreachable(): never {
throw new Error("Code not reachable");
}
+
+export function hexdump(u8: Uint8Array): string {
+ return Array.prototype.map
+ .call(u8, (x: number) => {
+ return ("00" + x.toString(16)).slice(-2);
+ })
+ .join(" ");
+}