summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/http2.ts
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills/http2.ts')
-rw-r--r--ext/node/polyfills/http2.ts144
1 files changed, 115 insertions, 29 deletions
diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts
index 62dd1a501..9ebdabb79 100644
--- a/ext/node/polyfills/http2.ts
+++ b/ext/node/polyfills/http2.ts
@@ -8,9 +8,11 @@ const core = globalThis.Deno.core;
import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter } from "node:events";
import { Buffer } from "node:buffer";
-import { Server, Socket, TCP } from "node:net";
+import { connect as netConnect, Server, Socket, TCP } from "node:net";
+import { connect as tlsConnect } from "node:tls";
import { TypedArray } from "ext:deno_node/internal/util/types.ts";
import {
+ kHandle,
kMaybeDestroy,
kUpdateTimer,
setStreamTimeout,
@@ -36,11 +38,11 @@ import {
ERR_HTTP2_STREAM_ERROR,
ERR_HTTP2_TRAILERS_ALREADY_SENT,
ERR_HTTP2_TRAILERS_NOT_READY,
+ ERR_HTTP2_UNSUPPORTED_PROTOCOL,
ERR_INVALID_HTTP_TOKEN,
+ ERR_SOCKET_CLOSED,
} from "ext:deno_node/internal/errors.ts";
import { _checkIsHttpToken } from "ext:deno_node/_http_common.ts";
-import { TcpConn } from "ext:deno_net/01_net.js";
-import { TlsConn } from "ext:deno_net/02_tls.js";
const {
op_http2_connect,
@@ -66,6 +68,7 @@ const kDenoResponse = Symbol("kDenoResponse");
const kDenoRid = Symbol("kDenoRid");
const kDenoClientRid = Symbol("kDenoClientRid");
const kDenoConnRid = Symbol("kDenoConnRid");
+const kPollConnPromiseId = Symbol("kPollConnPromiseId");
const STREAM_FLAGS_PENDING = 0x0;
const STREAM_FLAGS_READY = 0x1;
@@ -205,8 +208,12 @@ export class Http2Session extends EventEmitter {
_opaqueData: Buffer | TypedArray | DataView,
) {
warnNotImplemented("Http2Session.goaway");
- core.tryClose(this[kDenoConnRid]);
- core.tryClose(this[kDenoClientRid]);
+ if (this[kDenoConnRid]) {
+ core.tryClose(this[kDenoConnRid]);
+ }
+ if (this[kDenoClientRid]) {
+ core.tryClose(this[kDenoClientRid]);
+ }
}
destroy(error = constants.NGHTTP2_NO_ERROR, code?: number) {
@@ -264,7 +271,7 @@ export class Http2Session extends EventEmitter {
}
setTimeout(msecs: number, callback?: () => void) {
- setStreamTimeout(this, msecs, callback);
+ setStreamTimeout.call(this, msecs, callback);
}
}
@@ -302,8 +309,13 @@ function closeSession(session: Http2Session, code?: number, error?: Error) {
session[kDenoConnRid],
session[kDenoClientRid],
);
- core.tryClose(session[kDenoConnRid]);
- core.tryClose(session[kDenoClientRid]);
+ console.table(Deno.resources());
+ if (session[kDenoConnRid]) {
+ core.tryClose(session[kDenoConnRid]);
+ }
+ if (session[kDenoClientRid]) {
+ core.tryClose(session[kDenoClientRid]);
+ }
finishSessionClose(session, error);
}
@@ -340,9 +352,11 @@ function assertValidPseudoHeader(header: string) {
export class ClientHttp2Session extends Http2Session {
#connectPromise: Promise<void>;
+ #refed = true;
constructor(
- connPromise: Promise<TcpConn> | Promise<TlsConn>,
+ // deno-lint-ignore no-explicit-any
+ socket: any,
url: string,
options: Record<string, unknown>,
) {
@@ -350,22 +364,42 @@ export class ClientHttp2Session extends Http2Session {
this[kPendingRequestCalls] = null;
this[kDenoClientRid] = undefined;
this[kDenoConnRid] = undefined;
+ this[kPollConnPromiseId] = undefined;
+
+ socket.on("error", socketOnError);
+ socket.on("close", socketOnClose);
+ const connPromise = new Promise((resolve) => {
+ const eventName = url.startsWith("https") ? "secureConnect" : "connect";
+ socket.once(eventName, () => {
+ const rid = socket[kHandle][kStreamBaseField].rid;
+ nextTick(() => {
+ resolve(rid);
+ });
+ });
+ });
+ socket[kSession] = this;
// TODO(bartlomieju): cleanup
this.#connectPromise = (async () => {
debugHttp2(">>> before connect");
- const conn = await connPromise;
- const [clientRid, connRid] = await op_http2_connect(conn.rid, url);
- debugHttp2(">>> after connect");
+ const connRid_ = await connPromise;
+ // console.log(">>>> awaited connRid", connRid_, url);
+ const [clientRid, connRid] = await op_http2_connect(connRid_, url);
+ debugHttp2(">>> after connect", clientRid, connRid);
this[kDenoClientRid] = clientRid;
this[kDenoConnRid] = connRid;
- // TODO(bartlomieju): save this promise, so the session can be unrefed
(async () => {
try {
- await core.opAsync(
+ const promise = core.opAsync(
"op_http2_poll_client_connection",
this[kDenoConnRid],
);
+ this[kPollConnPromiseId] =
+ promise[Symbol.for("Deno.core.internalPromiseId")];
+ if (!this.#refed) {
+ this.unref();
+ }
+ await promise;
} catch (e) {
this.emit("error", e);
}
@@ -374,6 +408,20 @@ export class ClientHttp2Session extends Http2Session {
})();
}
+ ref() {
+ this.#refed = true;
+ if (this[kPollConnPromiseId]) {
+ core.refOp(this[kPollConnPromiseId]);
+ }
+ }
+
+ unref() {
+ this.#refed = false;
+ if (this[kPollConnPromiseId]) {
+ core.unrefOp(this[kPollConnPromiseId]);
+ }
+ }
+
request(
headers: Http2Headers,
options?: Record<string, unknown>,
@@ -1190,7 +1238,9 @@ function finishCloseStream(stream, code) {
);
core.tryClose(stream[kDenoRid]);
core.tryClose(stream[kDenoResponse].bodyRid);
- stream.emit("close");
+ nextTick(() => {
+ stream.emit("close");
+ });
}).catch(() => {
debugHttp2(
">>> finishCloseStream close2 catch",
@@ -1199,7 +1249,9 @@ function finishCloseStream(stream, code) {
);
core.tryClose(stream[kDenoRid]);
core.tryClose(stream[kDenoResponse].bodyRid);
- stream.emit("close");
+ nextTick(() => {
+ stream.emit("close");
+ });
});
}
}
@@ -1488,24 +1540,32 @@ export function connect(
host = authority.host;
}
- // TODO(bartlomieju): handle defaults
- if (typeof options.createConnection === "function") {
- console.error("Not implemented: http2.connect.options.createConnection");
- // notImplemented("http2.connect.options.createConnection");
- }
+ let url, socket;
- let conn, url;
- if (protocol == "http:") {
- conn = Deno.connect({ port, hostname: host });
+ if (typeof options.createConnection === "function") {
url = `http://${host}${port == 80 ? "" : (":" + port)}`;
- } else if (protocol == "https:") {
- conn = Deno.connectTls({ port, hostname: host, alpnProtocols: ["h2"] });
- url = `http://${host}${port == 443 ? "" : (":" + port)}`;
+ socket = options.createConnection(host, options);
} else {
- throw new TypeError("Unexpected URL protocol");
+ switch (protocol) {
+ case "http:":
+ url = `http://${host}${port == 80 ? "" : (":" + port)}`;
+ socket = netConnect({ port, host, ...options, pauseOnCreate: true });
+ break;
+ case "https:":
+ // TODO(bartlomieju): handle `initializeTLSOptions` here
+ url = `https://${host}${port == 443 ? "" : (":" + port)}`;
+ socket = tlsConnect(port, host, { manualStart: true });
+ break;
+ default:
+ throw new ERR_HTTP2_UNSUPPORTED_PROTOCOL(protocol);
+ }
}
- const session = new ClientHttp2Session(conn, url, options);
+ // Pause so no "socket.read()" starts in the background that would
+ // prevent us from taking ownership of the socket in `ClientHttp2Session`
+ socket.pause();
+ const session = new ClientHttp2Session(socket, url, options);
+
session[kAuthority] = `${options.servername || host}:${port}`;
session[kProtocol] = protocol;
@@ -1515,6 +1575,32 @@ export function connect(
return session;
}
+function socketOnError(error) {
+ const session = this[kSession];
+ if (session !== undefined) {
+ if (error.code === "ECONNRESET" && session[kState].goawayCode !== null) {
+ return session.destroy();
+ }
+ debugHttp2(">>>> socket error", error);
+ session.destroy(error);
+ }
+}
+
+function socketOnClose() {
+ const session = this[kSession];
+ if (session !== undefined) {
+ debugHttp2(">>>> socket closed");
+ const err = session.connecting ? new ERR_SOCKET_CLOSED() : null;
+ const state = session[kState];
+ state.streams.forEach((stream) => stream.close(constants.NGHTTP2_CANCEL));
+ state.pendingStreams.forEach((stream) =>
+ stream.close(constants.NGHTTP2_CANCEL)
+ );
+ session.close();
+ session[kMaybeDestroy](err);
+ }
+}
+
export const constants = {
NGHTTP2_ERR_FRAME_SIZE_ERROR: -522,
NGHTTP2_NV_FLAG_NONE: 0,