summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-06-06 04:29:55 -0600
committerGitHub <noreply@github.com>2023-06-06 12:29:55 +0200
commit42991017e9af59d6a5cb6b523228c62f1c32380e (patch)
tree7b32f2472c28b398717cb8bbb62e9583203ea052
parent2052ba343c0b222cf638e32f15622a237e423317 (diff)
feat(ext/node): Very basic node:http2 support (#19344)
This commit adds basic support for "node:http2" module. Not all APIs have been yet implemented, but this change already allows to use this module for some basic functions. The "grpc" package is still not working, but it's a good stepping stone. --------- Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
-rw-r--r--cli/tests/integration/node_unit_tests.rs1
-rw-r--r--cli/tests/unit_node/http2_test.ts104
-rw-r--r--ext/http/00_serve.js8
-rw-r--r--ext/node/polyfills/http2.ts398
-rw-r--r--ext/node/polyfills/net.ts34
5 files changed, 451 insertions, 94 deletions
diff --git a/cli/tests/integration/node_unit_tests.rs b/cli/tests/integration/node_unit_tests.rs
index f62c8761c..363e5dfa3 100644
--- a/cli/tests/integration/node_unit_tests.rs
+++ b/cli/tests/integration/node_unit_tests.rs
@@ -53,6 +53,7 @@ util::unit_test_factory!(
crypto_sign_test = crypto / crypto_sign_test,
fs_test,
http_test,
+ http2_test,
_randomBytes_test = internal / _randomBytes_test,
_randomFill_test = internal / _randomFill_test,
_randomInt_test = internal / _randomInt_test,
diff --git a/cli/tests/unit_node/http2_test.ts b/cli/tests/unit_node/http2_test.ts
new file mode 100644
index 000000000..543543cbd
--- /dev/null
+++ b/cli/tests/unit_node/http2_test.ts
@@ -0,0 +1,104 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+import * as http2 from "node:http2";
+import * as net from "node:net";
+import { deferred } from "../../../test_util/std/async/deferred.ts";
+import { assertEquals } from "https://deno.land/std@v0.42.0/testing/asserts.ts";
+
+const {
+ HTTP2_HEADER_AUTHORITY,
+ HTTP2_HEADER_METHOD,
+ HTTP2_HEADER_PATH,
+ HTTP2_HEADER_STATUS,
+} = http2.constants;
+
+Deno.test("[node/http2 client]", async () => {
+ // Create a server to respond to the HTTP2 requests
+ const portPromise = deferred();
+ const reqPromise = deferred<Request>();
+ const ready = deferred();
+ const ac = new AbortController();
+ const server = Deno.serve({
+ port: 0,
+ signal: ac.signal,
+ onListen: ({ port }: { port: number }) => portPromise.resolve(port),
+ handler: async (req: Request) => {
+ reqPromise.resolve(req);
+ await ready;
+ return new Response("body", {
+ status: 401,
+ headers: { "resp-header-name": "resp-header-value" },
+ });
+ },
+ });
+
+ const port = await portPromise;
+
+ // Get a session
+ const sessionPromise = deferred();
+ const session = http2.connect(
+ `localhost:${port}`,
+ {},
+ sessionPromise.resolve.bind(sessionPromise),
+ );
+ const session2 = await sessionPromise;
+ assertEquals(session, session2);
+
+ // Write a request, including a body
+ const stream = session.request({
+ [HTTP2_HEADER_AUTHORITY]: `localhost:${port}`,
+ [HTTP2_HEADER_METHOD]: "POST",
+ [HTTP2_HEADER_PATH]: "/path",
+ "req-header-name": "req-header-value",
+ });
+ stream.write("body");
+ stream.end();
+
+ // Check the request
+ const req = await reqPromise;
+ assertEquals(req.headers.get("req-header-name"), "req-header-value");
+ assertEquals(await req.text(), "body");
+
+ ready.resolve();
+
+ // Read a response
+ const headerPromise = new Promise<Record<string, string | string[]>>((
+ resolve,
+ ) => stream.on("headers", resolve));
+ const headers = await headerPromise;
+ assertEquals(headers["resp-header-name"], "resp-header-value");
+ assertEquals(headers[HTTP2_HEADER_STATUS], "401");
+
+ ac.abort();
+ await server.finished;
+});
+
+Deno.test("[node/http2 server]", async () => {
+ const server = http2.createServer();
+ server.listen(0);
+ const port = (<net.AddressInfo> server.address()).port;
+ const sessionPromise = new Promise<http2.Http2Session>((resolve) =>
+ server.on("session", resolve)
+ );
+
+ const responsePromise = fetch(`http://localhost:${port}/path`, {
+ method: "POST",
+ body: "body",
+ });
+
+ const session = await sessionPromise;
+ const stream = await new Promise<http2.ServerHttp2Stream>((resolve) =>
+ session.on("stream", resolve)
+ );
+ const _headers = await new Promise((resolve) =>
+ stream.on("headers", resolve)
+ );
+ const _data = await new Promise((resolve) => stream.on("data", resolve));
+ const _end = await new Promise((resolve) => stream.on("end", resolve));
+ stream.respond();
+ stream.end();
+ const resp = await responsePromise;
+ await resp.text();
+
+ await new Promise((resolve) => server.close(resolve));
+});
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index fa55079e7..c5a5c0e18 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -748,4 +748,10 @@ internals.upgradeHttpRaw = upgradeHttpRaw;
internals.serveHttpOnListener = serveHttpOnListener;
internals.serveHttpOnConnection = serveHttpOnConnection;
-export { serve, upgradeHttpRaw };
+export {
+ addTrailers,
+ serve,
+ serveHttpOnConnection,
+ serveHttpOnListener,
+ upgradeHttpRaw,
+};
diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts
index 90b1be1a2..a5d945efe 100644
--- a/ext/node/polyfills/http2.ts
+++ b/ext/node/polyfills/http2.ts
@@ -1,12 +1,21 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
-import { notImplemented } from "ext:deno_node/_utils.ts";
+import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter } from "ext:deno_node/events.ts";
import { Buffer } from "ext:deno_node/buffer.ts";
-import { Socket } from "ext:deno_node/net.ts";
+import { Server, Socket, TCP } from "ext:deno_node/net.ts";
import { TypedArray } from "ext:deno_node/internal/util/types.ts";
+import { setStreamTimeout } from "ext:deno_node/internal/stream_base_commons.ts";
import { FileHandle } from "ext:deno_node/fs/promises.ts";
+import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
+import { addTrailers, serveHttpOnConnection } from "ext:deno_http/00_serve.js";
+import { type Deferred, deferred } from "ext:deno_node/_util/async.ts";
+import { nextTick } from "ext:deno_node/_next_tick.ts";
+import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
+
+const ENCODER = new TextEncoder();
+type Http2Headers = Record<string, string | string[]>;
export class Http2Session extends EventEmitter {
constructor() {
@@ -19,11 +28,10 @@ export class Http2Session extends EventEmitter {
}
close(_callback?: () => void) {
- notImplemented("Http2Session.close");
+ warnNotImplemented("Http2Session.close");
}
get closed(): boolean {
- notImplemented("Http2Session.closed");
return false;
}
@@ -37,7 +45,6 @@ export class Http2Session extends EventEmitter {
}
get destroyed(): boolean {
- notImplemented("Http2Session.destroyed");
return false;
}
@@ -78,7 +85,7 @@ export class Http2Session extends EventEmitter {
}
ref() {
- notImplemented("Http2Session.ref");
+ warnNotImplemented("Http2Session.ref");
}
get remoteSettings(): Record<string, unknown> {
@@ -90,17 +97,15 @@ export class Http2Session extends EventEmitter {
notImplemented("Http2Session.setLocalWindowSize");
}
- setTimeout(_msecs: number, _callback: () => void) {
- notImplemented("Http2Session.setTimeout");
+ setTimeout(msecs: number, callback?: () => void) {
+ setStreamTimeout(this, msecs, callback);
}
get socket(): Socket /*| TlsSocket*/ {
- notImplemented("Http2Session.socket");
- return null;
+ return {};
}
get state(): Record<string, unknown> {
- notImplemented("Http2Session.state");
return {};
}
@@ -114,7 +119,7 @@ export class Http2Session extends EventEmitter {
}
unref() {
- notImplemented("Http2Session.unref");
+ warnNotImplemented("Http2Session.unref");
}
}
@@ -136,21 +141,131 @@ export class ServerHttp2Session extends Http2Session {
}
export class ClientHttp2Session extends Http2Session {
- constructor() {
+ constructor(
+ _authority: string | URL,
+ _options: Record<string, unknown>,
+ callback: (session: Http2Session) => void,
+ ) {
super();
+ if (callback) {
+ this.on("connect", callback);
+ }
+ nextTick(() => this.emit("connect", this));
}
request(
- _headers: Record<string, unknown>,
+ headers: Http2Headers,
_options?: Record<string, unknown>,
): ClientHttp2Stream {
- notImplemented("ClientHttp2Session.request");
- return new ClientHttp2Stream();
+ const reqHeaders: string[][] = [];
+ const controllerPromise: Deferred<
+ ReadableStreamDefaultController<Uint8Array>
+ > = deferred();
+ const body = new ReadableStream({
+ start(controller) {
+ controllerPromise.resolve(controller);
+ },
+ });
+ const request: RequestInit = { headers: reqHeaders, body };
+ let authority = null;
+ let path = null;
+ for (const [name, value] of Object.entries(headers)) {
+ if (name == constants.HTTP2_HEADER_PATH) {
+ path = String(value);
+ } else if (name == constants.HTTP2_HEADER_METHOD) {
+ request.method = String(value);
+ } else if (name == constants.HTTP2_HEADER_AUTHORITY) {
+ authority = String(value);
+ } else {
+ reqHeaders.push([name, String(value)]);
+ }
+ }
+
+ const fetchPromise = fetch(`http://${authority}${path}`, request);
+ const readerPromise = deferred();
+ const headersPromise = deferred();
+ (async () => {
+ const fetch = await fetchPromise;
+ readerPromise.resolve(fetch.body);
+
+ const headers: Http2Headers = {};
+ for (const [key, value] of fetch.headers) {
+ headers[key] = value;
+ }
+ headers[constants.HTTP2_HEADER_STATUS] = String(fetch.status);
+
+ headersPromise.resolve(headers);
+ })();
+ return new ClientHttp2Stream(
+ this,
+ headersPromise,
+ controllerPromise,
+ readerPromise,
+ );
}
}
-export class Http2Stream {
- constructor() {
+export class Http2Stream extends EventEmitter {
+ #session: Http2Session;
+ #headers: Deferred<Http2Headers>;
+ #controllerPromise: Deferred<ReadableStreamDefaultController<Uint8Array>>;
+ #readerPromise: Deferred<ReadableStream<Uint8Array>>;
+ #closed: boolean;
+ _response: Response;
+
+ constructor(
+ session: Http2Session,
+ headers: Promise<Http2Headers>,
+ controllerPromise: Promise<ReadableStreamDefaultController<Uint8Array>>,
+ readerPromise: Promise<ReadableStream<Uint8Array>>,
+ ) {
+ super();
+ this.#session = session;
+ this.#headers = headers;
+ this.#controllerPromise = controllerPromise;
+ this.#readerPromise = readerPromise;
+ this.#closed = false;
+ nextTick(() => {
+ (async () => {
+ const headers = await this.#headers;
+ this.emit("headers", headers);
+ })();
+ (async () => {
+ const reader = await this.#readerPromise;
+ if (reader) {
+ for await (const data of reader) {
+ this.emit("data", new Buffer(data));
+ }
+ }
+ this.emit("end");
+ })();
+ });
+ }
+
+ // TODO(mmastrac): Implement duplex
+ end() {
+ (async () => {
+ const controller = await this.#controllerPromise;
+ controller.close();
+ })();
+ }
+
+ write(buffer, callback?: () => void) {
+ (async () => {
+ const controller = await this.#controllerPromise;
+ if (typeof buffer === "string") {
+ controller.enqueue(ENCODER.encode(buffer));
+ } else {
+ controller.enqueue(buffer);
+ }
+ callback?.();
+ })();
+ }
+
+ resume() {
+ }
+
+ pause() {
}
get aborted(): boolean {
@@ -164,16 +279,15 @@ export class Http2Stream {
}
close(_code: number, _callback: () => void) {
- notImplemented("Http2Stream.close");
+ this.#closed = true;
+ this.emit("close");
}
get closed(): boolean {
- notImplemented("Http2Stream.closed");
- return false;
+ return this.#closed;
}
get destroyed(): boolean {
- notImplemented("Http2Stream.destroyed");
return false;
}
@@ -197,7 +311,7 @@ export class Http2Stream {
}
get rstCode(): number {
- notImplemented("Http2Stream.rstCode");
+ // notImplemented("Http2Stream.rstCode");
return 0;
}
@@ -217,12 +331,11 @@ export class Http2Stream {
}
get session(): Http2Session {
- notImplemented("Http2Stream.session");
- return new Http2Session();
+ return this.#session;
}
- setTimeout(_msecs: number, _callback: () => void) {
- notImplemented("Http2Stream.setTimeout");
+ setTimeout(msecs: number, callback?: () => void) {
+ setStreamTimeout(this, msecs, callback);
}
get state(): Record<string, unknown> {
@@ -231,28 +344,52 @@ export class Http2Stream {
}
sendTrailers(_headers: Record<string, unknown>) {
- notImplemented("Http2Stream.sendTrailers");
+ addTrailers(this._response, [["grpc-status", "0"], ["grpc-message", "OK"]]);
}
}
export class ClientHttp2Stream extends Http2Stream {
- constructor() {
- super();
+ constructor(
+ session: Http2Session,
+ headers: Promise<Http2Headers>,
+ controllerPromise: Deferred<ReadableStreamDefaultController<Uint8Array>>,
+ readerPromise: Deferred<ReadableStream<Uint8Array>>,
+ ) {
+ super(session, headers, controllerPromise, readerPromise);
}
}
export class ServerHttp2Stream extends Http2Stream {
- constructor() {
- super();
+ _promise: Deferred<Response>;
+ #body: ReadableStream<Uint8Array>;
+ #waitForTrailers: boolean;
+ #headersSent: boolean;
+
+ constructor(
+ session: Http2Session,
+ headers: Promise<Http2Headers>,
+ controllerPromise: Promise<ReadableStreamDefaultController<Uint8Array>>,
+ reader: ReadableStream<Uint8Array>,
+ body: ReadableStream<Uint8Array>,
+ ) {
+ super(session, headers, controllerPromise, Promise.resolve(reader));
+ this._promise = new deferred();
+ this.#body = body;
}
additionalHeaders(_headers: Record<string, unknown>) {
notImplemented("ServerHttp2Stream.additionalHeaders");
}
+ end(): void {
+ super.end();
+ if (this.#waitForTrailers) {
+ this.emit("wantTrailers");
+ }
+ }
+
get headersSent(): boolean {
- notImplemented("ServerHttp2Stream.headersSent");
- return false;
+ return this.#headersSent;
}
get pushAllowed(): boolean {
@@ -269,10 +406,26 @@ export class ServerHttp2Stream extends Http2Stream {
}
respond(
- _headers: Record<string, unknown>,
- _options: Record<string, unknown>,
+ headers: Http2Headers,
+ options: Record<string, unknown>,
) {
- notImplemented("ServerHttp2Stream.respond");
+ this.#headersSent = true;
+ const response: ResponseInit = {};
+ if (headers) {
+ for (const [name, value] of Object.entries(headers)) {
+ if (name == constants.HTTP2_HEADER_STATUS) {
+ response.status = Number(value);
+ }
+ }
+ }
+ if (options?.endStream) {
+ this._promise.resolve(this._response = new Response("", response));
+ } else {
+ this.#waitForTrailers = options?.waitForTrailers;
+ this._promise.resolve(
+ this._response = new Response(this.#body, response),
+ );
+ }
}
respondWithFD(
@@ -292,56 +445,145 @@ export class ServerHttp2Stream extends Http2Stream {
}
}
-export class Http2Server {
- constructor() {
- }
-
- close(_callback?: () => unknown) {
- notImplemented("Http2Server.close");
- }
+export class Http2Server extends Server {
+ #options: Record<string, unknown> = {};
+ #abortController;
+ #server;
+ timeout = 0;
- setTimeout(_msecs: number, _callback?: () => unknown) {
- notImplemented("Http2Server.setTimeout");
- }
-
- get timeout(): number {
- notImplemented("Http2Server.timeout");
- return 0;
- }
-
- updateSettings(_settings: Record<string, unknown>) {
- notImplemented("Http2Server.updateSettings");
+ constructor(
+ options: Record<string, unknown>,
+ requestListener: () => unknown,
+ ) {
+ super(options);
+ this.#abortController = new AbortController();
+ this.on(
+ "connection",
+ (conn: Deno.Conn) => {
+ try {
+ const session = new ServerHttp2Session();
+ this.emit("session", session);
+ this.#server = serveHttpOnConnection(
+ conn,
+ this.#abortController.signal,
+ async (req: Request) => {
+ try {
+ const controllerPromise: Deferred<
+ ReadableStreamDefaultController<Uint8Array>
+ > = deferred();
+ const body = new ReadableStream({
+ start(controller) {
+ controllerPromise.resolve(controller);
+ },
+ });
+ const headers: Http2Headers = {};
+ for (const [name, value] of req.headers) {
+ headers[name] = value;
+ }
+ headers[constants.HTTP2_HEADER_PATH] =
+ new URL(req.url).pathname;
+ const stream = new ServerHttp2Stream(
+ session,
+ Promise.resolve(headers),
+ controllerPromise,
+ req.body,
+ body,
+ );
+ session.emit("stream", stream, headers);
+ this.emit("stream", stream, headers);
+ return await stream._promise;
+ } catch (e) {
+ console.log("Error in serveHttpOnConnection", e);
+ }
+ return new Response("");
+ },
+ () => {
+ console.log("error");
+ },
+ () => {},
+ );
+ } catch (e) {
+ console.log("Error in Http2Server", e);
+ }
+ },
+ );
+ this.on(
+ "newListener",
+ (event) => console.log(`Event in newListener: ${event}`),
+ );
+ this.#options = options;
+ if (typeof requestListener === "function") {
+ this.on("request", requestListener);
+ }
+ }
+
+ // Prevent the TCP server from wrapping this in a socket, since we need it to serve HTTP
+ _createSocket(clientHandle: TCP) {
+ return clientHandle[kStreamBaseField];
+ }
+
+ close(callback?: () => unknown) {
+ if (callback) {
+ this.on("close", callback);
+ }
+ this.#abortController.abort();
+ super.close();
+ }
+
+ setTimeout(msecs: number, callback?: () => unknown) {
+ this.timeout = msecs;
+ if (callback !== undefined) {
+ this.on("timeout", callback);
+ }
+ }
+
+ updateSettings(settings: Record<string, unknown>) {
+ this.#options.settings = { ...this.#options.settings, ...settings };
}
}
-export class Http2SecureServer {
- constructor() {
+export class Http2SecureServer extends Server {
+ #options: Record<string, unknown> = {};
+ timeout = 0;
+
+ constructor(
+ options: Record<string, unknown>,
+ requestListener: () => unknown,
+ ) {
+ super(options, function () {
+ notImplemented("connectionListener");
+ });
+ this.#options = options;
+ if (typeof requestListener === "function") {
+ this.on("request", requestListener);
+ }
}
close(_callback?: () => unknown) {
notImplemented("Http2SecureServer.close");
}
- setTimeout(_msecs: number, _callback?: () => unknown) {
- notImplemented("Http2SecureServer.setTimeout");
- }
-
- get timeout(): number {
- notImplemented("Http2SecureServer.timeout");
- return 0;
+ setTimeout(msecs: number, callback?: () => unknown) {
+ this.timeout = msecs;
+ if (callback !== undefined) {
+ this.on("timeout", callback);
+ }
}
- updateSettings(_settings: Record<string, unknown>) {
- notImplemented("Http2SecureServer.updateSettings");
+ updateSettings(settings: Record<string, unknown>) {
+ this.#options.settings = { ...this.#options.settings, ...settings };
}
}
export function createServer(
- _options: Record<string, unknown>,
- _onRequestHandler: () => unknown,
+ options: Record<string, unknown>,
+ onRequestHandler: () => unknown,
): Http2Server {
- notImplemented("http2.createServer");
- return new Http2Server();
+ if (typeof options === "function") {
+ onRequestHandler = options;
+ options = {};
+ }
+ return new Http2Server(options, onRequestHandler);
}
export function createSecureServer(
@@ -353,11 +595,11 @@ export function createSecureServer(
}
export function connect(
- _authority: string | URL,
- _options: Record<string, unknown>,
+ authority: string | URL,
+ options: Record<string, unknown>,
+ callback: (session: ClientHttp2Session) => void,
): ClientHttp2Session {
- notImplemented("http2.connect");
- return new ClientHttp2Session();
+ return new ClientHttp2Session(authority, options, callback);
}
export const constants = {
@@ -681,8 +923,8 @@ export class Http2ServerRequest {
return "";
}
- setTimeout(_msecs: number, _callback: () => unknown) {
- notImplemented("Http2ServerRequest.setTimeout");
+ setTimeout(msecs: number, callback?: () => unknown) {
+ this.stream.setTimeout(callback, msecs);
}
get socket(): Socket /*| TlsSocket*/ {
@@ -781,8 +1023,8 @@ export class Http2ServerResponse {
notImplemented("Http2ServerResponse.setHeader");
}
- setTimeout(_msecs: number, _callback: () => unknown) {
- notImplemented("Http2ServerResponse.setTimeout");
+ setTimeout(msecs: number, callback?: () => unknown) {
+ this.stream.setTimeout(msecs, callback);
}
get socket(): Socket /*| TlsSocket*/ {
diff --git a/ext/node/polyfills/net.ts b/ext/node/polyfills/net.ts
index 2c2f5f944..79845adb2 100644
--- a/ext/node/polyfills/net.ts
+++ b/ext/node/polyfills/net.ts
@@ -1834,21 +1834,8 @@ function _onconnection(this: any, err: number, clientHandle?: Handle) {
return;
}
- const socket = new Socket({
- handle: clientHandle,
- allowHalfOpen: self.allowHalfOpen,
- pauseOnCreate: self.pauseOnConnect,
- readable: true,
- writable: true,
- });
-
- // TODO(@bartlomieju): implement noDelay and setKeepAlive
-
- self._connections++;
- socket.server = self;
- socket._server = self;
-
- DTRACE_NET_SERVER_CONNECTION(socket);
+ const socket = self._createSocket(clientHandle);
+ this._connections++;
self.emit("connection", socket);
if (netServerSocketChannel.hasSubscribers) {
@@ -2369,6 +2356,23 @@ export class Server extends EventEmitter {
return !!this._handle;
}
+ _createSocket(clientHandle) {
+ const socket = new Socket({
+ handle: clientHandle,
+ allowHalfOpen: this.allowHalfOpen,
+ pauseOnCreate: this.pauseOnConnect,
+ readable: true,
+ writable: true,
+ });
+
+ // TODO(@bartlomieju): implement noDelay and setKeepAlive
+
+ socket.server = this;
+ socket._server = this;
+
+ DTRACE_NET_SERVER_CONNECTION(socket);
+ }
+
_listen2 = _setupListenHandle;
_emitCloseIfDrained() {