diff options
author | Luca Casonato <hello@lcas.dev> | 2021-11-09 12:10:21 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-09 12:10:21 +0100 |
commit | 75793baae83123f890442c5d32e3dd38eb18ce1c (patch) | |
tree | 0d14bd5edbe28c3beebe9f0944437a89e0e3f724 /ext/http/01_http.js | |
parent | 31fde9deba6d4ca55293d60a030babd8d4ce12af (diff) |
Revert "refactor(ext/http): rewrite hyper integration and fix bug (#12332)" (#12704)
This reverts commit 5b1e537446454f6332de44adbeb6a15ff072c2fa.
Diffstat (limited to 'ext/http/01_http.js')
-rw-r--r-- | ext/http/01_http.js | 267 |
1 files changed, 148 insertions, 119 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js index d06be2142..9f05809f5 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -27,7 +27,6 @@ Set, SetPrototypeAdd, SetPrototypeDelete, - SetPrototypeHas, SetPrototypeValues, StringPrototypeIncludes, StringPrototypeToLowerCase, @@ -43,8 +42,6 @@ class HttpConn { #rid = 0; - #closed = false; - // This set holds resource ids of resources // that were created during lifecycle of this request. // When the connection is closed these resources should be closed @@ -65,11 +62,10 @@ let nextRequest; try { nextRequest = await core.opAsync( - "op_http_accept", + "op_http_request_next", this.#rid, ); } catch (error) { - this.close(); // A connection error seen here would cause disrupted responses to throw // a generic `BadResource` error. Instead store this error and replace // those with it. @@ -83,26 +79,26 @@ } throw error; } - if (nextRequest == null) { - this.close(); - return null; - } + if (nextRequest === null) return null; const [ - streamRid, + requestRid, + responseSenderRid, method, headersList, url, ] = nextRequest; - SetPrototypeAdd(this.managedResources, streamRid); /** @type {ReadableStream<Uint8Array> | undefined} */ let body = null; - // There might be a body, but we don't expose it for GET/HEAD requests. - // It will be closed automatically once the request has been handled and - // the response has been sent. - if (method !== "GET" && method !== "HEAD") { - body = createRequestBodyStream(streamRid); + if (typeof requestRid === "number") { + SetPrototypeAdd(this.managedResources, requestRid); + // There might be a body, but we don't expose it for GET/HEAD requests. + // It will be closed automatically once the request has been handled and + // the response has been sent. + if (method !== "GET" && method !== "HEAD") { + body = createRequestBodyStream(this, requestRid); + } } const innerRequest = newInnerRequest( @@ -115,21 +111,22 @@ const signal = abortSignal.newSignal(); const request = fromInnerRequest(innerRequest, signal, "immutable"); - const respondWith = createRespondWith(this, streamRid); + SetPrototypeAdd(this.managedResources, responseSenderRid); + const respondWith = createRespondWith( + this, + responseSenderRid, + requestRid, + ); return { request, respondWith }; } /** @returns {void} */ close() { - if (!this.#closed) { - this.#closed = true; - core.close(this.#rid); - for (const rid of SetPrototypeValues(this.managedResources)) { - SetPrototypeDelete(this.managedResources, rid); - core.close(rid); - } + for (const rid of SetPrototypeValues(this.managedResources)) { + core.tryClose(rid); } + core.close(this.#rid); } [SymbolAsyncIterator]() { @@ -139,86 +136,97 @@ async next() { const reqEvt = await httpConn.nextRequest(); // Change with caution, current form avoids a v8 deopt - return { value: reqEvt ?? undefined, done: reqEvt === null }; + return { value: reqEvt, done: reqEvt === null }; }, }; } } - function readRequest(streamRid, buf) { - return core.opAsync("op_http_read", streamRid, buf); + function readRequest(requestRid, zeroCopyBuf) { + return core.opAsync( + "op_http_request_read", + requestRid, + zeroCopyBuf, + ); } - function createRespondWith(httpConn, streamRid) { + function createRespondWith(httpConn, responseSenderRid, requestRid) { return async function respondWith(resp) { - try { - if (resp instanceof Promise) { - resp = await resp; - } - - if (!(resp instanceof Response)) { - throw new TypeError( - "First argument to respondWith must be a Response or a promise resolving to a Response.", - ); - } + if (resp instanceof Promise) { + resp = await resp; + } - const innerResp = toInnerResponse(resp); + if (!(resp instanceof Response)) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } - // If response body length is known, it will be sent synchronously in a - // single op, in other case a "response body" resource will be created and - // we'll be streaming it. - /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ - let respBody = null; - if (innerResp.body !== null) { - if (innerResp.body.unusable()) { - throw new TypeError("Body is unusable."); - } - if (innerResp.body.streamOrStatic instanceof ReadableStream) { - if ( - innerResp.body.length === null || - innerResp.body.source instanceof Blob - ) { - respBody = innerResp.body.stream; + const innerResp = toInnerResponse(resp); + + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ + let respBody = null; + if (innerResp.body !== null) { + if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); + if (innerResp.body.streamOrStatic instanceof ReadableStream) { + if ( + innerResp.body.length === null || + innerResp.body.source instanceof Blob + ) { + respBody = innerResp.body.stream; + } else { + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); } else { - const reader = innerResp.body.stream.getReader(); - const r1 = await reader.read(); - if (r1.done) { - respBody = new Uint8Array(0); - } else { - respBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); - } + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); } - } else { - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; } } else { - respBody = new Uint8Array(0); + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; } - const isStreamingResponseBody = - !(typeof respBody === "string" || respBody instanceof Uint8Array); + } else { + respBody = new Uint8Array(0); + } - try { - await core.opAsync("op_http_write_headers", [ - streamRid, + SetPrototypeDelete(httpConn.managedResources, responseSenderRid); + let responseBodyRid; + try { + responseBodyRid = await core.opAsync( + "op_http_response", + [ + responseSenderRid, innerResp.status ?? 200, innerResp.headerList, - ], isStreamingResponseBody ? null : respBody); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if (error instanceof BadResource && connError != null) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - if (respBody !== null && respBody instanceof ReadableStream) { - await respBody.cancel(error); - } - throw error; + ], + (respBody instanceof Uint8Array || typeof respBody === "string") + ? respBody + : null, + ); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); } + if (respBody !== null && respBody instanceof ReadableStream) { + await respBody.cancel(error); + } + throw error; + } - if (isStreamingResponseBody) { + // If `respond` returns a responseBodyRid, we should stream the body + // to that resource. + if (responseBodyRid !== null) { + SetPrototypeAdd(httpConn.managedResources, responseBodyRid); + try { if (respBody === null || !(respBody instanceof ReadableStream)) { throw new TypeError("Unreachable"); } @@ -231,7 +239,11 @@ break; } try { - await core.opAsync("op_http_write", streamRid, value); + await core.opAsync( + "op_http_response_write", + responseBodyRid, + value, + ); } catch (error) { const connError = httpConn[connErrorSymbol]; if (error instanceof BadResource && connError != null) { @@ -242,55 +254,61 @@ throw error; } } + } finally { + // Once all chunks are sent, and the request body is closed, we can + // close the response body. + SetPrototypeDelete(httpConn.managedResources, responseBodyRid); try { - await core.opAsync("op_http_shutdown", streamRid); - } catch (error) { - await reader.cancel(error); - throw error; - } + await core.opAsync("op_http_response_close", responseBodyRid); + } catch { /* pass */ } } + } - const ws = resp[_ws]; - if (ws) { - const wsRid = await core.opAsync( - "op_http_upgrade_websocket", - streamRid, + const ws = resp[_ws]; + if (ws) { + if (typeof requestRid !== "number") { + throw new TypeError( + "This request can not be upgraded to a websocket connection.", ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + } - httpConn.close(); + const wsRid = await core.opAsync( + "op_http_upgrade_websocket", + requestRid, + ); + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - if (ws[_readyState] === WebSocket.CLOSING) { - await core.opAsync("op_ws_close", { rid: wsRid }); + if (ws[_readyState] === WebSocket.CLOSING) { + await core.opAsync("op_ws_close", { rid: wsRid }); - ws[_readyState] = WebSocket.CLOSED; + ws[_readyState] = WebSocket.CLOSED; - const errEvent = new ErrorEvent("error"); - ws.dispatchEvent(errEvent); + const errEvent = new ErrorEvent("error"); + ws.dispatchEvent(errEvent); - const event = new CloseEvent("close"); - ws.dispatchEvent(event); + const event = new CloseEvent("close"); + ws.dispatchEvent(event); - core.tryClose(wsRid); - } else { - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); + core.tryClose(wsRid); + } else { + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); - ws[_eventLoop](); - } - } - } finally { - if (SetPrototypeHas(httpConn.managedResources, streamRid)) { - SetPrototypeDelete(httpConn.managedResources, streamRid); - core.close(streamRid); + ws[_eventLoop](); } + } else if (typeof requestRid === "number") { + // Try to close "request" resource. It might have been already consumed, + // but if it hasn't been we need to close it here to avoid resource + // leak. + SetPrototypeDelete(httpConn.managedResources, requestRid); + core.tryClose(requestRid); } }; } - function createRequestBodyStream(streamRid) { + function createRequestBodyStream(httpConn, requestRid) { return new ReadableStream({ type: "bytes", async pull(controller) { @@ -298,21 +316,32 @@ // This is the largest possible size for a single packet on a TLS // stream. const chunk = new Uint8Array(16 * 1024 + 256); - const read = await readRequest(streamRid, chunk); + const read = await readRequest( + requestRid, + chunk, + ); if (read > 0) { // We read some data. Enqueue it onto the stream. controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); } else { // We have reached the end of the body, so we close the stream. controller.close(); + SetPrototypeDelete(httpConn.managedResources, requestRid); + core.close(requestRid); } } catch (err) { // There was an error while reading a chunk of the body, so we // error. controller.error(err); controller.close(); + SetPrototypeDelete(httpConn.managedResources, requestRid); + core.close(requestRid); } }, + cancel() { + SetPrototypeDelete(httpConn.managedResources, requestRid); + core.close(requestRid); + }, }); } |