summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--http/http_bench.ts5
-rw-r--r--http/server.ts231
-rw-r--r--http/server_test.ts32
-rwxr-xr-xtest.ts1
-rw-r--r--util/async.ts85
-rw-r--r--util/async_test.ts34
-rw-r--r--util/test.ts1
7 files changed, 207 insertions, 182 deletions
diff --git a/http/http_bench.ts b/http/http_bench.ts
index 6d72d4be6..06043f9e4 100644
--- a/http/http_bench.ts
+++ b/http/http_bench.ts
@@ -3,13 +3,12 @@ import { serve } from "./server.ts";
const addr = Deno.args[1] || "127.0.0.1:4500";
const server = serve(addr);
-
const body = new TextEncoder().encode("Hello World");
async function main(): Promise<void> {
console.log(`http://${addr}/`);
- for await (const request of server) {
- request.respond({ status: 200, body });
+ for await (const req of server) {
+ req.respond({ body });
}
}
diff --git a/http/server.ts b/http/server.ts
index 484ecf808..281f8d302 100644
--- a/http/server.ts
+++ b/http/server.ts
@@ -1,55 +1,14 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
const { listen, copy, toAsyncIterator } = Deno;
+type Listener = Deno.Listener;
type Conn = Deno.Conn;
type Reader = Deno.Reader;
type Writer = Deno.Writer;
import { BufReader, BufState, BufWriter } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { STATUS_TEXT } from "./http_status.ts";
-import { assert } from "../testing/asserts.ts";
-
-interface Deferred {
- promise: Promise<{}>;
- resolve: () => void;
- reject: () => void;
-}
-
-function deferred(isResolved = false): Deferred {
- let resolve, reject;
- const promise = new Promise(
- (res, rej): void => {
- resolve = res;
- reject = rej;
- }
- );
- if (isResolved) {
- resolve();
- }
- return {
- promise,
- resolve,
- reject
- };
-}
-
-interface HttpConn extends Conn {
- // When read by a newly created request B, lastId is the id pointing to a previous
- // request A, such that we must wait for responses to A to complete before
- // writing B's response.
- lastPipelineId: number;
- pendingDeferredMap: Map<number, Deferred>;
-}
-
-function createHttpConn(c: Conn): HttpConn {
- const httpConn = Object.assign(c, {
- lastPipelineId: 0,
- pendingDeferredMap: new Map()
- });
-
- const resolvedDeferred = deferred(true);
- httpConn.pendingDeferredMap.set(0, resolvedDeferred);
- return httpConn;
-}
+import { assert, fail } from "../testing/asserts.ts";
+import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts";
function bufWriter(w: Writer): BufWriter {
if (w instanceof BufWriter) {
@@ -58,6 +17,7 @@ function bufWriter(w: Writer): BufWriter {
return new BufWriter(w);
}
}
+
export function setContentLength(r: Response): void {
if (!r.headers) {
r.headers = new Headers();
@@ -74,6 +34,7 @@ export function setContentLength(r: Response): void {
}
}
}
+
async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
const writer = bufWriter(w);
const encoder = new TextEncoder();
@@ -90,6 +51,7 @@ async function writeChunkedBody(w: Writer, r: Reader): Promise<void> {
const endChunk = encoder.encode("0\r\n\r\n");
await writer.write(endChunk);
}
+
export async function writeResponse(w: Writer, r: Response): Promise<void> {
const protoMajor = 1;
const protoMinor = 1;
@@ -131,6 +93,7 @@ export async function writeResponse(w: Writer, r: Response): Promise<void> {
}
await writer.flush();
}
+
async function readAllIterator(
it: AsyncIterableIterator<Uint8Array>
): Promise<Uint8Array> {
@@ -154,14 +117,14 @@ async function readAllIterator(
}
export class ServerRequest {
- pipelineId: number;
url: string;
method: string;
proto: string;
headers: Headers;
- conn: HttpConn;
+ conn: Conn;
r: BufReader;
w: BufWriter;
+ done: Deferred<void> = deferred();
public async *bodyStream(): AsyncIterableIterator<Uint8Array> {
if (this.headers.has("content-length")) {
@@ -244,134 +207,102 @@ export class ServerRequest {
}
async respond(r: Response): Promise<void> {
- // Check and wait if the previous request is done responding.
- const lastPipelineId = this.pipelineId - 1;
- const lastPipelineDeferred = this.conn.pendingDeferredMap.get(
- lastPipelineId
- );
- assert(!!lastPipelineDeferred);
- await lastPipelineDeferred.promise;
- // If yes, delete old deferred and proceed with writing.
- this.conn.pendingDeferredMap.delete(lastPipelineId);
// Write our response!
await writeResponse(this.w, r);
- // Signal the next pending request that it can start writing.
- const currPipelineDeferred = this.conn.pendingDeferredMap.get(
- this.pipelineId
- );
- assert(!!currPipelineDeferred);
- currPipelineDeferred.resolve();
+ // Signal that this request has been processed and the next pipelined
+ // request on the same connection can be accepted.
+ this.done.resolve();
}
}
-interface ServeEnv {
- reqQueue: ServerRequest[];
- serveDeferred: Deferred;
-}
-
-/** Continuously read more requests from conn until EOF
- * Calls maybeHandleReq.
- * bufr is empty on a fresh TCP connection.
- * Would be passed around and reused for later request on same conn
- * TODO: make them async function after this change is done
- * https://github.com/tc39/ecma262/pull/1250
- * See https://v8.dev/blog/fast-async
- */
async function readRequest(
- c: HttpConn,
- bufr?: BufReader
+ conn: Conn,
+ bufr: BufReader
): Promise<[ServerRequest, BufState]> {
- if (!bufr) {
- bufr = new BufReader(c);
- }
- const bufw = new BufWriter(c);
const req = new ServerRequest();
-
- // Set and incr pipeline id;
- req.pipelineId = ++c.lastPipelineId;
- // Set a new pipeline deferred associated with this request
- // for future requests to wait for.
- c.pendingDeferredMap.set(req.pipelineId, deferred());
-
- req.conn = c;
- req.r = bufr!;
- req.w = bufw;
- const tp = new TextProtoReader(bufr!);
-
- let s: string;
+ req.conn = conn;
+ req.r = bufr;
+ req.w = new BufWriter(conn);
+ const tp = new TextProtoReader(bufr);
let err: BufState;
-
// First line: GET /index.html HTTP/1.0
- [s, err] = await tp.readLine();
+ let firstLine: string;
+ [firstLine, err] = await tp.readLine();
if (err) {
return [null, err];
}
- [req.method, req.url, req.proto] = s.split(" ", 3);
-
+ [req.method, req.url, req.proto] = firstLine.split(" ", 3);
[req.headers, err] = await tp.readMIMEHeader();
-
return [req, err];
}
-function maybeHandleReq(
- env: ServeEnv,
- conn: Conn,
- maybeReq: [ServerRequest, BufState]
-): void {
- const [req, _err] = maybeReq;
- if (_err) {
- conn.close(); // assume EOF for now...
- return;
- }
- env.reqQueue.push(req); // push req to queue
- env.serveDeferred.resolve(); // signal while loop to process it
-}
+export class Server implements AsyncIterable<ServerRequest> {
+ private closing = false;
-function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
- readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
-}
+ constructor(public listener: Listener) {}
-export async function* serve(
- addr: string
-): AsyncIterableIterator<ServerRequest> {
- const listener = listen("tcp", addr);
- const env: ServeEnv = {
- reqQueue: [], // in case multiple promises are ready
- serveDeferred: deferred()
- };
+ close(): void {
+ this.closing = true;
+ this.listener.close();
+ }
- // Routine that keeps calling accept
- let handleConn = (_conn: Conn): void => {};
- let scheduleAccept = (): void => {};
- const acceptRoutine = (): void => {
- scheduleAccept = (): void => {
- listener.accept().then(handleConn);
- };
- handleConn = (conn: Conn): void => {
- const httpConn = createHttpConn(conn);
- serveConn(env, httpConn); // don't block
- scheduleAccept(); // schedule next accept
- };
+ // Yields all HTTP requests on a single TCP connection.
+ private async *iterateHttpRequests(
+ conn: Conn
+ ): AsyncIterableIterator<ServerRequest> {
+ const bufr = new BufReader(conn);
+ let bufStateErr: BufState;
+ let req: ServerRequest;
+
+ while (!this.closing) {
+ [req, bufStateErr] = await readRequest(conn, bufr);
+ if (bufStateErr) break;
+ yield req;
+ // Wait for the request to be processed before we accept a new request on
+ // this connection.
+ await req.done;
+ }
- scheduleAccept();
- };
+ if (bufStateErr === "EOF") {
+ // The connection was gracefully closed.
+ } else if (bufStateErr instanceof Error) {
+ // TODO(ry): send something back like a HTTP 500 status.
+ } else if (this.closing) {
+ // There are more requests incoming but the server is closing.
+ // TODO(ry): send a back a HTTP 503 Service Unavailable status.
+ } else {
+ fail(`unexpected BufState: ${bufStateErr}`);
+ }
- acceptRoutine();
+ conn.close();
+ }
- // Loop hack to allow yield (yield won't work in callbacks)
- while (true) {
- await env.serveDeferred.promise;
- env.serveDeferred = deferred(); // use a new deferred
- let queueToProcess = env.reqQueue;
- env.reqQueue = [];
- for (const result of queueToProcess) {
- yield result;
- // Continue read more from conn when user is done with the current req
- // Moving this here makes it easier to manage
- serveConn(env, result.conn, result.r);
- }
+ // Accepts a new TCP connection and yields all HTTP requests that arrive on
+ // it. When a connection is accepted, it also creates a new iterator of the
+ // same kind and adds it to the request multiplexer so that another TCP
+ // connection can be accepted.
+ private async *acceptConnAndIterateHttpRequests(
+ mux: MuxAsyncIterator<ServerRequest>
+ ): AsyncIterableIterator<ServerRequest> {
+ if (this.closing) return;
+ // Wait for a new connection.
+ const conn = await this.listener.accept();
+ // Try to accept another connection and add it to the multiplexer.
+ mux.add(this.acceptConnAndIterateHttpRequests(mux));
+ // Yield the requests that arrive on the just-accepted connection.
+ yield* this.iterateHttpRequests(conn);
}
- listener.close();
+
+ [Symbol.asyncIterator](): AsyncIterableIterator<ServerRequest> {
+ const mux: MuxAsyncIterator<ServerRequest> = new MuxAsyncIterator();
+ mux.add(this.acceptConnAndIterateHttpRequests(mux));
+ return mux.iterate();
+ }
+}
+
+export function serve(addr: string): Server {
+ const listener = listen("tcp", addr);
+ return new Server(listener);
}
export async function listenAndServe(
diff --git a/http/server_test.ts b/http/server_test.ts
index 82a368395..396a0321a 100644
--- a/http/server_test.ts
+++ b/http/server_test.ts
@@ -22,31 +22,6 @@ const dec = new TextDecoder();
type Handler = () => void;
-interface Deferred {
- promise: Promise<{}>;
- resolve: Handler;
- reject: Handler;
-}
-
-function deferred(isResolved = false): Deferred {
- let resolve: Handler = (): void => void 0;
- let reject: Handler = (): void => void 0;
- const promise = new Promise(
- (res, rej): void => {
- resolve = res;
- reject = rej;
- }
- );
- if (isResolved) {
- resolve();
- }
- return {
- promise,
- resolve,
- reject
- };
-}
-
const responseTests: ResponseTest[] = [
// Default response
{
@@ -72,8 +47,8 @@ test(async function responseWrite(): Promise<void> {
const buf = new Buffer();
const bufw = new BufWriter(buf);
const request = new ServerRequest();
- request.pipelineId = 1;
request.w = bufw;
+
request.conn = {
localAddr: "",
remoteAddr: "",
@@ -86,13 +61,12 @@ test(async function responseWrite(): Promise<void> {
write: async (): Promise<number> => {
return -1;
},
- close: (): void => {},
- lastPipelineId: 0,
- pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]])
+ close: (): void => {}
};
await request.respond(testCase.response);
assertEquals(buf.toString(), testCase.raw);
+ await request.done;
}
});
diff --git a/test.ts b/test.ts
index cc921cf9f..221bbc985 100755
--- a/test.ts
+++ b/test.ts
@@ -16,6 +16,7 @@ import "./strings/test.ts";
import "./testing/test.ts";
import "./textproto/test.ts";
import "./toml/test.ts";
+import "./util/test.ts";
import "./ws/test.ts";
import "./testing/main.ts";
diff --git a/util/async.ts b/util/async.ts
new file mode 100644
index 000000000..f9f2477d0
--- /dev/null
+++ b/util/async.ts
@@ -0,0 +1,85 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+
+// TODO(ry) It'd be better to make Deferred a class that inherits from
+// Promise, rather than an interface. This is possible in ES2016, however
+// typescript produces broken code when targeting ES5 code.
+// See https://github.com/Microsoft/TypeScript/issues/15202
+// At the time of writing, the github issue is closed but the problem remains.
+export interface Deferred<T> extends Promise<T> {
+ resolve: (value?: T | PromiseLike<T>) => void;
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ reject: (reason?: any) => void;
+}
+
+/** Creates a Promise with the `reject` and `resolve` functions
+ * placed as methods on the promise object itself. It allows you to do:
+ *
+ * const p = deferred<number>();
+ * // ...
+ * p.resolve(42);
+ */
+export function deferred<T>(): Deferred<T> {
+ let methods;
+ const promise = new Promise<T>(
+ (resolve, reject): void => {
+ methods = { resolve, reject };
+ }
+ );
+ return Object.assign(promise, methods) as Deferred<T>;
+}
+
+interface TaggedYieldedValue<T> {
+ iterator: AsyncIterableIterator<T>;
+ value: T;
+}
+
+/** The MuxAsyncIterator class multiplexes multiple async iterators into a
+ * single stream. It currently makes a few assumptions:
+ * - The iterators do not throw.
+ * - The final result (the value returned and not yielded from the iterator)
+ * does not matter; if there is any, it is discarded.
+ */
+export class MuxAsyncIterator<T> implements AsyncIterable<T> {
+ private iteratorCount = 0;
+ private yields: Array<TaggedYieldedValue<T>> = [];
+ private signal: Deferred<void> = deferred();
+
+ add(iterator: AsyncIterableIterator<T>): void {
+ ++this.iteratorCount;
+ this.callIteratorNext(iterator);
+ }
+
+ private async callIteratorNext(
+ iterator: AsyncIterableIterator<T>
+ ): Promise<void> {
+ const { value, done } = await iterator.next();
+ if (done) {
+ --this.iteratorCount;
+ } else {
+ this.yields.push({ iterator, value });
+ }
+ this.signal.resolve();
+ }
+
+ async *iterate(): AsyncIterableIterator<T> {
+ while (this.iteratorCount > 0) {
+ // Sleep until any of the wrapped iterators yields.
+ await this.signal;
+
+ // Note that while we're looping over `yields`, new items may be added.
+ for (let i = 0; i < this.yields.length; i++) {
+ const { iterator, value } = this.yields[i];
+ yield value;
+ this.callIteratorNext(iterator);
+ }
+
+ // Clear the `yields` list and reset the `signal` promise.
+ this.yields.length = 0;
+ this.signal = deferred();
+ }
+ }
+
+ [Symbol.asyncIterator](): AsyncIterableIterator<T> {
+ return this.iterate();
+ }
+}
diff --git a/util/async_test.ts b/util/async_test.ts
new file mode 100644
index 000000000..c704002d4
--- /dev/null
+++ b/util/async_test.ts
@@ -0,0 +1,34 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+import { test, runIfMain } from "../testing/mod.ts";
+import { assertEquals } from "../testing/asserts.ts";
+import { MuxAsyncIterator, deferred } from "./async.ts";
+
+test(async function asyncDeferred(): Promise<void> {
+ const d = deferred<number>();
+ d.resolve(12);
+});
+
+async function* gen123(): AsyncIterableIterator<number> {
+ yield 1;
+ yield 2;
+ yield 3;
+}
+
+async function* gen456(): AsyncIterableIterator<number> {
+ yield 4;
+ yield 5;
+ yield 6;
+}
+
+test(async function asyncMuxAsyncIterator(): Promise<void> {
+ const mux = new MuxAsyncIterator<number>();
+ mux.add(gen123());
+ mux.add(gen456());
+ const results = new Set();
+ for await (const value of mux) {
+ results.add(value);
+ }
+ assertEquals(results.size, 6);
+});
+
+runIfMain(import.meta);
diff --git a/util/test.ts b/util/test.ts
index a617c10ab..ede984904 100644
--- a/util/test.ts
+++ b/util/test.ts
@@ -1 +1,2 @@
+import "./async_test.ts";
import "./deep_assign_test.ts";