From a295bb0d4255993103b6afe9ffdd2bd4e4c65c95 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Mon, 20 May 2019 09:17:26 -0400 Subject: Clean up HTTP async iterator code (denoland/deno_std#411) Original: https://github.com/denoland/deno_std/commit/68faf32f721d2a95c7b1c75661713c8118c077c7 --- http/server.ts | 231 ++++++++++++++++++++------------------------------------- 1 file changed, 81 insertions(+), 150 deletions(-) (limited to 'http/server.ts') diff --git a/http/server.ts b/http/server.ts index 484ecf808..281f8d302 100644 --- a/http/server.ts +++ b/http/server.ts @@ -1,55 +1,14 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. const { listen, copy, toAsyncIterator } = Deno; +type Listener = Deno.Listener; type Conn = Deno.Conn; type Reader = Deno.Reader; type Writer = Deno.Writer; import { BufReader, BufState, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; -import { assert } from "../testing/asserts.ts"; - -interface Deferred { - promise: Promise<{}>; - resolve: () => void; - reject: () => void; -} - -function deferred(isResolved = false): Deferred { - let resolve, reject; - const promise = new Promise( - (res, rej): void => { - resolve = res; - reject = rej; - } - ); - if (isResolved) { - resolve(); - } - return { - promise, - resolve, - reject - }; -} - -interface HttpConn extends Conn { - // When read by a newly created request B, lastId is the id pointing to a previous - // request A, such that we must wait for responses to A to complete before - // writing B's response. - lastPipelineId: number; - pendingDeferredMap: Map; -} - -function createHttpConn(c: Conn): HttpConn { - const httpConn = Object.assign(c, { - lastPipelineId: 0, - pendingDeferredMap: new Map() - }); - - const resolvedDeferred = deferred(true); - httpConn.pendingDeferredMap.set(0, resolvedDeferred); - return httpConn; -} +import { assert, fail } from "../testing/asserts.ts"; +import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts"; function bufWriter(w: Writer): BufWriter { if (w instanceof BufWriter) { @@ -58,6 +17,7 @@ function bufWriter(w: Writer): BufWriter { return new BufWriter(w); } } + export function setContentLength(r: Response): void { if (!r.headers) { r.headers = new Headers(); @@ -74,6 +34,7 @@ export function setContentLength(r: Response): void { } } } + async function writeChunkedBody(w: Writer, r: Reader): Promise { const writer = bufWriter(w); const encoder = new TextEncoder(); @@ -90,6 +51,7 @@ async function writeChunkedBody(w: Writer, r: Reader): Promise { const endChunk = encoder.encode("0\r\n\r\n"); await writer.write(endChunk); } + export async function writeResponse(w: Writer, r: Response): Promise { const protoMajor = 1; const protoMinor = 1; @@ -131,6 +93,7 @@ export async function writeResponse(w: Writer, r: Response): Promise { } await writer.flush(); } + async function readAllIterator( it: AsyncIterableIterator ): Promise { @@ -154,14 +117,14 @@ async function readAllIterator( } export class ServerRequest { - pipelineId: number; url: string; method: string; proto: string; headers: Headers; - conn: HttpConn; + conn: Conn; r: BufReader; w: BufWriter; + done: Deferred = deferred(); public async *bodyStream(): AsyncIterableIterator { if (this.headers.has("content-length")) { @@ -244,134 +207,102 @@ export class ServerRequest { } async respond(r: Response): Promise { - // Check and wait if the previous request is done responding. - const lastPipelineId = this.pipelineId - 1; - const lastPipelineDeferred = this.conn.pendingDeferredMap.get( - lastPipelineId - ); - assert(!!lastPipelineDeferred); - await lastPipelineDeferred.promise; - // If yes, delete old deferred and proceed with writing. - this.conn.pendingDeferredMap.delete(lastPipelineId); // Write our response! await writeResponse(this.w, r); - // Signal the next pending request that it can start writing. - const currPipelineDeferred = this.conn.pendingDeferredMap.get( - this.pipelineId - ); - assert(!!currPipelineDeferred); - currPipelineDeferred.resolve(); + // Signal that this request has been processed and the next pipelined + // request on the same connection can be accepted. + this.done.resolve(); } } -interface ServeEnv { - reqQueue: ServerRequest[]; - serveDeferred: Deferred; -} - -/** Continuously read more requests from conn until EOF - * Calls maybeHandleReq. - * bufr is empty on a fresh TCP connection. - * Would be passed around and reused for later request on same conn - * TODO: make them async function after this change is done - * https://github.com/tc39/ecma262/pull/1250 - * See https://v8.dev/blog/fast-async - */ async function readRequest( - c: HttpConn, - bufr?: BufReader + conn: Conn, + bufr: BufReader ): Promise<[ServerRequest, BufState]> { - if (!bufr) { - bufr = new BufReader(c); - } - const bufw = new BufWriter(c); const req = new ServerRequest(); - - // Set and incr pipeline id; - req.pipelineId = ++c.lastPipelineId; - // Set a new pipeline deferred associated with this request - // for future requests to wait for. - c.pendingDeferredMap.set(req.pipelineId, deferred()); - - req.conn = c; - req.r = bufr!; - req.w = bufw; - const tp = new TextProtoReader(bufr!); - - let s: string; + req.conn = conn; + req.r = bufr; + req.w = new BufWriter(conn); + const tp = new TextProtoReader(bufr); let err: BufState; - // First line: GET /index.html HTTP/1.0 - [s, err] = await tp.readLine(); + let firstLine: string; + [firstLine, err] = await tp.readLine(); if (err) { return [null, err]; } - [req.method, req.url, req.proto] = s.split(" ", 3); - + [req.method, req.url, req.proto] = firstLine.split(" ", 3); [req.headers, err] = await tp.readMIMEHeader(); - return [req, err]; } -function maybeHandleReq( - env: ServeEnv, - conn: Conn, - maybeReq: [ServerRequest, BufState] -): void { - const [req, _err] = maybeReq; - if (_err) { - conn.close(); // assume EOF for now... - return; - } - env.reqQueue.push(req); // push req to queue - env.serveDeferred.resolve(); // signal while loop to process it -} +export class Server implements AsyncIterable { + private closing = false; -function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void { - readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); -} + constructor(public listener: Listener) {} -export async function* serve( - addr: string -): AsyncIterableIterator { - const listener = listen("tcp", addr); - const env: ServeEnv = { - reqQueue: [], // in case multiple promises are ready - serveDeferred: deferred() - }; + close(): void { + this.closing = true; + this.listener.close(); + } - // Routine that keeps calling accept - let handleConn = (_conn: Conn): void => {}; - let scheduleAccept = (): void => {}; - const acceptRoutine = (): void => { - scheduleAccept = (): void => { - listener.accept().then(handleConn); - }; - handleConn = (conn: Conn): void => { - const httpConn = createHttpConn(conn); - serveConn(env, httpConn); // don't block - scheduleAccept(); // schedule next accept - }; + // Yields all HTTP requests on a single TCP connection. + private async *iterateHttpRequests( + conn: Conn + ): AsyncIterableIterator { + const bufr = new BufReader(conn); + let bufStateErr: BufState; + let req: ServerRequest; + + while (!this.closing) { + [req, bufStateErr] = await readRequest(conn, bufr); + if (bufStateErr) break; + yield req; + // Wait for the request to be processed before we accept a new request on + // this connection. + await req.done; + } - scheduleAccept(); - }; + if (bufStateErr === "EOF") { + // The connection was gracefully closed. + } else if (bufStateErr instanceof Error) { + // TODO(ry): send something back like a HTTP 500 status. + } else if (this.closing) { + // There are more requests incoming but the server is closing. + // TODO(ry): send a back a HTTP 503 Service Unavailable status. + } else { + fail(`unexpected BufState: ${bufStateErr}`); + } - acceptRoutine(); + conn.close(); + } - // Loop hack to allow yield (yield won't work in callbacks) - while (true) { - await env.serveDeferred.promise; - env.serveDeferred = deferred(); // use a new deferred - let queueToProcess = env.reqQueue; - env.reqQueue = []; - for (const result of queueToProcess) { - yield result; - // Continue read more from conn when user is done with the current req - // Moving this here makes it easier to manage - serveConn(env, result.conn, result.r); - } + // Accepts a new TCP connection and yields all HTTP requests that arrive on + // it. When a connection is accepted, it also creates a new iterator of the + // same kind and adds it to the request multiplexer so that another TCP + // connection can be accepted. + private async *acceptConnAndIterateHttpRequests( + mux: MuxAsyncIterator + ): AsyncIterableIterator { + if (this.closing) return; + // Wait for a new connection. + const conn = await this.listener.accept(); + // Try to accept another connection and add it to the multiplexer. + mux.add(this.acceptConnAndIterateHttpRequests(mux)); + // Yield the requests that arrive on the just-accepted connection. + yield* this.iterateHttpRequests(conn); } - listener.close(); + + [Symbol.asyncIterator](): AsyncIterableIterator { + const mux: MuxAsyncIterator = new MuxAsyncIterator(); + mux.add(this.acceptConnAndIterateHttpRequests(mux)); + return mux.iterate(); + } +} + +export function serve(addr: string): Server { + const listener = listen("tcp", addr); + return new Server(listener); } export async function listenAndServe( -- cgit v1.2.3