summaryrefslogtreecommitdiff
path: root/http
diff options
context:
space:
mode:
Diffstat (limited to 'http')
-rw-r--r--http/server.ts348
1 files changed, 173 insertions, 175 deletions
diff --git a/http/server.ts b/http/server.ts
index f3b92ee90..5cf658cf3 100644
--- a/http/server.ts
+++ b/http/server.ts
@@ -13,116 +13,105 @@ interface Deferred {
resolve: () => void;
reject: () => void;
}
-
-function deferred(): Deferred {
- let resolve, reject;
- const promise = new Promise((res, rej) => {
- resolve = res;
- reject = rej;
- });
- return {
- promise,
- resolve,
- reject
- };
-}
-
-interface ServeEnv {
- reqQueue: ServerRequest[];
- serveDeferred: Deferred;
-}
-
-/** Continuously read more requests from conn until EOF
- * Calls maybeHandleReq.
- * bufr is empty on a fresh TCP connection.
- * Would be passed around and reused for later request on same conn
- * TODO: make them async function after this change is done
- * https://github.com/tc39/ecma262/pull/1250
- * See https://v8.dev/blog/fast-async
- */
-function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) {
- readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
-}
-
-function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
- const [req, _err] = maybeReq;
- if (_err) {
- conn.close(); // assume EOF for now...
- return;
+function bufWriter(w: Writer): BufWriter {
+ if (w instanceof BufWriter) {
+ return w;
+ } else {
+ return new BufWriter(w);
}
- env.reqQueue.push(req); // push req to queue
- env.serveDeferred.resolve(); // signal while loop to process it
}
+export function setContentLength(r: Response): void {
+ if (!r.headers) {
+ r.headers = new Headers();
+ }
-export async function* serve(addr: string) {
- const listener = listen("tcp", addr);
- const env: ServeEnv = {
- reqQueue: [], // in case multiple promises are ready
- serveDeferred: deferred()
- };
-
- // Routine that keeps calling accept
- const acceptRoutine = () => {
- const handleConn = (conn: Conn) => {
- serveConn(env, conn); // don't block
- scheduleAccept(); // schedule next accept
- };
- const scheduleAccept = () => {
- listener.accept().then(handleConn);
- };
- scheduleAccept();
- };
-
- acceptRoutine();
-
- // Loop hack to allow yield (yield won't work in callbacks)
- while (true) {
- await env.serveDeferred.promise;
- env.serveDeferred = deferred(); // use a new deferred
- let queueToProcess = env.reqQueue;
- env.reqQueue = [];
- for (const result of queueToProcess) {
- yield result;
- // Continue read more from conn when user is done with the current req
- // Moving this here makes it easier to manage
- serveConn(env, result.conn, result.r);
+ if (r.body) {
+ if (!r.headers.has("content-length")) {
+ if (r.body instanceof Uint8Array) {
+ const bodyLength = r.body.byteLength;
+ r.headers.append("Content-Length", bodyLength.toString());
+ } else {
+ r.headers.append("Transfer-Encoding", "chunked");
+ }
}
}
- listener.close();
}
+async function writeChunkedBody(w: Writer, r: Reader) {
+ const writer = bufWriter(w);
+ const encoder = new TextEncoder();
-export async function listenAndServe(
- addr: string,
- handler: (req: ServerRequest) => void
-) {
- const server = serve(addr);
-
- for await (const request of server) {
- await handler(request);
+ for await (const chunk of toAsyncIterator(r)) {
+ const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`);
+ const end = encoder.encode("\r\n");
+ await writer.write(start);
+ await writer.write(chunk);
+ await writer.write(end);
}
-}
-export interface Response {
- status?: number;
- headers?: Headers;
- body?: Uint8Array | Reader;
+ const endChunk = encoder.encode("0\r\n\r\n");
+ await writer.write(endChunk);
}
+export async function writeResponse(w: Writer, r: Response): Promise<void> {
+ const protoMajor = 1;
+ const protoMinor = 1;
+ const statusCode = r.status || 200;
+ const statusText = STATUS_TEXT.get(statusCode);
+ const writer = bufWriter(w);
+ if (!statusText) {
+ throw Error("bad status code");
+ }
-export function setContentLength(r: Response): void {
- if (!r.headers) {
- r.headers = new Headers();
+ let out = `HTTP/${protoMajor}.${protoMinor} ${statusCode} ${statusText}\r\n`;
+
+ setContentLength(r);
+
+ if (r.headers) {
+ for (const [key, value] of r.headers) {
+ out += `${key}: ${value}\r\n`;
+ }
}
+ out += "\r\n";
+
+ const header = new TextEncoder().encode(out);
+ let n = await writer.write(header);
+ assert(header.byteLength == n);
if (r.body) {
- if (!r.headers.has("content-length")) {
- if (r.body instanceof Uint8Array) {
- const bodyLength = r.body.byteLength;
- r.headers.append("Content-Length", bodyLength.toString());
+ if (r.body instanceof Uint8Array) {
+ n = await writer.write(r.body);
+ assert(r.body.byteLength == n);
+ } else {
+ if (r.headers.has("content-length")) {
+ const bodyLength = parseInt(r.headers.get("content-length"));
+ const n = await copy(writer, r.body);
+ assert(n == bodyLength);
} else {
- r.headers.append("Transfer-Encoding", "chunked");
+ await writeChunkedBody(writer, r.body);
}
}
}
+ await writer.flush();
+}
+async function readAllIterator(
+ it: AsyncIterableIterator<Uint8Array>
+): Promise<Uint8Array> {
+ const chunks = [];
+ let len = 0;
+ for await (const chunk of it) {
+ chunks.push(chunk);
+ len += chunk.length;
+ }
+ if (chunks.length === 0) {
+ // No need for copy
+ return chunks[0];
+ }
+ const collected = new Uint8Array(len);
+ let offset = 0;
+ for (let chunk of chunks) {
+ collected.set(chunk, offset);
+ offset += chunk.length;
+ }
+ return collected;
}
export class ServerRequest {
@@ -159,22 +148,22 @@ export class ServerRequest {
if (transferEncodings.includes("chunked")) {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(this.r);
- let [line, _] = await tp.readLine();
+ let [line] = await tp.readLine();
// TODO: handle chunk extension
- let [chunkSizeString, optExt] = line.split(";");
+ let [chunkSizeString] = line.split(";");
let chunkSize = parseInt(chunkSizeString, 16);
if (Number.isNaN(chunkSize) || chunkSize < 0) {
throw new Error("Invalid chunk size");
}
while (chunkSize > 0) {
let data = new Uint8Array(chunkSize);
- let [nread, err] = await this.r.readFull(data);
+ let [nread] = await this.r.readFull(data);
if (nread !== chunkSize) {
throw new Error("Chunk data does not match size");
}
yield data;
await this.r.readLine(); // Consume \r\n
- [line, _] = await tp.readLine();
+ [line] = await tp.readLine();
chunkSize = parseInt(line, 16);
}
const [entityHeaders, err] = await tp.readMIMEHeader();
@@ -219,72 +208,32 @@ export class ServerRequest {
}
}
-function bufWriter(w: Writer): BufWriter {
- if (w instanceof BufWriter) {
- return w;
- } else {
- return new BufWriter(w);
- }
-}
-
-export async function writeResponse(w: Writer, r: Response): Promise<void> {
- const protoMajor = 1;
- const protoMinor = 1;
- const statusCode = r.status || 200;
- const statusText = STATUS_TEXT.get(statusCode);
- const writer = bufWriter(w);
- if (!statusText) {
- throw Error("bad status code");
- }
-
- let out = `HTTP/${protoMajor}.${protoMinor} ${statusCode} ${statusText}\r\n`;
-
- setContentLength(r);
-
- if (r.headers) {
- for (const [key, value] of r.headers) {
- out += `${key}: ${value}\r\n`;
- }
- }
- out += "\r\n";
-
- const header = new TextEncoder().encode(out);
- let n = await writer.write(header);
- assert(header.byteLength == n);
-
- if (r.body) {
- if (r.body instanceof Uint8Array) {
- n = await writer.write(r.body);
- assert(r.body.byteLength == n);
- } else {
- if (r.headers.has("content-length")) {
- const bodyLength = parseInt(r.headers.get("content-length"));
- const n = await copy(writer, r.body);
- assert(n == bodyLength);
- } else {
- await writeChunkedBody(writer, r.body);
- }
- }
- }
- await writer.flush();
+function deferred(): Deferred {
+ let resolve, reject;
+ const promise = new Promise((res, rej) => {
+ resolve = res;
+ reject = rej;
+ });
+ return {
+ promise,
+ resolve,
+ reject
+ };
}
-async function writeChunkedBody(w: Writer, r: Reader) {
- const writer = bufWriter(w);
- const encoder = new TextEncoder();
-
- for await (const chunk of toAsyncIterator(r)) {
- const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`);
- const end = encoder.encode("\r\n");
- await writer.write(start);
- await writer.write(chunk);
- await writer.write(end);
- }
-
- const endChunk = encoder.encode("0\r\n\r\n");
- await writer.write(endChunk);
+interface ServeEnv {
+ reqQueue: ServerRequest[];
+ serveDeferred: Deferred;
}
+/** Continuously read more requests from conn until EOF
+ * Calls maybeHandleReq.
+ * bufr is empty on a fresh TCP connection.
+ * Would be passed around and reused for later request on same conn
+ * TODO: make them async function after this change is done
+ * https://github.com/tc39/ecma262/pull/1250
+ * See https://v8.dev/blog/fast-async
+ */
async function readRequest(
c: Conn,
bufr?: BufReader
@@ -314,24 +263,73 @@ async function readRequest(
return [req, err];
}
-async function readAllIterator(
- it: AsyncIterableIterator<Uint8Array>
-): Promise<Uint8Array> {
- const chunks = [];
- let len = 0;
- for await (const chunk of it) {
- chunks.push(chunk);
- len += chunk.length;
+function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
+ const [req, _err] = maybeReq;
+ if (_err) {
+ conn.close(); // assume EOF for now...
+ return;
}
- if (chunks.length === 0) {
- // No need for copy
- return chunks[0];
+ env.reqQueue.push(req); // push req to queue
+ env.serveDeferred.resolve(); // signal while loop to process it
+}
+
+function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) {
+ readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
+}
+
+export async function* serve(addr: string) {
+ const listener = listen("tcp", addr);
+ const env: ServeEnv = {
+ reqQueue: [], // in case multiple promises are ready
+ serveDeferred: deferred()
+ };
+
+ // Routine that keeps calling accept
+ let handleConn = (_conn: Conn) => {};
+ let scheduleAccept = () => {};
+ const acceptRoutine = () => {
+ scheduleAccept = () => {
+ listener.accept().then(handleConn);
+ };
+ handleConn = (conn: Conn) => {
+ serveConn(env, conn); // don't block
+ scheduleAccept(); // schedule next accept
+ };
+
+ scheduleAccept();
+ };
+
+ acceptRoutine();
+
+ // Loop hack to allow yield (yield won't work in callbacks)
+ while (true) {
+ await env.serveDeferred.promise;
+ env.serveDeferred = deferred(); // use a new deferred
+ let queueToProcess = env.reqQueue;
+ env.reqQueue = [];
+ for (const result of queueToProcess) {
+ yield result;
+ // Continue read more from conn when user is done with the current req
+ // Moving this here makes it easier to manage
+ serveConn(env, result.conn, result.r);
+ }
}
- const collected = new Uint8Array(len);
- let offset = 0;
- for (let chunk of chunks) {
- collected.set(chunk, offset);
- offset += chunk.length;
+ listener.close();
+}
+
+export async function listenAndServe(
+ addr: string,
+ handler: (req: ServerRequest) => void
+) {
+ const server = serve(addr);
+
+ for await (const request of server) {
+ await handler(request);
}
- return collected;
+}
+
+export interface Response {
+ status?: number;
+ headers?: Headers;
+ body?: Uint8Array | Reader;
}