summaryrefslogtreecommitdiff
path: root/ext/node/polyfills/http2.ts
diff options
context:
space:
mode:
Diffstat (limited to 'ext/node/polyfills/http2.ts')
-rw-r--r--ext/node/polyfills/http2.ts1169
1 files changed, 1062 insertions, 107 deletions
diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts
index 20306584f..62dd1a501 100644
--- a/ext/node/polyfills/http2.ts
+++ b/ext/node/polyfills/http2.ts
@@ -4,79 +4,160 @@
// TODO(petamoriken): enable prefer-primordials for node polyfills
// deno-lint-ignore-file prefer-primordials
+const core = globalThis.Deno.core;
import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter } from "node:events";
import { Buffer } from "node:buffer";
import { Server, Socket, TCP } from "node:net";
import { TypedArray } from "ext:deno_node/internal/util/types.ts";
-import { setStreamTimeout } from "ext:deno_node/internal/stream_base_commons.ts";
+import {
+ kMaybeDestroy,
+ kUpdateTimer,
+ setStreamTimeout,
+} from "ext:deno_node/internal/stream_base_commons.ts";
import { FileHandle } from "node:fs/promises";
import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
import { addTrailers, serveHttpOnConnection } from "ext:deno_http/00_serve.js";
import { type Deferred, deferred } from "ext:deno_node/_util/async.ts";
import { nextTick } from "ext:deno_node/_next_tick.ts";
import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
+import { Duplex } from "node:stream";
+import {
+ AbortError,
+ ERR_HTTP2_CONNECT_AUTHORITY,
+ ERR_HTTP2_CONNECT_PATH,
+ ERR_HTTP2_CONNECT_SCHEME,
+ ERR_HTTP2_GOAWAY_SESSION,
+ ERR_HTTP2_INVALID_PSEUDOHEADER,
+ ERR_HTTP2_INVALID_SESSION,
+ ERR_HTTP2_INVALID_STREAM,
+ ERR_HTTP2_SESSION_ERROR,
+ ERR_HTTP2_STREAM_CANCEL,
+ ERR_HTTP2_STREAM_ERROR,
+ ERR_HTTP2_TRAILERS_ALREADY_SENT,
+ ERR_HTTP2_TRAILERS_NOT_READY,
+ ERR_INVALID_HTTP_TOKEN,
+} from "ext:deno_node/internal/errors.ts";
+import { _checkIsHttpToken } from "ext:deno_node/_http_common.ts";
+import { TcpConn } from "ext:deno_net/01_net.js";
+import { TlsConn } from "ext:deno_net/02_tls.js";
+
+const {
+ op_http2_connect,
+} = core.ensureFastOps();
+
+const kSession = Symbol("session");
+const kAlpnProtocol = Symbol("alpnProtocol");
+const kAuthority = Symbol("authority");
+const kEncrypted = Symbol("encrypted");
+const kID = Symbol("id");
+const kInit = Symbol("init");
+const kInfoHeaders = Symbol("sent-info-headers");
+const kOrigin = Symbol("origin");
+const kPendingRequestCalls = Symbol("kPendingRequestCalls");
+const kProtocol = Symbol("protocol");
+const kSentHeaders = Symbol("sent-headers");
+const kSentTrailers = Symbol("sent-trailers");
+const kState = Symbol("state");
+const kType = Symbol("type");
+const kTimeout = Symbol("timeout");
+
+const kDenoResponse = Symbol("kDenoResponse");
+const kDenoRid = Symbol("kDenoRid");
+const kDenoClientRid = Symbol("kDenoClientRid");
+const kDenoConnRid = Symbol("kDenoConnRid");
+
+const STREAM_FLAGS_PENDING = 0x0;
+const STREAM_FLAGS_READY = 0x1;
+const STREAM_FLAGS_CLOSED = 0x2;
+const STREAM_FLAGS_HEADERS_SENT = 0x4;
+const STREAM_FLAGS_HEAD_REQUEST = 0x8;
+const STREAM_FLAGS_ABORTED = 0x10;
+const STREAM_FLAGS_HAS_TRAILERS = 0x20;
+
+const SESSION_FLAGS_PENDING = 0x0;
+const SESSION_FLAGS_READY = 0x1;
+const SESSION_FLAGS_CLOSED = 0x2;
+const SESSION_FLAGS_DESTROYED = 0x4;
const ENCODER = new TextEncoder();
type Http2Headers = Record<string, string | string[]>;
+const debugHttp2Enabled = false;
+function debugHttp2(...args) {
+ if (debugHttp2Enabled) {
+ console.log(...args);
+ }
+}
+
export class Http2Session extends EventEmitter {
- constructor() {
+ constructor(type, _options /* socket */) {
super();
- }
- get alpnProtocol(): string | undefined {
- notImplemented("Http2Session.alpnProtocol");
- return undefined;
- }
+ // TODO(bartlomieju): Handle sockets here
- close(_callback?: () => void) {
- warnNotImplemented("Http2Session.close");
- }
+ this[kState] = {
+ destroyCode: constants.NGHTTP2_NO_ERROR,
+ flags: SESSION_FLAGS_PENDING,
+ goawayCode: null,
+ goawayLastStreamID: null,
+ streams: new Map(),
+ pendingStreams: new Set(),
+ pendingAck: 0,
+ writeQueueSize: 0,
+ originSet: undefined,
+ };
- get closed(): boolean {
- return false;
+ this[kEncrypted] = undefined;
+ this[kAlpnProtocol] = undefined;
+ this[kType] = type;
+ this[kTimeout] = null;
+ // this[kProxySocket] = null;
+ // this[kSocket] = socket;
+ // this[kHandle] = undefined;
+
+ // TODO(bartlomieju): connecting via socket
}
- get connecting(): boolean {
- notImplemented("Http2Session.connecting");
- return false;
+ get encrypted(): boolean {
+ return this[kEncrypted];
}
- destroy(_error?: Error, _code?: number) {
- notImplemented("Http2Session.destroy");
+ get alpnProtocol(): string | undefined {
+ return this[kAlpnProtocol];
}
- get destroyed(): boolean {
- return false;
+ get originSet(): string[] | undefined {
+ if (!this.encrypted || this.destroyed) {
+ return undefined;
+ }
+ // TODO(bartlomieju):
+ return [];
}
- get encrypted(): boolean {
- notImplemented("Http2Session.encrypted");
- return false;
+ get connecting(): boolean {
+ return (this[kState].flags & SESSION_FLAGS_READY) === 0;
}
- goaway(
- _code: number,
- _lastStreamID: number,
- _opaqueData: Buffer | TypedArray | DataView,
- ) {
- notImplemented("Http2Session.goaway");
+ get closed(): boolean {
+ return !!(this[kState].flags & SESSION_FLAGS_CLOSED);
}
- get localSettings(): Record<string, unknown> {
- notImplemented("Http2Session.localSettings");
- return {};
+ get destroyed(): boolean {
+ return !!(this[kState].flags & SESSION_FLAGS_DESTROYED);
}
- get originSet(): string[] | undefined {
- notImplemented("Http2Session.originSet");
- return undefined;
+ [kUpdateTimer]() {
+ if (this.destroyed) {
+ return;
+ }
+ if (this[kTimeout]) {
+ this[kTimeout].refresh();
+ }
}
- get pendingSettingsAck(): boolean {
- notImplemented("Http2Session.pendingSettingsAck");
- return false;
+ setLocalWindowSize(_windowSize: number) {
+ notImplemented("Http2Session.setLocalWindowSize");
}
ping(
@@ -87,28 +168,30 @@ export class Http2Session extends EventEmitter {
return false;
}
- ref() {
- warnNotImplemented("Http2Session.ref");
+ get socket(): Socket /*| TlsSocket*/ {
+ warnNotImplemented("Http2Session.socket");
+ return {};
}
- get remoteSettings(): Record<string, unknown> {
- notImplemented("Http2Session.remoteSettings");
- return {};
+ get type(): number {
+ return this[kType];
}
- setLocalWindowSize(_windowSize: number) {
- notImplemented("Http2Session.setLocalWindowSize");
+ get pendingSettingsAck() {
+ return this[kState].pendingAck > 0;
}
- setTimeout(msecs: number, callback?: () => void) {
- setStreamTimeout(this, msecs, callback);
+ get state(): Record<string, unknown> {
+ return {};
}
- get socket(): Socket /*| TlsSocket*/ {
+ get localSettings(): Record<string, unknown> {
+ notImplemented("Http2Session.localSettings");
return {};
}
- get state(): Record<string, unknown> {
+ get remoteSettings(): Record<string, unknown> {
+ notImplemented("Http2Session.remoteSettings");
return {};
}
@@ -116,19 +199,118 @@ export class Http2Session extends EventEmitter {
notImplemented("Http2Session.settings");
}
- get type(): number {
- notImplemented("Http2Session.type");
- return 0;
+ goaway(
+ _code: number,
+ _lastStreamID: number,
+ _opaqueData: Buffer | TypedArray | DataView,
+ ) {
+ warnNotImplemented("Http2Session.goaway");
+ core.tryClose(this[kDenoConnRid]);
+ core.tryClose(this[kDenoClientRid]);
+ }
+
+ destroy(error = constants.NGHTTP2_NO_ERROR, code?: number) {
+ if (this.destroyed) {
+ return;
+ }
+
+ if (typeof error === "number") {
+ code = error;
+ error = code !== constants.NGHTTP2_NO_ERROR
+ ? new ERR_HTTP2_SESSION_ERROR(code)
+ : undefined;
+ }
+ if (code === undefined && error != null) {
+ code = constants.NGHTTP2_INTERNAL_ERROR;
+ }
+
+ closeSession(this, code, error);
+ }
+
+ close(callback?: () => void) {
+ if (this.closed || this.destroyed) {
+ return;
+ }
+
+ this[kState].flags |= SESSION_FLAGS_CLOSED;
+ if (typeof callback === "function") {
+ this.once("close", callback);
+ }
+ this.goaway();
+ this[kMaybeDestroy]();
+ }
+
+ [kMaybeDestroy](error?: number) {
+ if (!error) {
+ const state = this[kState];
+ // Don't destroy if the session is not closed or there are pending or open
+ // streams.
+ if (
+ !this.closed || state.streams.size > 0 || state.pendingStreams.size >
+ 0
+ ) {
+ return;
+ }
+ }
+ this.destroy(error);
+ }
+
+ ref() {
+ warnNotImplemented("Http2Session.ref");
}
unref() {
warnNotImplemented("Http2Session.unref");
}
+
+ setTimeout(msecs: number, callback?: () => void) {
+ setStreamTimeout(this, msecs, callback);
+ }
+}
+
+function emitClose(session: Http2Session, error?: Error) {
+ if (error) {
+ session.emit("error", error);
+ }
+ session.emit("close");
+}
+
+function finishSessionClose(session: Http2Session, error?: Error) {
+ // TODO(bartlomieju): handle sockets
+
+ nextTick(emitClose, session, error);
+}
+
+function closeSession(session: Http2Session, code?: number, error?: Error) {
+ const state = session[kState];
+ state.flags |= SESSION_FLAGS_DESTROYED;
+ state.destroyCode = code;
+
+ session.setTimeout(0);
+ session.removeAllListeners("timeout");
+
+ // Destroy open and pending streams
+ if (state.pendingStreams.size > 0 || state.streams.size > 0) {
+ const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
+ state.pendingStreams.forEach((stream) => stream.destroy(cancel));
+ state.streams.forEach((stream) => stream.destroy(cancel));
+ }
+
+ // TODO(bartlomieju): handle sockets
+ debugHttp2(
+ ">>> closeSession",
+ session[kDenoConnRid],
+ session[kDenoClientRid],
+ );
+ core.tryClose(session[kDenoConnRid]);
+ core.tryClose(session[kDenoClientRid]);
+
+ finishSessionClose(session, error);
}
export class ServerHttp2Session extends Http2Session {
constructor() {
- super();
+ super(constants.NGHTTP2_SESSION_SERVER, {});
}
altsvc(
@@ -143,71 +325,184 @@ export class ServerHttp2Session extends Http2Session {
}
}
+function assertValidPseudoHeader(header: string) {
+ switch (header) {
+ case ":authority":
+ case ":path":
+ case ":method":
+ case ":scheme":
+ case ":status":
+ return;
+ default:
+ throw new ERR_HTTP2_INVALID_PSEUDOHEADER(header);
+ }
+}
+
export class ClientHttp2Session extends Http2Session {
+ #connectPromise: Promise<void>;
+
constructor(
- _authority: string | URL,
- _options: Record<string, unknown>,
- callback: (session: Http2Session) => void,
+ connPromise: Promise<TcpConn> | Promise<TlsConn>,
+ url: string,
+ options: Record<string, unknown>,
) {
- super();
- if (callback) {
- this.on("connect", callback);
- }
- nextTick(() => this.emit("connect", this));
+ super(constants.NGHTTP2_SESSION_CLIENT, options);
+ this[kPendingRequestCalls] = null;
+ this[kDenoClientRid] = undefined;
+ this[kDenoConnRid] = undefined;
+
+ // TODO(bartlomieju): cleanup
+ this.#connectPromise = (async () => {
+ debugHttp2(">>> before connect");
+ const conn = await connPromise;
+ const [clientRid, connRid] = await op_http2_connect(conn.rid, url);
+ debugHttp2(">>> after connect");
+ this[kDenoClientRid] = clientRid;
+ this[kDenoConnRid] = connRid;
+ // TODO(bartlomieju): save this promise, so the session can be unrefed
+ (async () => {
+ try {
+ await core.opAsync(
+ "op_http2_poll_client_connection",
+ this[kDenoConnRid],
+ );
+ } catch (e) {
+ this.emit("error", e);
+ }
+ })();
+ this.emit("connect", this, {});
+ })();
}
request(
headers: Http2Headers,
- _options?: Record<string, unknown>,
+ options?: Record<string, unknown>,
): ClientHttp2Stream {
- const reqHeaders: string[][] = [];
- const controllerPromise: Deferred<
- ReadableStreamDefaultController<Uint8Array>
- > = deferred();
- const body = new ReadableStream({
- start(controller) {
- controllerPromise.resolve(controller);
- },
- });
- const request: RequestInit = { headers: reqHeaders, body };
- let authority = null;
- let path = null;
- for (const [name, value] of Object.entries(headers)) {
- if (name == constants.HTTP2_HEADER_PATH) {
- path = String(value);
- } else if (name == constants.HTTP2_HEADER_METHOD) {
- request.method = String(value);
- } else if (name == constants.HTTP2_HEADER_AUTHORITY) {
- authority = String(value);
- } else {
- reqHeaders.push([name, String(value)]);
+ if (this.destroyed) {
+ throw new ERR_HTTP2_INVALID_SESSION();
+ }
+
+ if (this.closed) {
+ throw new ERR_HTTP2_GOAWAY_SESSION();
+ }
+
+ this[kUpdateTimer]();
+ if (headers !== null && headers !== undefined) {
+ const keys = Object.keys(headers);
+ for (let i = 0; i < keys.length; i++) {
+ const header = keys[i];
+ if (header[0] === ":") {
+ assertValidPseudoHeader(header);
+ } else if (header && !_checkIsHttpToken(header)) {
+ this.destroy(new ERR_INVALID_HTTP_TOKEN("Header name", header));
+ }
}
}
- const fetchPromise = fetch(`http://${authority}${path}`, request);
- const readerPromise = deferred();
- const headersPromise = deferred();
- (async () => {
- const fetch = await fetchPromise;
- readerPromise.resolve(fetch.body);
+ headers = Object.assign({ __proto__: null }, headers);
+ options = { ...options };
+
+ if (headers[constants.HTTP2_HEADER_METHOD] === undefined) {
+ headers[constants.HTTP2_HEADER_METHOD] = constants.HTTP2_METHOD_GET;
+ }
+
+ const connect =
+ headers[constants.HTTP2_HEADER_METHOD] === constants.HTTP2_METHOD_CONNECT;
- const headers: Http2Headers = {};
- for (const [key, value] of fetch.headers) {
- headers[key] = value;
+ if (!connect || headers[constants.HTTP2_HEADER_PROTOCOL] !== undefined) {
+ if (getAuthority(headers) === undefined) {
+ headers[constants.HTTP2_HEADER_AUTHORITY] = this[kAuthority];
+ }
+ if (headers[constants.HTTP2_HEADER_SCHEME] === undefined) {
+ headers[constants.HTTP2_HEADER_SCHEME] = this[kProtocol].slice(0, -1);
+ }
+ if (headers[constants.HTTP2_HEADER_PATH] === undefined) {
+ headers[constants.HTTP2_HEADER_PATH] = "/";
+ }
+ } else {
+ if (headers[constants.HTTP2_HEADER_AUTHORITY] === undefined) {
+ throw new ERR_HTTP2_CONNECT_AUTHORITY();
}
- headers[constants.HTTP2_HEADER_STATUS] = String(fetch.status);
+ if (headers[constants.HTTP2_HEADER_SCHEME] === undefined) {
+ throw new ERR_HTTP2_CONNECT_SCHEME();
+ }
+ if (headers[constants.HTTP2_HEADER_PATH] === undefined) {
+ throw new ERR_HTTP2_CONNECT_PATH();
+ }
+ }
- headersPromise.resolve(headers);
- })();
- return new ClientHttp2Stream(
+ if (options.endStream === undefined) {
+ const method = headers[constants.HTTP2_HEADER_METHOD];
+ options.endStream = method === constants.HTTP2_METHOD_DELETE ||
+ method === constants.HTTP2_METHOD_GET ||
+ method === constants.HTTP2_METHOD_HEAD;
+ } else {
+ options.endStream = !!options.endStream;
+ }
+
+ const stream = new ClientHttp2Stream(
+ options,
this,
- headersPromise,
- controllerPromise,
- readerPromise,
+ this.#connectPromise,
+ headers,
);
+ stream[kSentHeaders] = headers;
+ stream[kOrigin] = `${headers[constants.HTTP2_HEADER_SCHEME]}://${
+ getAuthority(headers)
+ }`;
+
+ if (options.endStream) {
+ stream.end();
+ }
+
+ if (options.waitForTrailers) {
+ stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
+ }
+
+ const { signal } = options;
+ if (signal) {
+ const aborter = () => {
+ stream.destroy(new AbortError(undefined, { cause: signal.reason }));
+ };
+ if (signal.aborted) {
+ aborter();
+ } else {
+ // TODO(bartlomieju): handle this
+ // const disposable = EventEmitter.addAbortListener(signal, aborter);
+ // stream.once("close", disposable[Symbol.dispose]);
+ }
+ }
+
+ // TODO(bartlomieju): handle this
+ const onConnect = () => {};
+ if (this.connecting) {
+ if (this[kPendingRequestCalls] !== null) {
+ this[kPendingRequestCalls].push(onConnect);
+ } else {
+ this[kPendingRequestCalls] = [onConnect];
+ this.once("connect", () => {
+ this[kPendingRequestCalls].forEach((f) => f());
+ this[kPendingRequestCalls] = null;
+ });
+ }
+ } else {
+ onConnect();
+ }
+
+ return stream;
}
}
+function getAuthority(headers) {
+ if (headers[constants.HTTP2_HEADER_AUTHORITY] !== undefined) {
+ return headers[constants.HTTP2_HEADER_AUTHORITY];
+ }
+ if (headers[constants.HTTP2_HEADER_HOST] !== undefined) {
+ return headers[constants.HTTP2_HEADER_HOST];
+ }
+ return undefined;
+}
+
export class Http2Stream extends EventEmitter {
#session: Http2Session;
#headers: Deferred<Http2Headers>;
@@ -265,6 +560,8 @@ export class Http2Stream extends EventEmitter {
})();
}
+ setEncoding(_encoding) {}
+
resume() {
}
@@ -351,17 +648,566 @@ export class Http2Stream extends EventEmitter {
}
}
-export class ClientHttp2Stream extends Http2Stream {
+async function clientHttp2Request(
+ session,
+ sessionConnectPromise,
+ headers,
+ options,
+) {
+ debugHttp2(
+ ">>> waiting for connect promise",
+ sessionConnectPromise,
+ headers,
+ options,
+ );
+ await sessionConnectPromise;
+
+ const reqHeaders: string[][] = [];
+ const pseudoHeaders = {};
+
+ for (const [key, value] of Object.entries(headers)) {
+ if (key[0] === ":") {
+ pseudoHeaders[key] = value;
+ } else {
+ reqHeaders.push([key, Array.isArray(value) ? value[0] : value]);
+ }
+ }
+ debugHttp2(
+ "waited for connect promise",
+ !!options.waitForTrailers,
+ pseudoHeaders,
+ reqHeaders,
+ );
+
+ return await core.opAsync(
+ "op_http2_client_request",
+ session[kDenoClientRid],
+ pseudoHeaders,
+ reqHeaders,
+ );
+}
+
+export class ClientHttp2Stream extends Duplex {
+ #requestPromise: Promise<[number, number]>;
+ #responsePromise: Promise<void>;
+ #rid: number | undefined = undefined;
+ #encoding = "utf8";
+
constructor(
+ options: Record<string, unknown>,
session: Http2Session,
- headers: Promise<Http2Headers>,
- controllerPromise: Deferred<ReadableStreamDefaultController<Uint8Array>>,
- readerPromise: Deferred<ReadableStream<Uint8Array>>,
+ sessionConnectPromise: Promise<void>,
+ headers: Record<string, string>,
) {
- super(session, headers, controllerPromise, readerPromise);
+ options.allowHalfOpen = true;
+ options.decodeString = false;
+ options.autoDestroy = false;
+ super(options);
+ this.cork();
+ this[kSession] = session;
+ session[kState].pendingStreams.add(this);
+
+ this._readableState.readingMore = true;
+
+ this[kState] = {
+ didRead: false,
+ flags: STREAM_FLAGS_PENDING | STREAM_FLAGS_HEADERS_SENT,
+ rstCode: constants.NGHTTP2_NO_ERROR,
+ writeQueueSize: 0,
+ trailersReady: false,
+ endAfterHeaders: false,
+ shutdownWritableCalled: false,
+ };
+ this[kDenoResponse] = undefined;
+ this[kDenoRid] = undefined;
+
+ this.#requestPromise = clientHttp2Request(
+ session,
+ sessionConnectPromise,
+ headers,
+ options,
+ );
+ debugHttp2(">>> created clienthttp2stream");
+ // TODO(bartlomieju): save it so we can unref
+ this.#responsePromise = (async () => {
+ debugHttp2(">>> before request promise", session[kDenoClientRid]);
+ const [streamRid, streamId] = await this.#requestPromise;
+ this.#rid = streamRid;
+ this[kDenoRid] = streamRid;
+ this[kInit](streamId);
+ debugHttp2(
+ ">>> after request promise",
+ session[kDenoClientRid],
+ this.#rid,
+ );
+ const response = await core.opAsync(
+ "op_http2_client_get_response",
+ this.#rid,
+ );
+ debugHttp2(">>> after get response", response);
+ const headers = {
+ ":status": response.statusCode,
+ ...Object.fromEntries(response.headers),
+ };
+ debugHttp2(">>> emitting response", headers);
+ this.emit("response", headers, 0);
+ this[kDenoResponse] = response;
+ this.emit("ready");
+ })();
+ }
+
+ [kUpdateTimer]() {
+ if (this.destroyed) {
+ return;
+ }
+ if (this[kTimeout]) {
+ this[kTimeout].refresh();
+ }
+ if (this[kSession]) {
+ this[kSession][kUpdateTimer]();
+ }
+ }
+
+ [kInit](id) {
+ const state = this[kState];
+ state.flags |= STREAM_FLAGS_READY;
+
+ const session = this[kSession];
+ session[kState].pendingStreams.delete(this);
+ session[kState].streams.set(id, this);
+
+ // TODO(bartlomieju): handle socket handle
+
+ this[kID] = id;
+ this.uncork();
+ this.emit("ready");
+ }
+
+ get bufferSize() {
+ return this[kState].writeQueueSize + this.writableLength;
+ }
+
+ get endAfterHeaders() {
+ return this[kState].endAfterHeaders;
+ }
+
+ get sentHeaders() {
+ return this[kSentHeaders];
+ }
+
+ get sentTrailers() {
+ return this[kSentTrailers];
+ }
+
+ get sendInfoHeaders() {
+ return this[kInfoHeaders];
+ }
+
+ get pending(): boolean {
+ return this[kID] === undefined;
+ }
+
+ get id(): number | undefined {
+ return this[kID];
+ }
+
+ get session(): Http2Session {
+ return this[kSession];
+ }
+
+ _onTimeout() {
+ callTimeout(this, kSession);
+ }
+
+ get headersSent() {
+ return !!(this[kState].flags & STREAM_FLAGS_HEADERS_SENT);
+ }
+
+ get aborted() {
+ return !!(this[kState].flags & STREAM_FLAGS_ABORTED);
+ }
+
+ get headRequest() {
+ return !!(this[kState].flags & STREAM_FLAGS_HEAD_REQUEST);
+ }
+
+ get rstCode() {
+ return this[kState].rstCode;
+ }
+
+ get state(): Record<string, unknown> {
+ notImplemented("Http2Stream.state");
+ return {};
+ }
+
+ // [kAfterAsyncWrite]() {}
+
+ // [kWriteGeneric]() {}
+
+ // TODO(bartlomieju): clean up
+ _write(chunk, encoding, callback?: () => void) {
+ debugHttp2(">>> _write", callback);
+ if (typeof encoding === "function") {
+ callback = encoding;
+ encoding = "utf8";
+ }
+ let data;
+ if (typeof encoding === "string") {
+ data = ENCODER.encode(chunk);
+ } else {
+ data = chunk.buffer;
+ }
+
+ this.#requestPromise
+ .then(() => {
+ debugHttp2(">>> _write", this.#rid, data, encoding, callback);
+ return core.opAsync(
+ "op_http2_client_send_data",
+ this.#rid,
+ data,
+ );
+ })
+ .then(() => {
+ callback?.();
+ debugHttp2(
+ "this.writableFinished",
+ this.pending,
+ this.destroyed,
+ this.writableFinished,
+ );
+ })
+ .catch((e) => {
+ callback?.(e);
+ });
+ }
+
+ // TODO(bartlomieju): finish this method
+ _writev(_chunks, _callback?) {
+ notImplemented("ClientHttp2Stream._writev");
+ }
+
+ _final(cb) {
+ debugHttp2("_final", new Error());
+ if (this.pending) {
+ this.once("ready", () => this._final(cb));
+ return;
+ }
+
+ shutdownWritable(this, cb);
+ }
+
+ // TODO(bartlomieju): needs a proper cleanup
+ _read() {
+ if (this.destroyed) {
+ this.push(null);
+ return;
+ }
+
+ if (!this[kState].didRead) {
+ this._readableState.readingMore = false;
+ this[kState].didRead = true;
+ }
+ // if (!this.pending) {
+ // streamOnResume(this);
+ // } else {
+ // this.once("ready", () => streamOnResume(this));
+ // }
+
+ if (!this[kDenoResponse]) {
+ this.once("ready", this._read);
+ return;
+ }
+
+ debugHttp2(">>> read");
+
+ (async () => {
+ const [chunk, finished] = await core.opAsync(
+ "op_http2_client_get_response_body_chunk",
+ this[kDenoResponse].bodyRid,
+ );
+
+ debugHttp2(">>> chunk", chunk, finished, this[kDenoResponse].bodyRid);
+ if (chunk === null) {
+ const trailerList = await core.opAsync(
+ "op_http2_client_get_response_trailers",
+ this[kDenoResponse].bodyRid,
+ );
+ if (trailerList) {
+ const trailers = Object.fromEntries(trailerList);
+ this.emit("trailers", trailers);
+ }
+
+ debugHttp2("tryClose");
+ core.tryClose(this[kDenoResponse].bodyRid);
+ this.push(null);
+ debugHttp2(">>> read null chunk");
+ this[kMaybeDestroy]();
+ return;
+ }
+
+ let result;
+ if (this.#encoding === "utf8") {
+ result = this.push(new TextDecoder().decode(new Uint8Array(chunk)));
+ } else {
+ result = this.push(new Uint8Array(chunk));
+ }
+ debugHttp2(">>> read result", result);
+ })();
+ }
+
+ // TODO(bartlomieju):
+ priority(_options: Record<string, unknown>) {
+ notImplemented("Http2Stream.priority");
+ }
+
+ sendTrailers(trailers: Record<string, unknown>) {
+ debugHttp2("sendTrailers", trailers);
+ if (this.destroyed || this.closed) {
+ throw new ERR_HTTP2_INVALID_STREAM();
+ }
+ if (this[kSentTrailers]) {
+ throw new ERR_HTTP2_TRAILERS_ALREADY_SENT();
+ }
+ if (!this[kState].trailersReady) {
+ throw new ERR_HTTP2_TRAILERS_NOT_READY();
+ }
+
+ trailers = Object.assign({ __proto__: null }, trailers);
+ const trailerList = [];
+ for (const [key, value] of Object.entries(trailers)) {
+ trailerList.push([key, value]);
+ }
+ this[kSentTrailers] = trailers;
+
+ // deno-lint-ignore no-this-alias
+ const stream = this;
+ stream[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS;
+ debugHttp2("sending trailers", this.#rid, trailers);
+
+ core.opAsync(
+ "op_http2_client_send_trailers",
+ this.#rid,
+ trailerList,
+ ).then(() => {
+ stream[kMaybeDestroy]();
+ core.tryClose(this.#rid);
+ }).catch((e) => {
+ debugHttp2(">>> send trailers error", e);
+ core.tryClose(this.#rid);
+ stream._destroy(e);
+ });
+ }
+
+ get closed(): boolean {
+ return !!(this[kState].flags & STREAM_FLAGS_CLOSED);
+ }
+
+ close(code: number = constants.NGHTTP2_NO_ERROR, callback?: () => void) {
+ debugHttp2(">>> close", code, this.closed, callback);
+
+ if (this.closed) {
+ return;
+ }
+ if (typeof callback !== "undefined") {
+ this.once("close", callback);
+ }
+ closeStream(this, code);
+ }
+
+ _destroy(err, callback) {
+ debugHttp2(">>> ClientHttp2Stream._destroy", err, callback);
+ const session = this[kSession];
+ const id = this[kID];
+
+ const state = this[kState];
+ const sessionState = session[kState];
+ const sessionCode = sessionState.goawayCode || sessionState.destroyCode;
+
+ let code = this.closed ? this.rstCode : sessionCode;
+ if (err != null) {
+ if (sessionCode) {
+ code = sessionCode;
+ } else if (err instanceof AbortError) {
+ code = constants.NGHTTP2_CANCEL;
+ } else {
+ code = constants.NGHTTP2_INTERNAL_ERROR;
+ }
+ }
+
+ if (!this.closed) {
+ // TODO(bartlomieju): this not handle `socket handle`
+ closeStream(this, code, kNoRstStream);
+ }
+
+ sessionState.streams.delete(id);
+ sessionState.pendingStreams.delete(this);
+
+ sessionState.writeQueueSize -= state.writeQueueSize;
+ state.writeQueueSize = 0;
+
+ const nameForErrorCode = {};
+ if (
+ err == null && code !== constants.NGHTTP2_NO_ERROR &&
+ code !== constants.NGHTTP2_CANCEL
+ ) {
+ err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code);
+ }
+
+ this[kSession] = undefined;
+
+ session[kMaybeDestroy]();
+ callback(err);
+ }
+
+ [kMaybeDestroy](code = constants.NGHTTP2_NO_ERROR) {
+ debugHttp2(
+ ">>> ClientHttp2Stream[kMaybeDestroy]",
+ code,
+ this.writableFinished,
+ this.readable,
+ this.closed,
+ );
+ if (code !== constants.NGHTTP2_NO_ERROR) {
+ this._destroy();
+ return;
+ }
+
+ if (this.writableFinished) {
+ if (!this.readable && this.closed) {
+ debugHttp2("going into _destroy");
+ this._destroy();
+ return;
+ }
+ }
+ }
+
+ setTimeout(msecs: number, callback?: () => void) {
+ // TODO(bartlomieju): fix this call, it's crashing on `this` being undefined;
+ // some strange transpilation quirk going on here.
+ setStreamTimeout.call(this, msecs, callback);
+ }
+}
+
+function shutdownWritable(stream, callback) {
+ debugHttp2(">>> shutdownWritable", callback);
+ const state = stream[kState];
+ if (state.shutdownWritableCalled) {
+ return callback();
+ }
+ state.shutdownWritableCalled = true;
+ onStreamTrailers(stream);
+ callback();
+ // TODO(bartlomieju): might have to add "finish" event listener here,
+ // check it.
+}
+
+function onStreamTrailers(stream) {
+ stream[kState].trailersReady = true;
+ debugHttp2(">>> onStreamTrailers", stream.destroyed, stream.closed);
+ if (stream.destroyed || stream.closed) {
+ return;
+ }
+ if (!stream.emit("wantTrailers")) {
+ debugHttp2(">>> onStreamTrailers no wantTrailers");
+ stream.sendTrailers({});
+ }
+ debugHttp2(">>> onStreamTrailers wantTrailers");
+}
+
+const kNoRstStream = 0;
+const kSubmitRstStream = 1;
+const kForceRstStream = 2;
+
+function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) {
+ const state = stream[kState];
+ state.flags |= STREAM_FLAGS_CLOSED;
+ state.rstCode = code;
+
+ stream.setTimeout(0);
+ stream.removeAllListeners("timeout");
+
+ const { ending } = stream._writableState;
+
+ if (!ending) {
+ if (!stream.aborted) {
+ state.flags |= STREAM_FLAGS_ABORTED;
+ stream.emit("aborted");
+ }
+
+ stream.end();
+ }
+
+ if (rstStreamStatus != kNoRstStream) {
+ debugHttp2(
+ ">>> closeStream",
+ !ending,
+ stream.writableFinished,
+ code !== constants.NGHTTP2_NO_ERROR,
+ rstStreamStatus === kForceRstStream,
+ );
+ if (
+ !ending || stream.writableFinished ||
+ code !== constants.NGHTTP2_NO_ERROR || rstStreamStatus === kForceRstStream
+ ) {
+ finishCloseStream(stream, code);
+ } else {
+ stream.once("finish", () => finishCloseStream(stream, code));
+ }
+ }
+}
+
+function finishCloseStream(stream, code) {
+ debugHttp2(">>> finishCloseStream", stream.readableEnded, code);
+ if (stream.pending) {
+ stream.push(null);
+ stream.once("ready", () => {
+ core.opAsync(
+ "op_http2_client_reset_stream",
+ stream[kDenoRid],
+ code,
+ ).then(() => {
+ debugHttp2(
+ ">>> finishCloseStream close",
+ stream[kDenoRid],
+ stream[kDenoResponse].bodyRid,
+ );
+ core.tryClose(stream[kDenoRid]);
+ core.tryClose(stream[kDenoResponse].bodyRid);
+ stream.emit("close");
+ });
+ });
+ } else {
+ stream.resume();
+ core.opAsync(
+ "op_http2_client_reset_stream",
+ stream[kDenoRid],
+ code,
+ ).then(() => {
+ debugHttp2(
+ ">>> finishCloseStream close2",
+ stream[kDenoRid],
+ stream[kDenoResponse].bodyRid,
+ );
+ core.tryClose(stream[kDenoRid]);
+ core.tryClose(stream[kDenoResponse].bodyRid);
+ stream.emit("close");
+ }).catch(() => {
+ debugHttp2(
+ ">>> finishCloseStream close2 catch",
+ stream[kDenoRid],
+ stream[kDenoResponse].bodyRid,
+ );
+ core.tryClose(stream[kDenoRid]);
+ core.tryClose(stream[kDenoResponse].bodyRid);
+ stream.emit("close");
+ });
}
}
+function callTimeout() {
+ notImplemented("callTimeout");
+}
+
export class ServerHttp2Stream extends Http2Stream {
_promise: Deferred<Response>;
#body: ReadableStream<Uint8Array>;
@@ -496,17 +1342,17 @@ export class Http2Server extends Server {
this.emit("stream", stream, headers);
return await stream._promise;
} catch (e) {
- console.log("Error in serveHttpOnConnection", e);
+ console.log(">>> Error in serveHttpOnConnection", e);
}
return new Response("");
},
() => {
- console.log("error");
+ console.log(">>> error");
},
() => {},
);
} catch (e) {
- console.log("Error in Http2Server", e);
+ console.log(">>> Error in Http2Server", e);
}
},
);
@@ -602,11 +1448,77 @@ export function connect(
options: Record<string, unknown>,
callback: (session: ClientHttp2Session) => void,
): ClientHttp2Session {
- return new ClientHttp2Session(authority, options, callback);
+ debugHttp2(">>> http2.connect", options);
+
+ if (typeof options === "function") {
+ callback = options;
+ options = undefined;
+ }
+
+ options = { ...options };
+
+ if (typeof authority === "string") {
+ authority = new URL(authority);
+ }
+
+ const protocol = authority.protocol || options.protocol || "https:";
+ let port = 0;
+
+ if (authority.port !== "") {
+ port = Number(authority.port);
+ } else if (protocol === "http:") {
+ port = 80;
+ } else {
+ port = 443;
+ }
+
+ if (port == 0) {
+ throw new Error("Invalid port");
+ }
+
+ let host = "localhost";
+
+ if (authority.hostname) {
+ host = authority.hostname;
+
+ if (host[0] === "[") {
+ host = host.slice(1, -1);
+ }
+ } else if (authority.host) {
+ host = authority.host;
+ }
+
+ // TODO(bartlomieju): handle defaults
+ if (typeof options.createConnection === "function") {
+ console.error("Not implemented: http2.connect.options.createConnection");
+ // notImplemented("http2.connect.options.createConnection");
+ }
+
+ let conn, url;
+ if (protocol == "http:") {
+ conn = Deno.connect({ port, hostname: host });
+ url = `http://${host}${port == 80 ? "" : (":" + port)}`;
+ } else if (protocol == "https:") {
+ conn = Deno.connectTls({ port, hostname: host, alpnProtocols: ["h2"] });
+ url = `http://${host}${port == 443 ? "" : (":" + port)}`;
+ } else {
+ throw new TypeError("Unexpected URL protocol");
+ }
+
+ const session = new ClientHttp2Session(conn, url, options);
+ session[kAuthority] = `${options.servername || host}:${port}`;
+ session[kProtocol] = protocol;
+
+ if (typeof callback === "function") {
+ session.once("connect", callback);
+ }
+ return session;
}
export const constants = {
NGHTTP2_ERR_FRAME_SIZE_ERROR: -522,
+ NGHTTP2_NV_FLAG_NONE: 0,
+ NGHTTP2_NV_FLAG_NO_INDEX: 1,
NGHTTP2_SESSION_SERVER: 0,
NGHTTP2_SESSION_CLIENT: 1,
NGHTTP2_STREAM_STATE_IDLE: 1,
@@ -849,6 +1761,49 @@ export const constants = {
HTTP_STATUS_NETWORK_AUTHENTICATION_REQUIRED: 511,
};
+// const kSingleValueHeaders = new Set([
+// constants.HTTP2_HEADER_STATUS,
+// constants.HTTP2_HEADER_METHOD,
+// constants.HTTP2_HEADER_AUTHORITY,
+// constants.HTTP2_HEADER_SCHEME,
+// constants.HTTP2_HEADER_PATH,
+// constants.HTTP2_HEADER_PROTOCOL,
+// constants.HTTP2_HEADER_ACCESS_CONTROL_ALLOW_CREDENTIALS,
+// constants.HTTP2_HEADER_ACCESS_CONTROL_MAX_AGE,
+// constants.HTTP2_HEADER_ACCESS_CONTROL_REQUEST_METHOD,
+// constants.HTTP2_HEADER_AGE,
+// constants.HTTP2_HEADER_AUTHORIZATION,
+// constants.HTTP2_HEADER_CONTENT_ENCODING,
+// constants.HTTP2_HEADER_CONTENT_LANGUAGE,
+// constants.HTTP2_HEADER_CONTENT_LENGTH,
+// constants.HTTP2_HEADER_CONTENT_LOCATION,
+// constants.HTTP2_HEADER_CONTENT_MD5,
+// constants.HTTP2_HEADER_CONTENT_RANGE,
+// constants.HTTP2_HEADER_CONTENT_TYPE,
+// constants.HTTP2_HEADER_DATE,
+// constants.HTTP2_HEADER_DNT,
+// constants.HTTP2_HEADER_ETAG,
+// constants.HTTP2_HEADER_EXPIRES,
+// constants.HTTP2_HEADER_FROM,
+// constants.HTTP2_HEADER_HOST,
+// constants.HTTP2_HEADER_IF_MATCH,
+// constants.HTTP2_HEADER_IF_MODIFIED_SINCE,
+// constants.HTTP2_HEADER_IF_NONE_MATCH,
+// constants.HTTP2_HEADER_IF_RANGE,
+// constants.HTTP2_HEADER_IF_UNMODIFIED_SINCE,
+// constants.HTTP2_HEADER_LAST_MODIFIED,
+// constants.HTTP2_HEADER_LOCATION,
+// constants.HTTP2_HEADER_MAX_FORWARDS,
+// constants.HTTP2_HEADER_PROXY_AUTHORIZATION,
+// constants.HTTP2_HEADER_RANGE,
+// constants.HTTP2_HEADER_REFERER,
+// constants.HTTP2_HEADER_RETRY_AFTER,
+// constants.HTTP2_HEADER_TK,
+// constants.HTTP2_HEADER_UPGRADE_INSECURE_REQUESTS,
+// constants.HTTP2_HEADER_USER_AGENT,
+// constants.HTTP2_HEADER_X_CONTENT_TYPE_OPTIONS,
+// ]);
+
export function getDefaultSettings(): Record<string, unknown> {
notImplemented("http2.getDefaultSettings");
return {};