diff options
Diffstat (limited to 'http/server.ts')
| -rw-r--r-- | http/server.ts | 438 |
1 files changed, 191 insertions, 247 deletions
diff --git a/http/server.ts b/http/server.ts index a80becbd5..400171fc5 100644 --- a/http/server.ts +++ b/http/server.ts @@ -1,90 +1,63 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. - -import { Conn, copy, listen, Reader, toAsyncIterator, Writer } from "deno"; -import { BufReader, BufWriter } from "../io/bufio.ts"; +import { listen, Conn, toAsyncIterator, Reader, Writer, copy } from "deno"; +import { BufReader, BufState, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; import { assert } from "../testing/mod.ts"; -import { defer, Deferred } from "../util/deferred.ts"; -import { BodyReader, ChunkedBodyReader } from "./readers.ts"; -import { encode } from "../strings/strings.ts"; - -/** basic handler for http request */ -export type HttpHandler = (req: ServerRequest, res: ServerResponder) => unknown; -export type ServerRequest = { - /** request path with queries. always begin with / */ - url: string; - /** HTTP method */ - method: string; - /** requested protocol. like HTTP/1.1 */ - proto: string; - /** HTTP Headers */ - headers: Headers; - /** matched result for path pattern */ - match: RegExpMatchArray; - /** body stream. body with "transfer-encoding: chunked" will automatically be combined into original data */ - body: Reader; -}; - -/** basic responder for http response */ -export interface ServerResponder { - respond(response: ServerResponse): Promise<void>; - - respondJson(obj: any, headers?: Headers): Promise<void>; - - respondText(text: string, headers?: Headers): Promise<void>; - - readonly isResponded: boolean; +interface Deferred { + promise: Promise<{}>; + resolve: () => void; + reject: () => void; } -export interface ServerResponse { - /** - * HTTP status code - * @default 200 */ - status?: number; - headers?: Headers; - body?: Uint8Array | Reader; +function deferred(): Deferred { + let resolve, reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { + promise, + resolve, + reject + }; } interface ServeEnv { - reqQueue: { req: ServerRequest; conn: Conn }[]; + reqQueue: ServerRequest[]; serveDeferred: Deferred; } /** Continuously read more requests from conn until EOF * Calls maybeHandleReq. + * bufr is empty on a fresh TCP connection. + * Would be passed around and reused for later request on same conn * TODO: make them async function after this change is done * https://github.com/tc39/ecma262/pull/1250 * See https://v8.dev/blog/fast-async */ -function serveConn(env: ServeEnv, conn: Conn) { - readRequest(conn) - .then(maybeHandleReq.bind(null, env, conn)) - .catch(e => { - conn.close(); - }); +function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) { + readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); } -function maybeHandleReq(env: ServeEnv, conn: Conn, req: ServerRequest) { - env.reqQueue.push({ conn, req }); // push req to queue +function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) { + const [req, _err] = maybeReq; + if (_err) { + conn.close(); // assume EOF for now... + return; + } + env.reqQueue.push(req); // push req to queue env.serveDeferred.resolve(); // signal while loop to process it } -/** - * iterate new http request asynchronously - * @param addr listening address. like 127.0.0.1:80 - * @param cancel deferred object for cancellation of serving - * */ -export async function* serve( - addr: string, - cancel: Deferred = defer() -): AsyncIterableIterator<{ req: ServerRequest; res: ServerResponder }> { +export async function* serve(addr: string) { const listener = listen("tcp", addr); const env: ServeEnv = { reqQueue: [], // in case multiple promises are ready - serveDeferred: defer() + serveDeferred: deferred() }; + // Routine that keeps calling accept const acceptRoutine = () => { const handleConn = (conn: Conn) => { @@ -92,168 +65,47 @@ export async function* serve( scheduleAccept(); // schedule next accept }; const scheduleAccept = () => { - Promise.race([cancel.promise, listener.accept().then(handleConn)]); + listener.accept().then(handleConn); }; scheduleAccept(); }; + acceptRoutine(); + + // Loop hack to allow yield (yield won't work in callbacks) while (true) { - // do race between accept, serveDeferred and cancel - await Promise.race([env.serveDeferred.promise, cancel.promise]); - // cancellation deferred resolved - if (cancel.handled) { - break; - } - // next serve deferred - env.serveDeferred = defer(); - const queueToProcess = env.reqQueue; + await env.serveDeferred.promise; + env.serveDeferred = deferred(); // use a new deferred + let queueToProcess = env.reqQueue; env.reqQueue = []; - for (const { req, conn } of queueToProcess) { - if (req) { - const res = createResponder(conn); - yield { req, res }; - } - serveConn(env, conn); + for (const result of queueToProcess) { + yield result; + // Continue read more from conn when user is done with the current req + // Moving this here makes it easier to manage + serveConn(env, result.conn, result.r); } } listener.close(); } -export async function listenAndServe(addr: string, handler: HttpHandler) { +export async function listenAndServe( + addr: string, + handler: (req: ServerRequest) => void +) { const server = serve(addr); - for await (const { req, res } of server) { - await handler(req, res); - } -} - -export interface HttpServer { - handle(pattern: string | RegExp, handler: HttpHandler); - - listen(addr: string, cancel?: Deferred): Promise<void>; -} - -/** create HttpServer object */ -export function createServer(): HttpServer { - return new HttpServerImpl(); -} - -/** create ServerResponder object */ -export function createResponder(w: Writer): ServerResponder { - return new ServerResponderImpl(w); -} - -class HttpServerImpl implements HttpServer { - private handlers: { pattern: string | RegExp; handler: HttpHandler }[] = []; - - handle(pattern: string | RegExp, handler: HttpHandler) { - this.handlers.push({ pattern, handler }); - } - - async listen(addr: string, cancel: Deferred = defer()) { - for await (const { req, res } of serve(addr, cancel)) { - let { pathname } = new URL(req.url, addr); - const { index, match } = findLongestAndNearestMatch( - pathname, - this.handlers.map(v => v.pattern) - ); - req.match = match; - if (index > -1) { - const { handler } = this.handlers[index]; - await handler(req, res); - if (!res.isResponded) { - await res.respond({ - status: 500, - body: encode("Not Responded") - }); - } - } else { - await res.respond({ - status: 404, - body: encode("Not Found") - }); - } - } - } -} - -/** - * Find the match that appeared in the nearest position to the beginning of word. - * If positions are same, the longest one will be picked. - * Return -1 and null if no match found. - * */ -export function findLongestAndNearestMatch( - pathname: string, - patterns: (string | RegExp)[] -): { index: number; match: RegExpMatchArray } { - let lastMatchIndex = pathname.length; - let lastMatchLength = 0; - let match: RegExpMatchArray = null; - let index = -1; - for (let i = 0; i < patterns.length; i++) { - const pattern = patterns[i]; - const m = pathname.match(pattern); - if (!m) continue; - if ( - m.index < lastMatchIndex || - (m.index === lastMatchIndex && m[0].length > lastMatchLength) - ) { - index = i; - match = m; - lastMatchIndex = m.index; - lastMatchLength = m[0].length; - } + for await (const request of server) { + await handler(request); } - return { index, match }; } -class ServerResponderImpl implements ServerResponder { - constructor(private w: Writer) {} - - private _responded: boolean = false; - - get isResponded() { - return this._responded; - } - - private checkIfResponded() { - if (this.isResponded) { - throw new Error("http: already responded"); - } - } - - respond(response: ServerResponse): Promise<void> { - this.checkIfResponded(); - this._responded = true; - return writeResponse(this.w, response); - } - - respondJson(obj: any, headers: Headers = new Headers()): Promise<void> { - const body = encode(JSON.stringify(obj)); - if (!headers.has("content-type")) { - headers.set("content-type", "application/json"); - } - return this.respond({ - status: 200, - body, - headers - }); - } - - respondText(text: string, headers: Headers = new Headers()): Promise<void> { - const body = encode(text); - if (!headers.has("content-type")) { - headers.set("content-type", "text/plain"); - } - return this.respond({ - status: 200, - headers, - body - }); - } +export interface Response { + status?: number; + headers?: Headers; + body?: Uint8Array | Reader; } -export function setContentLength(r: ServerResponse): void { +export function setContentLength(r: Response): void { if (!r.headers) { r.headers = new Headers(); } @@ -270,6 +122,100 @@ export function setContentLength(r: ServerResponse): void { } } +export class ServerRequest { + url: string; + method: string; + proto: string; + headers: Headers; + conn: Conn; + r: BufReader; + w: BufWriter; + + public async *bodyStream() { + if (this.headers.has("content-length")) { + const len = +this.headers.get("content-length"); + if (Number.isNaN(len)) { + return new Uint8Array(0); + } + let buf = new Uint8Array(1024); + let rr = await this.r.read(buf); + let nread = rr.nread; + while (!rr.eof && nread < len) { + yield buf.subarray(0, rr.nread); + buf = new Uint8Array(1024); + rr = await this.r.read(buf); + nread += rr.nread; + } + yield buf.subarray(0, rr.nread); + } else { + if (this.headers.has("transfer-encoding")) { + const transferEncodings = this.headers + .get("transfer-encoding") + .split(",") + .map(e => e.trim().toLowerCase()); + if (transferEncodings.includes("chunked")) { + // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 + const tp = new TextProtoReader(this.r); + let [line, _] = await tp.readLine(); + // TODO: handle chunk extension + let [chunkSizeString, optExt] = line.split(";"); + let chunkSize = parseInt(chunkSizeString, 16); + if (Number.isNaN(chunkSize) || chunkSize < 0) { + throw new Error("Invalid chunk size"); + } + while (chunkSize > 0) { + let data = new Uint8Array(chunkSize); + let [nread, err] = await this.r.readFull(data); + if (nread !== chunkSize) { + throw new Error("Chunk data does not match size"); + } + yield data; + await this.r.readLine(); // Consume \r\n + [line, _] = await tp.readLine(); + chunkSize = parseInt(line, 16); + } + const [entityHeaders, err] = await tp.readMIMEHeader(); + if (!err) { + for (let [k, v] of entityHeaders) { + this.headers.set(k, v); + } + } + /* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6 + length := 0 + read chunk-size, chunk-extension (if any) and CRLF + while (chunk-size > 0) { + read chunk-data and CRLF + append chunk-data to entity-body + length := length + chunk-size + read chunk-size and CRLF + } + read entity-header + while (entity-header not empty) { + append entity-header to existing header fields + read entity-header + } + Content-Length := length + Remove "chunked" from Transfer-Encoding + */ + return; // Must return here to avoid fall through + } + // TODO: handle other transfer-encoding types + } + // Otherwise... + yield new Uint8Array(0); + } + } + + // Read the body of the request into a single Uint8Array + public async body(): Promise<Uint8Array> { + return readAllIterator(this.bodyStream()); + } + + async respond(r: Response): Promise<void> { + return writeResponse(this.w, r); + } +} + function bufWriter(w: Writer): BufWriter { if (w instanceof BufWriter) { return w; @@ -278,10 +224,7 @@ function bufWriter(w: Writer): BufWriter { } } -export async function writeResponse( - w: Writer, - r: ServerResponse -): Promise<void> { +export async function writeResponse(w: Writer, r: Response): Promise<void> { const protoMajor = 1; const protoMinor = 1; const statusCode = r.status || 200; @@ -339,52 +282,53 @@ async function writeChunkedBody(w: Writer, r: Reader) { await writer.write(endChunk); } -export async function readRequest(conn: Reader): Promise<ServerRequest> { - const bufr = new BufReader(conn); +async function readRequest( + c: Conn, + bufr?: BufReader +): Promise<[ServerRequest, BufState]> { + if (!bufr) { + bufr = new BufReader(c); + } + const bufw = new BufWriter(c); + const req = new ServerRequest(); + req.conn = c; + req.r = bufr!; + req.w = bufw; const tp = new TextProtoReader(bufr!); + let s: string; + let err: BufState; + // First line: GET /index.html HTTP/1.0 - const [line, lineErr] = await tp.readLine(); - if (lineErr) { - throw lineErr; + [s, err] = await tp.readLine(); + if (err) { + return [null, err]; } - const [method, url, proto] = line.split(" ", 3); - const [headers, headersErr] = await tp.readMIMEHeader(); - if (headersErr) { - throw headersErr; - } - const contentLength = headers.get("content-length"); - const body = - headers.get("transfer-encoding") === "chunked" - ? new ChunkedBodyReader(bufr) - : new BodyReader(bufr, parseInt(contentLength)); - return { - method, - url, - proto, - headers, - body, - match: null - }; + [req.method, req.url, req.proto] = s.split(" ", 3); + + [req.headers, err] = await tp.readMIMEHeader(); + + return [req, err]; } -export async function readResponse(conn: Reader): Promise<ServerResponse> { - const bufr = new BufReader(conn); - const tp = new TextProtoReader(bufr!); - // First line: HTTP/1,1 200 OK - const [line, lineErr] = await tp.readLine(); - if (lineErr) { - throw lineErr; +async function readAllIterator( + it: AsyncIterableIterator<Uint8Array> +): Promise<Uint8Array> { + const chunks = []; + let len = 0; + for await (const chunk of it) { + chunks.push(chunk); + len += chunk.length; + } + if (chunks.length === 0) { + // No need for copy + return chunks[0]; } - const [proto, status, statusText] = line.split(" ", 3); - const [headers, headersErr] = await tp.readMIMEHeader(); - if (headersErr) { - throw headersErr; + const collected = new Uint8Array(len); + let offset = 0; + for (let chunk of chunks) { + collected.set(chunk, offset); + offset += chunk.length; } - const contentLength = headers.get("content-length"); - const body = - headers.get("transfer-encoding") === "chunked" - ? new ChunkedBodyReader(bufr) - : new BodyReader(bufr, parseInt(contentLength)); - return { status: parseInt(status), headers, body }; + return collected; } |
