summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevin (Kun) "Kassimo" Qian <kevinkassimo@gmail.com>2018-12-18 20:48:05 -0500
committerRyan Dahl <ry@tinyclouds.org>2018-12-18 20:48:05 -0500
commit3c8f564ab8c3087bac8256723aed9572faba756f (patch)
tree33846b8242d0c03c0c982675255ac88927d91756
parentee72e869f7073f8f3ccce55f32695df96e3d8113 (diff)
http: Request body & Streaming (denoland/deno_std#23)
Original: https://github.com/denoland/deno_std/commit/e0e677bb02ad1587743373fe59efcaba1f89d1ed
-rw-r--r--net/http.ts141
-rw-r--r--net/http_test.ts172
2 files changed, 298 insertions, 15 deletions
diff --git a/net/http.ts b/net/http.ts
index bd45aea0d..2b0e5477a 100644
--- a/net/http.ts
+++ b/net/http.ts
@@ -28,13 +28,16 @@ interface ServeEnv {
serveDeferred: Deferred;
}
-// Continuously read more requests from conn until EOF
-// Mutually calling with maybeHandleReq
-// TODO: make them async function after this change is done
-// https://github.com/tc39/ecma262/pull/1250
-// See https://v8.dev/blog/fast-async
-export function serveConn(env: ServeEnv, conn: Conn) {
- readRequest(conn).then(maybeHandleReq.bind(null, env, conn));
+/** Continuously read more requests from conn until EOF
+ * Calls maybeHandleReq.
+ * bufr is empty on a fresh TCP connection.
+ * Would be passed around and reused for later request on same conn
+ * TODO: make them async function after this change is done
+ * https://github.com/tc39/ecma262/pull/1250
+ * See https://v8.dev/blog/fast-async
+ */
+function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) {
+ readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}
function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
const [req, _err] = maybeReq;
@@ -44,8 +47,6 @@ function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
}
env.reqQueue.push(req); // push req to queue
env.serveDeferred.resolve(); // signal while loop to process it
- // TODO: protection against client req flooding
- serveConn(env, conn); // try read more (reusing connection)
}
export async function* serve(addr: string) {
@@ -77,6 +78,9 @@ export async function* serve(addr: string) {
env.reqQueue = [];
for (const result of queueToProcess) {
yield result;
+ // Continue read more from conn when user is done with the current req
+ // Moving this here makes it easier to manage
+ serveConn(env, result.conn, result.r);
}
}
listener.close();
@@ -121,8 +125,90 @@ export class ServerRequest {
method: string;
proto: string;
headers: Headers;
+ conn: Conn;
+ r: BufReader;
w: BufWriter;
+ public async *bodyStream() {
+ if (this.headers.has("content-length")) {
+ const len = +this.headers.get("content-length");
+ if (Number.isNaN(len)) {
+ return new Uint8Array(0);
+ }
+ let buf = new Uint8Array(1024);
+ let rr = await this.r.read(buf);
+ let nread = rr.nread;
+ while (!rr.eof && nread < len) {
+ yield buf.subarray(0, rr.nread);
+ buf = new Uint8Array(1024);
+ rr = await this.r.read(buf);
+ nread += rr.nread;
+ }
+ yield buf.subarray(0, rr.nread);
+ } else {
+ if (this.headers.has("transfer-encoding")) {
+ const transferEncodings = this.headers
+ .get("transfer-encoding")
+ .split(",")
+ .map(e => e.trim().toLowerCase());
+ if (transferEncodings.includes("chunked")) {
+ // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
+ const tp = new TextProtoReader(this.r);
+ let [line, _] = await tp.readLine();
+ // TODO: handle chunk extension
+ let [chunkSizeString, optExt] = line.split(";");
+ let chunkSize = parseInt(chunkSizeString, 16);
+ if (Number.isNaN(chunkSize) || chunkSize < 0) {
+ throw new Error("Invalid chunk size");
+ }
+ while (chunkSize > 0) {
+ let data = new Uint8Array(chunkSize);
+ let [nread, err] = await this.r.readFull(data);
+ if (nread !== chunkSize) {
+ throw new Error("Chunk data does not match size");
+ }
+ yield data;
+ await this.r.readLine(); // Consume \r\n
+ [line, _] = await tp.readLine();
+ chunkSize = parseInt(line, 16);
+ }
+ const [entityHeaders, err] = await tp.readMIMEHeader();
+ if (!err) {
+ for (let [k, v] of entityHeaders) {
+ this.headers.set(k, v);
+ }
+ }
+ /* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6
+ length := 0
+ read chunk-size, chunk-extension (if any) and CRLF
+ while (chunk-size > 0) {
+ read chunk-data and CRLF
+ append chunk-data to entity-body
+ length := length + chunk-size
+ read chunk-size and CRLF
+ }
+ read entity-header
+ while (entity-header not empty) {
+ append entity-header to existing header fields
+ read entity-header
+ }
+ Content-Length := length
+ Remove "chunked" from Transfer-Encoding
+ */
+ return; // Must return here to avoid fall through
+ }
+ // TODO: handle other transfer-encoding types
+ }
+ // Otherwise...
+ yield new Uint8Array(0);
+ }
+ }
+
+ // Read the body of the request into a single Uint8Array
+ public async body(): Promise<Uint8Array> {
+ return readAllIterator(this.bodyStream());
+ }
+
private async _streamBody(body: Reader, bodyLength: number) {
const n = await copy(this.w, body);
assert(n == bodyLength);
@@ -187,12 +273,19 @@ export class ServerRequest {
}
}
-async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> {
- const bufr = new BufReader(c);
+async function readRequest(
+ c: Conn,
+ bufr?: BufReader
+): Promise<[ServerRequest, BufState]> {
+ if (!bufr) {
+ bufr = new BufReader(c);
+ }
const bufw = new BufWriter(c);
const req = new ServerRequest();
+ req.conn = c;
+ req.r = bufr!;
req.w = bufw;
- const tp = new TextProtoReader(bufr);
+ const tp = new TextProtoReader(bufr!);
let s: string;
let err: BufState;
@@ -206,7 +299,27 @@ async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> {
[req.headers, err] = await tp.readMIMEHeader();
- // TODO: handle body
-
return [req, err];
}
+
+async function readAllIterator(
+ it: AsyncIterableIterator<Uint8Array>
+): Promise<Uint8Array> {
+ const chunks = [];
+ let len = 0;
+ for await (const chunk of it) {
+ chunks.push(chunk);
+ len += chunk.length;
+ }
+ if (chunks.length === 0) {
+ // No need for copy
+ return chunks[0];
+ }
+ const collected = new Uint8Array(len);
+ let offset = 0;
+ for (let chunk of chunks) {
+ collected.set(chunk, offset);
+ offset += chunk.length;
+ }
+ return collected;
+}
diff --git a/net/http_test.ts b/net/http_test.ts
index cdb7f8303..97d07a5b4 100644
--- a/net/http_test.ts
+++ b/net/http_test.ts
@@ -18,13 +18,16 @@ import {
Response
} from "./http";
import { Buffer } from "deno";
-import { BufWriter } from "./bufio";
+import { BufWriter, BufReader } from "./bufio";
interface ResponseTest {
response: Response;
raw: string;
}
+const enc = new TextEncoder();
+const dec = new TextDecoder();
+
const responseTests: ResponseTest[] = [
// Default response
{
@@ -56,3 +59,170 @@ test(async function responseWrite() {
assertEqual(buf.toString(), testCase.raw);
}
});
+
+test(async function requestBodyWithContentLength() {
+ {
+ const req = new ServerRequest();
+ req.headers = new Headers();
+ req.headers.set("content-length", "5");
+ const buf = new Buffer(enc.encode("Hello"));
+ req.r = new BufReader(buf);
+ const body = dec.decode(await req.body());
+ assertEqual(body, "Hello");
+ }
+
+ // Larger than internal buf
+ {
+ const longText = "1234\n".repeat(1000);
+ const req = new ServerRequest();
+ req.headers = new Headers();
+ req.headers.set("Content-Length", "5000");
+ const buf = new Buffer(enc.encode(longText));
+ req.r = new BufReader(buf);
+ const body = dec.decode(await req.body());
+ assertEqual(body, longText);
+ }
+});
+
+test(async function requestBodyWithTransferEncoding() {
+ {
+ const shortText = "Hello";
+ const req = new ServerRequest();
+ req.headers = new Headers();
+ req.headers.set("transfer-encoding", "chunked");
+ let chunksData = "";
+ let chunkOffset = 0;
+ const maxChunkSize = 70;
+ while (chunkOffset < shortText.length) {
+ const chunkSize = Math.min(maxChunkSize, shortText.length - chunkOffset);
+ chunksData += `${chunkSize.toString(16)}\r\n${shortText.substr(
+ chunkOffset,
+ chunkSize
+ )}\r\n`;
+ chunkOffset += chunkSize;
+ }
+ chunksData += "0\r\n\r\n";
+ const buf = new Buffer(enc.encode(chunksData));
+ req.r = new BufReader(buf);
+ const body = dec.decode(await req.body());
+ assertEqual(body, shortText);
+ }
+
+ // Larger than internal buf
+ {
+ const longText = "1234\n".repeat(1000);
+ const req = new ServerRequest();
+ req.headers = new Headers();
+ req.headers.set("transfer-encoding", "chunked");
+ let chunksData = "";
+ let chunkOffset = 0;
+ const maxChunkSize = 70;
+ while (chunkOffset < longText.length) {
+ const chunkSize = Math.min(maxChunkSize, longText.length - chunkOffset);
+ chunksData += `${chunkSize.toString(16)}\r\n${longText.substr(
+ chunkOffset,
+ chunkSize
+ )}\r\n`;
+ chunkOffset += chunkSize;
+ }
+ chunksData += "0\r\n\r\n";
+ const buf = new Buffer(enc.encode(chunksData));
+ req.r = new BufReader(buf);
+ const body = dec.decode(await req.body());
+ assertEqual(body, longText);
+ }
+});
+
+test(async function requestBodyStreamWithContentLength() {
+ {
+ const shortText = "Hello";
+ const req = new ServerRequest();
+ req.headers = new Headers();
+ req.headers.set("content-length", "" + shortText.length);
+ const buf = new Buffer(enc.encode(shortText));
+ req.r = new BufReader(buf);
+ const it = await req.bodyStream();
+ let offset = 0;
+ for await (const chunk of it) {
+ const s = dec.decode(chunk);
+ assertEqual(shortText.substr(offset, s.length), s);
+ offset += s.length;
+ }
+ }
+
+ // Larger than internal buf
+ {
+ const longText = "1234\n".repeat(1000);
+ const req = new ServerRequest();
+ req.headers = new Headers();
+ req.headers.set("Content-Length", "5000");
+ const buf = new Buffer(enc.encode(longText));
+ req.r = new BufReader(buf);
+ const it = await req.bodyStream();
+ let offset = 0;
+ for await (const chunk of it) {
+ const s = dec.decode(chunk);
+ assertEqual(longText.substr(offset, s.length), s);
+ offset += s.length;
+ }
+ }
+});
+
+test(async function requestBodyStreamWithTransferEncoding() {
+ {
+ const shortText = "Hello";
+ const req = new ServerRequest();
+ req.headers = new Headers();
+ req.headers.set("transfer-encoding", "chunked");
+ let chunksData = "";
+ let chunkOffset = 0;
+ const maxChunkSize = 70;
+ while (chunkOffset < shortText.length) {
+ const chunkSize = Math.min(maxChunkSize, shortText.length - chunkOffset);
+ chunksData += `${chunkSize.toString(16)}\r\n${shortText.substr(
+ chunkOffset,
+ chunkSize
+ )}\r\n`;
+ chunkOffset += chunkSize;
+ }
+ chunksData += "0\r\n\r\n";
+ const buf = new Buffer(enc.encode(chunksData));
+ req.r = new BufReader(buf);
+ const it = await req.bodyStream();
+ let offset = 0;
+ for await (const chunk of it) {
+ const s = dec.decode(chunk);
+ assertEqual(shortText.substr(offset, s.length), s);
+ offset += s.length;
+ }
+ }
+
+ // Larger than internal buf
+ {
+ const longText = "1234\n".repeat(1000);
+ const req = new ServerRequest();
+ req.headers = new Headers();
+ req.headers.set("transfer-encoding", "chunked");
+ let chunksData = "";
+ let chunkOffset = 0;
+ const maxChunkSize = 70;
+ while (chunkOffset < longText.length) {
+ const chunkSize = Math.min(maxChunkSize, longText.length - chunkOffset);
+ chunksData += `${chunkSize.toString(16)}\r\n${longText.substr(
+ chunkOffset,
+ chunkSize
+ )}\r\n`;
+ chunkOffset += chunkSize;
+ }
+ chunksData += "0\r\n\r\n";
+ const buf = new Buffer(enc.encode(chunksData));
+ req.r = new BufReader(buf);
+ const it = await req.bodyStream();
+ let offset = 0;
+ for await (const chunk of it) {
+ const s = dec.decode(chunk);
+ assertEqual(longText.substr(offset, s.length), s);
+ offset += s.length;
+ }
+ }
+});