diff options
Diffstat (limited to 'ext/node/polyfills/http2.ts')
-rw-r--r-- | ext/node/polyfills/http2.ts | 1169 |
1 files changed, 1062 insertions, 107 deletions
diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts index 20306584f..62dd1a501 100644 --- a/ext/node/polyfills/http2.ts +++ b/ext/node/polyfills/http2.ts @@ -4,79 +4,160 @@ // TODO(petamoriken): enable prefer-primordials for node polyfills // deno-lint-ignore-file prefer-primordials +const core = globalThis.Deno.core; import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts"; import { EventEmitter } from "node:events"; import { Buffer } from "node:buffer"; import { Server, Socket, TCP } from "node:net"; import { TypedArray } from "ext:deno_node/internal/util/types.ts"; -import { setStreamTimeout } from "ext:deno_node/internal/stream_base_commons.ts"; +import { + kMaybeDestroy, + kUpdateTimer, + setStreamTimeout, +} from "ext:deno_node/internal/stream_base_commons.ts"; import { FileHandle } from "node:fs/promises"; import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts"; import { addTrailers, serveHttpOnConnection } from "ext:deno_http/00_serve.js"; import { type Deferred, deferred } from "ext:deno_node/_util/async.ts"; import { nextTick } from "ext:deno_node/_next_tick.ts"; import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; +import { Duplex } from "node:stream"; +import { + AbortError, + ERR_HTTP2_CONNECT_AUTHORITY, + ERR_HTTP2_CONNECT_PATH, + ERR_HTTP2_CONNECT_SCHEME, + ERR_HTTP2_GOAWAY_SESSION, + ERR_HTTP2_INVALID_PSEUDOHEADER, + ERR_HTTP2_INVALID_SESSION, + ERR_HTTP2_INVALID_STREAM, + ERR_HTTP2_SESSION_ERROR, + ERR_HTTP2_STREAM_CANCEL, + ERR_HTTP2_STREAM_ERROR, + ERR_HTTP2_TRAILERS_ALREADY_SENT, + ERR_HTTP2_TRAILERS_NOT_READY, + ERR_INVALID_HTTP_TOKEN, +} from "ext:deno_node/internal/errors.ts"; +import { _checkIsHttpToken } from "ext:deno_node/_http_common.ts"; +import { TcpConn } from "ext:deno_net/01_net.js"; +import { TlsConn } from "ext:deno_net/02_tls.js"; + +const { + op_http2_connect, +} = core.ensureFastOps(); + +const kSession = Symbol("session"); +const kAlpnProtocol = Symbol("alpnProtocol"); +const kAuthority = Symbol("authority"); +const kEncrypted = Symbol("encrypted"); +const kID = Symbol("id"); +const kInit = Symbol("init"); +const kInfoHeaders = Symbol("sent-info-headers"); +const kOrigin = Symbol("origin"); +const kPendingRequestCalls = Symbol("kPendingRequestCalls"); +const kProtocol = Symbol("protocol"); +const kSentHeaders = Symbol("sent-headers"); +const kSentTrailers = Symbol("sent-trailers"); +const kState = Symbol("state"); +const kType = Symbol("type"); +const kTimeout = Symbol("timeout"); + +const kDenoResponse = Symbol("kDenoResponse"); +const kDenoRid = Symbol("kDenoRid"); +const kDenoClientRid = Symbol("kDenoClientRid"); +const kDenoConnRid = Symbol("kDenoConnRid"); + +const STREAM_FLAGS_PENDING = 0x0; +const STREAM_FLAGS_READY = 0x1; +const STREAM_FLAGS_CLOSED = 0x2; +const STREAM_FLAGS_HEADERS_SENT = 0x4; +const STREAM_FLAGS_HEAD_REQUEST = 0x8; +const STREAM_FLAGS_ABORTED = 0x10; +const STREAM_FLAGS_HAS_TRAILERS = 0x20; + +const SESSION_FLAGS_PENDING = 0x0; +const SESSION_FLAGS_READY = 0x1; +const SESSION_FLAGS_CLOSED = 0x2; +const SESSION_FLAGS_DESTROYED = 0x4; const ENCODER = new TextEncoder(); type Http2Headers = Record<string, string | string[]>; +const debugHttp2Enabled = false; +function debugHttp2(...args) { + if (debugHttp2Enabled) { + console.log(...args); + } +} + export class Http2Session extends EventEmitter { - constructor() { + constructor(type, _options /* socket */) { super(); - } - get alpnProtocol(): string | undefined { - notImplemented("Http2Session.alpnProtocol"); - return undefined; - } + // TODO(bartlomieju): Handle sockets here - close(_callback?: () => void) { - warnNotImplemented("Http2Session.close"); - } + this[kState] = { + destroyCode: constants.NGHTTP2_NO_ERROR, + flags: SESSION_FLAGS_PENDING, + goawayCode: null, + goawayLastStreamID: null, + streams: new Map(), + pendingStreams: new Set(), + pendingAck: 0, + writeQueueSize: 0, + originSet: undefined, + }; - get closed(): boolean { - return false; + this[kEncrypted] = undefined; + this[kAlpnProtocol] = undefined; + this[kType] = type; + this[kTimeout] = null; + // this[kProxySocket] = null; + // this[kSocket] = socket; + // this[kHandle] = undefined; + + // TODO(bartlomieju): connecting via socket } - get connecting(): boolean { - notImplemented("Http2Session.connecting"); - return false; + get encrypted(): boolean { + return this[kEncrypted]; } - destroy(_error?: Error, _code?: number) { - notImplemented("Http2Session.destroy"); + get alpnProtocol(): string | undefined { + return this[kAlpnProtocol]; } - get destroyed(): boolean { - return false; + get originSet(): string[] | undefined { + if (!this.encrypted || this.destroyed) { + return undefined; + } + // TODO(bartlomieju): + return []; } - get encrypted(): boolean { - notImplemented("Http2Session.encrypted"); - return false; + get connecting(): boolean { + return (this[kState].flags & SESSION_FLAGS_READY) === 0; } - goaway( - _code: number, - _lastStreamID: number, - _opaqueData: Buffer | TypedArray | DataView, - ) { - notImplemented("Http2Session.goaway"); + get closed(): boolean { + return !!(this[kState].flags & SESSION_FLAGS_CLOSED); } - get localSettings(): Record<string, unknown> { - notImplemented("Http2Session.localSettings"); - return {}; + get destroyed(): boolean { + return !!(this[kState].flags & SESSION_FLAGS_DESTROYED); } - get originSet(): string[] | undefined { - notImplemented("Http2Session.originSet"); - return undefined; + [kUpdateTimer]() { + if (this.destroyed) { + return; + } + if (this[kTimeout]) { + this[kTimeout].refresh(); + } } - get pendingSettingsAck(): boolean { - notImplemented("Http2Session.pendingSettingsAck"); - return false; + setLocalWindowSize(_windowSize: number) { + notImplemented("Http2Session.setLocalWindowSize"); } ping( @@ -87,28 +168,30 @@ export class Http2Session extends EventEmitter { return false; } - ref() { - warnNotImplemented("Http2Session.ref"); + get socket(): Socket /*| TlsSocket*/ { + warnNotImplemented("Http2Session.socket"); + return {}; } - get remoteSettings(): Record<string, unknown> { - notImplemented("Http2Session.remoteSettings"); - return {}; + get type(): number { + return this[kType]; } - setLocalWindowSize(_windowSize: number) { - notImplemented("Http2Session.setLocalWindowSize"); + get pendingSettingsAck() { + return this[kState].pendingAck > 0; } - setTimeout(msecs: number, callback?: () => void) { - setStreamTimeout(this, msecs, callback); + get state(): Record<string, unknown> { + return {}; } - get socket(): Socket /*| TlsSocket*/ { + get localSettings(): Record<string, unknown> { + notImplemented("Http2Session.localSettings"); return {}; } - get state(): Record<string, unknown> { + get remoteSettings(): Record<string, unknown> { + notImplemented("Http2Session.remoteSettings"); return {}; } @@ -116,19 +199,118 @@ export class Http2Session extends EventEmitter { notImplemented("Http2Session.settings"); } - get type(): number { - notImplemented("Http2Session.type"); - return 0; + goaway( + _code: number, + _lastStreamID: number, + _opaqueData: Buffer | TypedArray | DataView, + ) { + warnNotImplemented("Http2Session.goaway"); + core.tryClose(this[kDenoConnRid]); + core.tryClose(this[kDenoClientRid]); + } + + destroy(error = constants.NGHTTP2_NO_ERROR, code?: number) { + if (this.destroyed) { + return; + } + + if (typeof error === "number") { + code = error; + error = code !== constants.NGHTTP2_NO_ERROR + ? new ERR_HTTP2_SESSION_ERROR(code) + : undefined; + } + if (code === undefined && error != null) { + code = constants.NGHTTP2_INTERNAL_ERROR; + } + + closeSession(this, code, error); + } + + close(callback?: () => void) { + if (this.closed || this.destroyed) { + return; + } + + this[kState].flags |= SESSION_FLAGS_CLOSED; + if (typeof callback === "function") { + this.once("close", callback); + } + this.goaway(); + this[kMaybeDestroy](); + } + + [kMaybeDestroy](error?: number) { + if (!error) { + const state = this[kState]; + // Don't destroy if the session is not closed or there are pending or open + // streams. + if ( + !this.closed || state.streams.size > 0 || state.pendingStreams.size > + 0 + ) { + return; + } + } + this.destroy(error); + } + + ref() { + warnNotImplemented("Http2Session.ref"); } unref() { warnNotImplemented("Http2Session.unref"); } + + setTimeout(msecs: number, callback?: () => void) { + setStreamTimeout(this, msecs, callback); + } +} + +function emitClose(session: Http2Session, error?: Error) { + if (error) { + session.emit("error", error); + } + session.emit("close"); +} + +function finishSessionClose(session: Http2Session, error?: Error) { + // TODO(bartlomieju): handle sockets + + nextTick(emitClose, session, error); +} + +function closeSession(session: Http2Session, code?: number, error?: Error) { + const state = session[kState]; + state.flags |= SESSION_FLAGS_DESTROYED; + state.destroyCode = code; + + session.setTimeout(0); + session.removeAllListeners("timeout"); + + // Destroy open and pending streams + if (state.pendingStreams.size > 0 || state.streams.size > 0) { + const cancel = new ERR_HTTP2_STREAM_CANCEL(error); + state.pendingStreams.forEach((stream) => stream.destroy(cancel)); + state.streams.forEach((stream) => stream.destroy(cancel)); + } + + // TODO(bartlomieju): handle sockets + debugHttp2( + ">>> closeSession", + session[kDenoConnRid], + session[kDenoClientRid], + ); + core.tryClose(session[kDenoConnRid]); + core.tryClose(session[kDenoClientRid]); + + finishSessionClose(session, error); } export class ServerHttp2Session extends Http2Session { constructor() { - super(); + super(constants.NGHTTP2_SESSION_SERVER, {}); } altsvc( @@ -143,71 +325,184 @@ export class ServerHttp2Session extends Http2Session { } } +function assertValidPseudoHeader(header: string) { + switch (header) { + case ":authority": + case ":path": + case ":method": + case ":scheme": + case ":status": + return; + default: + throw new ERR_HTTP2_INVALID_PSEUDOHEADER(header); + } +} + export class ClientHttp2Session extends Http2Session { + #connectPromise: Promise<void>; + constructor( - _authority: string | URL, - _options: Record<string, unknown>, - callback: (session: Http2Session) => void, + connPromise: Promise<TcpConn> | Promise<TlsConn>, + url: string, + options: Record<string, unknown>, ) { - super(); - if (callback) { - this.on("connect", callback); - } - nextTick(() => this.emit("connect", this)); + super(constants.NGHTTP2_SESSION_CLIENT, options); + this[kPendingRequestCalls] = null; + this[kDenoClientRid] = undefined; + this[kDenoConnRid] = undefined; + + // TODO(bartlomieju): cleanup + this.#connectPromise = (async () => { + debugHttp2(">>> before connect"); + const conn = await connPromise; + const [clientRid, connRid] = await op_http2_connect(conn.rid, url); + debugHttp2(">>> after connect"); + this[kDenoClientRid] = clientRid; + this[kDenoConnRid] = connRid; + // TODO(bartlomieju): save this promise, so the session can be unrefed + (async () => { + try { + await core.opAsync( + "op_http2_poll_client_connection", + this[kDenoConnRid], + ); + } catch (e) { + this.emit("error", e); + } + })(); + this.emit("connect", this, {}); + })(); } request( headers: Http2Headers, - _options?: Record<string, unknown>, + options?: Record<string, unknown>, ): ClientHttp2Stream { - const reqHeaders: string[][] = []; - const controllerPromise: Deferred< - ReadableStreamDefaultController<Uint8Array> - > = deferred(); - const body = new ReadableStream({ - start(controller) { - controllerPromise.resolve(controller); - }, - }); - const request: RequestInit = { headers: reqHeaders, body }; - let authority = null; - let path = null; - for (const [name, value] of Object.entries(headers)) { - if (name == constants.HTTP2_HEADER_PATH) { - path = String(value); - } else if (name == constants.HTTP2_HEADER_METHOD) { - request.method = String(value); - } else if (name == constants.HTTP2_HEADER_AUTHORITY) { - authority = String(value); - } else { - reqHeaders.push([name, String(value)]); + if (this.destroyed) { + throw new ERR_HTTP2_INVALID_SESSION(); + } + + if (this.closed) { + throw new ERR_HTTP2_GOAWAY_SESSION(); + } + + this[kUpdateTimer](); + if (headers !== null && headers !== undefined) { + const keys = Object.keys(headers); + for (let i = 0; i < keys.length; i++) { + const header = keys[i]; + if (header[0] === ":") { + assertValidPseudoHeader(header); + } else if (header && !_checkIsHttpToken(header)) { + this.destroy(new ERR_INVALID_HTTP_TOKEN("Header name", header)); + } } } - const fetchPromise = fetch(`http://${authority}${path}`, request); - const readerPromise = deferred(); - const headersPromise = deferred(); - (async () => { - const fetch = await fetchPromise; - readerPromise.resolve(fetch.body); + headers = Object.assign({ __proto__: null }, headers); + options = { ...options }; + + if (headers[constants.HTTP2_HEADER_METHOD] === undefined) { + headers[constants.HTTP2_HEADER_METHOD] = constants.HTTP2_METHOD_GET; + } + + const connect = + headers[constants.HTTP2_HEADER_METHOD] === constants.HTTP2_METHOD_CONNECT; - const headers: Http2Headers = {}; - for (const [key, value] of fetch.headers) { - headers[key] = value; + if (!connect || headers[constants.HTTP2_HEADER_PROTOCOL] !== undefined) { + if (getAuthority(headers) === undefined) { + headers[constants.HTTP2_HEADER_AUTHORITY] = this[kAuthority]; + } + if (headers[constants.HTTP2_HEADER_SCHEME] === undefined) { + headers[constants.HTTP2_HEADER_SCHEME] = this[kProtocol].slice(0, -1); + } + if (headers[constants.HTTP2_HEADER_PATH] === undefined) { + headers[constants.HTTP2_HEADER_PATH] = "/"; + } + } else { + if (headers[constants.HTTP2_HEADER_AUTHORITY] === undefined) { + throw new ERR_HTTP2_CONNECT_AUTHORITY(); } - headers[constants.HTTP2_HEADER_STATUS] = String(fetch.status); + if (headers[constants.HTTP2_HEADER_SCHEME] === undefined) { + throw new ERR_HTTP2_CONNECT_SCHEME(); + } + if (headers[constants.HTTP2_HEADER_PATH] === undefined) { + throw new ERR_HTTP2_CONNECT_PATH(); + } + } - headersPromise.resolve(headers); - })(); - return new ClientHttp2Stream( + if (options.endStream === undefined) { + const method = headers[constants.HTTP2_HEADER_METHOD]; + options.endStream = method === constants.HTTP2_METHOD_DELETE || + method === constants.HTTP2_METHOD_GET || + method === constants.HTTP2_METHOD_HEAD; + } else { + options.endStream = !!options.endStream; + } + + const stream = new ClientHttp2Stream( + options, this, - headersPromise, - controllerPromise, - readerPromise, + this.#connectPromise, + headers, ); + stream[kSentHeaders] = headers; + stream[kOrigin] = `${headers[constants.HTTP2_HEADER_SCHEME]}://${ + getAuthority(headers) + }`; + + if (options.endStream) { + stream.end(); + } + + if (options.waitForTrailers) { + stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS; + } + + const { signal } = options; + if (signal) { + const aborter = () => { + stream.destroy(new AbortError(undefined, { cause: signal.reason })); + }; + if (signal.aborted) { + aborter(); + } else { + // TODO(bartlomieju): handle this + // const disposable = EventEmitter.addAbortListener(signal, aborter); + // stream.once("close", disposable[Symbol.dispose]); + } + } + + // TODO(bartlomieju): handle this + const onConnect = () => {}; + if (this.connecting) { + if (this[kPendingRequestCalls] !== null) { + this[kPendingRequestCalls].push(onConnect); + } else { + this[kPendingRequestCalls] = [onConnect]; + this.once("connect", () => { + this[kPendingRequestCalls].forEach((f) => f()); + this[kPendingRequestCalls] = null; + }); + } + } else { + onConnect(); + } + + return stream; } } +function getAuthority(headers) { + if (headers[constants.HTTP2_HEADER_AUTHORITY] !== undefined) { + return headers[constants.HTTP2_HEADER_AUTHORITY]; + } + if (headers[constants.HTTP2_HEADER_HOST] !== undefined) { + return headers[constants.HTTP2_HEADER_HOST]; + } + return undefined; +} + export class Http2Stream extends EventEmitter { #session: Http2Session; #headers: Deferred<Http2Headers>; @@ -265,6 +560,8 @@ export class Http2Stream extends EventEmitter { })(); } + setEncoding(_encoding) {} + resume() { } @@ -351,17 +648,566 @@ export class Http2Stream extends EventEmitter { } } -export class ClientHttp2Stream extends Http2Stream { +async function clientHttp2Request( + session, + sessionConnectPromise, + headers, + options, +) { + debugHttp2( + ">>> waiting for connect promise", + sessionConnectPromise, + headers, + options, + ); + await sessionConnectPromise; + + const reqHeaders: string[][] = []; + const pseudoHeaders = {}; + + for (const [key, value] of Object.entries(headers)) { + if (key[0] === ":") { + pseudoHeaders[key] = value; + } else { + reqHeaders.push([key, Array.isArray(value) ? value[0] : value]); + } + } + debugHttp2( + "waited for connect promise", + !!options.waitForTrailers, + pseudoHeaders, + reqHeaders, + ); + + return await core.opAsync( + "op_http2_client_request", + session[kDenoClientRid], + pseudoHeaders, + reqHeaders, + ); +} + +export class ClientHttp2Stream extends Duplex { + #requestPromise: Promise<[number, number]>; + #responsePromise: Promise<void>; + #rid: number | undefined = undefined; + #encoding = "utf8"; + constructor( + options: Record<string, unknown>, session: Http2Session, - headers: Promise<Http2Headers>, - controllerPromise: Deferred<ReadableStreamDefaultController<Uint8Array>>, - readerPromise: Deferred<ReadableStream<Uint8Array>>, + sessionConnectPromise: Promise<void>, + headers: Record<string, string>, ) { - super(session, headers, controllerPromise, readerPromise); + options.allowHalfOpen = true; + options.decodeString = false; + options.autoDestroy = false; + super(options); + this.cork(); + this[kSession] = session; + session[kState].pendingStreams.add(this); + + this._readableState.readingMore = true; + + this[kState] = { + didRead: false, + flags: STREAM_FLAGS_PENDING | STREAM_FLAGS_HEADERS_SENT, + rstCode: constants.NGHTTP2_NO_ERROR, + writeQueueSize: 0, + trailersReady: false, + endAfterHeaders: false, + shutdownWritableCalled: false, + }; + this[kDenoResponse] = undefined; + this[kDenoRid] = undefined; + + this.#requestPromise = clientHttp2Request( + session, + sessionConnectPromise, + headers, + options, + ); + debugHttp2(">>> created clienthttp2stream"); + // TODO(bartlomieju): save it so we can unref + this.#responsePromise = (async () => { + debugHttp2(">>> before request promise", session[kDenoClientRid]); + const [streamRid, streamId] = await this.#requestPromise; + this.#rid = streamRid; + this[kDenoRid] = streamRid; + this[kInit](streamId); + debugHttp2( + ">>> after request promise", + session[kDenoClientRid], + this.#rid, + ); + const response = await core.opAsync( + "op_http2_client_get_response", + this.#rid, + ); + debugHttp2(">>> after get response", response); + const headers = { + ":status": response.statusCode, + ...Object.fromEntries(response.headers), + }; + debugHttp2(">>> emitting response", headers); + this.emit("response", headers, 0); + this[kDenoResponse] = response; + this.emit("ready"); + })(); + } + + [kUpdateTimer]() { + if (this.destroyed) { + return; + } + if (this[kTimeout]) { + this[kTimeout].refresh(); + } + if (this[kSession]) { + this[kSession][kUpdateTimer](); + } + } + + [kInit](id) { + const state = this[kState]; + state.flags |= STREAM_FLAGS_READY; + + const session = this[kSession]; + session[kState].pendingStreams.delete(this); + session[kState].streams.set(id, this); + + // TODO(bartlomieju): handle socket handle + + this[kID] = id; + this.uncork(); + this.emit("ready"); + } + + get bufferSize() { + return this[kState].writeQueueSize + this.writableLength; + } + + get endAfterHeaders() { + return this[kState].endAfterHeaders; + } + + get sentHeaders() { + return this[kSentHeaders]; + } + + get sentTrailers() { + return this[kSentTrailers]; + } + + get sendInfoHeaders() { + return this[kInfoHeaders]; + } + + get pending(): boolean { + return this[kID] === undefined; + } + + get id(): number | undefined { + return this[kID]; + } + + get session(): Http2Session { + return this[kSession]; + } + + _onTimeout() { + callTimeout(this, kSession); + } + + get headersSent() { + return !!(this[kState].flags & STREAM_FLAGS_HEADERS_SENT); + } + + get aborted() { + return !!(this[kState].flags & STREAM_FLAGS_ABORTED); + } + + get headRequest() { + return !!(this[kState].flags & STREAM_FLAGS_HEAD_REQUEST); + } + + get rstCode() { + return this[kState].rstCode; + } + + get state(): Record<string, unknown> { + notImplemented("Http2Stream.state"); + return {}; + } + + // [kAfterAsyncWrite]() {} + + // [kWriteGeneric]() {} + + // TODO(bartlomieju): clean up + _write(chunk, encoding, callback?: () => void) { + debugHttp2(">>> _write", callback); + if (typeof encoding === "function") { + callback = encoding; + encoding = "utf8"; + } + let data; + if (typeof encoding === "string") { + data = ENCODER.encode(chunk); + } else { + data = chunk.buffer; + } + + this.#requestPromise + .then(() => { + debugHttp2(">>> _write", this.#rid, data, encoding, callback); + return core.opAsync( + "op_http2_client_send_data", + this.#rid, + data, + ); + }) + .then(() => { + callback?.(); + debugHttp2( + "this.writableFinished", + this.pending, + this.destroyed, + this.writableFinished, + ); + }) + .catch((e) => { + callback?.(e); + }); + } + + // TODO(bartlomieju): finish this method + _writev(_chunks, _callback?) { + notImplemented("ClientHttp2Stream._writev"); + } + + _final(cb) { + debugHttp2("_final", new Error()); + if (this.pending) { + this.once("ready", () => this._final(cb)); + return; + } + + shutdownWritable(this, cb); + } + + // TODO(bartlomieju): needs a proper cleanup + _read() { + if (this.destroyed) { + this.push(null); + return; + } + + if (!this[kState].didRead) { + this._readableState.readingMore = false; + this[kState].didRead = true; + } + // if (!this.pending) { + // streamOnResume(this); + // } else { + // this.once("ready", () => streamOnResume(this)); + // } + + if (!this[kDenoResponse]) { + this.once("ready", this._read); + return; + } + + debugHttp2(">>> read"); + + (async () => { + const [chunk, finished] = await core.opAsync( + "op_http2_client_get_response_body_chunk", + this[kDenoResponse].bodyRid, + ); + + debugHttp2(">>> chunk", chunk, finished, this[kDenoResponse].bodyRid); + if (chunk === null) { + const trailerList = await core.opAsync( + "op_http2_client_get_response_trailers", + this[kDenoResponse].bodyRid, + ); + if (trailerList) { + const trailers = Object.fromEntries(trailerList); + this.emit("trailers", trailers); + } + + debugHttp2("tryClose"); + core.tryClose(this[kDenoResponse].bodyRid); + this.push(null); + debugHttp2(">>> read null chunk"); + this[kMaybeDestroy](); + return; + } + + let result; + if (this.#encoding === "utf8") { + result = this.push(new TextDecoder().decode(new Uint8Array(chunk))); + } else { + result = this.push(new Uint8Array(chunk)); + } + debugHttp2(">>> read result", result); + })(); + } + + // TODO(bartlomieju): + priority(_options: Record<string, unknown>) { + notImplemented("Http2Stream.priority"); + } + + sendTrailers(trailers: Record<string, unknown>) { + debugHttp2("sendTrailers", trailers); + if (this.destroyed || this.closed) { + throw new ERR_HTTP2_INVALID_STREAM(); + } + if (this[kSentTrailers]) { + throw new ERR_HTTP2_TRAILERS_ALREADY_SENT(); + } + if (!this[kState].trailersReady) { + throw new ERR_HTTP2_TRAILERS_NOT_READY(); + } + + trailers = Object.assign({ __proto__: null }, trailers); + const trailerList = []; + for (const [key, value] of Object.entries(trailers)) { + trailerList.push([key, value]); + } + this[kSentTrailers] = trailers; + + // deno-lint-ignore no-this-alias + const stream = this; + stream[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS; + debugHttp2("sending trailers", this.#rid, trailers); + + core.opAsync( + "op_http2_client_send_trailers", + this.#rid, + trailerList, + ).then(() => { + stream[kMaybeDestroy](); + core.tryClose(this.#rid); + }).catch((e) => { + debugHttp2(">>> send trailers error", e); + core.tryClose(this.#rid); + stream._destroy(e); + }); + } + + get closed(): boolean { + return !!(this[kState].flags & STREAM_FLAGS_CLOSED); + } + + close(code: number = constants.NGHTTP2_NO_ERROR, callback?: () => void) { + debugHttp2(">>> close", code, this.closed, callback); + + if (this.closed) { + return; + } + if (typeof callback !== "undefined") { + this.once("close", callback); + } + closeStream(this, code); + } + + _destroy(err, callback) { + debugHttp2(">>> ClientHttp2Stream._destroy", err, callback); + const session = this[kSession]; + const id = this[kID]; + + const state = this[kState]; + const sessionState = session[kState]; + const sessionCode = sessionState.goawayCode || sessionState.destroyCode; + + let code = this.closed ? this.rstCode : sessionCode; + if (err != null) { + if (sessionCode) { + code = sessionCode; + } else if (err instanceof AbortError) { + code = constants.NGHTTP2_CANCEL; + } else { + code = constants.NGHTTP2_INTERNAL_ERROR; + } + } + + if (!this.closed) { + // TODO(bartlomieju): this not handle `socket handle` + closeStream(this, code, kNoRstStream); + } + + sessionState.streams.delete(id); + sessionState.pendingStreams.delete(this); + + sessionState.writeQueueSize -= state.writeQueueSize; + state.writeQueueSize = 0; + + const nameForErrorCode = {}; + if ( + err == null && code !== constants.NGHTTP2_NO_ERROR && + code !== constants.NGHTTP2_CANCEL + ) { + err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code); + } + + this[kSession] = undefined; + + session[kMaybeDestroy](); + callback(err); + } + + [kMaybeDestroy](code = constants.NGHTTP2_NO_ERROR) { + debugHttp2( + ">>> ClientHttp2Stream[kMaybeDestroy]", + code, + this.writableFinished, + this.readable, + this.closed, + ); + if (code !== constants.NGHTTP2_NO_ERROR) { + this._destroy(); + return; + } + + if (this.writableFinished) { + if (!this.readable && this.closed) { + debugHttp2("going into _destroy"); + this._destroy(); + return; + } + } + } + + setTimeout(msecs: number, callback?: () => void) { + // TODO(bartlomieju): fix this call, it's crashing on `this` being undefined; + // some strange transpilation quirk going on here. + setStreamTimeout.call(this, msecs, callback); + } +} + +function shutdownWritable(stream, callback) { + debugHttp2(">>> shutdownWritable", callback); + const state = stream[kState]; + if (state.shutdownWritableCalled) { + return callback(); + } + state.shutdownWritableCalled = true; + onStreamTrailers(stream); + callback(); + // TODO(bartlomieju): might have to add "finish" event listener here, + // check it. +} + +function onStreamTrailers(stream) { + stream[kState].trailersReady = true; + debugHttp2(">>> onStreamTrailers", stream.destroyed, stream.closed); + if (stream.destroyed || stream.closed) { + return; + } + if (!stream.emit("wantTrailers")) { + debugHttp2(">>> onStreamTrailers no wantTrailers"); + stream.sendTrailers({}); + } + debugHttp2(">>> onStreamTrailers wantTrailers"); +} + +const kNoRstStream = 0; +const kSubmitRstStream = 1; +const kForceRstStream = 2; + +function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) { + const state = stream[kState]; + state.flags |= STREAM_FLAGS_CLOSED; + state.rstCode = code; + + stream.setTimeout(0); + stream.removeAllListeners("timeout"); + + const { ending } = stream._writableState; + + if (!ending) { + if (!stream.aborted) { + state.flags |= STREAM_FLAGS_ABORTED; + stream.emit("aborted"); + } + + stream.end(); + } + + if (rstStreamStatus != kNoRstStream) { + debugHttp2( + ">>> closeStream", + !ending, + stream.writableFinished, + code !== constants.NGHTTP2_NO_ERROR, + rstStreamStatus === kForceRstStream, + ); + if ( + !ending || stream.writableFinished || + code !== constants.NGHTTP2_NO_ERROR || rstStreamStatus === kForceRstStream + ) { + finishCloseStream(stream, code); + } else { + stream.once("finish", () => finishCloseStream(stream, code)); + } + } +} + +function finishCloseStream(stream, code) { + debugHttp2(">>> finishCloseStream", stream.readableEnded, code); + if (stream.pending) { + stream.push(null); + stream.once("ready", () => { + core.opAsync( + "op_http2_client_reset_stream", + stream[kDenoRid], + code, + ).then(() => { + debugHttp2( + ">>> finishCloseStream close", + stream[kDenoRid], + stream[kDenoResponse].bodyRid, + ); + core.tryClose(stream[kDenoRid]); + core.tryClose(stream[kDenoResponse].bodyRid); + stream.emit("close"); + }); + }); + } else { + stream.resume(); + core.opAsync( + "op_http2_client_reset_stream", + stream[kDenoRid], + code, + ).then(() => { + debugHttp2( + ">>> finishCloseStream close2", + stream[kDenoRid], + stream[kDenoResponse].bodyRid, + ); + core.tryClose(stream[kDenoRid]); + core.tryClose(stream[kDenoResponse].bodyRid); + stream.emit("close"); + }).catch(() => { + debugHttp2( + ">>> finishCloseStream close2 catch", + stream[kDenoRid], + stream[kDenoResponse].bodyRid, + ); + core.tryClose(stream[kDenoRid]); + core.tryClose(stream[kDenoResponse].bodyRid); + stream.emit("close"); + }); } } +function callTimeout() { + notImplemented("callTimeout"); +} + export class ServerHttp2Stream extends Http2Stream { _promise: Deferred<Response>; #body: ReadableStream<Uint8Array>; @@ -496,17 +1342,17 @@ export class Http2Server extends Server { this.emit("stream", stream, headers); return await stream._promise; } catch (e) { - console.log("Error in serveHttpOnConnection", e); + console.log(">>> Error in serveHttpOnConnection", e); } return new Response(""); }, () => { - console.log("error"); + console.log(">>> error"); }, () => {}, ); } catch (e) { - console.log("Error in Http2Server", e); + console.log(">>> Error in Http2Server", e); } }, ); @@ -602,11 +1448,77 @@ export function connect( options: Record<string, unknown>, callback: (session: ClientHttp2Session) => void, ): ClientHttp2Session { - return new ClientHttp2Session(authority, options, callback); + debugHttp2(">>> http2.connect", options); + + if (typeof options === "function") { + callback = options; + options = undefined; + } + + options = { ...options }; + + if (typeof authority === "string") { + authority = new URL(authority); + } + + const protocol = authority.protocol || options.protocol || "https:"; + let port = 0; + + if (authority.port !== "") { + port = Number(authority.port); + } else if (protocol === "http:") { + port = 80; + } else { + port = 443; + } + + if (port == 0) { + throw new Error("Invalid port"); + } + + let host = "localhost"; + + if (authority.hostname) { + host = authority.hostname; + + if (host[0] === "[") { + host = host.slice(1, -1); + } + } else if (authority.host) { + host = authority.host; + } + + // TODO(bartlomieju): handle defaults + if (typeof options.createConnection === "function") { + console.error("Not implemented: http2.connect.options.createConnection"); + // notImplemented("http2.connect.options.createConnection"); + } + + let conn, url; + if (protocol == "http:") { + conn = Deno.connect({ port, hostname: host }); + url = `http://${host}${port == 80 ? "" : (":" + port)}`; + } else if (protocol == "https:") { + conn = Deno.connectTls({ port, hostname: host, alpnProtocols: ["h2"] }); + url = `http://${host}${port == 443 ? "" : (":" + port)}`; + } else { + throw new TypeError("Unexpected URL protocol"); + } + + const session = new ClientHttp2Session(conn, url, options); + session[kAuthority] = `${options.servername || host}:${port}`; + session[kProtocol] = protocol; + + if (typeof callback === "function") { + session.once("connect", callback); + } + return session; } export const constants = { NGHTTP2_ERR_FRAME_SIZE_ERROR: -522, + NGHTTP2_NV_FLAG_NONE: 0, + NGHTTP2_NV_FLAG_NO_INDEX: 1, NGHTTP2_SESSION_SERVER: 0, NGHTTP2_SESSION_CLIENT: 1, NGHTTP2_STREAM_STATE_IDLE: 1, @@ -849,6 +1761,49 @@ export const constants = { HTTP_STATUS_NETWORK_AUTHENTICATION_REQUIRED: 511, }; +// const kSingleValueHeaders = new Set([ +// constants.HTTP2_HEADER_STATUS, +// constants.HTTP2_HEADER_METHOD, +// constants.HTTP2_HEADER_AUTHORITY, +// constants.HTTP2_HEADER_SCHEME, +// constants.HTTP2_HEADER_PATH, +// constants.HTTP2_HEADER_PROTOCOL, +// constants.HTTP2_HEADER_ACCESS_CONTROL_ALLOW_CREDENTIALS, +// constants.HTTP2_HEADER_ACCESS_CONTROL_MAX_AGE, +// constants.HTTP2_HEADER_ACCESS_CONTROL_REQUEST_METHOD, +// constants.HTTP2_HEADER_AGE, +// constants.HTTP2_HEADER_AUTHORIZATION, +// constants.HTTP2_HEADER_CONTENT_ENCODING, +// constants.HTTP2_HEADER_CONTENT_LANGUAGE, +// constants.HTTP2_HEADER_CONTENT_LENGTH, +// constants.HTTP2_HEADER_CONTENT_LOCATION, +// constants.HTTP2_HEADER_CONTENT_MD5, +// constants.HTTP2_HEADER_CONTENT_RANGE, +// constants.HTTP2_HEADER_CONTENT_TYPE, +// constants.HTTP2_HEADER_DATE, +// constants.HTTP2_HEADER_DNT, +// constants.HTTP2_HEADER_ETAG, +// constants.HTTP2_HEADER_EXPIRES, +// constants.HTTP2_HEADER_FROM, +// constants.HTTP2_HEADER_HOST, +// constants.HTTP2_HEADER_IF_MATCH, +// constants.HTTP2_HEADER_IF_MODIFIED_SINCE, +// constants.HTTP2_HEADER_IF_NONE_MATCH, +// constants.HTTP2_HEADER_IF_RANGE, +// constants.HTTP2_HEADER_IF_UNMODIFIED_SINCE, +// constants.HTTP2_HEADER_LAST_MODIFIED, +// constants.HTTP2_HEADER_LOCATION, +// constants.HTTP2_HEADER_MAX_FORWARDS, +// constants.HTTP2_HEADER_PROXY_AUTHORIZATION, +// constants.HTTP2_HEADER_RANGE, +// constants.HTTP2_HEADER_REFERER, +// constants.HTTP2_HEADER_RETRY_AFTER, +// constants.HTTP2_HEADER_TK, +// constants.HTTP2_HEADER_UPGRADE_INSECURE_REQUESTS, +// constants.HTTP2_HEADER_USER_AGENT, +// constants.HTTP2_HEADER_X_CONTENT_TYPE_OPTIONS, +// ]); + export function getDefaultSettings(): Record<string, unknown> { notImplemented("http2.getDefaultSettings"); return {}; |