summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-06-06 04:29:55 -0600
committerGitHub <noreply@github.com>2023-06-06 12:29:55 +0200
commit42991017e9af59d6a5cb6b523228c62f1c32380e (patch)
tree7b32f2472c28b398717cb8bbb62e9583203ea052 /ext
parent2052ba343c0b222cf638e32f15622a237e423317 (diff)
feat(ext/node): Very basic node:http2 support (#19344)
This commit adds basic support for "node:http2" module. Not all APIs have been yet implemented, but this change already allows to use this module for some basic functions. The "grpc" package is still not working, but it's a good stepping stone. --------- Co-authored-by: Bartek IwaƄczuk <biwanczuk@gmail.com>
Diffstat (limited to 'ext')
-rw-r--r--ext/http/00_serve.js8
-rw-r--r--ext/node/polyfills/http2.ts398
-rw-r--r--ext/node/polyfills/net.ts34
3 files changed, 346 insertions, 94 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index fa55079e7..c5a5c0e18 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -748,4 +748,10 @@ internals.upgradeHttpRaw = upgradeHttpRaw;
internals.serveHttpOnListener = serveHttpOnListener;
internals.serveHttpOnConnection = serveHttpOnConnection;
-export { serve, upgradeHttpRaw };
+export {
+ addTrailers,
+ serve,
+ serveHttpOnConnection,
+ serveHttpOnListener,
+ upgradeHttpRaw,
+};
diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts
index 90b1be1a2..a5d945efe 100644
--- a/ext/node/polyfills/http2.ts
+++ b/ext/node/polyfills/http2.ts
@@ -1,12 +1,21 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
-import { notImplemented } from "ext:deno_node/_utils.ts";
+import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter } from "ext:deno_node/events.ts";
import { Buffer } from "ext:deno_node/buffer.ts";
-import { Socket } from "ext:deno_node/net.ts";
+import { Server, Socket, TCP } from "ext:deno_node/net.ts";
import { TypedArray } from "ext:deno_node/internal/util/types.ts";
+import { setStreamTimeout } from "ext:deno_node/internal/stream_base_commons.ts";
import { FileHandle } from "ext:deno_node/fs/promises.ts";
+import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
+import { addTrailers, serveHttpOnConnection } from "ext:deno_http/00_serve.js";
+import { type Deferred, deferred } from "ext:deno_node/_util/async.ts";
+import { nextTick } from "ext:deno_node/_next_tick.ts";
+import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
+
+const ENCODER = new TextEncoder();
+type Http2Headers = Record<string, string | string[]>;
export class Http2Session extends EventEmitter {
constructor() {
@@ -19,11 +28,10 @@ export class Http2Session extends EventEmitter {
}
close(_callback?: () => void) {
- notImplemented("Http2Session.close");
+ warnNotImplemented("Http2Session.close");
}
get closed(): boolean {
- notImplemented("Http2Session.closed");
return false;
}
@@ -37,7 +45,6 @@ export class Http2Session extends EventEmitter {
}
get destroyed(): boolean {
- notImplemented("Http2Session.destroyed");
return false;
}
@@ -78,7 +85,7 @@ export class Http2Session extends EventEmitter {
}
ref() {
- notImplemented("Http2Session.ref");
+ warnNotImplemented("Http2Session.ref");
}
get remoteSettings(): Record<string, unknown> {
@@ -90,17 +97,15 @@ export class Http2Session extends EventEmitter {
notImplemented("Http2Session.setLocalWindowSize");
}
- setTimeout(_msecs: number, _callback: () => void) {
- notImplemented("Http2Session.setTimeout");
+ setTimeout(msecs: number, callback?: () => void) {
+ setStreamTimeout(this, msecs, callback);
}
get socket(): Socket /*| TlsSocket*/ {
- notImplemented("Http2Session.socket");
- return null;
+ return {};
}
get state(): Record<string, unknown> {
- notImplemented("Http2Session.state");
return {};
}
@@ -114,7 +119,7 @@ export class Http2Session extends EventEmitter {
}
unref() {
- notImplemented("Http2Session.unref");
+ warnNotImplemented("Http2Session.unref");
}
}
@@ -136,21 +141,131 @@ export class ServerHttp2Session extends Http2Session {
}
export class ClientHttp2Session extends Http2Session {
- constructor() {
+ constructor(
+ _authority: string | URL,
+ _options: Record<string, unknown>,
+ callback: (session: Http2Session) => void,
+ ) {
super();
+ if (callback) {
+ this.on("connect", callback);
+ }
+ nextTick(() => this.emit("connect", this));
}
request(
- _headers: Record<string, unknown>,
+ headers: Http2Headers,
_options?: Record<string, unknown>,
): ClientHttp2Stream {
- notImplemented("ClientHttp2Session.request");
- return new ClientHttp2Stream();
+ const reqHeaders: string[][] = [];
+ const controllerPromise: Deferred<
+ ReadableStreamDefaultController<Uint8Array>
+ > = deferred();
+ const body = new ReadableStream({
+ start(controller) {
+ controllerPromise.resolve(controller);
+ },
+ });
+ const request: RequestInit = { headers: reqHeaders, body };
+ let authority = null;
+ let path = null;
+ for (const [name, value] of Object.entries(headers)) {
+ if (name == constants.HTTP2_HEADER_PATH) {
+ path = String(value);
+ } else if (name == constants.HTTP2_HEADER_METHOD) {
+ request.method = String(value);
+ } else if (name == constants.HTTP2_HEADER_AUTHORITY) {
+ authority = String(value);
+ } else {
+ reqHeaders.push([name, String(value)]);
+ }
+ }
+
+ const fetchPromise = fetch(`http://${authority}${path}`, request);
+ const readerPromise = deferred();
+ const headersPromise = deferred();
+ (async () => {
+ const fetch = await fetchPromise;
+ readerPromise.resolve(fetch.body);
+
+ const headers: Http2Headers = {};
+ for (const [key, value] of fetch.headers) {
+ headers[key] = value;
+ }
+ headers[constants.HTTP2_HEADER_STATUS] = String(fetch.status);
+
+ headersPromise.resolve(headers);
+ })();
+ return new ClientHttp2Stream(
+ this,
+ headersPromise,
+ controllerPromise,
+ readerPromise,
+ );
}
}
-export class Http2Stream {
- constructor() {
+export class Http2Stream extends EventEmitter {
+ #session: Http2Session;
+ #headers: Deferred<Http2Headers>;
+ #controllerPromise: Deferred<ReadableStreamDefaultController<Uint8Array>>;
+ #readerPromise: Deferred<ReadableStream<Uint8Array>>;
+ #closed: boolean;
+ _response: Response;
+
+ constructor(
+ session: Http2Session,
+ headers: Promise<Http2Headers>,
+ controllerPromise: Promise<ReadableStreamDefaultController<Uint8Array>>,
+ readerPromise: Promise<ReadableStream<Uint8Array>>,
+ ) {
+ super();
+ this.#session = session;
+ this.#headers = headers;
+ this.#controllerPromise = controllerPromise;
+ this.#readerPromise = readerPromise;
+ this.#closed = false;
+ nextTick(() => {
+ (async () => {
+ const headers = await this.#headers;
+ this.emit("headers", headers);
+ })();
+ (async () => {
+ const reader = await this.#readerPromise;
+ if (reader) {
+ for await (const data of reader) {
+ this.emit("data", new Buffer(data));
+ }
+ }
+ this.emit("end");
+ })();
+ });
+ }
+
+ // TODO(mmastrac): Implement duplex
+ end() {
+ (async () => {
+ const controller = await this.#controllerPromise;
+ controller.close();
+ })();
+ }
+
+ write(buffer, callback?: () => void) {
+ (async () => {
+ const controller = await this.#controllerPromise;
+ if (typeof buffer === "string") {
+ controller.enqueue(ENCODER.encode(buffer));
+ } else {
+ controller.enqueue(buffer);
+ }
+ callback?.();
+ })();
+ }
+
+ resume() {
+ }
+
+ pause() {
}
get aborted(): boolean {
@@ -164,16 +279,15 @@ export class Http2Stream {
}
close(_code: number, _callback: () => void) {
- notImplemented("Http2Stream.close");
+ this.#closed = true;
+ this.emit("close");
}
get closed(): boolean {
- notImplemented("Http2Stream.closed");
- return false;
+ return this.#closed;
}
get destroyed(): boolean {
- notImplemented("Http2Stream.destroyed");
return false;
}
@@ -197,7 +311,7 @@ export class Http2Stream {
}
get rstCode(): number {
- notImplemented("Http2Stream.rstCode");
+ // notImplemented("Http2Stream.rstCode");
return 0;
}
@@ -217,12 +331,11 @@ export class Http2Stream {
}
get session(): Http2Session {
- notImplemented("Http2Stream.session");
- return new Http2Session();
+ return this.#session;
}
- setTimeout(_msecs: number, _callback: () => void) {
- notImplemented("Http2Stream.setTimeout");
+ setTimeout(msecs: number, callback?: () => void) {
+ setStreamTimeout(this, msecs, callback);
}
get state(): Record<string, unknown> {
@@ -231,28 +344,52 @@ export class Http2Stream {
}
sendTrailers(_headers: Record<string, unknown>) {
- notImplemented("Http2Stream.sendTrailers");
+ addTrailers(this._response, [["grpc-status", "0"], ["grpc-message", "OK"]]);
}
}
export class ClientHttp2Stream extends Http2Stream {
- constructor() {
- super();
+ constructor(
+ session: Http2Session,
+ headers: Promise<Http2Headers>,
+ controllerPromise: Deferred<ReadableStreamDefaultController<Uint8Array>>,
+ readerPromise: Deferred<ReadableStream<Uint8Array>>,
+ ) {
+ super(session, headers, controllerPromise, readerPromise);
}
}
export class ServerHttp2Stream extends Http2Stream {
- constructor() {
- super();
+ _promise: Deferred<Response>;
+ #body: ReadableStream<Uint8Array>;
+ #waitForTrailers: boolean;
+ #headersSent: boolean;
+
+ constructor(
+ session: Http2Session,
+ headers: Promise<Http2Headers>,
+ controllerPromise: Promise<ReadableStreamDefaultController<Uint8Array>>,
+ reader: ReadableStream<Uint8Array>,
+ body: ReadableStream<Uint8Array>,
+ ) {
+ super(session, headers, controllerPromise, Promise.resolve(reader));
+ this._promise = new deferred();
+ this.#body = body;
}
additionalHeaders(_headers: Record<string, unknown>) {
notImplemented("ServerHttp2Stream.additionalHeaders");
}
+ end(): void {
+ super.end();
+ if (this.#waitForTrailers) {
+ this.emit("wantTrailers");
+ }
+ }
+
get headersSent(): boolean {
- notImplemented("ServerHttp2Stream.headersSent");
- return false;
+ return this.#headersSent;
}
get pushAllowed(): boolean {
@@ -269,10 +406,26 @@ export class ServerHttp2Stream extends Http2Stream {
}
respond(
- _headers: Record<string, unknown>,
- _options: Record<string, unknown>,
+ headers: Http2Headers,
+ options: Record<string, unknown>,
) {
- notImplemented("ServerHttp2Stream.respond");
+ this.#headersSent = true;
+ const response: ResponseInit = {};
+ if (headers) {
+ for (const [name, value] of Object.entries(headers)) {
+ if (name == constants.HTTP2_HEADER_STATUS) {
+ response.status = Number(value);
+ }
+ }
+ }
+ if (options?.endStream) {
+ this._promise.resolve(this._response = new Response("", response));
+ } else {
+ this.#waitForTrailers = options?.waitForTrailers;
+ this._promise.resolve(
+ this._response = new Response(this.#body, response),
+ );
+ }
}
respondWithFD(
@@ -292,56 +445,145 @@ export class ServerHttp2Stream extends Http2Stream {
}
}
-export class Http2Server {
- constructor() {
- }
-
- close(_callback?: () => unknown) {
- notImplemented("Http2Server.close");
- }
+export class Http2Server extends Server {
+ #options: Record<string, unknown> = {};
+ #abortController;
+ #server;
+ timeout = 0;
- setTimeout(_msecs: number, _callback?: () => unknown) {
- notImplemented("Http2Server.setTimeout");
- }
-
- get timeout(): number {
- notImplemented("Http2Server.timeout");
- return 0;
- }
-
- updateSettings(_settings: Record<string, unknown>) {
- notImplemented("Http2Server.updateSettings");
+ constructor(
+ options: Record<string, unknown>,
+ requestListener: () => unknown,
+ ) {
+ super(options);
+ this.#abortController = new AbortController();
+ this.on(
+ "connection",
+ (conn: Deno.Conn) => {
+ try {
+ const session = new ServerHttp2Session();
+ this.emit("session", session);
+ this.#server = serveHttpOnConnection(
+ conn,
+ this.#abortController.signal,
+ async (req: Request) => {
+ try {
+ const controllerPromise: Deferred<
+ ReadableStreamDefaultController<Uint8Array>
+ > = deferred();
+ const body = new ReadableStream({
+ start(controller) {
+ controllerPromise.resolve(controller);
+ },
+ });
+ const headers: Http2Headers = {};
+ for (const [name, value] of req.headers) {
+ headers[name] = value;
+ }
+ headers[constants.HTTP2_HEADER_PATH] =
+ new URL(req.url).pathname;
+ const stream = new ServerHttp2Stream(
+ session,
+ Promise.resolve(headers),
+ controllerPromise,
+ req.body,
+ body,
+ );
+ session.emit("stream", stream, headers);
+ this.emit("stream", stream, headers);
+ return await stream._promise;
+ } catch (e) {
+ console.log("Error in serveHttpOnConnection", e);
+ }
+ return new Response("");
+ },
+ () => {
+ console.log("error");
+ },
+ () => {},
+ );
+ } catch (e) {
+ console.log("Error in Http2Server", e);
+ }
+ },
+ );
+ this.on(
+ "newListener",
+ (event) => console.log(`Event in newListener: ${event}`),
+ );
+ this.#options = options;
+ if (typeof requestListener === "function") {
+ this.on("request", requestListener);
+ }
+ }
+
+ // Prevent the TCP server from wrapping this in a socket, since we need it to serve HTTP
+ _createSocket(clientHandle: TCP) {
+ return clientHandle[kStreamBaseField];
+ }
+
+ close(callback?: () => unknown) {
+ if (callback) {
+ this.on("close", callback);
+ }
+ this.#abortController.abort();
+ super.close();
+ }
+
+ setTimeout(msecs: number, callback?: () => unknown) {
+ this.timeout = msecs;
+ if (callback !== undefined) {
+ this.on("timeout", callback);
+ }
+ }
+
+ updateSettings(settings: Record<string, unknown>) {
+ this.#options.settings = { ...this.#options.settings, ...settings };
}
}
-export class Http2SecureServer {
- constructor() {
+export class Http2SecureServer extends Server {
+ #options: Record<string, unknown> = {};
+ timeout = 0;
+
+ constructor(
+ options: Record<string, unknown>,
+ requestListener: () => unknown,
+ ) {
+ super(options, function () {
+ notImplemented("connectionListener");
+ });
+ this.#options = options;
+ if (typeof requestListener === "function") {
+ this.on("request", requestListener);
+ }
}
close(_callback?: () => unknown) {
notImplemented("Http2SecureServer.close");
}
- setTimeout(_msecs: number, _callback?: () => unknown) {
- notImplemented("Http2SecureServer.setTimeout");
- }
-
- get timeout(): number {
- notImplemented("Http2SecureServer.timeout");
- return 0;
+ setTimeout(msecs: number, callback?: () => unknown) {
+ this.timeout = msecs;
+ if (callback !== undefined) {
+ this.on("timeout", callback);
+ }
}
- updateSettings(_settings: Record<string, unknown>) {
- notImplemented("Http2SecureServer.updateSettings");
+ updateSettings(settings: Record<string, unknown>) {
+ this.#options.settings = { ...this.#options.settings, ...settings };
}
}
export function createServer(
- _options: Record<string, unknown>,
- _onRequestHandler: () => unknown,
+ options: Record<string, unknown>,
+ onRequestHandler: () => unknown,
): Http2Server {
- notImplemented("http2.createServer");
- return new Http2Server();
+ if (typeof options === "function") {
+ onRequestHandler = options;
+ options = {};
+ }
+ return new Http2Server(options, onRequestHandler);
}
export function createSecureServer(
@@ -353,11 +595,11 @@ export function createSecureServer(
}
export function connect(
- _authority: string | URL,
- _options: Record<string, unknown>,
+ authority: string | URL,
+ options: Record<string, unknown>,
+ callback: (session: ClientHttp2Session) => void,
): ClientHttp2Session {
- notImplemented("http2.connect");
- return new ClientHttp2Session();
+ return new ClientHttp2Session(authority, options, callback);
}
export const constants = {
@@ -681,8 +923,8 @@ export class Http2ServerRequest {
return "";
}
- setTimeout(_msecs: number, _callback: () => unknown) {
- notImplemented("Http2ServerRequest.setTimeout");
+ setTimeout(msecs: number, callback?: () => unknown) {
+ this.stream.setTimeout(callback, msecs);
}
get socket(): Socket /*| TlsSocket*/ {
@@ -781,8 +1023,8 @@ export class Http2ServerResponse {
notImplemented("Http2ServerResponse.setHeader");
}
- setTimeout(_msecs: number, _callback: () => unknown) {
- notImplemented("Http2ServerResponse.setTimeout");
+ setTimeout(msecs: number, callback?: () => unknown) {
+ this.stream.setTimeout(msecs, callback);
}
get socket(): Socket /*| TlsSocket*/ {
diff --git a/ext/node/polyfills/net.ts b/ext/node/polyfills/net.ts
index 2c2f5f944..79845adb2 100644
--- a/ext/node/polyfills/net.ts
+++ b/ext/node/polyfills/net.ts
@@ -1834,21 +1834,8 @@ function _onconnection(this: any, err: number, clientHandle?: Handle) {
return;
}
- const socket = new Socket({
- handle: clientHandle,
- allowHalfOpen: self.allowHalfOpen,
- pauseOnCreate: self.pauseOnConnect,
- readable: true,
- writable: true,
- });
-
- // TODO(@bartlomieju): implement noDelay and setKeepAlive
-
- self._connections++;
- socket.server = self;
- socket._server = self;
-
- DTRACE_NET_SERVER_CONNECTION(socket);
+ const socket = self._createSocket(clientHandle);
+ this._connections++;
self.emit("connection", socket);
if (netServerSocketChannel.hasSubscribers) {
@@ -2369,6 +2356,23 @@ export class Server extends EventEmitter {
return !!this._handle;
}
+ _createSocket(clientHandle) {
+ const socket = new Socket({
+ handle: clientHandle,
+ allowHalfOpen: this.allowHalfOpen,
+ pauseOnCreate: this.pauseOnConnect,
+ readable: true,
+ writable: true,
+ });
+
+ // TODO(@bartlomieju): implement noDelay and setKeepAlive
+
+ socket.server = this;
+ socket._server = this;
+
+ DTRACE_NET_SERVER_CONNECTION(socket);
+ }
+
_listen2 = _setupListenHandle;
_emitCloseIfDrained() {