diff options
| author | Yusuke Sakurai <kerokerokerop@gmail.com> | 2019-02-16 01:03:57 +0900 |
|---|---|---|
| committer | Ryan Dahl <ry@tinyclouds.org> | 2019-02-15 11:03:57 -0500 |
| commit | 57f4e6a86448263c9f1c75934e829e048c76f572 (patch) | |
| tree | b4e46dcb4a43fec2694e289702fba2cd5e475c50 /http/server.ts | |
| parent | 34ece9f2ede9c63af2678feb15ef5290a74c8d2f (diff) | |
Redesign of http server module (denoland/deno_std#188)
Original: https://github.com/denoland/deno_std/commit/8569f15207bdc12c2c8ca81e9d020955be54918b
Diffstat (limited to 'http/server.ts')
| -rw-r--r-- | http/server.ts | 418 |
1 files changed, 227 insertions, 191 deletions
diff --git a/http/server.ts b/http/server.ts index 400171fc5..a5c5677c2 100644 --- a/http/server.ts +++ b/http/server.ts @@ -1,63 +1,90 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { listen, Conn, toAsyncIterator, Reader, Writer, copy } from "deno"; -import { BufReader, BufState, BufWriter } from "../io/bufio.ts"; + +import { Conn, copy, listen, Reader, toAsyncIterator, Writer } from "deno"; +import { BufReader, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; import { assert } from "../testing/mod.ts"; +import { defer, Deferred } from "../util/deferred.ts"; +import { BodyReader, ChunkedBodyReader } from "./readers.ts"; +import { encode } from "../strings/strings.ts"; + +/** basic handler for http request */ +export type HttpHandler = (req: ServerRequest, res: ServerResponder) => unknown; -interface Deferred { - promise: Promise<{}>; - resolve: () => void; - reject: () => void; +export type ServerRequest = { + /** request path with queries. always begin with / */ + url: string; + /** HTTP method */ + method: string; + /** requested protocol. like HTTP/1.1 */ + proto: string; + /** HTTP Headers */ + headers: Headers; + /** matched result for path pattern */ + match: RegExpMatchArray; + /** body stream. body with "transfer-encoding: chunked" will automatically be combined into original data */ + body: Reader; +}; + +/** basic responder for http response */ +export interface ServerResponder { + respond(response: ServerResponse): Promise<void>; + + respondJson(obj: any, headers?: Headers): Promise<void>; + + respondText(text: string, headers?: Headers): Promise<void>; + + readonly isResponded: boolean; } -function deferred(): Deferred { - let resolve, reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { - promise, - resolve, - reject - }; +export interface ServerResponse { + /** + * HTTP status code + * @default 200 */ + status?: number; + headers?: Headers; + body?: Uint8Array | Reader; } interface ServeEnv { - reqQueue: ServerRequest[]; + reqQueue: { req: ServerRequest; conn: Conn }[]; serveDeferred: Deferred; } /** Continuously read more requests from conn until EOF * Calls maybeHandleReq. - * bufr is empty on a fresh TCP connection. - * Would be passed around and reused for later request on same conn * TODO: make them async function after this change is done * https://github.com/tc39/ecma262/pull/1250 * See https://v8.dev/blog/fast-async */ -function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) { - readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); +function serveConn(env: ServeEnv, conn: Conn) { + readRequest(conn) + .then(maybeHandleReq.bind(null, env, conn)) + .catch(e => { + conn.close(); + }); } -function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) { - const [req, _err] = maybeReq; - if (_err) { - conn.close(); // assume EOF for now... - return; - } - env.reqQueue.push(req); // push req to queue +function maybeHandleReq(env: ServeEnv, conn: Conn, req: ServerRequest) { + env.reqQueue.push({ conn, req }); // push req to queue env.serveDeferred.resolve(); // signal while loop to process it } -export async function* serve(addr: string) { +/** + * iterate new http request asynchronously + * @param addr listening address. like 127.0.0.1:80 + * @param cancel deferred object for cancellation of serving + * */ +export async function* serve( + addr: string, + cancel: Deferred = defer() +): AsyncIterableIterator<{ req: ServerRequest; res: ServerResponder }> { const listener = listen("tcp", addr); const env: ServeEnv = { reqQueue: [], // in case multiple promises are ready - serveDeferred: deferred() + serveDeferred: defer() }; - // Routine that keeps calling accept const acceptRoutine = () => { const handleConn = (conn: Conn) => { @@ -65,154 +92,161 @@ export async function* serve(addr: string) { scheduleAccept(); // schedule next accept }; const scheduleAccept = () => { - listener.accept().then(handleConn); + Promise.race([cancel.promise, listener.accept().then(handleConn)]); }; scheduleAccept(); }; - acceptRoutine(); - - // Loop hack to allow yield (yield won't work in callbacks) while (true) { - await env.serveDeferred.promise; - env.serveDeferred = deferred(); // use a new deferred - let queueToProcess = env.reqQueue; + // do race between accept, serveDeferred and cancel + await Promise.race([env.serveDeferred.promise, cancel.promise]); + // cancellation deferred resolved + if (cancel.handled) { + break; + } + // next serve deferred + env.serveDeferred = defer(); + const queueToProcess = env.reqQueue; env.reqQueue = []; - for (const result of queueToProcess) { - yield result; - // Continue read more from conn when user is done with the current req - // Moving this here makes it easier to manage - serveConn(env, result.conn, result.r); + for (const { req, conn } of queueToProcess) { + if (req) { + const res = createResponder(conn); + yield { req, res }; + } + serveConn(env, conn); } } listener.close(); } -export async function listenAndServe( - addr: string, - handler: (req: ServerRequest) => void -) { +export async function listenAndServe(addr: string, handler: HttpHandler) { const server = serve(addr); - for await (const request of server) { - await handler(request); + for await (const { req, res } of server) { + await handler(req, res); } } -export interface Response { - status?: number; - headers?: Headers; - body?: Uint8Array | Reader; +export interface HttpServer { + handle(pattern: string | RegExp, handler: HttpHandler); + + listen(addr: string, cancel?: Deferred): Promise<void>; } -export function setContentLength(r: Response): void { - if (!r.headers) { - r.headers = new Headers(); +/** create HttpServer object */ +export function createServer(): HttpServer { + return new HttpServerImpl(); +} + +/** create ServerResponder object */ +export function createResponder(w: Writer): ServerResponder { + return new ServerResponderImpl(w); +} + +class HttpServerImpl implements HttpServer { + private handlers: { pattern: string | RegExp; handler: HttpHandler }[] = []; + + handle(pattern: string | RegExp, handler: HttpHandler) { + this.handlers.push({ pattern, handler }); } - if (r.body) { - if (!r.headers.has("content-length")) { - if (r.body instanceof Uint8Array) { - const bodyLength = r.body.byteLength; - r.headers.append("Content-Length", bodyLength.toString()); + async listen(addr: string, cancel: Deferred = defer()) { + for await (const { req, res } of serve(addr, cancel)) { + let lastMatch: RegExpMatchArray; + let lastHandler: HttpHandler; + for (const { pattern, handler } of this.handlers) { + const match = req.url.match(pattern); + if (!match) { + continue; + } + if (!lastMatch) { + lastMatch = match; + lastHandler = handler; + } else if (match[0].length > lastMatch[0].length) { + // use longest match + lastMatch = match; + lastHandler = handler; + } + } + req.match = lastMatch; + if (lastHandler) { + await lastHandler(req, res); + if (!res.isResponded) { + await res.respond({ + status: 500, + body: encode("Not Responded") + }); + } } else { - r.headers.append("Transfer-Encoding", "chunked"); + await res.respond({ + status: 404, + body: encode("Not Found") + }); } } } } -export class ServerRequest { - url: string; - method: string; - proto: string; - headers: Headers; - conn: Conn; - r: BufReader; - w: BufWriter; - - public async *bodyStream() { - if (this.headers.has("content-length")) { - const len = +this.headers.get("content-length"); - if (Number.isNaN(len)) { - return new Uint8Array(0); - } - let buf = new Uint8Array(1024); - let rr = await this.r.read(buf); - let nread = rr.nread; - while (!rr.eof && nread < len) { - yield buf.subarray(0, rr.nread); - buf = new Uint8Array(1024); - rr = await this.r.read(buf); - nread += rr.nread; - } - yield buf.subarray(0, rr.nread); - } else { - if (this.headers.has("transfer-encoding")) { - const transferEncodings = this.headers - .get("transfer-encoding") - .split(",") - .map(e => e.trim().toLowerCase()); - if (transferEncodings.includes("chunked")) { - // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 - const tp = new TextProtoReader(this.r); - let [line, _] = await tp.readLine(); - // TODO: handle chunk extension - let [chunkSizeString, optExt] = line.split(";"); - let chunkSize = parseInt(chunkSizeString, 16); - if (Number.isNaN(chunkSize) || chunkSize < 0) { - throw new Error("Invalid chunk size"); - } - while (chunkSize > 0) { - let data = new Uint8Array(chunkSize); - let [nread, err] = await this.r.readFull(data); - if (nread !== chunkSize) { - throw new Error("Chunk data does not match size"); - } - yield data; - await this.r.readLine(); // Consume \r\n - [line, _] = await tp.readLine(); - chunkSize = parseInt(line, 16); - } - const [entityHeaders, err] = await tp.readMIMEHeader(); - if (!err) { - for (let [k, v] of entityHeaders) { - this.headers.set(k, v); - } - } - /* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6 - length := 0 - read chunk-size, chunk-extension (if any) and CRLF - while (chunk-size > 0) { - read chunk-data and CRLF - append chunk-data to entity-body - length := length + chunk-size - read chunk-size and CRLF - } - read entity-header - while (entity-header not empty) { - append entity-header to existing header fields - read entity-header - } - Content-Length := length - Remove "chunked" from Transfer-Encoding - */ - return; // Must return here to avoid fall through - } - // TODO: handle other transfer-encoding types - } - // Otherwise... - yield new Uint8Array(0); +class ServerResponderImpl implements ServerResponder { + constructor(private w: Writer) {} + + private _responded: boolean = false; + + get isResponded() { + return this._responded; + } + + private checkIfResponded() { + if (this.isResponded) { + throw new Error("http: already responded"); } } - // Read the body of the request into a single Uint8Array - public async body(): Promise<Uint8Array> { - return readAllIterator(this.bodyStream()); + respond(response: ServerResponse): Promise<void> { + this.checkIfResponded(); + this._responded = true; + return writeResponse(this.w, response); } - async respond(r: Response): Promise<void> { - return writeResponse(this.w, r); + respondJson(obj: any, headers: Headers = new Headers()): Promise<void> { + const body = encode(JSON.stringify(obj)); + if (!headers.has("content-type")) { + headers.set("content-type", "application/json"); + } + return this.respond({ + status: 200, + body, + headers + }); + } + + respondText(text: string, headers: Headers = new Headers()): Promise<void> { + const body = encode(text); + if (!headers.has("content-type")) { + headers.set("content-type", "text/plain"); + } + return this.respond({ + status: 200, + headers, + body + }); + } +} + +export function setContentLength(r: ServerResponse): void { + if (!r.headers) { + r.headers = new Headers(); + } + + if (r.body) { + if (!r.headers.has("content-length")) { + if (r.body instanceof Uint8Array) { + const bodyLength = r.body.byteLength; + r.headers.append("Content-Length", bodyLength.toString()); + } else { + r.headers.append("Transfer-Encoding", "chunked"); + } + } } } @@ -224,7 +258,10 @@ function bufWriter(w: Writer): BufWriter { } } -export async function writeResponse(w: Writer, r: Response): Promise<void> { +export async function writeResponse( + w: Writer, + r: ServerResponse +): Promise<void> { const protoMajor = 1; const protoMinor = 1; const statusCode = r.status || 200; @@ -282,53 +319,52 @@ async function writeChunkedBody(w: Writer, r: Reader) { await writer.write(endChunk); } -async function readRequest( - c: Conn, - bufr?: BufReader -): Promise<[ServerRequest, BufState]> { - if (!bufr) { - bufr = new BufReader(c); - } - const bufw = new BufWriter(c); - const req = new ServerRequest(); - req.conn = c; - req.r = bufr!; - req.w = bufw; +export async function readRequest(conn: Reader): Promise<ServerRequest> { + const bufr = new BufReader(conn); const tp = new TextProtoReader(bufr!); - let s: string; - let err: BufState; - // First line: GET /index.html HTTP/1.0 - [s, err] = await tp.readLine(); - if (err) { - return [null, err]; + const [line, lineErr] = await tp.readLine(); + if (lineErr) { + throw lineErr; } - [req.method, req.url, req.proto] = s.split(" ", 3); - - [req.headers, err] = await tp.readMIMEHeader(); - - return [req, err]; + const [method, url, proto] = line.split(" ", 3); + const [headers, headersErr] = await tp.readMIMEHeader(); + if (headersErr) { + throw headersErr; + } + const contentLength = headers.get("content-length"); + const body = + headers.get("transfer-encoding") === "chunked" + ? new ChunkedBodyReader(bufr) + : new BodyReader(bufr, parseInt(contentLength)); + return { + method, + url, + proto, + headers, + body, + match: null + }; } -async function readAllIterator( - it: AsyncIterableIterator<Uint8Array> -): Promise<Uint8Array> { - const chunks = []; - let len = 0; - for await (const chunk of it) { - chunks.push(chunk); - len += chunk.length; - } - if (chunks.length === 0) { - // No need for copy - return chunks[0]; +export async function readResponse(conn: Reader): Promise<ServerResponse> { + const bufr = new BufReader(conn); + const tp = new TextProtoReader(bufr!); + // First line: HTTP/1,1 200 OK + const [line, lineErr] = await tp.readLine(); + if (lineErr) { + throw lineErr; } - const collected = new Uint8Array(len); - let offset = 0; - for (let chunk of chunks) { - collected.set(chunk, offset); - offset += chunk.length; + const [proto, status, statusText] = line.split(" ", 3); + const [headers, headersErr] = await tp.readMIMEHeader(); + if (headersErr) { + throw headersErr; } - return collected; + const contentLength = headers.get("content-length"); + const body = + headers.get("transfer-encoding") === "chunked" + ? new ChunkedBodyReader(bufr) + : new BodyReader(bufr, parseInt(contentLength)); + return { status: parseInt(status), headers, body }; } |
