diff options
author | Leo Kettmeir <crowlkats@toaxl.com> | 2023-05-27 15:42:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-27 15:42:20 +0200 |
commit | be59e93220e24a2e66ae2843a136e61eab9d8ac3 (patch) | |
tree | 9c49d56516d1f126234d4e11e276750c6028b42a /ext/node/polyfills/http.ts | |
parent | d0c5ff42f4b5fa9b848e6ed5af2e480d12f15bda (diff) |
refactor(node/http): don't use readablestream for writing to request (#19282)
Refactors the internal usage of a readablestream to write to the
resource directly
---------
Co-authored-by: Bartek IwaĆczuk <biwanczuk@gmail.com>
Diffstat (limited to 'ext/node/polyfills/http.ts')
-rw-r--r-- | ext/node/polyfills/http.ts | 230 |
1 files changed, 86 insertions, 144 deletions
diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 93c802d37..4e72b80f8 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -38,6 +38,7 @@ import { Agent, globalAgent } from "ext:deno_node/_http_agent.mjs"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; +import { notImplemented } from "ext:deno_node/_utils.ts"; import { connResetException, ERR_HTTP_HEADERS_SENT, @@ -500,6 +501,14 @@ class ClientRequest extends OutgoingMessage { delete optsWithoutSignal.signal; } + if (options!.createConnection) { + notImplemented("ClientRequest.options.createConnection"); + } + + if (options!.lookup) { + notImplemented("ClientRequest.options.lookup"); + } + // initiate connection // TODO(crowlKats): finish this /*if (this.agent) { @@ -547,61 +556,14 @@ class ClientRequest extends OutgoingMessage { const client = this._getClient() ?? createHttpClient({ http2: false }); this._client = client; - const req = core.ops.op_node_http_request( + this._req = core.ops.op_node_http_request( this.method, url, headers, client.rid, this.method === "POST" || this.method === "PATCH", ); - - this._req = req; - - if (req.requestBodyRid !== null) { - const reader = this.stream.getReader(); - (async () => { - let done = false; - while (!done) { - let val; - try { - const res = await reader.read(); - done = res.done; - val = res.value; - } catch (err) { - //if (terminator.aborted) break; - // TODO(lucacasonato): propagate error into response body stream - this._requestSendError = err; - this._requestSendErrorSet = true; - break; - } - if (done) break; - try { - await core.writeAll(req.requestBodyRid, val); - } catch (err) { - //if (terminator.aborted) break; - await reader.cancel(err); - // TODO(lucacasonato): propagate error into response body stream - this._requestSendError = err; - this._requestSendErrorSet = true; - break; - } - } - if (done /*&& !terminator.aborted*/) { - try { - await core.shutdown(req.requestBodyRid); - } catch (err) { - // TODO(bartlomieju): fix this conditional - // deno-lint-ignore no-constant-condition - if (true) { - this._requestSendError = err; - this._requestSendErrorSet = true; - } - } - } - //WeakMapPrototypeDelete(requestBodyReaders, req); - core.tryClose(req.requestBodyRid); - })(); - } + this._bodyWriteRid = this._req.requestBodyRid; } _getClient(): Deno.HttpClient | undefined { @@ -645,112 +607,92 @@ class ClientRequest extends OutgoingMessage { } } - // TODO(bartlomieju): use callback here // deno-lint-ignore no-explicit-any - end(chunk?: any, encoding?: any, _cb?: any): this { + end(chunk?: any, encoding?: any, cb?: any): this { this.finished = true; - - if (chunk !== undefined) { + if (chunk !== undefined && chunk !== null) { this.write(chunk, encoding); } - this.controller.close(); - core.opAsync("op_fetch_send", this._req.requestRid).then((res) => { - if (this._timeout) { - this._timeout.onabort = null; - } - this._client.close(); - const incoming = new IncomingMessageForClient(this.socket); - - // TODO(@crowlKats): - // incoming.httpVersionMajor = versionMajor; - // incoming.httpVersionMinor = versionMinor; - // incoming.httpVersion = `${versionMajor}.${versionMinor}`; - // incoming.joinDuplicateHeaders = socket?.server?.joinDuplicateHeaders || - // parser.joinDuplicateHeaders; - - incoming.url = res.url; - incoming.statusCode = res.status; - incoming.statusMessage = res.statusText; - - incoming._addHeaderLines( - res.headers, - Object.entries(res.headers).flat().length, - ); - incoming._bodyRid = res.responseRid; + (async () => { + try { + const [res, _] = await Promise.all([ + core.opAsync("op_fetch_send", this._req.requestRid), + (async () => { + if (this._bodyWriteRid) { + try { + await core.shutdown(this._bodyWriteRid); + } catch (err) { + this._requestSendError = err; + } + + core.tryClose(this._bodyWriteRid); + + try { + cb?.(); + } catch (_) { + // + } + } + })(), + ]); + if (this._timeout) { + this._timeout.onabort = null; + } + this._client.close(); + const incoming = new IncomingMessageForClient(this.socket); + + // TODO(@crowlKats): + // incoming.httpVersionMajor = versionMajor; + // incoming.httpVersionMinor = versionMinor; + // incoming.httpVersion = `${versionMajor}.${versionMinor}`; + // incoming.joinDuplicateHeaders = socket?.server?.joinDuplicateHeaders || + // parser.joinDuplicateHeaders; + + incoming.url = res.url; + incoming.statusCode = res.status; + incoming.statusMessage = res.statusText; + + incoming._addHeaderLines( + res.headers, + Object.entries(res.headers).flat().length, + ); + incoming._bodyRid = res.responseRid; - if (this._req.cancelHandleRid !== null) { - core.tryClose(this._req.cancelHandleRid); - } + if (this._req.cancelHandleRid !== null) { + core.tryClose(this._req.cancelHandleRid); + } - this.emit("response", incoming); - }).catch((err) => { - if (this._req.cancelHandleRid !== null) { - core.tryClose(this._req.cancelHandleRid); - } + this.emit("response", incoming); + } catch (err) { + if (this._req.cancelHandleRid !== null) { + core.tryClose(this._req.cancelHandleRid); + } - if (this._requestSendErrorSet) { - // if the request body stream errored, we want to propagate that error - // instead of the original error from opFetchSend - throw new TypeError("Failed to fetch: request body stream errored", { - cause: this._requestSendError, - }); - } + if (this._requestSendError !== undefined) { + // if the request body stream errored, we want to propagate that error + // instead of the original error from opFetchSend + throw new TypeError( + "Failed to fetch: request body stream errored", + { + cause: this._requestSendError, + }, + ); + } - if (err.message.includes("connection closed before message completed")) { - // Node.js seems ignoring this error - } else if (err.message.includes("The signal has been aborted")) { - // Remap this error - this.emit("error", connResetException("socket hang up")); - } else { - this.emit("error", err); + if ( + err.message.includes("connection closed before message completed") + ) { + // Node.js seems ignoring this error + } else if (err.message.includes("The signal has been aborted")) { + // Remap this error + this.emit("error", connResetException("socket hang up")); + } else { + this.emit("error", err); + } } - }); + })(); } - /* - override async _final() { - if (this.controller) { - this.controller.close(); - } - - const body = await this._createBody(this.body, this.opts); - const client = await this._createCustomClient(); - const opts = { - body, - method: this.opts.method, - client, - headers: this.opts.headers, - signal: this.opts.signal ?? undefined, - }; - const mayResponse = fetch(this._createUrlStrFromOptions(this.opts), opts) - .catch((e) => { - if (e.message.includes("connection closed before message completed")) { - // Node.js seems ignoring this error - } else if (e.message.includes("The signal has been aborted")) { - // Remap this error - this.emit("error", connResetException("socket hang up")); - } else { - this.emit("error", e); - } - return undefined; - }); - - const res = new IncomingMessageForClient( - await mayResponse, - this._createSocket(), - ); - this.emit("response", res); - if (client) { - res.on("end", () => { - client.close(); - }); - } - if (this.opts.timeout != undefined) { - clearTimeout(this.opts.timeout); - this.opts.timeout = undefined; - } - this.cb?.(res); - }*/ abort() { if (this.aborted) { |