diff options
author | Bert Belder <bertbelder@gmail.com> | 2021-10-04 18:50:40 -0700 |
---|---|---|
committer | Bert Belder <bertbelder@gmail.com> | 2021-11-10 14:51:43 -0800 |
commit | 72a6231a614e71a57c4f8ce5f9de68ab97171dd1 (patch) | |
tree | b19bbd749ad67f606ef331fee00bfe2b34477633 /ext/http/01_http.js | |
parent | 0cc8a9741a16efe3e37167731238b33d26887fd0 (diff) |
refactor(ext/http): rewrite hyper integration and fix bug (#12732)
Fixes: #12193
Fixes: #12251
Closes: #12714
Diffstat (limited to 'ext/http/01_http.js')
-rw-r--r-- | ext/http/01_http.js | 272 |
1 files changed, 124 insertions, 148 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 9f05809f5..94f1a1051 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -27,6 +27,7 @@ Set, SetPrototypeAdd, SetPrototypeDelete, + SetPrototypeHas, SetPrototypeValues, StringPrototypeIncludes, StringPrototypeToLowerCase, @@ -42,6 +43,8 @@ class HttpConn { #rid = 0; + #closed = false; + // This set holds resource ids of resources // that were created during lifecycle of this request. // When the connection is closed these resources should be closed @@ -62,10 +65,11 @@ let nextRequest; try { nextRequest = await core.opAsync( - "op_http_request_next", + "op_http_accept", this.#rid, ); } catch (error) { + this.close(); // A connection error seen here would cause disrupted responses to throw // a generic `BadResource` error. Instead store this error and replace // those with it. @@ -79,26 +83,31 @@ } throw error; } - if (nextRequest === null) return null; + if (nextRequest == null) { + // Work-around for servers (deno_std/http in particular) that call + // `nextRequest()` before upgrading a previous request which has a + // `connection: upgrade` header. + await null; + + this.close(); + return null; + } const [ - requestRid, - responseSenderRid, + streamRid, method, headersList, url, ] = nextRequest; + SetPrototypeAdd(this.managedResources, streamRid); /** @type {ReadableStream<Uint8Array> | undefined} */ let body = null; - if (typeof requestRid === "number") { - SetPrototypeAdd(this.managedResources, requestRid); - // There might be a body, but we don't expose it for GET/HEAD requests. - // It will be closed automatically once the request has been handled and - // the response has been sent. - if (method !== "GET" && method !== "HEAD") { - body = createRequestBodyStream(this, requestRid); - } + // There might be a body, but we don't expose it for GET/HEAD requests. + // It will be closed automatically once the request has been handled and + // the response has been sent. + if (method !== "GET" && method !== "HEAD") { + body = createRequestBodyStream(streamRid); } const innerRequest = newInnerRequest( @@ -111,22 +120,21 @@ const signal = abortSignal.newSignal(); const request = fromInnerRequest(innerRequest, signal, "immutable"); - SetPrototypeAdd(this.managedResources, responseSenderRid); - const respondWith = createRespondWith( - this, - responseSenderRid, - requestRid, - ); + const respondWith = createRespondWith(this, streamRid); return { request, respondWith }; } /** @returns {void} */ close() { - for (const rid of SetPrototypeValues(this.managedResources)) { - core.tryClose(rid); + if (!this.#closed) { + this.#closed = true; + core.close(this.#rid); + for (const rid of SetPrototypeValues(this.managedResources)) { + SetPrototypeDelete(this.managedResources, rid); + core.close(rid); + } } - core.close(this.#rid); } [SymbolAsyncIterator]() { @@ -136,97 +144,86 @@ async next() { const reqEvt = await httpConn.nextRequest(); // Change with caution, current form avoids a v8 deopt - return { value: reqEvt, done: reqEvt === null }; + return { value: reqEvt ?? undefined, done: reqEvt === null }; }, }; } } - function readRequest(requestRid, zeroCopyBuf) { - return core.opAsync( - "op_http_request_read", - requestRid, - zeroCopyBuf, - ); + function readRequest(streamRid, buf) { + return core.opAsync("op_http_read", streamRid, buf); } - function createRespondWith(httpConn, responseSenderRid, requestRid) { + function createRespondWith(httpConn, streamRid) { return async function respondWith(resp) { - if (resp instanceof Promise) { - resp = await resp; - } + try { + if (resp instanceof Promise) { + resp = await resp; + } - if (!(resp instanceof Response)) { - throw new TypeError( - "First argument to respondWith must be a Response or a promise resolving to a Response.", - ); - } + if (!(resp instanceof Response)) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } - const innerResp = toInnerResponse(resp); - - // If response body length is known, it will be sent synchronously in a - // single op, in other case a "response body" resource will be created and - // we'll be streaming it. - /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ - let respBody = null; - if (innerResp.body !== null) { - if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); - if (innerResp.body.streamOrStatic instanceof ReadableStream) { - if ( - innerResp.body.length === null || - innerResp.body.source instanceof Blob - ) { - respBody = innerResp.body.stream; - } else { - const reader = innerResp.body.stream.getReader(); - const r1 = await reader.read(); - if (r1.done) { - respBody = new Uint8Array(0); + const innerResp = toInnerResponse(resp); + + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ + let respBody = null; + if (innerResp.body !== null) { + if (innerResp.body.unusable()) { + throw new TypeError("Body is unusable."); + } + if (innerResp.body.streamOrStatic instanceof ReadableStream) { + if ( + innerResp.body.length === null || + innerResp.body.source instanceof Blob + ) { + respBody = innerResp.body.stream; } else { - respBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); + } else { + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } } + } else { + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; } } else { - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; + respBody = new Uint8Array(0); } - } else { - respBody = new Uint8Array(0); - } + const isStreamingResponseBody = + !(typeof respBody === "string" || respBody instanceof Uint8Array); - SetPrototypeDelete(httpConn.managedResources, responseSenderRid); - let responseBodyRid; - try { - responseBodyRid = await core.opAsync( - "op_http_response", - [ - responseSenderRid, + try { + await core.opAsync("op_http_write_headers", [ + streamRid, innerResp.status ?? 200, innerResp.headerList, - ], - (respBody instanceof Uint8Array || typeof respBody === "string") - ? respBody - : null, - ); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if (error instanceof BadResource && connError != null) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - if (respBody !== null && respBody instanceof ReadableStream) { - await respBody.cancel(error); + ], isStreamingResponseBody ? null : respBody); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + if (respBody !== null && respBody instanceof ReadableStream) { + await respBody.cancel(error); + } + throw error; } - throw error; - } - // If `respond` returns a responseBodyRid, we should stream the body - // to that resource. - if (responseBodyRid !== null) { - SetPrototypeAdd(httpConn.managedResources, responseBodyRid); - try { + if (isStreamingResponseBody) { if (respBody === null || !(respBody instanceof ReadableStream)) { throw new TypeError("Unreachable"); } @@ -239,11 +236,7 @@ break; } try { - await core.opAsync( - "op_http_response_write", - responseBodyRid, - value, - ); + await core.opAsync("op_http_write", streamRid, value); } catch (error) { const connError = httpConn[connErrorSymbol]; if (error instanceof BadResource && connError != null) { @@ -254,61 +247,55 @@ throw error; } } - } finally { - // Once all chunks are sent, and the request body is closed, we can - // close the response body. - SetPrototypeDelete(httpConn.managedResources, responseBodyRid); try { - await core.opAsync("op_http_response_close", responseBodyRid); - } catch { /* pass */ } + await core.opAsync("op_http_shutdown", streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } } - } - const ws = resp[_ws]; - if (ws) { - if (typeof requestRid !== "number") { - throw new TypeError( - "This request can not be upgraded to a websocket connection.", + const ws = resp[_ws]; + if (ws) { + const wsRid = await core.opAsync( + "op_http_upgrade_websocket", + streamRid, ); - } + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - const wsRid = await core.opAsync( - "op_http_upgrade_websocket", - requestRid, - ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + httpConn.close(); - if (ws[_readyState] === WebSocket.CLOSING) { - await core.opAsync("op_ws_close", { rid: wsRid }); + if (ws[_readyState] === WebSocket.CLOSING) { + await core.opAsync("op_ws_close", { rid: wsRid }); - ws[_readyState] = WebSocket.CLOSED; + ws[_readyState] = WebSocket.CLOSED; - const errEvent = new ErrorEvent("error"); - ws.dispatchEvent(errEvent); + const errEvent = new ErrorEvent("error"); + ws.dispatchEvent(errEvent); - const event = new CloseEvent("close"); - ws.dispatchEvent(event); + const event = new CloseEvent("close"); + ws.dispatchEvent(event); - core.tryClose(wsRid); - } else { - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); + core.tryClose(wsRid); + } else { + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); - ws[_eventLoop](); + ws[_eventLoop](); + } + } + } finally { + if (SetPrototypeHas(httpConn.managedResources, streamRid)) { + SetPrototypeDelete(httpConn.managedResources, streamRid); + core.close(streamRid); } - } else if (typeof requestRid === "number") { - // Try to close "request" resource. It might have been already consumed, - // but if it hasn't been we need to close it here to avoid resource - // leak. - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.tryClose(requestRid); } }; } - function createRequestBodyStream(httpConn, requestRid) { + function createRequestBodyStream(streamRid) { return new ReadableStream({ type: "bytes", async pull(controller) { @@ -316,32 +303,21 @@ // This is the largest possible size for a single packet on a TLS // stream. const chunk = new Uint8Array(16 * 1024 + 256); - const read = await readRequest( - requestRid, - chunk, - ); + const read = await readRequest(streamRid, chunk); if (read > 0) { // We read some data. Enqueue it onto the stream. controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); } else { // We have reached the end of the body, so we close the stream. controller.close(); - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.close(requestRid); } } catch (err) { // There was an error while reading a chunk of the body, so we // error. controller.error(err); controller.close(); - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.close(requestRid); } }, - cancel() { - SetPrototypeDelete(httpConn.managedResources, requestRid); - core.close(requestRid); - }, }); } |