summaryrefslogtreecommitdiff
path: root/ext/node/polyfills
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills')
-rw-r--r--ext/node/polyfills/_http_outgoing.ts50
-rw-r--r--ext/node/polyfills/http.ts230
2 files changed, 108 insertions, 172 deletions
diff --git a/ext/node/polyfills/_http_outgoing.ts b/ext/node/polyfills/_http_outgoing.ts
index c4b88ae2f..ab6a78038 100644
--- a/ext/node/polyfills/_http_outgoing.ts
+++ b/ext/node/polyfills/_http_outgoing.ts
@@ -1,6 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
+const core = globalThis.__bootstrap.core;
import { getDefaultHighWaterMark } from "ext:deno_node/internal/streams/state.mjs";
import assert from "ext:deno_node/internal/assert.mjs";
import EE from "ext:deno_node/events.ts";
@@ -137,12 +138,6 @@ export class OutgoingMessage extends Stream {
this._keepAliveTimeout = 0;
this._onPendingData = nop;
-
- this.stream = new ReadableStream({
- start: (controller) => {
- this.controller = controller;
- },
- });
}
get writableFinished() {
@@ -374,21 +369,30 @@ export class OutgoingMessage extends Stream {
return headers;
}
- controller: ReadableStreamDefaultController;
write(
chunk: string | Uint8Array | Buffer,
encoding: string | null,
- // TODO(crowlKats): use callback
- _callback: () => void,
+ callback: () => void,
): boolean {
- if (typeof chunk === "string") {
- chunk = Buffer.from(chunk, encoding);
- }
- if (chunk instanceof Buffer) {
- chunk = new Uint8Array(chunk.buffer);
- }
+ if (
+ (typeof chunk === "string" && chunk.length > 0) ||
+ ((chunk instanceof Buffer || chunk instanceof Uint8Array) &&
+ chunk.buffer.byteLength > 0)
+ ) {
+ if (typeof chunk === "string") {
+ chunk = Buffer.from(chunk, encoding);
+ }
+ if (chunk instanceof Buffer) {
+ chunk = new Uint8Array(chunk.buffer);
+ }
- this.controller.enqueue(chunk);
+ core.writeAll(this._bodyWriteRid, chunk).then(() => {
+ callback?.();
+ this.emit("drain");
+ }).catch((e) => {
+ this._requestSendError = e;
+ });
+ }
return false;
}
@@ -400,18 +404,8 @@ export class OutgoingMessage extends Stream {
}
// deno-lint-ignore no-explicit-any
- end(chunk: any, encoding: any, _callback: any) {
- if (typeof chunk === "function") {
- callback = chunk;
- chunk = null;
- encoding = null;
- } else if (typeof encoding === "function") {
- callback = encoding;
- encoding = null;
- }
- // TODO(crowlKats): finish
-
- return this;
+ end(_chunk: any, _encoding: any, _callback: any) {
+ notImplemented("OutgoingMessage.end");
}
flushHeaders() {
diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts
index 93c802d37..4e72b80f8 100644
--- a/ext/node/polyfills/http.ts
+++ b/ext/node/polyfills/http.ts
@@ -38,6 +38,7 @@ import { Agent, globalAgent } from "ext:deno_node/_http_agent.mjs";
import { urlToHttpOptions } from "ext:deno_node/internal/url.ts";
import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts";
+import { notImplemented } from "ext:deno_node/_utils.ts";
import {
connResetException,
ERR_HTTP_HEADERS_SENT,
@@ -500,6 +501,14 @@ class ClientRequest extends OutgoingMessage {
delete optsWithoutSignal.signal;
}
+ if (options!.createConnection) {
+ notImplemented("ClientRequest.options.createConnection");
+ }
+
+ if (options!.lookup) {
+ notImplemented("ClientRequest.options.lookup");
+ }
+
// initiate connection
// TODO(crowlKats): finish this
/*if (this.agent) {
@@ -547,61 +556,14 @@ class ClientRequest extends OutgoingMessage {
const client = this._getClient() ?? createHttpClient({ http2: false });
this._client = client;
- const req = core.ops.op_node_http_request(
+ this._req = core.ops.op_node_http_request(
this.method,
url,
headers,
client.rid,
this.method === "POST" || this.method === "PATCH",
);
-
- this._req = req;
-
- if (req.requestBodyRid !== null) {
- const reader = this.stream.getReader();
- (async () => {
- let done = false;
- while (!done) {
- let val;
- try {
- const res = await reader.read();
- done = res.done;
- val = res.value;
- } catch (err) {
- //if (terminator.aborted) break;
- // TODO(lucacasonato): propagate error into response body stream
- this._requestSendError = err;
- this._requestSendErrorSet = true;
- break;
- }
- if (done) break;
- try {
- await core.writeAll(req.requestBodyRid, val);
- } catch (err) {
- //if (terminator.aborted) break;
- await reader.cancel(err);
- // TODO(lucacasonato): propagate error into response body stream
- this._requestSendError = err;
- this._requestSendErrorSet = true;
- break;
- }
- }
- if (done /*&& !terminator.aborted*/) {
- try {
- await core.shutdown(req.requestBodyRid);
- } catch (err) {
- // TODO(bartlomieju): fix this conditional
- // deno-lint-ignore no-constant-condition
- if (true) {
- this._requestSendError = err;
- this._requestSendErrorSet = true;
- }
- }
- }
- //WeakMapPrototypeDelete(requestBodyReaders, req);
- core.tryClose(req.requestBodyRid);
- })();
- }
+ this._bodyWriteRid = this._req.requestBodyRid;
}
_getClient(): Deno.HttpClient | undefined {
@@ -645,112 +607,92 @@ class ClientRequest extends OutgoingMessage {
}
}
- // TODO(bartlomieju): use callback here
// deno-lint-ignore no-explicit-any
- end(chunk?: any, encoding?: any, _cb?: any): this {
+ end(chunk?: any, encoding?: any, cb?: any): this {
this.finished = true;
-
- if (chunk !== undefined) {
+ if (chunk !== undefined && chunk !== null) {
this.write(chunk, encoding);
}
- this.controller.close();
- core.opAsync("op_fetch_send", this._req.requestRid).then((res) => {
- if (this._timeout) {
- this._timeout.onabort = null;
- }
- this._client.close();
- const incoming = new IncomingMessageForClient(this.socket);
-
- // TODO(@crowlKats):
- // incoming.httpVersionMajor = versionMajor;
- // incoming.httpVersionMinor = versionMinor;
- // incoming.httpVersion = `${versionMajor}.${versionMinor}`;
- // incoming.joinDuplicateHeaders = socket?.server?.joinDuplicateHeaders ||
- // parser.joinDuplicateHeaders;
-
- incoming.url = res.url;
- incoming.statusCode = res.status;
- incoming.statusMessage = res.statusText;
-
- incoming._addHeaderLines(
- res.headers,
- Object.entries(res.headers).flat().length,
- );
- incoming._bodyRid = res.responseRid;
+ (async () => {
+ try {
+ const [res, _] = await Promise.all([
+ core.opAsync("op_fetch_send", this._req.requestRid),
+ (async () => {
+ if (this._bodyWriteRid) {
+ try {
+ await core.shutdown(this._bodyWriteRid);
+ } catch (err) {
+ this._requestSendError = err;
+ }
+
+ core.tryClose(this._bodyWriteRid);
+
+ try {
+ cb?.();
+ } catch (_) {
+ //
+ }
+ }
+ })(),
+ ]);
+ if (this._timeout) {
+ this._timeout.onabort = null;
+ }
+ this._client.close();
+ const incoming = new IncomingMessageForClient(this.socket);
+
+ // TODO(@crowlKats):
+ // incoming.httpVersionMajor = versionMajor;
+ // incoming.httpVersionMinor = versionMinor;
+ // incoming.httpVersion = `${versionMajor}.${versionMinor}`;
+ // incoming.joinDuplicateHeaders = socket?.server?.joinDuplicateHeaders ||
+ // parser.joinDuplicateHeaders;
+
+ incoming.url = res.url;
+ incoming.statusCode = res.status;
+ incoming.statusMessage = res.statusText;
+
+ incoming._addHeaderLines(
+ res.headers,
+ Object.entries(res.headers).flat().length,
+ );
+ incoming._bodyRid = res.responseRid;
- if (this._req.cancelHandleRid !== null) {
- core.tryClose(this._req.cancelHandleRid);
- }
+ if (this._req.cancelHandleRid !== null) {
+ core.tryClose(this._req.cancelHandleRid);
+ }
- this.emit("response", incoming);
- }).catch((err) => {
- if (this._req.cancelHandleRid !== null) {
- core.tryClose(this._req.cancelHandleRid);
- }
+ this.emit("response", incoming);
+ } catch (err) {
+ if (this._req.cancelHandleRid !== null) {
+ core.tryClose(this._req.cancelHandleRid);
+ }
- if (this._requestSendErrorSet) {
- // if the request body stream errored, we want to propagate that error
- // instead of the original error from opFetchSend
- throw new TypeError("Failed to fetch: request body stream errored", {
- cause: this._requestSendError,
- });
- }
+ if (this._requestSendError !== undefined) {
+ // if the request body stream errored, we want to propagate that error
+ // instead of the original error from opFetchSend
+ throw new TypeError(
+ "Failed to fetch: request body stream errored",
+ {
+ cause: this._requestSendError,
+ },
+ );
+ }
- if (err.message.includes("connection closed before message completed")) {
- // Node.js seems ignoring this error
- } else if (err.message.includes("The signal has been aborted")) {
- // Remap this error
- this.emit("error", connResetException("socket hang up"));
- } else {
- this.emit("error", err);
+ if (
+ err.message.includes("connection closed before message completed")
+ ) {
+ // Node.js seems ignoring this error
+ } else if (err.message.includes("The signal has been aborted")) {
+ // Remap this error
+ this.emit("error", connResetException("socket hang up"));
+ } else {
+ this.emit("error", err);
+ }
}
- });
+ })();
}
- /*
- override async _final() {
- if (this.controller) {
- this.controller.close();
- }
-
- const body = await this._createBody(this.body, this.opts);
- const client = await this._createCustomClient();
- const opts = {
- body,
- method: this.opts.method,
- client,
- headers: this.opts.headers,
- signal: this.opts.signal ?? undefined,
- };
- const mayResponse = fetch(this._createUrlStrFromOptions(this.opts), opts)
- .catch((e) => {
- if (e.message.includes("connection closed before message completed")) {
- // Node.js seems ignoring this error
- } else if (e.message.includes("The signal has been aborted")) {
- // Remap this error
- this.emit("error", connResetException("socket hang up"));
- } else {
- this.emit("error", e);
- }
- return undefined;
- });
-
- const res = new IncomingMessageForClient(
- await mayResponse,
- this._createSocket(),
- );
- this.emit("response", res);
- if (client) {
- res.on("end", () => {
- client.close();
- });
- }
- if (this.opts.timeout != undefined) {
- clearTimeout(this.opts.timeout);
- this.opts.timeout = undefined;
- }
- this.cb?.(res);
- }*/
abort() {
if (this.aborted) {