summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock4
-rw-r--r--cli/tests/unit_node/http2_test.ts46
-rw-r--r--ext/node/polyfills/http2.ts144
3 files changed, 163 insertions, 31 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 51192db2f..8afda07c2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6301,9 +6301,9 @@ dependencies = [
[[package]]
name = "v8"
-version = "0.79.1"
+version = "0.79.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b87d5248d1a7e321a264d21dc7839675fc0bb456e489102272a55b44047869f0"
+checksum = "b15561535230812a1db89a696f1f16a12ae6c2c370c6b2241c68d4cb33963faf"
dependencies = [
"bitflags 1.3.2",
"fslock",
diff --git a/cli/tests/unit_node/http2_test.ts b/cli/tests/unit_node/http2_test.ts
index 8e7b261ae..d0e0de43f 100644
--- a/cli/tests/unit_node/http2_test.ts
+++ b/cli/tests/unit_node/http2_test.ts
@@ -63,6 +63,52 @@ for (const url of ["http://127.0.0.1:4246", "https://127.0.0.1:4247"]) {
});
}
+Deno.test(`[node/http2 client createConnection]`, {
+ ignore: Deno.build.os === "windows",
+}, async () => {
+ const url = "http://127.0.0.1:4246";
+ const createConnPromise = deferred();
+ // Create a server to respond to the HTTP2 requests
+ const client = http2.connect(url, {
+ createConnection() {
+ const socket = net.connect({ host: "127.0.0.1", port: 4246 });
+
+ socket.on("connect", () => {
+ createConnPromise.resolve();
+ });
+
+ return socket;
+ },
+ });
+ client.on("error", (err) => console.error(err));
+
+ const req = client.request({ ":method": "POST", ":path": "/" });
+
+ let receivedData = "";
+
+ req.write("hello");
+ req.setEncoding("utf8");
+
+ req.on("data", (chunk) => {
+ receivedData += chunk;
+ });
+ req.end();
+
+ const endPromise = deferred();
+ setTimeout(() => {
+ try {
+ client.close();
+ } catch (_) {
+ // pass
+ }
+ endPromise.resolve();
+ }, 2000);
+
+ await createConnPromise;
+ await endPromise;
+ assertEquals(receivedData, "hello world\n");
+});
+
// TODO(bartlomieju): reenable sanitizers
Deno.test("[node/http2 server]", { sanitizeOps: false }, async () => {
const server = http2.createServer();
diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts
index 62dd1a501..9ebdabb79 100644
--- a/ext/node/polyfills/http2.ts
+++ b/ext/node/polyfills/http2.ts
@@ -8,9 +8,11 @@ const core = globalThis.Deno.core;
import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter } from "node:events";
import { Buffer } from "node:buffer";
-import { Server, Socket, TCP } from "node:net";
+import { connect as netConnect, Server, Socket, TCP } from "node:net";
+import { connect as tlsConnect } from "node:tls";
import { TypedArray } from "ext:deno_node/internal/util/types.ts";
import {
+ kHandle,
kMaybeDestroy,
kUpdateTimer,
setStreamTimeout,
@@ -36,11 +38,11 @@ import {
ERR_HTTP2_STREAM_ERROR,
ERR_HTTP2_TRAILERS_ALREADY_SENT,
ERR_HTTP2_TRAILERS_NOT_READY,
+ ERR_HTTP2_UNSUPPORTED_PROTOCOL,
ERR_INVALID_HTTP_TOKEN,
+ ERR_SOCKET_CLOSED,
} from "ext:deno_node/internal/errors.ts";
import { _checkIsHttpToken } from "ext:deno_node/_http_common.ts";
-import { TcpConn } from "ext:deno_net/01_net.js";
-import { TlsConn } from "ext:deno_net/02_tls.js";
const {
op_http2_connect,
@@ -66,6 +68,7 @@ const kDenoResponse = Symbol("kDenoResponse");
const kDenoRid = Symbol("kDenoRid");
const kDenoClientRid = Symbol("kDenoClientRid");
const kDenoConnRid = Symbol("kDenoConnRid");
+const kPollConnPromiseId = Symbol("kPollConnPromiseId");
const STREAM_FLAGS_PENDING = 0x0;
const STREAM_FLAGS_READY = 0x1;
@@ -205,8 +208,12 @@ export class Http2Session extends EventEmitter {
_opaqueData: Buffer | TypedArray | DataView,
) {
warnNotImplemented("Http2Session.goaway");
- core.tryClose(this[kDenoConnRid]);
- core.tryClose(this[kDenoClientRid]);
+ if (this[kDenoConnRid]) {
+ core.tryClose(this[kDenoConnRid]);
+ }
+ if (this[kDenoClientRid]) {
+ core.tryClose(this[kDenoClientRid]);
+ }
}
destroy(error = constants.NGHTTP2_NO_ERROR, code?: number) {
@@ -264,7 +271,7 @@ export class Http2Session extends EventEmitter {
}
setTimeout(msecs: number, callback?: () => void) {
- setStreamTimeout(this, msecs, callback);
+ setStreamTimeout.call(this, msecs, callback);
}
}
@@ -302,8 +309,13 @@ function closeSession(session: Http2Session, code?: number, error?: Error) {
session[kDenoConnRid],
session[kDenoClientRid],
);
- core.tryClose(session[kDenoConnRid]);
- core.tryClose(session[kDenoClientRid]);
+ console.table(Deno.resources());
+ if (session[kDenoConnRid]) {
+ core.tryClose(session[kDenoConnRid]);
+ }
+ if (session[kDenoClientRid]) {
+ core.tryClose(session[kDenoClientRid]);
+ }
finishSessionClose(session, error);
}
@@ -340,9 +352,11 @@ function assertValidPseudoHeader(header: string) {
export class ClientHttp2Session extends Http2Session {
#connectPromise: Promise<void>;
+ #refed = true;
constructor(
- connPromise: Promise<TcpConn> | Promise<TlsConn>,
+ // deno-lint-ignore no-explicit-any
+ socket: any,
url: string,
options: Record<string, unknown>,
) {
@@ -350,22 +364,42 @@ export class ClientHttp2Session extends Http2Session {
this[kPendingRequestCalls] = null;
this[kDenoClientRid] = undefined;
this[kDenoConnRid] = undefined;
+ this[kPollConnPromiseId] = undefined;
+
+ socket.on("error", socketOnError);
+ socket.on("close", socketOnClose);
+ const connPromise = new Promise((resolve) => {
+ const eventName = url.startsWith("https") ? "secureConnect" : "connect";
+ socket.once(eventName, () => {
+ const rid = socket[kHandle][kStreamBaseField].rid;
+ nextTick(() => {
+ resolve(rid);
+ });
+ });
+ });
+ socket[kSession] = this;
// TODO(bartlomieju): cleanup
this.#connectPromise = (async () => {
debugHttp2(">>> before connect");
- const conn = await connPromise;
- const [clientRid, connRid] = await op_http2_connect(conn.rid, url);
- debugHttp2(">>> after connect");
+ const connRid_ = await connPromise;
+ // console.log(">>>> awaited connRid", connRid_, url);
+ const [clientRid, connRid] = await op_http2_connect(connRid_, url);
+ debugHttp2(">>> after connect", clientRid, connRid);
this[kDenoClientRid] = clientRid;
this[kDenoConnRid] = connRid;
- // TODO(bartlomieju): save this promise, so the session can be unrefed
(async () => {
try {
- await core.opAsync(
+ const promise = core.opAsync(
"op_http2_poll_client_connection",
this[kDenoConnRid],
);
+ this[kPollConnPromiseId] =
+ promise[Symbol.for("Deno.core.internalPromiseId")];
+ if (!this.#refed) {
+ this.unref();
+ }
+ await promise;
} catch (e) {
this.emit("error", e);
}
@@ -374,6 +408,20 @@ export class ClientHttp2Session extends Http2Session {
})();
}
+ ref() {
+ this.#refed = true;
+ if (this[kPollConnPromiseId]) {
+ core.refOp(this[kPollConnPromiseId]);
+ }
+ }
+
+ unref() {
+ this.#refed = false;
+ if (this[kPollConnPromiseId]) {
+ core.unrefOp(this[kPollConnPromiseId]);
+ }
+ }
+
request(
headers: Http2Headers,
options?: Record<string, unknown>,
@@ -1190,7 +1238,9 @@ function finishCloseStream(stream, code) {
);
core.tryClose(stream[kDenoRid]);
core.tryClose(stream[kDenoResponse].bodyRid);
- stream.emit("close");
+ nextTick(() => {
+ stream.emit("close");
+ });
}).catch(() => {
debugHttp2(
">>> finishCloseStream close2 catch",
@@ -1199,7 +1249,9 @@ function finishCloseStream(stream, code) {
);
core.tryClose(stream[kDenoRid]);
core.tryClose(stream[kDenoResponse].bodyRid);
- stream.emit("close");
+ nextTick(() => {
+ stream.emit("close");
+ });
});
}
}
@@ -1488,24 +1540,32 @@ export function connect(
host = authority.host;
}
- // TODO(bartlomieju): handle defaults
- if (typeof options.createConnection === "function") {
- console.error("Not implemented: http2.connect.options.createConnection");
- // notImplemented("http2.connect.options.createConnection");
- }
+ let url, socket;
- let conn, url;
- if (protocol == "http:") {
- conn = Deno.connect({ port, hostname: host });
+ if (typeof options.createConnection === "function") {
url = `http://${host}${port == 80 ? "" : (":" + port)}`;
- } else if (protocol == "https:") {
- conn = Deno.connectTls({ port, hostname: host, alpnProtocols: ["h2"] });
- url = `http://${host}${port == 443 ? "" : (":" + port)}`;
+ socket = options.createConnection(host, options);
} else {
- throw new TypeError("Unexpected URL protocol");
+ switch (protocol) {
+ case "http:":
+ url = `http://${host}${port == 80 ? "" : (":" + port)}`;
+ socket = netConnect({ port, host, ...options, pauseOnCreate: true });
+ break;
+ case "https:":
+ // TODO(bartlomieju): handle `initializeTLSOptions` here
+ url = `https://${host}${port == 443 ? "" : (":" + port)}`;
+ socket = tlsConnect(port, host, { manualStart: true });
+ break;
+ default:
+ throw new ERR_HTTP2_UNSUPPORTED_PROTOCOL(protocol);
+ }
}
- const session = new ClientHttp2Session(conn, url, options);
+ // Pause so no "socket.read()" starts in the background that would
+ // prevent us from taking ownership of the socket in `ClientHttp2Session`
+ socket.pause();
+ const session = new ClientHttp2Session(socket, url, options);
+
session[kAuthority] = `${options.servername || host}:${port}`;
session[kProtocol] = protocol;
@@ -1515,6 +1575,32 @@ export function connect(
return session;
}
+function socketOnError(error) {
+ const session = this[kSession];
+ if (session !== undefined) {
+ if (error.code === "ECONNRESET" && session[kState].goawayCode !== null) {
+ return session.destroy();
+ }
+ debugHttp2(">>>> socket error", error);
+ session.destroy(error);
+ }
+}
+
+function socketOnClose() {
+ const session = this[kSession];
+ if (session !== undefined) {
+ debugHttp2(">>>> socket closed");
+ const err = session.connecting ? new ERR_SOCKET_CLOSED() : null;
+ const state = session[kState];
+ state.streams.forEach((stream) => stream.close(constants.NGHTTP2_CANCEL));
+ state.pendingStreams.forEach((stream) =>
+ stream.close(constants.NGHTTP2_CANCEL)
+ );
+ session.close();
+ session[kMaybeDestroy](err);
+ }
+}
+
export const constants = {
NGHTTP2_ERR_FRAME_SIZE_ERROR: -522,
NGHTTP2_NV_FLAG_NONE: 0,