summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevin (Kun) "Kassimo" Qian <kevinkassimo@gmail.com>2019-04-13 12:23:56 -0700
committerRyan Dahl <ry@tinyclouds.org>2019-04-13 15:23:56 -0400
commit236cedc7cba21132a2280c86ae4cf44c057ab5d8 (patch)
treebbaede3c311c377d9b4a22dc365875f7f642185f
parent733fbfd555a2db3365ac8ce19bc11e4a97fcfd47 (diff)
Enforce HTTP/1.1 pipeline response order (denoland/deno_std#331)
Original: https://github.com/denoland/deno_std/commit/144ef0e08d589fad2ca19eb4ef1ea20f1749bb5c
-rw-r--r--http/racing_server.ts53
-rw-r--r--http/racing_server_test.ts65
-rw-r--r--http/server.ts84
-rw-r--r--http/server_test.ts39
-rw-r--r--http/test.ts1
5 files changed, 224 insertions, 18 deletions
diff --git a/http/racing_server.ts b/http/racing_server.ts
new file mode 100644
index 000000000..c44fc1216
--- /dev/null
+++ b/http/racing_server.ts
@@ -0,0 +1,53 @@
+// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
+import { serve, ServerRequest } from "./server.ts";
+
+const addr = Deno.args[1] || "127.0.0.1:4501";
+const server = serve(addr);
+
+const body = new TextEncoder().encode("Hello 1\n");
+const body4 = new TextEncoder().encode("World 4\n");
+
+function sleep(ms: number): Promise<void> {
+ return new Promise(res => setTimeout(res, ms));
+}
+
+async function delayedRespond(request: ServerRequest): Promise<void> {
+ await sleep(3000);
+ await request.respond({ status: 200, body });
+}
+
+async function largeRespond(request: ServerRequest, c: string): Promise<void> {
+ const b = new Uint8Array(1024 * 1024);
+ b.fill(c.charCodeAt(0));
+ await request.respond({ status: 200, body: b });
+}
+
+async function main(): Promise<void> {
+ let step = 1;
+ for await (const request of server) {
+ switch (step) {
+ case 1:
+ // Try to wait long enough.
+ // For pipelining, this should cause all the following response
+ // to block.
+ delayedRespond(request);
+ break;
+ case 2:
+ // HUGE body.
+ largeRespond(request, "a");
+ break;
+ case 3:
+ // HUGE body.
+ largeRespond(request, "b");
+ break;
+ default:
+ request.respond({ status: 200, body: body4 });
+ break;
+ }
+ step++;
+ }
+}
+
+main();
+
+console.log("Racing server listening...\n");
diff --git a/http/racing_server_test.ts b/http/racing_server_test.ts
new file mode 100644
index 000000000..0c1a5c65f
--- /dev/null
+++ b/http/racing_server_test.ts
@@ -0,0 +1,65 @@
+const { dial, run } = Deno;
+
+import { test } from "../testing/mod.ts";
+import { assert, assertEquals } from "../testing/asserts.ts";
+import { BufReader } from "../io/bufio.ts";
+import { TextProtoReader } from "../textproto/mod.ts";
+
+let server;
+async function startServer(): Promise<void> {
+ server = run({
+ args: ["deno", "-A", "http/racing_server.ts"],
+ stdout: "piped"
+ });
+ // Once fileServer is ready it will write to its stdout.
+ const r = new TextProtoReader(new BufReader(server.stdout));
+ const [s, err] = await r.readLine();
+ assert(err == null);
+ assert(s.includes("Racing server listening..."));
+}
+function killServer(): void {
+ server.close();
+ server.stdout.close();
+}
+
+let input = `GET / HTTP/1.1
+
+GET / HTTP/1.1
+
+GET / HTTP/1.1
+
+GET / HTTP/1.1
+
+`;
+const HUGE_BODY_SIZE = 1024 * 1024;
+let output = `HTTP/1.1 200 OK
+content-length: 8
+
+Hello 1
+HTTP/1.1 200 OK
+content-length: ${HUGE_BODY_SIZE}
+
+${"a".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK
+content-length: ${HUGE_BODY_SIZE}
+
+${"b".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK
+content-length: 8
+
+World 4
+`;
+
+test(async function serverPipelineRace(): Promise<void> {
+ await startServer();
+
+ const conn = await dial("tcp", "127.0.0.1:4501");
+ const r = new TextProtoReader(new BufReader(conn));
+ await conn.write(new TextEncoder().encode(input));
+ const outLines = output.split("\n");
+ // length - 1 to disregard last empty line
+ for (let i = 0; i < outLines.length - 1; i++) {
+ const [s, err] = await r.readLine();
+ assert(!err);
+ assertEquals(s, outLines[i]);
+ }
+ killServer();
+});
diff --git a/http/server.ts b/http/server.ts
index 17295f739..f1ef015ba 100644
--- a/http/server.ts
+++ b/http/server.ts
@@ -13,6 +13,42 @@ interface Deferred {
resolve: () => void;
reject: () => void;
}
+
+function deferred(isResolved = false): Deferred {
+ let resolve, reject;
+ const promise = new Promise((res, rej) => {
+ resolve = res;
+ reject = rej;
+ });
+ if (isResolved) {
+ resolve();
+ }
+ return {
+ promise,
+ resolve,
+ reject
+ };
+}
+
+interface HttpConn extends Conn {
+ // When read by a newly created request B, lastId is the id pointing to a previous
+ // request A, such that we must wait for responses to A to complete before
+ // writing B's response.
+ lastPipelineId: number;
+ pendingDeferredMap: Map<number, Deferred>;
+}
+
+function createHttpConn(c: Conn): HttpConn {
+ const httpConn = Object.assign(c, {
+ lastPipelineId: 0,
+ pendingDeferredMap: new Map()
+ });
+
+ const resolvedDeferred = deferred(true);
+ httpConn.pendingDeferredMap.set(0, resolvedDeferred);
+ return httpConn;
+}
+
function bufWriter(w: Writer): BufWriter {
if (w instanceof BufWriter) {
return w;
@@ -115,11 +151,12 @@ async function readAllIterator(
}
export class ServerRequest {
+ pipelineId: number;
url: string;
method: string;
proto: string;
headers: Headers;
- conn: Conn;
+ conn: HttpConn;
r: BufReader;
w: BufWriter;
@@ -204,23 +241,26 @@ export class ServerRequest {
}
async respond(r: Response): Promise<void> {
- return writeResponse(this.w, r);
+ // Check and wait if the previous request is done responding.
+ const lastPipelineId = this.pipelineId - 1;
+ const lastPipelineDeferred = this.conn.pendingDeferredMap.get(
+ lastPipelineId
+ );
+ assert(!!lastPipelineDeferred);
+ await lastPipelineDeferred.promise;
+ // If yes, delete old deferred and proceed with writing.
+ this.conn.pendingDeferredMap.delete(lastPipelineId);
+ // Write our response!
+ await writeResponse(this.w, r);
+ // Signal the next pending request that it can start writing.
+ const currPipelineDeferred = this.conn.pendingDeferredMap.get(
+ this.pipelineId
+ );
+ assert(!!currPipelineDeferred);
+ currPipelineDeferred.resolve();
}
}
-function deferred(): Deferred {
- let resolve, reject;
- const promise = new Promise((res, rej) => {
- resolve = res;
- reject = rej;
- });
- return {
- promise,
- resolve,
- reject
- };
-}
-
interface ServeEnv {
reqQueue: ServerRequest[];
serveDeferred: Deferred;
@@ -235,7 +275,7 @@ interface ServeEnv {
* See https://v8.dev/blog/fast-async
*/
async function readRequest(
- c: Conn,
+ c: HttpConn,
bufr?: BufReader
): Promise<[ServerRequest, BufState]> {
if (!bufr) {
@@ -243,6 +283,13 @@ async function readRequest(
}
const bufw = new BufWriter(c);
const req = new ServerRequest();
+
+ // Set and incr pipeline id;
+ req.pipelineId = ++c.lastPipelineId;
+ // Set a new pipeline deferred associated with this request
+ // for future requests to wait for.
+ c.pendingDeferredMap.set(req.pipelineId, deferred());
+
req.conn = c;
req.r = bufr!;
req.w = bufw;
@@ -277,7 +324,7 @@ function maybeHandleReq(
env.serveDeferred.resolve(); // signal while loop to process it
}
-function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader): void {
+function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}
@@ -298,7 +345,8 @@ export async function* serve(
listener.accept().then(handleConn);
};
handleConn = (conn: Conn) => {
- serveConn(env, conn); // don't block
+ const httpConn = createHttpConn(conn);
+ serveConn(env, httpConn); // don't block
scheduleAccept(); // schedule next accept
};
diff --git a/http/server_test.ts b/http/server_test.ts
index 4f1bc1c40..09a8df2bd 100644
--- a/http/server_test.ts
+++ b/http/server_test.ts
@@ -19,6 +19,28 @@ interface ResponseTest {
const enc = new TextEncoder();
const dec = new TextDecoder();
+interface Deferred {
+ promise: Promise<{}>;
+ resolve: () => void;
+ reject: () => void;
+}
+
+function deferred(isResolved = false): Deferred {
+ let resolve, reject;
+ const promise = new Promise((res, rej) => {
+ resolve = res;
+ reject = rej;
+ });
+ if (isResolved) {
+ resolve();
+ }
+ return {
+ promise,
+ resolve,
+ reject
+ };
+}
+
const responseTests: ResponseTest[] = [
// Default response
{
@@ -44,7 +66,24 @@ test(async function responseWrite() {
const buf = new Buffer();
const bufw = new BufWriter(buf);
const request = new ServerRequest();
+ request.pipelineId = 1;
request.w = bufw;
+ request.conn = {
+ localAddr: "",
+ remoteAddr: "",
+ rid: -1,
+ closeRead: () => {},
+ closeWrite: () => {},
+ read: async () => {
+ return { eof: true, nread: 0 };
+ },
+ write: async () => {
+ return -1;
+ },
+ close: () => {},
+ lastPipelineId: 0,
+ pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]])
+ };
await request.respond(testCase.response);
assertEquals(buf.toString(), testCase.raw);
diff --git a/http/test.ts b/http/test.ts
index 25b8af79f..938ea4458 100644
--- a/http/test.ts
+++ b/http/test.ts
@@ -1,3 +1,4 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import "./server_test.ts";
import "./file_server_test.ts";
+import "./racing_server_test.ts";