diff options
Diffstat (limited to 'http/server.ts')
| -rw-r--r-- | http/server.ts | 84 |
1 files changed, 66 insertions, 18 deletions
diff --git a/http/server.ts b/http/server.ts index 17295f739..f1ef015ba 100644 --- a/http/server.ts +++ b/http/server.ts @@ -13,6 +13,42 @@ interface Deferred { resolve: () => void; reject: () => void; } + +function deferred(isResolved = false): Deferred { + let resolve, reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + if (isResolved) { + resolve(); + } + return { + promise, + resolve, + reject + }; +} + +interface HttpConn extends Conn { + // When read by a newly created request B, lastId is the id pointing to a previous + // request A, such that we must wait for responses to A to complete before + // writing B's response. + lastPipelineId: number; + pendingDeferredMap: Map<number, Deferred>; +} + +function createHttpConn(c: Conn): HttpConn { + const httpConn = Object.assign(c, { + lastPipelineId: 0, + pendingDeferredMap: new Map() + }); + + const resolvedDeferred = deferred(true); + httpConn.pendingDeferredMap.set(0, resolvedDeferred); + return httpConn; +} + function bufWriter(w: Writer): BufWriter { if (w instanceof BufWriter) { return w; @@ -115,11 +151,12 @@ async function readAllIterator( } export class ServerRequest { + pipelineId: number; url: string; method: string; proto: string; headers: Headers; - conn: Conn; + conn: HttpConn; r: BufReader; w: BufWriter; @@ -204,23 +241,26 @@ export class ServerRequest { } async respond(r: Response): Promise<void> { - return writeResponse(this.w, r); + // Check and wait if the previous request is done responding. + const lastPipelineId = this.pipelineId - 1; + const lastPipelineDeferred = this.conn.pendingDeferredMap.get( + lastPipelineId + ); + assert(!!lastPipelineDeferred); + await lastPipelineDeferred.promise; + // If yes, delete old deferred and proceed with writing. + this.conn.pendingDeferredMap.delete(lastPipelineId); + // Write our response! + await writeResponse(this.w, r); + // Signal the next pending request that it can start writing. + const currPipelineDeferred = this.conn.pendingDeferredMap.get( + this.pipelineId + ); + assert(!!currPipelineDeferred); + currPipelineDeferred.resolve(); } } -function deferred(): Deferred { - let resolve, reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { - promise, - resolve, - reject - }; -} - interface ServeEnv { reqQueue: ServerRequest[]; serveDeferred: Deferred; @@ -235,7 +275,7 @@ interface ServeEnv { * See https://v8.dev/blog/fast-async */ async function readRequest( - c: Conn, + c: HttpConn, bufr?: BufReader ): Promise<[ServerRequest, BufState]> { if (!bufr) { @@ -243,6 +283,13 @@ async function readRequest( } const bufw = new BufWriter(c); const req = new ServerRequest(); + + // Set and incr pipeline id; + req.pipelineId = ++c.lastPipelineId; + // Set a new pipeline deferred associated with this request + // for future requests to wait for. + c.pendingDeferredMap.set(req.pipelineId, deferred()); + req.conn = c; req.r = bufr!; req.w = bufw; @@ -277,7 +324,7 @@ function maybeHandleReq( env.serveDeferred.resolve(); // signal while loop to process it } -function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader): void { +function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void { readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); } @@ -298,7 +345,8 @@ export async function* serve( listener.accept().then(handleConn); }; handleConn = (conn: Conn) => { - serveConn(env, conn); // don't block + const httpConn = createHttpConn(conn); + serveConn(env, httpConn); // don't block scheduleAccept(); // schedule next accept }; |
