diff options
Diffstat (limited to 'std/http/server.ts')
-rw-r--r-- | std/http/server.ts | 408 |
1 files changed, 408 insertions, 0 deletions
diff --git a/std/http/server.ts b/std/http/server.ts new file mode 100644 index 000000000..f1ced0577 --- /dev/null +++ b/std/http/server.ts @@ -0,0 +1,408 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +const { listen, copy, toAsyncIterator } = Deno; +type Listener = Deno.Listener; +type Conn = Deno.Conn; +type Reader = Deno.Reader; +type Writer = Deno.Writer; +import { BufReader, BufWriter, UnexpectedEOFError } from "../io/bufio.ts"; +import { TextProtoReader } from "../textproto/mod.ts"; +import { STATUS_TEXT } from "./http_status.ts"; +import { assert } from "../testing/asserts.ts"; +import { + collectUint8Arrays, + deferred, + Deferred, + MuxAsyncIterator +} from "../util/async.ts"; + +function bufWriter(w: Writer): BufWriter { + if (w instanceof BufWriter) { + return w; + } else { + return new BufWriter(w); + } +} + +export function setContentLength(r: Response): void { + if (!r.headers) { + r.headers = new Headers(); + } + + if (r.body) { + if (!r.headers.has("content-length")) { + if (r.body instanceof Uint8Array) { + const bodyLength = r.body.byteLength; + r.headers.append("Content-Length", bodyLength.toString()); + } else { + r.headers.append("Transfer-Encoding", "chunked"); + } + } + } +} + +async function writeChunkedBody(w: Writer, r: Reader): Promise<void> { + const writer = bufWriter(w); + const encoder = new TextEncoder(); + + for await (const chunk of toAsyncIterator(r)) { + if (chunk.byteLength <= 0) continue; + const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`); + const end = encoder.encode("\r\n"); + await writer.write(start); + await writer.write(chunk); + await writer.write(end); + } + + const endChunk = encoder.encode("0\r\n\r\n"); + await writer.write(endChunk); +} + +export async function writeResponse(w: Writer, r: Response): Promise<void> { + const protoMajor = 1; + const protoMinor = 1; + const statusCode = r.status || 200; + const statusText = STATUS_TEXT.get(statusCode); + const writer = bufWriter(w); + if (!statusText) { + throw Error("bad status code"); + } + if (!r.body) { + r.body = new Uint8Array(); + } + + let out = `HTTP/${protoMajor}.${protoMinor} ${statusCode} ${statusText}\r\n`; + + setContentLength(r); + const headers = r.headers!; + + for (const [key, value] of headers!) { + out += `${key}: ${value}\r\n`; + } + out += "\r\n"; + + const header = new TextEncoder().encode(out); + const n = await writer.write(header); + assert(n === header.byteLength); + + if (r.body instanceof Uint8Array) { + const n = await writer.write(r.body); + assert(n === r.body.byteLength); + } else if (headers.has("content-length")) { + const bodyLength = parseInt(headers.get("content-length")!); + const n = await copy(writer, r.body); + assert(n === bodyLength); + } else { + await writeChunkedBody(writer, r.body); + } + await writer.flush(); +} + +export class ServerRequest { + url!: string; + method!: string; + proto!: string; + protoMinor!: number; + protoMajor!: number; + headers!: Headers; + conn!: Conn; + r!: BufReader; + w!: BufWriter; + done: Deferred<void> = deferred(); + + public async *bodyStream(): AsyncIterableIterator<Uint8Array> { + if (this.headers.has("content-length")) { + const len = +this.headers.get("content-length")!; + if (Number.isNaN(len)) { + return new Uint8Array(0); + } + let buf = new Uint8Array(1024); + let rr = await this.r.read(buf); + let nread = rr === Deno.EOF ? 0 : rr; + let nreadTotal = nread; + while (rr !== Deno.EOF && nreadTotal < len) { + yield buf.subarray(0, nread); + buf = new Uint8Array(1024); + rr = await this.r.read(buf); + nread = rr === Deno.EOF ? 0 : rr; + nreadTotal += nread; + } + yield buf.subarray(0, nread); + } else { + if (this.headers.has("transfer-encoding")) { + const transferEncodings = this.headers + .get("transfer-encoding")! + .split(",") + .map((e): string => e.trim().toLowerCase()); + if (transferEncodings.includes("chunked")) { + // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 + const tp = new TextProtoReader(this.r); + let line = await tp.readLine(); + if (line === Deno.EOF) throw new UnexpectedEOFError(); + // TODO: handle chunk extension + const [chunkSizeString] = line.split(";"); + let chunkSize = parseInt(chunkSizeString, 16); + if (Number.isNaN(chunkSize) || chunkSize < 0) { + throw new Error("Invalid chunk size"); + } + while (chunkSize > 0) { + const data = new Uint8Array(chunkSize); + if ((await this.r.readFull(data)) === Deno.EOF) { + throw new UnexpectedEOFError(); + } + yield data; + await this.r.readLine(); // Consume \r\n + line = await tp.readLine(); + if (line === Deno.EOF) throw new UnexpectedEOFError(); + chunkSize = parseInt(line, 16); + } + const entityHeaders = await tp.readMIMEHeader(); + if (entityHeaders !== Deno.EOF) { + for (const [k, v] of entityHeaders) { + this.headers.set(k, v); + } + } + /* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6 + length := 0 + read chunk-size, chunk-extension (if any) and CRLF + while (chunk-size > 0) { + read chunk-data and CRLF + append chunk-data to entity-body + length := length + chunk-size + read chunk-size and CRLF + } + read entity-header + while (entity-header not empty) { + append entity-header to existing header fields + read entity-header + } + Content-Length := length + Remove "chunked" from Transfer-Encoding + */ + return; // Must return here to avoid fall through + } + // TODO: handle other transfer-encoding types + } + // Otherwise... + yield new Uint8Array(0); + } + } + + // Read the body of the request into a single Uint8Array + public async body(): Promise<Uint8Array> { + return collectUint8Arrays(this.bodyStream()); + } + + async respond(r: Response): Promise<void> { + // Write our response! + await writeResponse(this.w, r); + // Signal that this request has been processed and the next pipelined + // request on the same connection can be accepted. + this.done.resolve(); + } +} + +function fixLength(req: ServerRequest): void { + const contentLength = req.headers.get("Content-Length"); + if (contentLength) { + const arrClen = contentLength.split(","); + if (arrClen.length > 1) { + const distinct = [...new Set(arrClen.map((e): string => e.trim()))]; + if (distinct.length > 1) { + throw Error("cannot contain multiple Content-Length headers"); + } else { + req.headers.set("Content-Length", distinct[0]); + } + } + const c = req.headers.get("Content-Length"); + if (req.method === "HEAD" && c && c !== "0") { + throw Error("http: method cannot contain a Content-Length"); + } + if (c && req.headers.has("transfer-encoding")) { + // A sender MUST NOT send a Content-Length header field in any message + // that contains a Transfer-Encoding header field. + // rfc: https://tools.ietf.org/html/rfc7230#section-3.3.2 + throw new Error( + "http: Transfer-Encoding and Content-Length cannot be send together" + ); + } + } +} + +// ParseHTTPVersion parses a HTTP version string. +// "HTTP/1.0" returns (1, 0, true). +// Ported from https://github.com/golang/go/blob/f5c43b9/src/net/http/request.go#L766-L792 +export function parseHTTPVersion(vers: string): [number, number] { + switch (vers) { + case "HTTP/1.1": + return [1, 1]; + + case "HTTP/1.0": + return [1, 0]; + + default: { + const Big = 1000000; // arbitrary upper bound + const digitReg = /^\d+$/; // test if string is only digit + + if (!vers.startsWith("HTTP/")) { + break; + } + + const dot = vers.indexOf("."); + if (dot < 0) { + break; + } + + const majorStr = vers.substring(vers.indexOf("/") + 1, dot); + const major = parseInt(majorStr); + if ( + !digitReg.test(majorStr) || + isNaN(major) || + major < 0 || + major > Big + ) { + break; + } + + const minorStr = vers.substring(dot + 1); + const minor = parseInt(minorStr); + if ( + !digitReg.test(minorStr) || + isNaN(minor) || + minor < 0 || + minor > Big + ) { + break; + } + + return [major, minor]; + } + } + + throw new Error(`malformed HTTP version ${vers}`); +} + +export async function readRequest( + conn: Conn, + bufr: BufReader +): Promise<ServerRequest | Deno.EOF> { + const tp = new TextProtoReader(bufr); + const firstLine = await tp.readLine(); // e.g. GET /index.html HTTP/1.0 + if (firstLine === Deno.EOF) return Deno.EOF; + const headers = await tp.readMIMEHeader(); + if (headers === Deno.EOF) throw new UnexpectedEOFError(); + + const req = new ServerRequest(); + req.conn = conn; + req.r = bufr; + [req.method, req.url, req.proto] = firstLine.split(" ", 3); + [req.protoMinor, req.protoMajor] = parseHTTPVersion(req.proto); + req.headers = headers; + fixLength(req); + return req; +} + +export class Server implements AsyncIterable<ServerRequest> { + private closing = false; + + constructor(public listener: Listener) {} + + close(): void { + this.closing = true; + this.listener.close(); + } + + // Yields all HTTP requests on a single TCP connection. + private async *iterateHttpRequests( + conn: Conn + ): AsyncIterableIterator<ServerRequest> { + const bufr = new BufReader(conn); + const w = new BufWriter(conn); + let req: ServerRequest | Deno.EOF; + let err: Error | undefined; + + while (!this.closing) { + try { + req = await readRequest(conn, bufr); + } catch (e) { + err = e; + break; + } + if (req === Deno.EOF) { + break; + } + + req.w = w; + yield req; + + // Wait for the request to be processed before we accept a new request on + // this connection. + await req!.done; + } + + if (req! === Deno.EOF) { + // The connection was gracefully closed. + } else if (err) { + // An error was thrown while parsing request headers. + try { + await writeResponse(req!.w, { + status: 400, + body: new TextEncoder().encode(`${err.message}\r\n\r\n`) + }); + } catch (_) { + // The connection is destroyed. + // Ignores the error. + } + } else if (this.closing) { + // There are more requests incoming but the server is closing. + // TODO(ry): send a back a HTTP 503 Service Unavailable status. + } + + conn.close(); + } + + // Accepts a new TCP connection and yields all HTTP requests that arrive on + // it. When a connection is accepted, it also creates a new iterator of the + // same kind and adds it to the request multiplexer so that another TCP + // connection can be accepted. + private async *acceptConnAndIterateHttpRequests( + mux: MuxAsyncIterator<ServerRequest> + ): AsyncIterableIterator<ServerRequest> { + if (this.closing) return; + // Wait for a new connection. + const conn = await this.listener.accept(); + // Try to accept another connection and add it to the multiplexer. + mux.add(this.acceptConnAndIterateHttpRequests(mux)); + // Yield the requests that arrive on the just-accepted connection. + yield* this.iterateHttpRequests(conn); + } + + [Symbol.asyncIterator](): AsyncIterableIterator<ServerRequest> { + const mux: MuxAsyncIterator<ServerRequest> = new MuxAsyncIterator(); + mux.add(this.acceptConnAndIterateHttpRequests(mux)); + return mux.iterate(); + } +} + +export function serve(addr: string): Server { + // TODO(ry) Update serve to also take { hostname, port }. + const [hostname, port] = addr.split(":"); + const listener = listen({ hostname, port: Number(port) }); + return new Server(listener); +} + +export async function listenAndServe( + addr: string, + handler: (req: ServerRequest) => void +): Promise<void> { + const server = serve(addr); + + for await (const request of server) { + handler(request); + } +} + +export interface Response { + status?: number; + headers?: Headers; + body?: Uint8Array | Reader; +} |