diff options
| author | Ryan Dahl <ry@tinyclouds.org> | 2019-05-20 09:17:26 -0400 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-05-20 09:17:26 -0400 |
| commit | a295bb0d4255993103b6afe9ffdd2bd4e4c65c95 (patch) | |
| tree | 48d491488e8ac12e2f666ea360962a494e819884 /http | |
| parent | 227d92e046220de30195432ed5235ccb19c91fc6 (diff) | |
Clean up HTTP async iterator code (denoland/deno_std#411)
Original: https://github.com/denoland/deno_std/commit/68faf32f721d2a95c7b1c75661713c8118c077c7
Diffstat (limited to 'http')
| -rw-r--r-- | http/http_bench.ts | 5 | ||||
| -rw-r--r-- | http/server.ts | 231 | ||||
| -rw-r--r-- | http/server_test.ts | 32 |
3 files changed, 86 insertions, 182 deletions
diff --git a/http/http_bench.ts b/http/http_bench.ts index 6d72d4be6..06043f9e4 100644 --- a/http/http_bench.ts +++ b/http/http_bench.ts @@ -3,13 +3,12 @@ import { serve } from "./server.ts"; const addr = Deno.args[1] || "127.0.0.1:4500"; const server = serve(addr); - const body = new TextEncoder().encode("Hello World"); async function main(): Promise<void> { console.log(`http://${addr}/`); - for await (const request of server) { - request.respond({ status: 200, body }); + for await (const req of server) { + req.respond({ body }); } } diff --git a/http/server.ts b/http/server.ts index 484ecf808..281f8d302 100644 --- a/http/server.ts +++ b/http/server.ts @@ -1,55 +1,14 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. const { listen, copy, toAsyncIterator } = Deno; +type Listener = Deno.Listener; type Conn = Deno.Conn; type Reader = Deno.Reader; type Writer = Deno.Writer; import { BufReader, BufState, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; -import { assert } from "../testing/asserts.ts"; - -interface Deferred { - promise: Promise<{}>; - resolve: () => void; - reject: () => void; -} - -function deferred(isResolved = false): Deferred { - let resolve, reject; - const promise = new Promise( - (res, rej): void => { - resolve = res; - reject = rej; - } - ); - if (isResolved) { - resolve(); - } - return { - promise, - resolve, - reject - }; -} - -interface HttpConn extends Conn { - // When read by a newly created request B, lastId is the id pointing to a previous - // request A, such that we must wait for responses to A to complete before - // writing B's response. - lastPipelineId: number; - pendingDeferredMap: Map<number, Deferred>; -} - -function createHttpConn(c: Conn): HttpConn { - const httpConn = Object.assign(c, { - lastPipelineId: 0, - pendingDeferredMap: new Map() - }); - - const resolvedDeferred = deferred(true); - httpConn.pendingDeferredMap.set(0, resolvedDeferred); - return httpConn; -} +import { assert, fail } from "../testing/asserts.ts"; +import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts"; function bufWriter(w: Writer): BufWriter { if (w instanceof BufWriter) { @@ -58,6 +17,7 @@ function bufWriter(w: Writer): BufWriter { return new BufWriter(w); } } + export function setContentLength(r: Response): void { if (!r.headers) { r.headers = new Headers(); @@ -74,6 +34,7 @@ export function setContentLength(r: Response): void { } } } + async function writeChunkedBody(w: Writer, r: Reader): Promise<void> { const writer = bufWriter(w); const encoder = new TextEncoder(); @@ -90,6 +51,7 @@ async function writeChunkedBody(w: Writer, r: Reader): Promise<void> { const endChunk = encoder.encode("0\r\n\r\n"); await writer.write(endChunk); } + export async function writeResponse(w: Writer, r: Response): Promise<void> { const protoMajor = 1; const protoMinor = 1; @@ -131,6 +93,7 @@ export async function writeResponse(w: Writer, r: Response): Promise<void> { } await writer.flush(); } + async function readAllIterator( it: AsyncIterableIterator<Uint8Array> ): Promise<Uint8Array> { @@ -154,14 +117,14 @@ async function readAllIterator( } export class ServerRequest { - pipelineId: number; url: string; method: string; proto: string; headers: Headers; - conn: HttpConn; + conn: Conn; r: BufReader; w: BufWriter; + done: Deferred<void> = deferred(); public async *bodyStream(): AsyncIterableIterator<Uint8Array> { if (this.headers.has("content-length")) { @@ -244,134 +207,102 @@ export class ServerRequest { } async respond(r: Response): Promise<void> { - // Check and wait if the previous request is done responding. - const lastPipelineId = this.pipelineId - 1; - const lastPipelineDeferred = this.conn.pendingDeferredMap.get( - lastPipelineId - ); - assert(!!lastPipelineDeferred); - await lastPipelineDeferred.promise; - // If yes, delete old deferred and proceed with writing. - this.conn.pendingDeferredMap.delete(lastPipelineId); // Write our response! await writeResponse(this.w, r); - // Signal the next pending request that it can start writing. - const currPipelineDeferred = this.conn.pendingDeferredMap.get( - this.pipelineId - ); - assert(!!currPipelineDeferred); - currPipelineDeferred.resolve(); + // Signal that this request has been processed and the next pipelined + // request on the same connection can be accepted. + this.done.resolve(); } } -interface ServeEnv { - reqQueue: ServerRequest[]; - serveDeferred: Deferred; -} - -/** Continuously read more requests from conn until EOF - * Calls maybeHandleReq. - * bufr is empty on a fresh TCP connection. - * Would be passed around and reused for later request on same conn - * TODO: make them async function after this change is done - * https://github.com/tc39/ecma262/pull/1250 - * See https://v8.dev/blog/fast-async - */ async function readRequest( - c: HttpConn, - bufr?: BufReader + conn: Conn, + bufr: BufReader ): Promise<[ServerRequest, BufState]> { - if (!bufr) { - bufr = new BufReader(c); - } - const bufw = new BufWriter(c); const req = new ServerRequest(); - - // Set and incr pipeline id; - req.pipelineId = ++c.lastPipelineId; - // Set a new pipeline deferred associated with this request - // for future requests to wait for. - c.pendingDeferredMap.set(req.pipelineId, deferred()); - - req.conn = c; - req.r = bufr!; - req.w = bufw; - const tp = new TextProtoReader(bufr!); - - let s: string; + req.conn = conn; + req.r = bufr; + req.w = new BufWriter(conn); + const tp = new TextProtoReader(bufr); let err: BufState; - // First line: GET /index.html HTTP/1.0 - [s, err] = await tp.readLine(); + let firstLine: string; + [firstLine, err] = await tp.readLine(); if (err) { return [null, err]; } - [req.method, req.url, req.proto] = s.split(" ", 3); - + [req.method, req.url, req.proto] = firstLine.split(" ", 3); [req.headers, err] = await tp.readMIMEHeader(); - return [req, err]; } -function maybeHandleReq( - env: ServeEnv, - conn: Conn, - maybeReq: [ServerRequest, BufState] -): void { - const [req, _err] = maybeReq; - if (_err) { - conn.close(); // assume EOF for now... - return; - } - env.reqQueue.push(req); // push req to queue - env.serveDeferred.resolve(); // signal while loop to process it -} +export class Server implements AsyncIterable<ServerRequest> { + private closing = false; -function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void { - readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); -} + constructor(public listener: Listener) {} -export async function* serve( - addr: string -): AsyncIterableIterator<ServerRequest> { - const listener = listen("tcp", addr); - const env: ServeEnv = { - reqQueue: [], // in case multiple promises are ready - serveDeferred: deferred() - }; + close(): void { + this.closing = true; + this.listener.close(); + } - // Routine that keeps calling accept - let handleConn = (_conn: Conn): void => {}; - let scheduleAccept = (): void => {}; - const acceptRoutine = (): void => { - scheduleAccept = (): void => { - listener.accept().then(handleConn); - }; - handleConn = (conn: Conn): void => { - const httpConn = createHttpConn(conn); - serveConn(env, httpConn); // don't block - scheduleAccept(); // schedule next accept - }; + // Yields all HTTP requests on a single TCP connection. + private async *iterateHttpRequests( + conn: Conn + ): AsyncIterableIterator<ServerRequest> { + const bufr = new BufReader(conn); + let bufStateErr: BufState; + let req: ServerRequest; + + while (!this.closing) { + [req, bufStateErr] = await readRequest(conn, bufr); + if (bufStateErr) break; + yield req; + // Wait for the request to be processed before we accept a new request on + // this connection. + await req.done; + } - scheduleAccept(); - }; + if (bufStateErr === "EOF") { + // The connection was gracefully closed. + } else if (bufStateErr instanceof Error) { + // TODO(ry): send something back like a HTTP 500 status. + } else if (this.closing) { + // There are more requests incoming but the server is closing. + // TODO(ry): send a back a HTTP 503 Service Unavailable status. + } else { + fail(`unexpected BufState: ${bufStateErr}`); + } - acceptRoutine(); + conn.close(); + } - // Loop hack to allow yield (yield won't work in callbacks) - while (true) { - await env.serveDeferred.promise; - env.serveDeferred = deferred(); // use a new deferred - let queueToProcess = env.reqQueue; - env.reqQueue = []; - for (const result of queueToProcess) { - yield result; - // Continue read more from conn when user is done with the current req - // Moving this here makes it easier to manage - serveConn(env, result.conn, result.r); - } + // Accepts a new TCP connection and yields all HTTP requests that arrive on + // it. When a connection is accepted, it also creates a new iterator of the + // same kind and adds it to the request multiplexer so that another TCP + // connection can be accepted. + private async *acceptConnAndIterateHttpRequests( + mux: MuxAsyncIterator<ServerRequest> + ): AsyncIterableIterator<ServerRequest> { + if (this.closing) return; + // Wait for a new connection. + const conn = await this.listener.accept(); + // Try to accept another connection and add it to the multiplexer. + mux.add(this.acceptConnAndIterateHttpRequests(mux)); + // Yield the requests that arrive on the just-accepted connection. + yield* this.iterateHttpRequests(conn); } - listener.close(); + + [Symbol.asyncIterator](): AsyncIterableIterator<ServerRequest> { + const mux: MuxAsyncIterator<ServerRequest> = new MuxAsyncIterator(); + mux.add(this.acceptConnAndIterateHttpRequests(mux)); + return mux.iterate(); + } +} + +export function serve(addr: string): Server { + const listener = listen("tcp", addr); + return new Server(listener); } export async function listenAndServe( diff --git a/http/server_test.ts b/http/server_test.ts index 82a368395..396a0321a 100644 --- a/http/server_test.ts +++ b/http/server_test.ts @@ -22,31 +22,6 @@ const dec = new TextDecoder(); type Handler = () => void; -interface Deferred { - promise: Promise<{}>; - resolve: Handler; - reject: Handler; -} - -function deferred(isResolved = false): Deferred { - let resolve: Handler = (): void => void 0; - let reject: Handler = (): void => void 0; - const promise = new Promise( - (res, rej): void => { - resolve = res; - reject = rej; - } - ); - if (isResolved) { - resolve(); - } - return { - promise, - resolve, - reject - }; -} - const responseTests: ResponseTest[] = [ // Default response { @@ -72,8 +47,8 @@ test(async function responseWrite(): Promise<void> { const buf = new Buffer(); const bufw = new BufWriter(buf); const request = new ServerRequest(); - request.pipelineId = 1; request.w = bufw; + request.conn = { localAddr: "", remoteAddr: "", @@ -86,13 +61,12 @@ test(async function responseWrite(): Promise<void> { write: async (): Promise<number> => { return -1; }, - close: (): void => {}, - lastPipelineId: 0, - pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]]) + close: (): void => {} }; await request.respond(testCase.response); assertEquals(buf.toString(), testCase.raw); + await request.done; } }); |
