summaryrefslogtreecommitdiff
path: root/http/server.ts
diff options
context:
space:
mode:
Diffstat (limited to 'http/server.ts')
-rw-r--r--http/server.ts418
1 files changed, 227 insertions, 191 deletions
diff --git a/http/server.ts b/http/server.ts
index 400171fc5..a5c5677c2 100644
--- a/http/server.ts
+++ b/http/server.ts
@@ -1,63 +1,90 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
-import { listen, Conn, toAsyncIterator, Reader, Writer, copy } from "deno";
-import { BufReader, BufState, BufWriter } from "../io/bufio.ts";
+
+import { Conn, copy, listen, Reader, toAsyncIterator, Writer } from "deno";
+import { BufReader, BufWriter } from "../io/bufio.ts";
import { TextProtoReader } from "../textproto/mod.ts";
import { STATUS_TEXT } from "./http_status.ts";
import { assert } from "../testing/mod.ts";
+import { defer, Deferred } from "../util/deferred.ts";
+import { BodyReader, ChunkedBodyReader } from "./readers.ts";
+import { encode } from "../strings/strings.ts";
+
+/** basic handler for http request */
+export type HttpHandler = (req: ServerRequest, res: ServerResponder) => unknown;
-interface Deferred {
- promise: Promise<{}>;
- resolve: () => void;
- reject: () => void;
+export type ServerRequest = {
+ /** request path with queries. always begin with / */
+ url: string;
+ /** HTTP method */
+ method: string;
+ /** requested protocol. like HTTP/1.1 */
+ proto: string;
+ /** HTTP Headers */
+ headers: Headers;
+ /** matched result for path pattern */
+ match: RegExpMatchArray;
+ /** body stream. body with "transfer-encoding: chunked" will automatically be combined into original data */
+ body: Reader;
+};
+
+/** basic responder for http response */
+export interface ServerResponder {
+ respond(response: ServerResponse): Promise<void>;
+
+ respondJson(obj: any, headers?: Headers): Promise<void>;
+
+ respondText(text: string, headers?: Headers): Promise<void>;
+
+ readonly isResponded: boolean;
}
-function deferred(): Deferred {
- let resolve, reject;
- const promise = new Promise((res, rej) => {
- resolve = res;
- reject = rej;
- });
- return {
- promise,
- resolve,
- reject
- };
+export interface ServerResponse {
+ /**
+ * HTTP status code
+ * @default 200 */
+ status?: number;
+ headers?: Headers;
+ body?: Uint8Array | Reader;
}
interface ServeEnv {
- reqQueue: ServerRequest[];
+ reqQueue: { req: ServerRequest; conn: Conn }[];
serveDeferred: Deferred;
}
/** Continuously read more requests from conn until EOF
* Calls maybeHandleReq.
- * bufr is empty on a fresh TCP connection.
- * Would be passed around and reused for later request on same conn
* TODO: make them async function after this change is done
* https://github.com/tc39/ecma262/pull/1250
* See https://v8.dev/blog/fast-async
*/
-function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) {
- readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
+function serveConn(env: ServeEnv, conn: Conn) {
+ readRequest(conn)
+ .then(maybeHandleReq.bind(null, env, conn))
+ .catch(e => {
+ conn.close();
+ });
}
-function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
- const [req, _err] = maybeReq;
- if (_err) {
- conn.close(); // assume EOF for now...
- return;
- }
- env.reqQueue.push(req); // push req to queue
+function maybeHandleReq(env: ServeEnv, conn: Conn, req: ServerRequest) {
+ env.reqQueue.push({ conn, req }); // push req to queue
env.serveDeferred.resolve(); // signal while loop to process it
}
-export async function* serve(addr: string) {
+/**
+ * iterate new http request asynchronously
+ * @param addr listening address. like 127.0.0.1:80
+ * @param cancel deferred object for cancellation of serving
+ * */
+export async function* serve(
+ addr: string,
+ cancel: Deferred = defer()
+): AsyncIterableIterator<{ req: ServerRequest; res: ServerResponder }> {
const listener = listen("tcp", addr);
const env: ServeEnv = {
reqQueue: [], // in case multiple promises are ready
- serveDeferred: deferred()
+ serveDeferred: defer()
};
-
// Routine that keeps calling accept
const acceptRoutine = () => {
const handleConn = (conn: Conn) => {
@@ -65,154 +92,161 @@ export async function* serve(addr: string) {
scheduleAccept(); // schedule next accept
};
const scheduleAccept = () => {
- listener.accept().then(handleConn);
+ Promise.race([cancel.promise, listener.accept().then(handleConn)]);
};
scheduleAccept();
};
-
acceptRoutine();
-
- // Loop hack to allow yield (yield won't work in callbacks)
while (true) {
- await env.serveDeferred.promise;
- env.serveDeferred = deferred(); // use a new deferred
- let queueToProcess = env.reqQueue;
+ // do race between accept, serveDeferred and cancel
+ await Promise.race([env.serveDeferred.promise, cancel.promise]);
+ // cancellation deferred resolved
+ if (cancel.handled) {
+ break;
+ }
+ // next serve deferred
+ env.serveDeferred = defer();
+ const queueToProcess = env.reqQueue;
env.reqQueue = [];
- for (const result of queueToProcess) {
- yield result;
- // Continue read more from conn when user is done with the current req
- // Moving this here makes it easier to manage
- serveConn(env, result.conn, result.r);
+ for (const { req, conn } of queueToProcess) {
+ if (req) {
+ const res = createResponder(conn);
+ yield { req, res };
+ }
+ serveConn(env, conn);
}
}
listener.close();
}
-export async function listenAndServe(
- addr: string,
- handler: (req: ServerRequest) => void
-) {
+export async function listenAndServe(addr: string, handler: HttpHandler) {
const server = serve(addr);
- for await (const request of server) {
- await handler(request);
+ for await (const { req, res } of server) {
+ await handler(req, res);
}
}
-export interface Response {
- status?: number;
- headers?: Headers;
- body?: Uint8Array | Reader;
+export interface HttpServer {
+ handle(pattern: string | RegExp, handler: HttpHandler);
+
+ listen(addr: string, cancel?: Deferred): Promise<void>;
}
-export function setContentLength(r: Response): void {
- if (!r.headers) {
- r.headers = new Headers();
+/** create HttpServer object */
+export function createServer(): HttpServer {
+ return new HttpServerImpl();
+}
+
+/** create ServerResponder object */
+export function createResponder(w: Writer): ServerResponder {
+ return new ServerResponderImpl(w);
+}
+
+class HttpServerImpl implements HttpServer {
+ private handlers: { pattern: string | RegExp; handler: HttpHandler }[] = [];
+
+ handle(pattern: string | RegExp, handler: HttpHandler) {
+ this.handlers.push({ pattern, handler });
}
- if (r.body) {
- if (!r.headers.has("content-length")) {
- if (r.body instanceof Uint8Array) {
- const bodyLength = r.body.byteLength;
- r.headers.append("Content-Length", bodyLength.toString());
+ async listen(addr: string, cancel: Deferred = defer()) {
+ for await (const { req, res } of serve(addr, cancel)) {
+ let lastMatch: RegExpMatchArray;
+ let lastHandler: HttpHandler;
+ for (const { pattern, handler } of this.handlers) {
+ const match = req.url.match(pattern);
+ if (!match) {
+ continue;
+ }
+ if (!lastMatch) {
+ lastMatch = match;
+ lastHandler = handler;
+ } else if (match[0].length > lastMatch[0].length) {
+ // use longest match
+ lastMatch = match;
+ lastHandler = handler;
+ }
+ }
+ req.match = lastMatch;
+ if (lastHandler) {
+ await lastHandler(req, res);
+ if (!res.isResponded) {
+ await res.respond({
+ status: 500,
+ body: encode("Not Responded")
+ });
+ }
} else {
- r.headers.append("Transfer-Encoding", "chunked");
+ await res.respond({
+ status: 404,
+ body: encode("Not Found")
+ });
}
}
}
}
-export class ServerRequest {
- url: string;
- method: string;
- proto: string;
- headers: Headers;
- conn: Conn;
- r: BufReader;
- w: BufWriter;
-
- public async *bodyStream() {
- if (this.headers.has("content-length")) {
- const len = +this.headers.get("content-length");
- if (Number.isNaN(len)) {
- return new Uint8Array(0);
- }
- let buf = new Uint8Array(1024);
- let rr = await this.r.read(buf);
- let nread = rr.nread;
- while (!rr.eof && nread < len) {
- yield buf.subarray(0, rr.nread);
- buf = new Uint8Array(1024);
- rr = await this.r.read(buf);
- nread += rr.nread;
- }
- yield buf.subarray(0, rr.nread);
- } else {
- if (this.headers.has("transfer-encoding")) {
- const transferEncodings = this.headers
- .get("transfer-encoding")
- .split(",")
- .map(e => e.trim().toLowerCase());
- if (transferEncodings.includes("chunked")) {
- // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
- const tp = new TextProtoReader(this.r);
- let [line, _] = await tp.readLine();
- // TODO: handle chunk extension
- let [chunkSizeString, optExt] = line.split(";");
- let chunkSize = parseInt(chunkSizeString, 16);
- if (Number.isNaN(chunkSize) || chunkSize < 0) {
- throw new Error("Invalid chunk size");
- }
- while (chunkSize > 0) {
- let data = new Uint8Array(chunkSize);
- let [nread, err] = await this.r.readFull(data);
- if (nread !== chunkSize) {
- throw new Error("Chunk data does not match size");
- }
- yield data;
- await this.r.readLine(); // Consume \r\n
- [line, _] = await tp.readLine();
- chunkSize = parseInt(line, 16);
- }
- const [entityHeaders, err] = await tp.readMIMEHeader();
- if (!err) {
- for (let [k, v] of entityHeaders) {
- this.headers.set(k, v);
- }
- }
- /* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6
- length := 0
- read chunk-size, chunk-extension (if any) and CRLF
- while (chunk-size > 0) {
- read chunk-data and CRLF
- append chunk-data to entity-body
- length := length + chunk-size
- read chunk-size and CRLF
- }
- read entity-header
- while (entity-header not empty) {
- append entity-header to existing header fields
- read entity-header
- }
- Content-Length := length
- Remove "chunked" from Transfer-Encoding
- */
- return; // Must return here to avoid fall through
- }
- // TODO: handle other transfer-encoding types
- }
- // Otherwise...
- yield new Uint8Array(0);
+class ServerResponderImpl implements ServerResponder {
+ constructor(private w: Writer) {}
+
+ private _responded: boolean = false;
+
+ get isResponded() {
+ return this._responded;
+ }
+
+ private checkIfResponded() {
+ if (this.isResponded) {
+ throw new Error("http: already responded");
}
}
- // Read the body of the request into a single Uint8Array
- public async body(): Promise<Uint8Array> {
- return readAllIterator(this.bodyStream());
+ respond(response: ServerResponse): Promise<void> {
+ this.checkIfResponded();
+ this._responded = true;
+ return writeResponse(this.w, response);
}
- async respond(r: Response): Promise<void> {
- return writeResponse(this.w, r);
+ respondJson(obj: any, headers: Headers = new Headers()): Promise<void> {
+ const body = encode(JSON.stringify(obj));
+ if (!headers.has("content-type")) {
+ headers.set("content-type", "application/json");
+ }
+ return this.respond({
+ status: 200,
+ body,
+ headers
+ });
+ }
+
+ respondText(text: string, headers: Headers = new Headers()): Promise<void> {
+ const body = encode(text);
+ if (!headers.has("content-type")) {
+ headers.set("content-type", "text/plain");
+ }
+ return this.respond({
+ status: 200,
+ headers,
+ body
+ });
+ }
+}
+
+export function setContentLength(r: ServerResponse): void {
+ if (!r.headers) {
+ r.headers = new Headers();
+ }
+
+ if (r.body) {
+ if (!r.headers.has("content-length")) {
+ if (r.body instanceof Uint8Array) {
+ const bodyLength = r.body.byteLength;
+ r.headers.append("Content-Length", bodyLength.toString());
+ } else {
+ r.headers.append("Transfer-Encoding", "chunked");
+ }
+ }
}
}
@@ -224,7 +258,10 @@ function bufWriter(w: Writer): BufWriter {
}
}
-export async function writeResponse(w: Writer, r: Response): Promise<void> {
+export async function writeResponse(
+ w: Writer,
+ r: ServerResponse
+): Promise<void> {
const protoMajor = 1;
const protoMinor = 1;
const statusCode = r.status || 200;
@@ -282,53 +319,52 @@ async function writeChunkedBody(w: Writer, r: Reader) {
await writer.write(endChunk);
}
-async function readRequest(
- c: Conn,
- bufr?: BufReader
-): Promise<[ServerRequest, BufState]> {
- if (!bufr) {
- bufr = new BufReader(c);
- }
- const bufw = new BufWriter(c);
- const req = new ServerRequest();
- req.conn = c;
- req.r = bufr!;
- req.w = bufw;
+export async function readRequest(conn: Reader): Promise<ServerRequest> {
+ const bufr = new BufReader(conn);
const tp = new TextProtoReader(bufr!);
- let s: string;
- let err: BufState;
-
// First line: GET /index.html HTTP/1.0
- [s, err] = await tp.readLine();
- if (err) {
- return [null, err];
+ const [line, lineErr] = await tp.readLine();
+ if (lineErr) {
+ throw lineErr;
}
- [req.method, req.url, req.proto] = s.split(" ", 3);
-
- [req.headers, err] = await tp.readMIMEHeader();
-
- return [req, err];
+ const [method, url, proto] = line.split(" ", 3);
+ const [headers, headersErr] = await tp.readMIMEHeader();
+ if (headersErr) {
+ throw headersErr;
+ }
+ const contentLength = headers.get("content-length");
+ const body =
+ headers.get("transfer-encoding") === "chunked"
+ ? new ChunkedBodyReader(bufr)
+ : new BodyReader(bufr, parseInt(contentLength));
+ return {
+ method,
+ url,
+ proto,
+ headers,
+ body,
+ match: null
+ };
}
-async function readAllIterator(
- it: AsyncIterableIterator<Uint8Array>
-): Promise<Uint8Array> {
- const chunks = [];
- let len = 0;
- for await (const chunk of it) {
- chunks.push(chunk);
- len += chunk.length;
- }
- if (chunks.length === 0) {
- // No need for copy
- return chunks[0];
+export async function readResponse(conn: Reader): Promise<ServerResponse> {
+ const bufr = new BufReader(conn);
+ const tp = new TextProtoReader(bufr!);
+ // First line: HTTP/1,1 200 OK
+ const [line, lineErr] = await tp.readLine();
+ if (lineErr) {
+ throw lineErr;
}
- const collected = new Uint8Array(len);
- let offset = 0;
- for (let chunk of chunks) {
- collected.set(chunk, offset);
- offset += chunk.length;
+ const [proto, status, statusText] = line.split(" ", 3);
+ const [headers, headersErr] = await tp.readMIMEHeader();
+ if (headersErr) {
+ throw headersErr;
}
- return collected;
+ const contentLength = headers.get("content-length");
+ const body =
+ headers.get("transfer-encoding") === "chunked"
+ ? new ChunkedBodyReader(bufr)
+ : new BodyReader(bufr, parseInt(contentLength));
+ return { status: parseInt(status), headers, body };
}