summaryrefslogtreecommitdiff
path: root/http/server.ts
diff options
context:
space:
mode:
Diffstat (limited to 'http/server.ts')
-rw-r--r--http/server.ts84
1 files changed, 66 insertions, 18 deletions
diff --git a/http/server.ts b/http/server.ts
index 17295f739..f1ef015ba 100644
--- a/http/server.ts
+++ b/http/server.ts
@@ -13,6 +13,42 @@ interface Deferred {
resolve: () => void;
reject: () => void;
}
+
+function deferred(isResolved = false): Deferred {
+ let resolve, reject;
+ const promise = new Promise((res, rej) => {
+ resolve = res;
+ reject = rej;
+ });
+ if (isResolved) {
+ resolve();
+ }
+ return {
+ promise,
+ resolve,
+ reject
+ };
+}
+
+interface HttpConn extends Conn {
+ // When read by a newly created request B, lastId is the id pointing to a previous
+ // request A, such that we must wait for responses to A to complete before
+ // writing B's response.
+ lastPipelineId: number;
+ pendingDeferredMap: Map<number, Deferred>;
+}
+
+function createHttpConn(c: Conn): HttpConn {
+ const httpConn = Object.assign(c, {
+ lastPipelineId: 0,
+ pendingDeferredMap: new Map()
+ });
+
+ const resolvedDeferred = deferred(true);
+ httpConn.pendingDeferredMap.set(0, resolvedDeferred);
+ return httpConn;
+}
+
function bufWriter(w: Writer): BufWriter {
if (w instanceof BufWriter) {
return w;
@@ -115,11 +151,12 @@ async function readAllIterator(
}
export class ServerRequest {
+ pipelineId: number;
url: string;
method: string;
proto: string;
headers: Headers;
- conn: Conn;
+ conn: HttpConn;
r: BufReader;
w: BufWriter;
@@ -204,23 +241,26 @@ export class ServerRequest {
}
async respond(r: Response): Promise<void> {
- return writeResponse(this.w, r);
+ // Check and wait if the previous request is done responding.
+ const lastPipelineId = this.pipelineId - 1;
+ const lastPipelineDeferred = this.conn.pendingDeferredMap.get(
+ lastPipelineId
+ );
+ assert(!!lastPipelineDeferred);
+ await lastPipelineDeferred.promise;
+ // If yes, delete old deferred and proceed with writing.
+ this.conn.pendingDeferredMap.delete(lastPipelineId);
+ // Write our response!
+ await writeResponse(this.w, r);
+ // Signal the next pending request that it can start writing.
+ const currPipelineDeferred = this.conn.pendingDeferredMap.get(
+ this.pipelineId
+ );
+ assert(!!currPipelineDeferred);
+ currPipelineDeferred.resolve();
}
}
-function deferred(): Deferred {
- let resolve, reject;
- const promise = new Promise((res, rej) => {
- resolve = res;
- reject = rej;
- });
- return {
- promise,
- resolve,
- reject
- };
-}
-
interface ServeEnv {
reqQueue: ServerRequest[];
serveDeferred: Deferred;
@@ -235,7 +275,7 @@ interface ServeEnv {
* See https://v8.dev/blog/fast-async
*/
async function readRequest(
- c: Conn,
+ c: HttpConn,
bufr?: BufReader
): Promise<[ServerRequest, BufState]> {
if (!bufr) {
@@ -243,6 +283,13 @@ async function readRequest(
}
const bufw = new BufWriter(c);
const req = new ServerRequest();
+
+ // Set and incr pipeline id;
+ req.pipelineId = ++c.lastPipelineId;
+ // Set a new pipeline deferred associated with this request
+ // for future requests to wait for.
+ c.pendingDeferredMap.set(req.pipelineId, deferred());
+
req.conn = c;
req.r = bufr!;
req.w = bufw;
@@ -277,7 +324,7 @@ function maybeHandleReq(
env.serveDeferred.resolve(); // signal while loop to process it
}
-function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader): void {
+function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}
@@ -298,7 +345,8 @@ export async function* serve(
listener.accept().then(handleConn);
};
handleConn = (conn: Conn) => {
- serveConn(env, conn); // don't block
+ const httpConn = createHttpConn(conn);
+ serveConn(env, httpConn); // don't block
scheduleAccept(); // schedule next accept
};