summaryrefslogtreecommitdiff
path: root/net/http.ts
diff options
context:
space:
mode:
Diffstat (limited to 'net/http.ts')
-rw-r--r--net/http.ts141
1 files changed, 127 insertions, 14 deletions
diff --git a/net/http.ts b/net/http.ts
index bd45aea0d..2b0e5477a 100644
--- a/net/http.ts
+++ b/net/http.ts
@@ -28,13 +28,16 @@ interface ServeEnv {
serveDeferred: Deferred;
}
-// Continuously read more requests from conn until EOF
-// Mutually calling with maybeHandleReq
-// TODO: make them async function after this change is done
-// https://github.com/tc39/ecma262/pull/1250
-// See https://v8.dev/blog/fast-async
-export function serveConn(env: ServeEnv, conn: Conn) {
- readRequest(conn).then(maybeHandleReq.bind(null, env, conn));
+/** Continuously read more requests from conn until EOF
+ * Calls maybeHandleReq.
+ * bufr is empty on a fresh TCP connection.
+ * Would be passed around and reused for later request on same conn
+ * TODO: make them async function after this change is done
+ * https://github.com/tc39/ecma262/pull/1250
+ * See https://v8.dev/blog/fast-async
+ */
+function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) {
+ readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}
function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
const [req, _err] = maybeReq;
@@ -44,8 +47,6 @@ function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
}
env.reqQueue.push(req); // push req to queue
env.serveDeferred.resolve(); // signal while loop to process it
- // TODO: protection against client req flooding
- serveConn(env, conn); // try read more (reusing connection)
}
export async function* serve(addr: string) {
@@ -77,6 +78,9 @@ export async function* serve(addr: string) {
env.reqQueue = [];
for (const result of queueToProcess) {
yield result;
+ // Continue read more from conn when user is done with the current req
+ // Moving this here makes it easier to manage
+ serveConn(env, result.conn, result.r);
}
}
listener.close();
@@ -121,8 +125,90 @@ export class ServerRequest {
method: string;
proto: string;
headers: Headers;
+ conn: Conn;
+ r: BufReader;
w: BufWriter;
+ public async *bodyStream() {
+ if (this.headers.has("content-length")) {
+ const len = +this.headers.get("content-length");
+ if (Number.isNaN(len)) {
+ return new Uint8Array(0);
+ }
+ let buf = new Uint8Array(1024);
+ let rr = await this.r.read(buf);
+ let nread = rr.nread;
+ while (!rr.eof && nread < len) {
+ yield buf.subarray(0, rr.nread);
+ buf = new Uint8Array(1024);
+ rr = await this.r.read(buf);
+ nread += rr.nread;
+ }
+ yield buf.subarray(0, rr.nread);
+ } else {
+ if (this.headers.has("transfer-encoding")) {
+ const transferEncodings = this.headers
+ .get("transfer-encoding")
+ .split(",")
+ .map(e => e.trim().toLowerCase());
+ if (transferEncodings.includes("chunked")) {
+ // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
+ const tp = new TextProtoReader(this.r);
+ let [line, _] = await tp.readLine();
+ // TODO: handle chunk extension
+ let [chunkSizeString, optExt] = line.split(";");
+ let chunkSize = parseInt(chunkSizeString, 16);
+ if (Number.isNaN(chunkSize) || chunkSize < 0) {
+ throw new Error("Invalid chunk size");
+ }
+ while (chunkSize > 0) {
+ let data = new Uint8Array(chunkSize);
+ let [nread, err] = await this.r.readFull(data);
+ if (nread !== chunkSize) {
+ throw new Error("Chunk data does not match size");
+ }
+ yield data;
+ await this.r.readLine(); // Consume \r\n
+ [line, _] = await tp.readLine();
+ chunkSize = parseInt(line, 16);
+ }
+ const [entityHeaders, err] = await tp.readMIMEHeader();
+ if (!err) {
+ for (let [k, v] of entityHeaders) {
+ this.headers.set(k, v);
+ }
+ }
+ /* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6
+ length := 0
+ read chunk-size, chunk-extension (if any) and CRLF
+ while (chunk-size > 0) {
+ read chunk-data and CRLF
+ append chunk-data to entity-body
+ length := length + chunk-size
+ read chunk-size and CRLF
+ }
+ read entity-header
+ while (entity-header not empty) {
+ append entity-header to existing header fields
+ read entity-header
+ }
+ Content-Length := length
+ Remove "chunked" from Transfer-Encoding
+ */
+ return; // Must return here to avoid fall through
+ }
+ // TODO: handle other transfer-encoding types
+ }
+ // Otherwise...
+ yield new Uint8Array(0);
+ }
+ }
+
+ // Read the body of the request into a single Uint8Array
+ public async body(): Promise<Uint8Array> {
+ return readAllIterator(this.bodyStream());
+ }
+
private async _streamBody(body: Reader, bodyLength: number) {
const n = await copy(this.w, body);
assert(n == bodyLength);
@@ -187,12 +273,19 @@ export class ServerRequest {
}
}
-async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> {
- const bufr = new BufReader(c);
+async function readRequest(
+ c: Conn,
+ bufr?: BufReader
+): Promise<[ServerRequest, BufState]> {
+ if (!bufr) {
+ bufr = new BufReader(c);
+ }
const bufw = new BufWriter(c);
const req = new ServerRequest();
+ req.conn = c;
+ req.r = bufr!;
req.w = bufw;
- const tp = new TextProtoReader(bufr);
+ const tp = new TextProtoReader(bufr!);
let s: string;
let err: BufState;
@@ -206,7 +299,27 @@ async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> {
[req.headers, err] = await tp.readMIMEHeader();
- // TODO: handle body
-
return [req, err];
}
+
+async function readAllIterator(
+ it: AsyncIterableIterator<Uint8Array>
+): Promise<Uint8Array> {
+ const chunks = [];
+ let len = 0;
+ for await (const chunk of it) {
+ chunks.push(chunk);
+ len += chunk.length;
+ }
+ if (chunks.length === 0) {
+ // No need for copy
+ return chunks[0];
+ }
+ const collected = new Uint8Array(len);
+ let offset = 0;
+ for (let chunk of chunks) {
+ collected.set(chunk, offset);
+ offset += chunk.length;
+ }
+ return collected;
+}