summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2023-09-15 21:51:25 +0200
committerGitHub <noreply@github.com>2023-09-15 21:51:25 +0200
commit5a1505db67d0326bf37b765c8a566584b44a2c1a (patch)
tree4f1cadc6bc85f8538cc0c21ebc7354e404d48bcf
parent11f0ccf8052065e37f92d8ba43e9624fcd3b9814 (diff)
feat(ext/node): http2.connect() API (#19671)
This commit improves compatibility of "node:http2" module by polyfilling "connect" method and "ClientHttp2Session" class. Basic operations like streaming, header and trailer handling are working correctly. Refing/unrefing is still a TODO and "npm:grpc-js/grpc" is not yet working correctly. --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
-rw-r--r--Cargo.lock19
-rw-r--r--Cargo.toml1
-rw-r--r--cli/tests/unit/websocket_test.ts21
-rw-r--r--cli/tests/unit_node/http2_test.ts115
-rw-r--r--ext/node/Cargo.toml5
-rw-r--r--ext/node/lib.rs13
-rw-r--r--ext/node/ops/http2.rs550
-rw-r--r--ext/node/ops/mod.rs1
-rw-r--r--ext/node/polyfills/http2.ts1169
-rw-r--r--test_util/Cargo.toml2
-rw-r--r--test_util/src/lib.rs131
11 files changed, 1831 insertions, 196 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 60897e504..fbf9ee8f6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -484,9 +484,9 @@ dependencies = [
[[package]]
name = "bumpalo"
-version = "3.13.0"
+version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1"
+checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
[[package]]
name = "byteorder"
@@ -533,9 +533,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
-version = "0.4.30"
+version = "0.4.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877"
+checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38"
dependencies = [
"android-tzdata",
"iana-time-zone",
@@ -1386,12 +1386,14 @@ dependencies = [
"aead-gcm-stream",
"aes",
"brotli",
+ "bytes",
"cbc",
"data-encoding",
"deno_core",
"deno_fetch",
"deno_fs",
"deno_media_type",
+ "deno_net",
"deno_npm",
"deno_semver",
"digest 0.10.7",
@@ -1399,8 +1401,10 @@ dependencies = [
"ecb",
"elliptic-curve 0.13.5",
"errno 0.2.8",
+ "h2",
"hex",
"hkdf",
+ "http",
"idna 0.3.0",
"indexmap 2.0.0",
"lazy-regex",
@@ -1432,6 +1436,7 @@ dependencies = [
"signature 1.6.4",
"tokio",
"typenum",
+ "url",
"whoami",
"winapi",
"x25519-dalek",
@@ -5445,11 +5450,13 @@ dependencies = [
"anyhow",
"async-stream",
"base64 0.13.1",
+ "bytes",
"console_static_text",
"fastwebsockets",
"flate2",
"futures",
"glob",
+ "h2",
"hyper 0.14.27",
"lazy-regex",
"libc",
@@ -5887,9 +5894,9 @@ checksum = "0685c84d5d54d1c26f7d3eb96cd41550adb97baed141a761cf335d3d33bcd0ae"
[[package]]
name = "typenum"
-version = "1.16.0"
+version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
+checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "unic-char-property"
diff --git a/Cargo.toml b/Cargo.toml
index 5ca7d1d96..b601edcc2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -96,6 +96,7 @@ futures = "0.3.21"
glob = "0.3.1"
hex = "0.4"
http = "0.2.9"
+h2 = "0.3.17"
httparse = "1.8.0"
hyper = { version = "0.14.26", features = ["runtime", "http1"] }
# TODO(mmastrac): indexmap 2.0 will require multiple synchronized changes
diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts
index ac33f9d69..11f0fd7dc 100644
--- a/cli/tests/unit/websocket_test.ts
+++ b/cli/tests/unit/websocket_test.ts
@@ -7,6 +7,9 @@ import {
fail,
} from "./test_util.ts";
+const servePort = 4248;
+const serveUrl = `ws://localhost:${servePort}/`;
+
Deno.test({ permissions: "none" }, function websocketPermissionless() {
assertThrows(
() => new WebSocket("ws://localhost"),
@@ -81,13 +84,13 @@ Deno.test(
signal: ac.signal,
onListen: () => listeningPromise.resolve(),
hostname: "localhost",
- port: 4246,
+ port: servePort,
});
await listeningPromise;
const promise = deferred();
- const ws = new WebSocket("ws://localhost:4246/");
- assertEquals(ws.url, "ws://localhost:4246/");
+ const ws = new WebSocket(serveUrl);
+ assertEquals(ws.url, serveUrl);
ws.onerror = () => fail();
ws.onmessage = (e) => {
assertEquals(e.data, "Hello");
@@ -133,13 +136,13 @@ Deno.test({
signal: ac.signal,
onListen: () => listeningPromise.resolve(),
hostname: "localhost",
- port: 4247,
+ port: servePort,
});
await listeningPromise;
- const ws = new WebSocket("ws://localhost:4247/");
- assertEquals(ws.url, "ws://localhost:4247/");
+ const ws = new WebSocket(serveUrl);
+ assertEquals(ws.url, serveUrl);
ws.onerror = () => fail();
ws.onmessage = () => ws.send("bye");
ws.onclose = () => {
@@ -173,13 +176,13 @@ Deno.test({
signal: ac.signal,
onListen: () => listeningPromise.resolve(),
hostname: "localhost",
- port: 4247,
+ port: servePort,
});
await listeningPromise;
- const ws = new WebSocket("ws://localhost:4247/");
- assertEquals(ws.url, "ws://localhost:4247/");
+ const ws = new WebSocket(serveUrl);
+ assertEquals(ws.url, serveUrl);
let seenBye = false;
ws.onerror = () => fail();
ws.onmessage = ({ data }) => {
diff --git a/cli/tests/unit_node/http2_test.ts b/cli/tests/unit_node/http2_test.ts
index 64824f3e8..8e7b261ae 100644
--- a/cli/tests/unit_node/http2_test.ts
+++ b/cli/tests/unit_node/http2_test.ts
@@ -5,75 +5,66 @@ import * as net from "node:net";
import { deferred } from "../../../test_util/std/async/deferred.ts";
import { assertEquals } from "../../../test_util/std/testing/asserts.ts";
-const {
- HTTP2_HEADER_AUTHORITY,
- HTTP2_HEADER_METHOD,
- HTTP2_HEADER_PATH,
- HTTP2_HEADER_STATUS,
-} = http2.constants;
-
-Deno.test("[node/http2 client]", async () => {
- // Create a server to respond to the HTTP2 requests
- const portPromise = deferred();
- const reqPromise = deferred<Request>();
- const ready = deferred();
- const ac = new AbortController();
- const server = Deno.serve({
- port: 0,
- signal: ac.signal,
- onListen: ({ port }: { port: number }) => portPromise.resolve(port),
- handler: async (req: Request) => {
- reqPromise.resolve(req);
- await ready;
- return new Response("body", {
- status: 401,
- headers: { "resp-header-name": "resp-header-value" },
- });
- },
- });
+for (const url of ["http://127.0.0.1:4246", "https://127.0.0.1:4247"]) {
+ Deno.test(`[node/http2 client] ${url}`, {
+ ignore: Deno.build.os === "windows",
+ }, async () => {
+ // Create a server to respond to the HTTP2 requests
+ const client = http2.connect(url, {});
+ client.on("error", (err) => console.error(err));
- const port = await portPromise;
+ const req = client.request({ ":method": "POST", ":path": "/" }, {
+ waitForTrailers: true,
+ });
- // Get a session
- const sessionPromise = deferred();
- const session = http2.connect(
- `localhost:${port}`,
- {},
- sessionPromise.resolve.bind(sessionPromise),
- );
- const session2 = await sessionPromise;
- assertEquals(session, session2);
-
- // Write a request, including a body
- const stream = session.request({
- [HTTP2_HEADER_AUTHORITY]: `localhost:${port}`,
- [HTTP2_HEADER_METHOD]: "POST",
- [HTTP2_HEADER_PATH]: "/path",
- "req-header-name": "req-header-value",
- });
- stream.write("body");
- stream.end();
+ let receivedTrailers;
+ let receivedHeaders;
+ let receivedData = "";
- // Check the request
- const req = await reqPromise;
- assertEquals(req.headers.get("req-header-name"), "req-header-value");
- assertEquals(await req.text(), "body");
+ req.on("response", (headers, _flags) => {
+ receivedHeaders = headers;
+ });
- ready.resolve();
+ req.write("hello");
+ req.setEncoding("utf8");
- // Read a response
- const headerPromise = new Promise<Record<string, string | string[]>>((
- resolve,
- ) => stream.on("headers", resolve));
- const headers = await headerPromise;
- assertEquals(headers["resp-header-name"], "resp-header-value");
- assertEquals(headers[HTTP2_HEADER_STATUS], "401");
+ req.on("wantTrailers", () => {
+ req.sendTrailers({ foo: "bar" });
+ });
- ac.abort();
- await server.finished;
-});
+ req.on("trailers", (trailers, _flags) => {
+ receivedTrailers = trailers;
+ });
+
+ req.on("data", (chunk) => {
+ receivedData += chunk;
+ });
+ req.end();
+
+ const endPromise = deferred();
+ setTimeout(() => {
+ try {
+ client.close();
+ } catch (_) {
+ // pass
+ }
+ endPromise.resolve();
+ }, 2000);
+
+ await endPromise;
+ assertEquals(receivedHeaders, { ":status": 200 });
+ assertEquals(receivedData, "hello world\n");
+
+ assertEquals(receivedTrailers, {
+ "abc": "def",
+ "opr": "stv",
+ "foo": "bar",
+ });
+ });
+}
-Deno.test("[node/http2 server]", async () => {
+// TODO(bartlomieju): reenable sanitizers
+Deno.test("[node/http2 server]", { sanitizeOps: false }, async () => {
const server = http2.createServer();
server.listen(0);
const port = (<net.AddressInfo> server.address()).port;
diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml
index 07c2b2da5..44b56978e 100644
--- a/ext/node/Cargo.toml
+++ b/ext/node/Cargo.toml
@@ -17,12 +17,14 @@ path = "lib.rs"
aead-gcm-stream = "0.1"
aes.workspace = true
brotli.workspace = true
+bytes.workspace = true
cbc.workspace = true
data-encoding = "2.3.3"
deno_core.workspace = true
deno_fetch.workspace = true
deno_fs.workspace = true
deno_media_type.workspace = true
+deno_net.workspace = true
deno_npm.workspace = true
deno_semver.workspace = true
digest = { version = "0.10.5", features = ["core-api", "std"] }
@@ -30,8 +32,10 @@ dsa = "0.6.1"
ecb.workspace = true
elliptic-curve.workspace = true
errno = "0.2.8"
+h2.workspace = true
hex.workspace = true
hkdf.workspace = true
+http.workspace = true
idna = "0.3.0"
indexmap.workspace = true
lazy-regex.workspace = true
@@ -63,6 +67,7 @@ sha2.workspace = true
signature.workspace = true
tokio.workspace = true
typenum = "1.15.0"
+url.workspace = true
whoami = "1.4.0"
winapi.workspace = true
# https://github.com/dalek-cryptography/x25519-dalek/pull/89
diff --git a/ext/node/lib.rs b/ext/node/lib.rs
index c01785b95..fbc1c9ffd 100644
--- a/ext/node/lib.rs
+++ b/ext/node/lib.rs
@@ -243,6 +243,19 @@ deno_core::extension!(deno_node,
ops::zlib::brotli::op_brotli_decompress_stream,
ops::zlib::brotli::op_brotli_decompress_stream_end,
ops::http::op_node_http_request<P>,
+ ops::http2::op_http2_connect,
+ ops::http2::op_http2_poll_client_connection,
+ ops::http2::op_http2_client_request,
+ ops::http2::op_http2_client_get_response,
+ ops::http2::op_http2_client_get_response_body_chunk,
+ ops::http2::op_http2_client_send_data,
+ ops::http2::op_http2_client_end_stream,
+ ops::http2::op_http2_client_reset_stream,
+ ops::http2::op_http2_client_send_trailers,
+ ops::http2::op_http2_client_get_response_trailers,
+ ops::http2::op_http2_accept,
+ ops::http2::op_http2_listen,
+ ops::http2::op_http2_send_response,
ops::os::op_node_os_get_priority<P>,
ops::os::op_node_os_set_priority<P>,
ops::os::op_node_os_username<P>,
diff --git a/ext/node/ops/http2.rs b/ext/node/ops/http2.rs
new file mode 100644
index 000000000..3e9fb5586
--- /dev/null
+++ b/ext/node/ops/http2.rs
@@ -0,0 +1,550 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+use std::borrow::Cow;
+use std::cell::RefCell;
+use std::collections::HashMap;
+use std::rc::Rc;
+use std::task::Poll;
+
+use bytes::Bytes;
+use deno_core::error::AnyError;
+use deno_core::futures::future::poll_fn;
+use deno_core::op;
+use deno_core::serde::Serialize;
+use deno_core::AsyncRefCell;
+use deno_core::ByteString;
+use deno_core::CancelFuture;
+use deno_core::CancelHandle;
+use deno_core::JsBuffer;
+use deno_core::OpState;
+use deno_core::RcRef;
+use deno_core::Resource;
+use deno_core::ResourceId;
+use deno_net::raw::take_network_stream_resource;
+use deno_net::raw::NetworkStream;
+use h2;
+use h2::RecvStream;
+use http;
+use http::request::Parts;
+use http::HeaderMap;
+use http::Response;
+use http::StatusCode;
+use reqwest::header::HeaderName;
+use reqwest::header::HeaderValue;
+use url::Url;
+
+pub struct Http2Client {
+ pub client: AsyncRefCell<h2::client::SendRequest<Bytes>>,
+ pub url: Url,
+}
+
+impl Resource for Http2Client {
+ fn name(&self) -> Cow<str> {
+ "http2Client".into()
+ }
+}
+
+#[derive(Debug)]
+pub struct Http2ClientConn {
+ pub conn: AsyncRefCell<h2::client::Connection<NetworkStream>>,
+ cancel_handle: CancelHandle,
+}
+
+impl Resource for Http2ClientConn {
+ fn name(&self) -> Cow<str> {
+ "http2ClientConnection".into()
+ }
+
+ fn close(self: Rc<Self>) {
+ self.cancel_handle.cancel()
+ }
+}
+
+#[derive(Debug)]
+pub struct Http2ClientStream {
+ pub response: AsyncRefCell<h2::client::ResponseFuture>,
+ pub stream: AsyncRefCell<h2::SendStream<Bytes>>,
+}
+
+impl Resource for Http2ClientStream {
+ fn name(&self) -> Cow<str> {
+ "http2ClientStream".into()
+ }
+}
+
+#[derive(Debug)]
+pub struct Http2ClientResponseBody {
+ pub body: AsyncRefCell<h2::RecvStream>,
+ pub trailers_rx:
+ AsyncRefCell<Option<tokio::sync::oneshot::Receiver<Option<HeaderMap>>>>,
+ pub trailers_tx:
+ AsyncRefCell<Option<tokio::sync::oneshot::Sender<Option<HeaderMap>>>>,
+}
+
+impl Resource for Http2ClientResponseBody {
+ fn name(&self) -> Cow<str> {
+ "http2ClientResponseBody".into()
+ }
+}
+
+#[derive(Debug)]
+pub struct Http2ServerConnection {
+ pub conn: AsyncRefCell<h2::server::Connection<NetworkStream, Bytes>>,
+}
+
+impl Resource for Http2ServerConnection {
+ fn name(&self) -> Cow<str> {
+ "http2ServerConnection".into()
+ }
+}
+
+pub struct Http2ServerSendResponse {
+ pub send_response: AsyncRefCell<h2::server::SendResponse<Bytes>>,
+}
+
+impl Resource for Http2ServerSendResponse {
+ fn name(&self) -> Cow<str> {
+ "http2ServerSendResponse".into()
+ }
+}
+
+#[op]
+pub async fn op_http2_connect(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ url: String,
+) -> Result<(ResourceId, ResourceId), AnyError> {
+ // No permission check necessary because we're using an existing connection
+ let network_stream = {
+ let mut state = state.borrow_mut();
+ take_network_stream_resource(&mut state.resource_table, rid)?
+ };
+
+ let url = Url::parse(&url)?;
+
+ let (client, conn) = h2::client::handshake(network_stream).await?;
+ let mut state = state.borrow_mut();
+ let client_rid = state.resource_table.add(Http2Client {
+ client: AsyncRefCell::new(client),
+ url,
+ });
+ let conn_rid = state.resource_table.add(Http2ClientConn {
+ conn: AsyncRefCell::new(conn),
+ cancel_handle: CancelHandle::new(),
+ });
+ Ok((client_rid, conn_rid))
+}
+
+#[op]
+pub async fn op_http2_listen(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+) -> Result<ResourceId, AnyError> {
+ let stream =
+ take_network_stream_resource(&mut state.borrow_mut().resource_table, rid)?;
+
+ let conn = h2::server::handshake(stream).await?;
+ Ok(
+ state
+ .borrow_mut()
+ .resource_table
+ .add(Http2ServerConnection {
+ conn: AsyncRefCell::new(conn),
+ }),
+ )
+}
+
+#[op]
+pub async fn op_http2_accept(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+) -> Result<
+ Option<(Vec<(ByteString, ByteString)>, ResourceId, ResourceId)>,
+ AnyError,
+> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<Http2ServerConnection>(rid)?;
+ let mut conn = RcRef::map(&resource, |r| &r.conn).borrow_mut().await;
+ if let Some(res) = conn.accept().await {
+ let (req, resp) = res?;
+ let (parts, body) = req.into_parts();
+ let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel();
+ let stm = state
+ .borrow_mut()
+ .resource_table
+ .add(Http2ClientResponseBody {
+ body: AsyncRefCell::new(body),
+ trailers_rx: AsyncRefCell::new(Some(trailers_rx)),
+ trailers_tx: AsyncRefCell::new(Some(trailers_tx)),
+ });
+
+ let Parts {
+ uri,
+ method,
+ headers,
+ ..
+ } = parts;
+ let mut req_headers = Vec::with_capacity(headers.len() + 4);
+ req_headers.push((
+ ByteString::from(":method"),
+ ByteString::from(method.as_str()),
+ ));
+ req_headers.push((
+ ByteString::from(":scheme"),
+ ByteString::from(uri.scheme().map(|s| s.as_str()).unwrap_or("http")),
+ ));
+ req_headers.push((
+ ByteString::from(":path"),
+ ByteString::from(uri.path_and_query().map(|p| p.as_str()).unwrap_or("")),
+ ));
+ req_headers.push((
+ ByteString::from(":authority"),
+ ByteString::from(uri.authority().map(|a| a.as_str()).unwrap_or("")),
+ ));
+ for (key, val) in headers.iter() {
+ req_headers.push((key.as_str().into(), val.as_bytes().into()));
+ }
+
+ let resp = state
+ .borrow_mut()
+ .resource_table
+ .add(Http2ServerSendResponse {
+ send_response: AsyncRefCell::new(resp),
+ });
+
+ Ok(Some((req_headers, stm, resp)))
+ } else {
+ Ok(None)
+ }
+}
+
+#[op]
+pub async fn op_http2_send_response(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+ status: u16,
+ headers: Vec<(ByteString, ByteString)>,
+) -> Result<(ResourceId, u32), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<Http2ServerSendResponse>(rid)?;
+ let mut send_response = RcRef::map(resource, |r| &r.send_response)
+ .borrow_mut()
+ .await;
+ let mut response = Response::new(());
+ if let Ok(status) = StatusCode::from_u16(status) {
+ *response.status_mut() = status;
+ }
+ for (name, value) in headers {
+ response.headers_mut().append(
+ HeaderName::from_lowercase(&name).unwrap(),
+ HeaderValue::from_bytes(&value).unwrap(),
+ );
+ }
+
+ let stream = send_response.send_response(response, false)?;
+ let stream_id = stream.stream_id();
+
+ Ok((rid, stream_id.into()))
+}
+
+#[op]
+pub async fn op_http2_poll_client_connection(
+ state: Rc<RefCell<OpState>>,
+ rid: ResourceId,
+) -> Result<(), AnyError> {
+ let resource = state.borrow().resource_table.get::<Http2ClientConn>(rid)?;
+
+ let cancel_handle = RcRef::map(resource.clone(), |this| &this.cancel_handle);
+ let mut conn = RcRef::map(resource, |this| &this.conn).borrow_mut().await;
+
+ match (&mut *conn).or_cancel(cancel_handle).await {
+ Ok(result) => result?,
+ Err(_) => {
+ // TODO(bartlomieju): probably need a better mechanism for closing the connection
+
+ // cancelled
+ }
+ }
+
+ Ok(())
+}
+
+#[op]
+pub async fn op_http2_client_request(
+ state: Rc<RefCell<OpState>>,
+ client_rid: ResourceId,
+ // TODO(bartlomieju): maybe use a vector with fixed layout to save sending
+ // 4 strings of keys?
+ mut pseudo_headers: HashMap<String, String>,
+ headers: Vec<(ByteString, ByteString)>,
+) -> Result<(ResourceId, u32), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<Http2Client>(client_rid)?;
+
+ let url = resource.url.clone();
+
+ let pseudo_path = pseudo_headers.remove(":path").unwrap_or("/".to_string());
+ let pseudo_method = pseudo_headers
+ .remove(":method")
+ .unwrap_or("GET".to_string());
+ // TODO(bartlomieju): handle all pseudo-headers (:authority, :scheme)
+ let _pseudo_authority = pseudo_headers
+ .remove(":authority")
+ .unwrap_or("/".to_string());
+ let _pseudo_scheme = pseudo_headers
+ .remove(":scheme")
+ .unwrap_or("http".to_string());
+
+ let url = url.join(&pseudo_path)?;
+
+ let mut req = http::Request::builder()
+ .uri(url.as_str())
+ .method(pseudo_method.as_str());
+
+ for (name, value) in headers {
+ req.headers_mut().unwrap().append(
+ HeaderName::from_lowercase(&name).unwrap(),
+ HeaderValue::from_bytes(&value).unwrap(),
+ );
+ }
+
+ let request = req.body(()).unwrap();
+
+ let resource = {
+ let state = state.borrow();
+ state.resource_table.get::<Http2Client>(client_rid)?
+ };
+ let mut client = RcRef::map(&resource, |r| &r.client).borrow_mut().await;
+ poll_fn(|cx| client.poll_ready(cx)).await?;
+ let (response, stream) = client.send_request(request, false).unwrap();
+ let stream_id = stream.stream_id();
+ let stream_rid = state.borrow_mut().resource_table.add(Http2ClientStream {
+ response: AsyncRefCell::new(response),
+ stream: AsyncRefCell::new(stream),
+ });
+ Ok((stream_rid, stream_id.into()))
+}
+
+#[op]
+pub async fn op_http2_client_send_data(
+ state: Rc<RefCell<OpState>>,
+ stream_rid: ResourceId,
+ data: JsBuffer,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<Http2ClientStream>(stream_rid)?;
+ let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
+
+ // TODO(bartlomieju): handle end of stream
+ stream.send_data(bytes::Bytes::from(data), false)?;
+ Ok(())
+}
+
+#[op]
+pub async fn op_http2_client_end_stream(
+ state: Rc<RefCell<OpState>>,
+ stream_rid: ResourceId,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<Http2ClientStream>(stream_rid)?;
+ let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
+
+ // TODO(bartlomieju): handle end of stream
+ stream.send_data(bytes::Bytes::from(vec![]), true)?;
+ Ok(())
+}
+
+#[op]
+pub async fn op_http2_client_reset_stream(
+ state: Rc<RefCell<OpState>>,
+ stream_rid: ResourceId,
+ code: u32,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<Http2ClientStream>(stream_rid)?;
+ let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
+ stream.send_reset(h2::Reason::from(code));
+ Ok(())
+}
+
+#[op]
+pub async fn op_http2_client_send_trailers(
+ state: Rc<RefCell<OpState>>,
+ stream_rid: ResourceId,
+ trailers: Vec<(ByteString, ByteString)>,
+) -> Result<(), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<Http2ClientStream>(stream_rid)?;
+ let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await;
+
+ let mut trailers_map = http::HeaderMap::new();
+ for (name, value) in trailers {
+ trailers_map.insert(
+ HeaderName::from_bytes(&name).unwrap(),
+ HeaderValue::from_bytes(&value).unwrap(),
+ );
+ }
+
+ stream.send_trailers(trailers_map)?;
+ Ok(())
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct Http2ClientResponse {
+ headers: Vec<(ByteString, ByteString)>,
+ body_rid: ResourceId,
+ status_code: u16,
+}
+
+#[op]
+pub async fn op_http2_client_get_response(
+ state: Rc<RefCell<OpState>>,
+ stream_rid: ResourceId,
+) -> Result<Http2ClientResponse, AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<Http2ClientStream>(stream_rid)?;
+ let mut response_future =
+ RcRef::map(&resource, |r| &r.response).borrow_mut().await;
+
+ let response = (&mut *response_future).await?;
+
+ let (parts, body) = response.into_parts();
+ let status = parts.status;
+ let mut res_headers = Vec::new();
+ for (key, val) in parts.headers.iter() {
+ res_headers.push((key.as_str().into(), val.as_bytes().into()));
+ }
+
+ let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel();
+ let body_rid =
+ state
+ .borrow_mut()
+ .resource_table
+ .add(Http2ClientResponseBody {
+ body: AsyncRefCell::new(body),
+ trailers_rx: AsyncRefCell::new(Some(trailers_rx)),
+ trailers_tx: AsyncRefCell::new(Some(trailers_tx)),
+ });
+ Ok(Http2ClientResponse {
+ headers: res_headers,
+ body_rid,
+ status_code: status.into(),
+ })
+}
+
+enum DataOrTrailers {
+ Data(Bytes),
+ Trailers(HeaderMap),
+ Eof,
+}
+
+fn poll_data_or_trailers(
+ cx: &mut std::task::Context,
+ body: &mut RecvStream,
+) -> Poll<Result<DataOrTrailers, h2::Error>> {
+ loop {
+ if let Poll::Ready(trailers) = body.poll_trailers(cx) {
+ if let Some(trailers) = trailers? {
+ return Poll::Ready(Ok(DataOrTrailers::Trailers(trailers)));
+ } else {
+ return Poll::Ready(Ok(DataOrTrailers::Eof));
+ }
+ }
+ if let Poll::Ready(data) = body.poll_data(cx) {
+ if let Some(data) = data {
+ return Poll::Ready(Ok(DataOrTrailers::Data(data?)));
+ }
+ // If data is None, loop one more time to check for trailers
+ continue;
+ }
+ // Return pending here as poll_data will keep the waker
+ return Poll::Pending;
+ }
+}
+
+#[op]
+pub async fn op_http2_client_get_response_body_chunk(
+ state: Rc<RefCell<OpState>>,
+ body_rid: ResourceId,
+) -> Result<(Option<Vec<u8>>, bool), AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<Http2ClientResponseBody>(body_rid)?;
+ let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await;
+
+ loop {
+ match poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await? {
+ DataOrTrailers::Data(data) => {
+ return Ok((Some(data.to_vec()), false));
+ }
+ DataOrTrailers::Trailers(trailers) => {
+ println!("{trailers:?}");
+ if let Some(trailers_tx) = RcRef::map(&resource, |r| &r.trailers_tx)
+ .borrow_mut()
+ .await
+ .take()
+ {
+ _ = trailers_tx.send(Some(trailers));
+ };
+
+ continue;
+ }
+ DataOrTrailers::Eof => {
+ RcRef::map(&resource, |r| &r.trailers_tx)
+ .borrow_mut()
+ .await
+ .take();
+ return Ok((None, true));
+ }
+ };
+ }
+}
+
+#[op]
+pub async fn op_http2_client_get_response_trailers(
+ state: Rc<RefCell<OpState>>,
+ body_rid: ResourceId,
+) -> Result<Option<Vec<(ByteString, ByteString)>>, AnyError> {
+ let resource = state
+ .borrow()
+ .resource_table
+ .get::<Http2ClientResponseBody>(body_rid)?;
+ let trailers = RcRef::map(&resource, |r| &r.trailers_rx)
+ .borrow_mut()
+ .await
+ .take();
+ if let Some(trailers) = trailers {
+ if let Ok(Some(trailers)) = trailers.await {
+ let mut v = Vec::with_capacity(trailers.len());
+ for (key, value) in trailers.iter() {
+ v.push((
+ ByteString::from(key.as_str()),
+ ByteString::from(value.as_bytes()),
+ ));
+ }
+ Ok(Some(v))
+ } else {
+ Ok(None)
+ }
+ } else {
+ Ok(None)
+ }
+}
diff --git a/ext/node/ops/mod.rs b/ext/node/ops/mod.rs
index 22ad546e8..cf4abf3dd 100644
--- a/ext/node/ops/mod.rs
+++ b/ext/node/ops/mod.rs
@@ -2,6 +2,7 @@
pub mod crypto;
pub mod http;
+pub mod http2;
pub mod idna;
pub mod os;
pub mod require;
diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts
index 20306584f..62dd1a501 100644
--- a/ext/node/polyfills/http2.ts
+++ b/ext/node/polyfills/http2.ts
@@ -4,79 +4,160 @@
// TODO(petamoriken): enable prefer-primordials for node polyfills
// deno-lint-ignore-file prefer-primordials
+const core = globalThis.Deno.core;
import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter } from "node:events";
import { Buffer } from "node:buffer";
import { Server, Socket, TCP } from "node:net";
import { TypedArray } from "ext:deno_node/internal/util/types.ts";
-import { setStreamTimeout } from "ext:deno_node/internal/stream_base_commons.ts";
+import {
+ kMaybeDestroy,
+ kUpdateTimer,
+ setStreamTimeout,
+} from "ext:deno_node/internal/stream_base_commons.ts";
import { FileHandle } from "node:fs/promises";
import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
import { addTrailers, serveHttpOnConnection } from "ext:deno_http/00_serve.js";
import { type Deferred, deferred } from "ext:deno_node/_util/async.ts";
import { nextTick } from "ext:deno_node/_next_tick.ts";
import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
+import { Duplex } from "node:stream";
+import {
+ AbortError,
+ ERR_HTTP2_CONNECT_AUTHORITY,
+ ERR_HTTP2_CONNECT_PATH,
+ ERR_HTTP2_CONNECT_SCHEME,
+ ERR_HTTP2_GOAWAY_SESSION,
+ ERR_HTTP2_INVALID_PSEUDOHEADER,
+ ERR_HTTP2_INVALID_SESSION,
+ ERR_HTTP2_INVALID_STREAM,
+ ERR_HTTP2_SESSION_ERROR,
+ ERR_HTTP2_STREAM_CANCEL,
+ ERR_HTTP2_STREAM_ERROR,
+ ERR_HTTP2_TRAILERS_ALREADY_SENT,
+ ERR_HTTP2_TRAILERS_NOT_READY,
+ ERR_INVALID_HTTP_TOKEN,
+} from "ext:deno_node/internal/errors.ts";
+import { _checkIsHttpToken } from "ext:deno_node/_http_common.ts";
+import { TcpConn } from "ext:deno_net/01_net.js";
+import { TlsConn } from "ext:deno_net/02_tls.js";
+
+const {
+ op_http2_connect,
+} = core.ensureFastOps();
+
+const kSession = Symbol("session");
+const kAlpnProtocol = Symbol("alpnProtocol");
+const kAuthority = Symbol("authority");
+const kEncrypted = Symbol("encrypted");
+const kID = Symbol("id");
+const kInit = Symbol("init");
+const kInfoHeaders = Symbol("sent-info-headers");
+const kOrigin = Symbol("origin");
+const kPendingRequestCalls = Symbol("kPendingRequestCalls");
+const kProtocol = Symbol("protocol");
+const kSentHeaders = Symbol("sent-headers");
+const kSentTrailers = Symbol("sent-trailers");
+const kState = Symbol("state");
+const kType = Symbol("type");
+const kTimeout = Symbol("timeout");
+
+const kDenoResponse = Symbol("kDenoResponse");
+const kDenoRid = Symbol("kDenoRid");
+const kDenoClientRid = Symbol("kDenoClientRid");
+const kDenoConnRid = Symbol("kDenoConnRid");
+
+const STREAM_FLAGS_PENDING = 0x0;
+const STREAM_FLAGS_READY = 0x1;
+const STREAM_FLAGS_CLOSED = 0x2;
+const STREAM_FLAGS_HEADERS_SENT = 0x4;
+const STREAM_FLAGS_HEAD_REQUEST = 0x8;
+const STREAM_FLAGS_ABORTED = 0x10;
+const STREAM_FLAGS_HAS_TRAILERS = 0x20;
+
+const SESSION_FLAGS_PENDING = 0x0;
+const SESSION_FLAGS_READY = 0x1;
+const SESSION_FLAGS_CLOSED = 0x2;
+const SESSION_FLAGS_DESTROYED = 0x4;
const ENCODER = new TextEncoder();
type Http2Headers = Record<string, string | string[]>;
+const debugHttp2Enabled = false;
+function debugHttp2(...args) {
+ if (debugHttp2Enabled) {
+ console.log(...args);
+ }
+}
+
export class Http2Session extends EventEmitter {
- constructor() {
+ constructor(type, _options /* socket */) {
super();
- }
- get alpnProtocol(): string | undefined {
- notImplemented("Http2Session.alpnProtocol");
- return undefined;
- }
+ // TODO(bartlomieju): Handle sockets here
- close(_callback?: () => void) {
- warnNotImplemented("Http2Session.close");
- }
+ this[kState] = {
+ destroyCode: constants.NGHTTP2_NO_ERROR,
+ flags: SESSION_FLAGS_PENDING,
+ goawayCode: null,
+ goawayLastStreamID: null,
+ streams: new Map(),
+ pendingStreams: new Set(),
+ pendingAck: 0,
+ writeQueueSize: 0,
+ originSet: undefined,
+ };
- get closed(): boolean {
- return false;
+ this[kEncrypted] = undefined;
+ this[kAlpnProtocol] = undefined;
+ this[kType] = type;
+ this[kTimeout] = null;
+ // this[kProxySocket] = null;
+ // this[kSocket] = socket;
+ // this[kHandle] = undefined;
+
+ // TODO(bartlomieju): connecting via socket
}
- get connecting(): boolean {
- notImplemented("Http2Session.connecting");
- return false;
+ get encrypted(): boolean {
+ return this[kEncrypted];
}
- destroy(_error?: Error, _code?: number) {
- notImplemented("Http2Session.destroy");
+ get alpnProtocol(): string | undefined {
+ return this[kAlpnProtocol];
}
- get destroyed(): boolean {
- return false;
+ get originSet(): string[] | undefined {
+ if (!this.encrypted || this.destroyed) {
+ return undefined;
+ }
+ // TODO(bartlomieju):
+ return [];
}
- get encrypted(): boolean {
- notImplemented("Http2Session.encrypted");
- return false;
+ get connecting(): boolean {
+ return (this[kState].flags & SESSION_FLAGS_READY) === 0;
}
- goaway(
- _code: number,
- _lastStreamID: number,
- _opaqueData: Buffer | TypedArray | DataView,
- ) {
- notImplemented("Http2Session.goaway");
+ get closed(): boolean {
+ return !!(this[kState].flags & SESSION_FLAGS_CLOSED);
}
- get localSettings(): Record<string, unknown> {
- notImplemented("Http2Session.localSettings");
- return {};
+ get destroyed(): boolean {
+ return !!(this[kState].flags & SESSION_FLAGS_DESTROYED);
}
- get originSet(): string[] | undefined {
- notImplemented("Http2Session.originSet");
- return undefined;
+ [kUpdateTimer]() {
+ if (this.destroyed) {
+ return;
+ }
+ if (this[kTimeout]) {
+ this[kTimeout].refresh();
+ }
}
- get pendingSettingsAck(): boolean {
- notImplemented("Http2Session.pendingSettingsAck");
- return false;
+ setLocalWindowSize(_windowSize: number) {
+ notImplemented("Http2Session.setLocalWindowSize");
}
ping(
@@ -87,28 +168,30 @@ export class Http2Session extends EventEmitter {
return false;
}
- ref() {
- warnNotImplemented("Http2Session.ref");
+ get socket(): Socket /*| TlsSocket*/ {
+ warnNotImplemented("Http2Session.socket");
+ return {};
}
- get remoteSettings(): Record<string, unknown> {
- notImplemented("Http2Session.remoteSettings");
- return {};
+ get type(): number {
+ return this[kType];
}
- setLocalWindowSize(_windowSize: number) {
- notImplemented("Http2Session.setLocalWindowSize");
+ get pendingSettingsAck() {
+ return this[kState].pendingAck > 0;
}
- setTimeout(msecs: number, callback?: () => void) {
- setStreamTimeout(this, msecs, callback);
+ get state(): Record<string, unknown> {
+ return {};
}
- get socket(): Socket /*| TlsSocket*/ {
+ get localSettings(): Record<string, unknown> {
+ notImplemented("Http2Session.localSettings");
return {};
}
- get state(): Record<string, unknown> {
+ get remoteSettings(): Record<string, unknown> {
+ notImplemented("Http2Session.remoteSettings");
return {};
}
@@ -116,19 +199,118 @@ export class Http2Session extends EventEmitter {
notImplemented("Http2Session.settings");
}
- get type(): number {
- notImplemented("Http2Session.type");
- return 0;
+ goaway(
+ _code: number,
+ _lastStreamID: number,
+ _opaqueData: Buffer | TypedArray | DataView,
+ ) {
+ warnNotImplemented("Http2Session.goaway");
+ core.tryClose(this[kDenoConnRid]);
+ core.tryClose(this[kDenoClientRid]);
+ }
+
+ destroy(error = constants.NGHTTP2_NO_ERROR, code?: number) {
+ if (this.destroyed) {
+ return;
+ }
+
+ if (typeof error === "number") {
+ code = error;
+ error = code !== constants.NGHTTP2_NO_ERROR
+ ? new ERR_HTTP2_SESSION_ERROR(code)
+ : undefined;
+ }
+ if (code === undefined && error != null) {
+ code = constants.NGHTTP2_INTERNAL_ERROR;
+ }
+
+ closeSession(this, code, error);
+ }
+
+ close(callback?: () => void) {
+ if (this.closed || this.destroyed) {
+ return;
+ }
+
+ this[kState].flags |= SESSION_FLAGS_CLOSED;
+ if (typeof callback === "function") {
+ this.once("close", callback);
+ }
+ this.goaway();
+ this[kMaybeDestroy]();
+ }
+
+ [kMaybeDestroy](error?: number) {
+ if (!error) {
+ const state = this[kState];
+ // Don't destroy if the session is not closed or there are pending or open
+ // streams.
+ if (
+ !this.closed || state.streams.size > 0 || state.pendingStreams.size >
+ 0
+ ) {
+ return;
+ }
+ }
+ this.destroy(error);
+ }
+
+ ref() {
+ warnNotImplemented("Http2Session.ref");
}
unref() {
warnNotImplemented("Http2Session.unref");
}
+
+ setTimeout(msecs: number, callback?: () => void) {
+ setStreamTimeout(this, msecs, callback);
+ }
+}
+
+function emitClose(session: Http2Session, error?: Error) {
+ if (error) {
+ session.emit("error", error);
+ }
+ session.emit("close");
+}
+
+function finishSessionClose(session: Http2Session, error?: Error) {
+ // TODO(bartlomieju): handle sockets
+
+ nextTick(emitClose, session, error);
+}
+
+function closeSession(session: Http2Session, code?: number, error?: Error) {
+ const state = session[kState];
+ state.flags |= SESSION_FLAGS_DESTROYED;
+ state.destroyCode = code;
+
+ session.setTimeout(0);
+ session.removeAllListeners("timeout");
+
+ // Destroy open and pending streams
+ if (state.pendingStreams.size > 0 || state.streams.size > 0) {
+ const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
+ state.pendingStreams.forEach((stream) => stream.destroy(cancel));
+ state.streams.forEach((stream) => stream.destroy(cancel));
+ }
+
+ // TODO(bartlomieju): handle sockets
+ debugHttp2(
+ ">>> closeSession",
+ session[kDenoConnRid],
+ session[kDenoClientRid],
+ );
+ core.tryClose(session[kDenoConnRid]);
+ core.tryClose(session[kDenoClientRid]);
+
+ finishSessionClose(session, error);
}
export class ServerHttp2Session extends Http2Session {
constructor() {
- super();
+ super(constants.NGHTTP2_SESSION_SERVER, {});
}
altsvc(
@@ -143,71 +325,184 @@ export class ServerHttp2Session extends Http2Session {
}
}
+function assertValidPseudoHeader(header: string) {
+ switch (header) {
+ case ":authority":
+ case ":path":
+ case ":method":
+ case ":scheme":
+ case ":status":
+ return;
+ default:
+ throw new ERR_HTTP2_INVALID_PSEUDOHEADER(header);
+ }
+}
+
export class ClientHttp2Session extends Http2Session {
+ #connectPromise: Promise<void>;
+
constructor(
- _authority: string | URL,
- _options: Record<string, unknown>,
- callback: (session: Http2Session) => void,
+ connPromise: Promise<TcpConn> | Promise<TlsConn>,
+ url: string,
+ options: Record<string, unknown>,
) {
- super();
- if (callback) {
- this.on("connect", callback);
- }
- nextTick(() => this.emit("connect", this));
+ super(constants.NGHTTP2_SESSION_CLIENT, options);
+ this[kPendingRequestCalls] = null;
+ this[kDenoClientRid] = undefined;
+ this[kDenoConnRid] = undefined;
+
+ // TODO(bartlomieju): cleanup
+ this.#connectPromise = (async () => {
+ debugHttp2(">>> before connect");
+ const conn = await connPromise;
+ const [clientRid, connRid] = await op_http2_connect(conn.rid, url);
+ debugHttp2(">>> after connect");
+ this[kDenoClientRid] = clientRid;
+ this[kDenoConnRid] = connRid;
+ // TODO(bartlomieju): save this promise, so the session can be unrefed
+ (async () => {
+ try {
+ await core.opAsync(
+ "op_http2_poll_client_connection",
+ this[kDenoConnRid],
+ );
+ } catch (e) {
+ this.emit("error", e);
+ }
+ })();
+ this.emit("connect", this, {});
+ })();
}
request(
headers: Http2Headers,
- _options?: Record<string, unknown>,
+ options?: Record<string, unknown>,
): ClientHttp2Stream {
- const reqHeaders: string[][] = [];
- const controllerPromise: Deferred<
- ReadableStreamDefaultController<Uint8Array>
- > = deferred();
- const body = new ReadableStream({
- start(controller) {
- controllerPromise.resolve(controller);
- },
- });
- const request: RequestInit = { headers: reqHeaders, body };
- let authority = null;
- let path = null;
- for (const [name, value] of Object.entries(headers)) {
- if (name == constants.HTTP2_HEADER_PATH) {
- path = String(value);
- } else if (name == constants.HTTP2_HEADER_METHOD) {
- request.method = String(value);
- } else if (name == constants.HTTP2_HEADER_AUTHORITY) {
- authority = String(value);
- } else {
- reqHeaders.push([name, String(value)]);
+ if (this.destroyed) {
+ throw new ERR_HTTP2_INVALID_SESSION();
+ }
+
+ if (this.closed) {
+ throw new ERR_HTTP2_GOAWAY_SESSION();
+ }
+
+ this[kUpdateTimer]();
+ if (headers !== null && headers !== undefined) {
+ const keys = Object.keys(headers);
+ for (let i = 0; i < keys.length; i++) {
+ const header = keys[i];
+ if (header[0] === ":") {
+ assertValidPseudoHeader(header);
+ } else if (header && !_checkIsHttpToken(header)) {
+ this.destroy(new ERR_INVALID_HTTP_TOKEN("Header name", header));
+ }
}
}
- const fetchPromise = fetch(`http://${authority}${path}`, request);
- const readerPromise = deferred();
- const headersPromise = deferred();
- (async () => {
- const fetch = await fetchPromise;
- readerPromise.resolve(fetch.body);
+ headers = Object.assign({ __proto__: null }, headers);
+ options = { ...options };
+
+ if (headers[constants.HTTP2_HEADER_METHOD] === undefined) {
+ headers[constants.HTTP2_HEADER_METHOD] = constants.HTTP2_METHOD_GET;
+ }
+
+ const connect =
+ headers[constants.HTTP2_HEADER_METHOD] === constants.HTTP2_METHOD_CONNECT;
- const headers: Http2Headers = {};
- for (const [key, value] of fetch.headers) {
- headers[key] = value;
+ if (!connect || headers[constants.HTTP2_HEADER_PROTOCOL] !== undefined) {
+ if (getAuthority(headers) === undefined) {
+ headers[constants.HTTP2_HEADER_AUTHORITY] = this[kAuthority];
+ }
+ if (headers[constants.HTTP2_HEADER_SCHEME] === undefined) {
+ headers[constants.HTTP2_HEADER_SCHEME] = this[kProtocol].slice(0, -1);
+ }
+ if (headers[constants.HTTP2_HEADER_PATH] === undefined) {
+ headers[constants.HTTP2_HEADER_PATH] = "/";
+ }
+ } else {
+ if (headers[constants.HTTP2_HEADER_AUTHORITY] === undefined) {
+ throw new ERR_HTTP2_CONNECT_AUTHORITY();
}
- headers[constants.HTTP2_HEADER_STATUS] = String(fetch.status);
+ if (headers[constants.HTTP2_HEADER_SCHEME] === undefined) {
+ throw new ERR_HTTP2_CONNECT_SCHEME();
+ }
+ if (headers[constants.HTTP2_HEADER_PATH] === undefined) {
+ throw new ERR_HTTP2_CONNECT_PATH();
+ }
+ }
- headersPromise.resolve(headers);
- })();
- return new ClientHttp2Stream(
+ if (options.endStream === undefined) {
+ const method = headers[constants.HTTP2_HEADER_METHOD];
+ options.endStream = method === constants.HTTP2_METHOD_DELETE ||
+ method === constants.HTTP2_METHOD_GET ||
+ method === constants.HTTP2_METHOD_HEAD;
+ } else {
+ options.endStream = !!options.endStream;
+ }
+
+ const stream = new ClientHttp2Stream(
+ options,
this,
- headersPromise,
- controllerPromise,
- readerPromise,
+ this.#connectPromise,
+ headers,
);
+ stream[kSentHeaders] = headers;
+ stream[kOrigin] = `${headers[constants.HTTP2_HEADER_SCHEME]}://${
+ getAuthority(headers)
+ }`;
+
+ if (options.endStream) {
+ stream.end();
+ }
+
+ if (options.waitForTrailers) {
+ stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS;
+ }
+
+ const { signal } = options;
+ if (signal) {
+ const aborter = () => {
+ stream.destroy(new AbortError(undefined, { cause: signal.reason }));
+ };
+ if (signal.aborted) {
+ aborter();
+ } else {
+ // TODO(bartlomieju): handle this
+ // const disposable = EventEmitter.addAbortListener(signal, aborter);
+ // stream.once("close", disposable[Symbol.dispose]);
+ }
+ }
+
+ // TODO(bartlomieju): handle this
+ const onConnect = () => {};
+ if (this.connecting) {
+ if (this[kPendingRequestCalls] !== null) {
+ this[kPendingRequestCalls].push(onConnect);
+ } else {
+ this[kPendingRequestCalls] = [onConnect];
+ this.once("connect", () => {
+ this[kPendingRequestCalls].forEach((f) => f());
+ this[kPendingRequestCalls] = null;
+ });
+ }
+ } else {
+ onConnect();
+ }
+
+ return stream;
}
}
+function getAuthority(headers) {
+ if (headers[constants.HTTP2_HEADER_AUTHORITY] !== undefined) {
+ return headers[constants.HTTP2_HEADER_AUTHORITY];
+ }
+ if (headers[constants.HTTP2_HEADER_HOST] !== undefined) {
+ return headers[constants.HTTP2_HEADER_HOST];
+ }
+ return undefined;
+}
+
export class Http2Stream extends EventEmitter {
#session: Http2Session;
#headers: Deferred<Http2Headers>;
@@ -265,6 +560,8 @@ export class Http2Stream extends EventEmitter {
})();
}
+ setEncoding(_encoding) {}
+
resume() {
}
@@ -351,17 +648,566 @@ export class Http2Stream extends EventEmitter {
}
}
-export class ClientHttp2Stream extends Http2Stream {
+async function clientHttp2Request(
+ session,
+ sessionConnectPromise,
+ headers,
+ options,
+) {
+ debugHttp2(
+ ">>> waiting for connect promise",
+ sessionConnectPromise,
+ headers,
+ options,
+ );
+ await sessionConnectPromise;
+
+ const reqHeaders: string[][] = [];
+ const pseudoHeaders = {};
+
+ for (const [key, value] of Object.entries(headers)) {
+ if (key[0] === ":") {
+ pseudoHeaders[key] = value;
+ } else {
+ reqHeaders.push([key, Array.isArray(value) ? value[0] : value]);
+ }
+ }
+ debugHttp2(
+ "waited for connect promise",
+ !!options.waitForTrailers,
+ pseudoHeaders,
+ reqHeaders,
+ );
+
+ return await core.opAsync(
+ "op_http2_client_request",
+ session[kDenoClientRid],
+ pseudoHeaders,
+ reqHeaders,
+ );
+}
+
+export class ClientHttp2Stream extends Duplex {
+ #requestPromise: Promise<[number, number]>;
+ #responsePromise: Promise<void>;
+ #rid: number | undefined = undefined;
+ #encoding = "utf8";
+
constructor(
+ options: Record<string, unknown>,
session: Http2Session,
- headers: Promise<Http2Headers>,
- controllerPromise: Deferred<ReadableStreamDefaultController<Uint8Array>>,
- readerPromise: Deferred<ReadableStream<Uint8Array>>,
+ sessionConnectPromise: Promise<void>,
+ headers: Record<string, string>,
) {
- super(session, headers, controllerPromise, readerPromise);
+ options.allowHalfOpen = true;
+ options.decodeString = false;
+ options.autoDestroy = false;
+ super(options);
+ this.cork();
+ this[kSession] = session;
+ session[kState].pendingStreams.add(this);
+
+ this._readableState.readingMore = true;
+
+ this[kState] = {
+ didRead: false,
+ flags: STREAM_FLAGS_PENDING | STREAM_FLAGS_HEADERS_SENT,
+ rstCode: constants.NGHTTP2_NO_ERROR,
+ writeQueueSize: 0,
+ trailersReady: false,
+ endAfterHeaders: false,
+ shutdownWritableCalled: false,
+ };
+ this[kDenoResponse] = undefined;
+ this[kDenoRid] = undefined;
+
+ this.#requestPromise = clientHttp2Request(
+ session,
+ sessionConnectPromise,
+ headers,
+ options,
+ );
+ debugHttp2(">>> created clienthttp2stream");
+ // TODO(bartlomieju): save it so we can unref
+ this.#responsePromise = (async () => {
+ debugHttp2(">>> before request promise", session[kDenoClientRid]);
+ const [streamRid, streamId] = await this.#requestPromise;
+ this.#rid = streamRid;
+ this[kDenoRid] = streamRid;
+ this[kInit](streamId);
+ debugHttp2(
+ ">>> after request promise",
+ session[kDenoClientRid],
+ this.#rid,
+ );
+ const response = await core.opAsync(
+ "op_http2_client_get_response",
+ this.#rid,
+ );
+ debugHttp2(">>> after get response", response);
+ const headers = {
+ ":status": response.statusCode,
+ ...Object.fromEntries(response.headers),
+ };
+ debugHttp2(">>> emitting response", headers);
+ this.emit("response", headers, 0);
+ this[kDenoResponse] = response;
+ this.emit("ready");
+ })();
+ }
+
+ [kUpdateTimer]() {
+ if (this.destroyed) {
+ return;
+ }
+ if (this[kTimeout]) {
+ this[kTimeout].refresh();
+ }
+ if (this[kSession]) {
+ this[kSession][kUpdateTimer]();
+ }
+ }
+
+ [kInit](id) {
+ const state = this[kState];
+ state.flags |= STREAM_FLAGS_READY;
+
+ const session = this[kSession];
+ session[kState].pendingStreams.delete(this);
+ session[kState].streams.set(id, this);
+
+ // TODO(bartlomieju): handle socket handle
+
+ this[kID] = id;
+ this.uncork();
+ this.emit("ready");
+ }
+
+ get bufferSize() {
+ return this[kState].writeQueueSize + this.writableLength;
+ }
+
+ get endAfterHeaders() {
+ return this[kState].endAfterHeaders;
+ }
+
+ get sentHeaders() {
+ return this[kSentHeaders];
+ }
+
+ get sentTrailers() {
+ return this[kSentTrailers];
+ }
+
+ get sendInfoHeaders() {
+ return this[kInfoHeaders];
+ }
+
+ get pending(): boolean {
+ return this[kID] === undefined;
+ }
+
+ get id(): number | undefined {
+ return this[kID];
+ }
+
+ get session(): Http2Session {
+ return this[kSession];
+ }
+
+ _onTimeout() {
+ callTimeout(this, kSession);
+ }
+
+ get headersSent() {
+ return !!(this[kState].flags & STREAM_FLAGS_HEADERS_SENT);
+ }
+
+ get aborted() {
+ return !!(this[kState].flags & STREAM_FLAGS_ABORTED);
+ }
+
+ get headRequest() {
+ return !!(this[kState].flags & STREAM_FLAGS_HEAD_REQUEST);
+ }
+
+ get rstCode() {
+ return this[kState].rstCode;
+ }
+
+ get state(): Record<string, unknown> {
+ notImplemented("Http2Stream.state");
+ return {};
+ }
+
+ // [kAfterAsyncWrite]() {}
+
+ // [kWriteGeneric]() {}
+
+ // TODO(bartlomieju): clean up
+ _write(chunk, encoding, callback?: () => void) {
+ debugHttp2(">>> _write", callback);
+ if (typeof encoding === "function") {
+ callback = encoding;
+ encoding = "utf8";
+ }
+ let data;
+ if (typeof encoding === "string") {
+ data = ENCODER.encode(chunk);
+ } else {
+ data = chunk.buffer;
+ }
+
+ this.#requestPromise
+ .then(() => {
+ debugHttp2(">>> _write", this.#rid, data, encoding, callback);
+ return core.opAsync(
+ "op_http2_client_send_data",
+ this.#rid,
+ data,
+ );
+ })
+ .then(() => {
+ callback?.();
+ debugHttp2(
+ "this.writableFinished",
+ this.pending,
+ this.destroyed,
+ this.writableFinished,
+ );
+ })
+ .catch((e) => {
+ callback?.(e);
+ });
+ }
+
+ // TODO(bartlomieju): finish this method
+ _writev(_chunks, _callback?) {
+ notImplemented("ClientHttp2Stream._writev");
+ }
+
+ _final(cb) {
+ debugHttp2("_final", new Error());
+ if (this.pending) {
+ this.once("ready", () => this._final(cb));
+ return;
+ }
+
+ shutdownWritable(this, cb);
+ }
+
+ // TODO(bartlomieju): needs a proper cleanup
+ _read() {
+ if (this.destroyed) {
+ this.push(null);
+ return;
+ }
+
+ if (!this[kState].didRead) {
+ this._readableState.readingMore = false;
+ this[kState].didRead = true;
+ }
+ // if (!this.pending) {
+ // streamOnResume(this);
+ // } else {
+ // this.once("ready", () => streamOnResume(this));
+ // }
+
+ if (!this[kDenoResponse]) {
+ this.once("ready", this._read);
+ return;
+ }
+
+ debugHttp2(">>> read");
+
+ (async () => {
+ const [chunk, finished] = await core.opAsync(
+ "op_http2_client_get_response_body_chunk",
+ this[kDenoResponse].bodyRid,
+ );
+
+ debugHttp2(">>> chunk", chunk, finished, this[kDenoResponse].bodyRid);
+ if (chunk === null) {
+ const trailerList = await core.opAsync(
+ "op_http2_client_get_response_trailers",
+ this[kDenoResponse].bodyRid,
+ );
+ if (trailerList) {
+ const trailers = Object.fromEntries(trailerList);
+ this.emit("trailers", trailers);
+ }
+
+ debugHttp2("tryClose");
+ core.tryClose(this[kDenoResponse].bodyRid);
+ this.push(null);
+ debugHttp2(">>> read null chunk");
+ this[kMaybeDestroy]();
+ return;
+ }
+
+ let result;
+ if (this.#encoding === "utf8") {
+ result = this.push(new TextDecoder().decode(new Uint8Array(chunk)));
+ } else {
+ result = this.push(new Uint8Array(chunk));
+ }
+ debugHttp2(">>> read result", result);
+ })();
+ }
+
+ // TODO(bartlomieju):
+ priority(_options: Record<string, unknown>) {
+ notImplemented("Http2Stream.priority");
+ }
+
+ sendTrailers(trailers: Record<string, unknown>) {
+ debugHttp2("sendTrailers", trailers);
+ if (this.destroyed || this.closed) {
+ throw new ERR_HTTP2_INVALID_STREAM();
+ }
+ if (this[kSentTrailers]) {
+ throw new ERR_HTTP2_TRAILERS_ALREADY_SENT();
+ }
+ if (!this[kState].trailersReady) {
+ throw new ERR_HTTP2_TRAILERS_NOT_READY();
+ }
+
+ trailers = Object.assign({ __proto__: null }, trailers);
+ const trailerList = [];
+ for (const [key, value] of Object.entries(trailers)) {
+ trailerList.push([key, value]);
+ }
+ this[kSentTrailers] = trailers;
+
+ // deno-lint-ignore no-this-alias
+ const stream = this;
+ stream[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS;
+ debugHttp2("sending trailers", this.#rid, trailers);
+
+ core.opAsync(
+ "op_http2_client_send_trailers",
+ this.#rid,
+ trailerList,
+ ).then(() => {
+ stream[kMaybeDestroy]();
+ core.tryClose(this.#rid);
+ }).catch((e) => {
+ debugHttp2(">>> send trailers error", e);
+ core.tryClose(this.#rid);
+ stream._destroy(e);
+ });
+ }
+
+ get closed(): boolean {
+ return !!(this[kState].flags & STREAM_FLAGS_CLOSED);
+ }
+
+ close(code: number = constants.NGHTTP2_NO_ERROR, callback?: () => void) {
+ debugHttp2(">>> close", code, this.closed, callback);
+
+ if (this.closed) {
+ return;
+ }
+ if (typeof callback !== "undefined") {
+ this.once("close", callback);
+ }
+ closeStream(this, code);
+ }
+
+ _destroy(err, callback) {
+ debugHttp2(">>> ClientHttp2Stream._destroy", err, callback);
+ const session = this[kSession];
+ const id = this[kID];
+
+ const state = this[kState];
+ const sessionState = session[kState];
+ const sessionCode = sessionState.goawayCode || sessionState.destroyCode;
+
+ let code = this.closed ? this.rstCode : sessionCode;
+ if (err != null) {
+ if (sessionCode) {
+ code = sessionCode;
+ } else if (err instanceof AbortError) {
+ code = constants.NGHTTP2_CANCEL;
+ } else {
+ code = constants.NGHTTP2_INTERNAL_ERROR;
+ }
+ }
+
+ if (!this.closed) {
+ // TODO(bartlomieju): this not handle `socket handle`
+ closeStream(this, code, kNoRstStream);
+ }
+
+ sessionState.streams.delete(id);
+ sessionState.pendingStreams.delete(this);
+
+ sessionState.writeQueueSize -= state.writeQueueSize;
+ state.writeQueueSize = 0;
+
+ const nameForErrorCode = {};
+ if (
+ err == null && code !== constants.NGHTTP2_NO_ERROR &&
+ code !== constants.NGHTTP2_CANCEL
+ ) {
+ err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code);
+ }
+
+ this[kSession] = undefined;
+
+ session[kMaybeDestroy]();
+ callback(err);
+ }
+
+ [kMaybeDestroy](code = constants.NGHTTP2_NO_ERROR) {
+ debugHttp2(
+ ">>> ClientHttp2Stream[kMaybeDestroy]",
+ code,
+ this.writableFinished,
+ this.readable,
+ this.closed,
+ );
+ if (code !== constants.NGHTTP2_NO_ERROR) {
+ this._destroy();
+ return;
+ }
+
+ if (this.writableFinished) {
+ if (!this.readable && this.closed) {
+ debugHttp2("going into _destroy");
+ this._destroy();
+ return;
+ }
+ }
+ }
+
+ setTimeout(msecs: number, callback?: () => void) {
+ // TODO(bartlomieju): fix this call, it's crashing on `this` being undefined;
+ // some strange transpilation quirk going on here.
+ setStreamTimeout.call(this, msecs, callback);
+ }
+}
+
+function shutdownWritable(stream, callback) {
+ debugHttp2(">>> shutdownWritable", callback);
+ const state = stream[kState];
+ if (state.shutdownWritableCalled) {
+ return callback();
+ }
+ state.shutdownWritableCalled = true;
+ onStreamTrailers(stream);
+ callback();
+ // TODO(bartlomieju): might have to add "finish" event listener here,
+ // check it.
+}
+
+function onStreamTrailers(stream) {
+ stream[kState].trailersReady = true;
+ debugHttp2(">>> onStreamTrailers", stream.destroyed, stream.closed);
+ if (stream.destroyed || stream.closed) {
+ return;
+ }
+ if (!stream.emit("wantTrailers")) {
+ debugHttp2(">>> onStreamTrailers no wantTrailers");
+ stream.sendTrailers({});
+ }
+ debugHttp2(">>> onStreamTrailers wantTrailers");
+}
+
+const kNoRstStream = 0;
+const kSubmitRstStream = 1;
+const kForceRstStream = 2;
+
+function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) {
+ const state = stream[kState];
+ state.flags |= STREAM_FLAGS_CLOSED;
+ state.rstCode = code;
+
+ stream.setTimeout(0);
+ stream.removeAllListeners("timeout");
+
+ const { ending } = stream._writableState;
+
+ if (!ending) {
+ if (!stream.aborted) {
+ state.flags |= STREAM_FLAGS_ABORTED;
+ stream.emit("aborted");
+ }
+
+ stream.end();
+ }
+
+ if (rstStreamStatus != kNoRstStream) {
+ debugHttp2(
+ ">>> closeStream",
+ !ending,
+ stream.writableFinished,
+ code !== constants.NGHTTP2_NO_ERROR,
+ rstStreamStatus === kForceRstStream,
+ );
+ if (
+ !ending || stream.writableFinished ||
+ code !== constants.NGHTTP2_NO_ERROR || rstStreamStatus === kForceRstStream
+ ) {
+ finishCloseStream(stream, code);
+ } else {
+ stream.once("finish", () => finishCloseStream(stream, code));
+ }
+ }
+}
+
+function finishCloseStream(stream, code) {
+ debugHttp2(">>> finishCloseStream", stream.readableEnded, code);
+ if (stream.pending) {
+ stream.push(null);
+ stream.once("ready", () => {
+ core.opAsync(
+ "op_http2_client_reset_stream",
+ stream[kDenoRid],
+ code,
+ ).then(() => {
+ debugHttp2(
+ ">>> finishCloseStream close",
+ stream[kDenoRid],
+ stream[kDenoResponse].bodyRid,
+ );
+ core.tryClose(stream[kDenoRid]);
+ core.tryClose(stream[kDenoResponse].bodyRid);
+ stream.emit("close");
+ });
+ });
+ } else {
+ stream.resume();
+ core.opAsync(
+ "op_http2_client_reset_stream",
+ stream[kDenoRid],
+ code,
+ ).then(() => {
+ debugHttp2(
+ ">>> finishCloseStream close2",
+ stream[kDenoRid],
+ stream[kDenoResponse].bodyRid,
+ );
+ core.tryClose(stream[kDenoRid]);
+ core.tryClose(stream[kDenoResponse].bodyRid);
+ stream.emit("close");
+ }).catch(() => {
+ debugHttp2(
+ ">>> finishCloseStream close2 catch",
+ stream[kDenoRid],
+ stream[kDenoResponse].bodyRid,
+ );
+ core.tryClose(stream[kDenoRid]);
+ core.tryClose(stream[kDenoResponse].bodyRid);
+ stream.emit("close");
+ });
}
}
+function callTimeout() {
+ notImplemented("callTimeout");
+}
+
export class ServerHttp2Stream extends Http2Stream {
_promise: Deferred<Response>;
#body: ReadableStream<Uint8Array>;
@@ -496,17 +1342,17 @@ export class Http2Server extends Server {
this.emit("stream", stream, headers);
return await stream._promise;
} catch (e) {
- console.log("Error in serveHttpOnConnection", e);
+ console.log(">>> Error in serveHttpOnConnection", e);
}
return new Response("");
},
() => {
- console.log("error");
+ console.log(">>> error");
},
() => {},
);
} catch (e) {
- console.log("Error in Http2Server", e);
+ console.log(">>> Error in Http2Server", e);
}
},
);
@@ -602,11 +1448,77 @@ export function connect(
options: Record<string, unknown>,
callback: (session: ClientHttp2Session) => void,
): ClientHttp2Session {
- return new ClientHttp2Session(authority, options, callback);
+ debugHttp2(">>> http2.connect", options);
+
+ if (typeof options === "function") {
+ callback = options;
+ options = undefined;
+ }
+
+ options = { ...options };
+
+ if (typeof authority === "string") {
+ authority = new URL(authority);
+ }
+
+ const protocol = authority.protocol || options.protocol || "https:";
+ let port = 0;
+
+ if (authority.port !== "") {
+ port = Number(authority.port);
+ } else if (protocol === "http:") {
+ port = 80;
+ } else {
+ port = 443;
+ }
+
+ if (port == 0) {
+ throw new Error("Invalid port");
+ }
+
+ let host = "localhost";
+
+ if (authority.hostname) {
+ host = authority.hostname;
+
+ if (host[0] === "[") {
+ host = host.slice(1, -1);
+ }
+ } else if (authority.host) {
+ host = authority.host;
+ }
+
+ // TODO(bartlomieju): handle defaults
+ if (typeof options.createConnection === "function") {
+ console.error("Not implemented: http2.connect.options.createConnection");
+ // notImplemented("http2.connect.options.createConnection");
+ }
+
+ let conn, url;
+ if (protocol == "http:") {
+ conn = Deno.connect({ port, hostname: host });
+ url = `http://${host}${port == 80 ? "" : (":" + port)}`;
+ } else if (protocol == "https:") {
+ conn = Deno.connectTls({ port, hostname: host, alpnProtocols: ["h2"] });
+ url = `http://${host}${port == 443 ? "" : (":" + port)}`;
+ } else {
+ throw new TypeError("Unexpected URL protocol");
+ }
+
+ const session = new ClientHttp2Session(conn, url, options);
+ session[kAuthority] = `${options.servername || host}:${port}`;
+ session[kProtocol] = protocol;
+
+ if (typeof callback === "function") {
+ session.once("connect", callback);
+ }
+ return session;
}
export const constants = {
NGHTTP2_ERR_FRAME_SIZE_ERROR: -522,
+ NGHTTP2_NV_FLAG_NONE: 0,
+ NGHTTP2_NV_FLAG_NO_INDEX: 1,
NGHTTP2_SESSION_SERVER: 0,
NGHTTP2_SESSION_CLIENT: 1,
NGHTTP2_STREAM_STATE_IDLE: 1,
@@ -849,6 +1761,49 @@ export const constants = {
HTTP_STATUS_NETWORK_AUTHENTICATION_REQUIRED: 511,
};
+// const kSingleValueHeaders = new Set([
+// constants.HTTP2_HEADER_STATUS,
+// constants.HTTP2_HEADER_METHOD,
+// constants.HTTP2_HEADER_AUTHORITY,
+// constants.HTTP2_HEADER_SCHEME,
+// constants.HTTP2_HEADER_PATH,
+// constants.HTTP2_HEADER_PROTOCOL,
+// constants.HTTP2_HEADER_ACCESS_CONTROL_ALLOW_CREDENTIALS,
+// constants.HTTP2_HEADER_ACCESS_CONTROL_MAX_AGE,
+// constants.HTTP2_HEADER_ACCESS_CONTROL_REQUEST_METHOD,
+// constants.HTTP2_HEADER_AGE,
+// constants.HTTP2_HEADER_AUTHORIZATION,
+// constants.HTTP2_HEADER_CONTENT_ENCODING,
+// constants.HTTP2_HEADER_CONTENT_LANGUAGE,
+// constants.HTTP2_HEADER_CONTENT_LENGTH,
+// constants.HTTP2_HEADER_CONTENT_LOCATION,
+// constants.HTTP2_HEADER_CONTENT_MD5,
+// constants.HTTP2_HEADER_CONTENT_RANGE,
+// constants.HTTP2_HEADER_CONTENT_TYPE,
+// constants.HTTP2_HEADER_DATE,
+// constants.HTTP2_HEADER_DNT,
+// constants.HTTP2_HEADER_ETAG,
+// constants.HTTP2_HEADER_EXPIRES,
+// constants.HTTP2_HEADER_FROM,
+// constants.HTTP2_HEADER_HOST,
+// constants.HTTP2_HEADER_IF_MATCH,
+// constants.HTTP2_HEADER_IF_MODIFIED_SINCE,
+// constants.HTTP2_HEADER_IF_NONE_MATCH,
+// constants.HTTP2_HEADER_IF_RANGE,
+// constants.HTTP2_HEADER_IF_UNMODIFIED_SINCE,
+// constants.HTTP2_HEADER_LAST_MODIFIED,
+// constants.HTTP2_HEADER_LOCATION,
+// constants.HTTP2_HEADER_MAX_FORWARDS,
+// constants.HTTP2_HEADER_PROXY_AUTHORIZATION,
+// constants.HTTP2_HEADER_RANGE,
+// constants.HTTP2_HEADER_REFERER,
+// constants.HTTP2_HEADER_RETRY_AFTER,
+// constants.HTTP2_HEADER_TK,
+// constants.HTTP2_HEADER_UPGRADE_INSECURE_REQUESTS,
+// constants.HTTP2_HEADER_USER_AGENT,
+// constants.HTTP2_HEADER_X_CONTENT_TYPE_OPTIONS,
+// ]);
+
export function getDefaultSettings(): Record<string, unknown> {
notImplemented("http2.getDefaultSettings");
return {};
diff --git a/test_util/Cargo.toml b/test_util/Cargo.toml
index 47f7d1d52..f48fce0b3 100644
--- a/test_util/Cargo.toml
+++ b/test_util/Cargo.toml
@@ -17,11 +17,13 @@ path = "src/test_server.rs"
anyhow.workspace = true
async-stream = "0.3.3"
base64.workspace = true
+bytes.workspace = true
console_static_text.workspace = true
fastwebsockets = { workspace = true, features = ["upgrade"] }
flate2.workspace = true
futures.workspace = true
glob.workspace = true
+h2.workspace = true
hyper = { workspace = true, features = ["server", "http1", "http2", "runtime"] }
lazy-regex.workspace = true
libc.workspace = true
diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs
index 0600867d7..635520b44 100644
--- a/test_util/src/lib.rs
+++ b/test_util/src/lib.rs
@@ -7,6 +7,7 @@ use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
use hyper::header::HeaderValue;
+use hyper::http;
use hyper::server::Server;
use hyper::service::make_service_fn;
use hyper::service::service_fn;
@@ -57,6 +58,7 @@ use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_rustls::rustls;
+use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
use url::Url;
@@ -102,6 +104,8 @@ const WS_PORT: u16 = 4242;
const WSS_PORT: u16 = 4243;
const WS_CLOSE_PORT: u16 = 4244;
const WS_PING_PORT: u16 = 4245;
+const H2_GRPC_PORT: u16 = 4246;
+const H2S_GRPC_PORT: u16 = 4247;
pub const PERMISSION_VARIANTS: [&str; 5] =
["read", "write", "env", "net", "run"];
@@ -1664,17 +1668,7 @@ async fn wrap_https_h1_only_tls_server() {
async fn wrap_https_h2_only_tls_server() {
let main_server_https_addr =
SocketAddr::from(([127, 0, 0, 1], H2_ONLY_TLS_PORT));
- let cert_file = "tls/localhost.crt";
- let key_file = "tls/localhost.key";
- let ca_cert_file = "tls/RootCA.pem";
- let tls_config = get_tls_config(
- cert_file,
- key_file,
- ca_cert_file,
- SupportedHttpVersions::Http2Only,
- )
- .await
- .unwrap();
+ let tls_config = create_tls_server_config().await;
loop {
let tcp = TcpListener::bind(&main_server_https_addr)
.await
@@ -1707,6 +1701,20 @@ async fn wrap_https_h2_only_tls_server() {
}
}
+async fn create_tls_server_config() -> Arc<rustls::ServerConfig> {
+ let cert_file = "tls/localhost.crt";
+ let key_file = "tls/localhost.key";
+ let ca_cert_file = "tls/RootCA.pem";
+ get_tls_config(
+ cert_file,
+ key_file,
+ ca_cert_file,
+ SupportedHttpVersions::Http2Only,
+ )
+ .await
+ .unwrap()
+}
+
async fn wrap_https_h1_only_server() {
let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], H1_ONLY_PORT));
@@ -1729,6 +1737,103 @@ async fn wrap_https_h2_only_server() {
let _ = main_server_http.await;
}
+async fn h2_grpc_server() {
+ let addr = SocketAddr::from(([127, 0, 0, 1], H2_GRPC_PORT));
+ let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
+
+ let addr_tls = SocketAddr::from(([127, 0, 0, 1], H2S_GRPC_PORT));
+ let listener_tls = tokio::net::TcpListener::bind(addr_tls).await.unwrap();
+ let tls_config = create_tls_server_config().await;
+
+ async fn serve(socket: TcpStream) -> Result<(), anyhow::Error> {
+ let mut connection = h2::server::handshake(socket).await?;
+
+ while let Some(result) = connection.accept().await {
+ let (request, respond) = result?;
+ tokio::spawn(async move {
+ let _ = handle_request(request, respond).await;
+ });
+ }
+
+ Ok(())
+ }
+
+ async fn serve_tls(
+ socket: TlsStream<TcpStream>,
+ ) -> Result<(), anyhow::Error> {
+ let mut connection = h2::server::handshake(socket).await?;
+
+ while let Some(result) = connection.accept().await {
+ let (request, respond) = result?;
+ tokio::spawn(async move {
+ let _ = handle_request(request, respond).await;
+ });
+ }
+
+ Ok(())
+ }
+
+ async fn handle_request(
+ mut request: http::Request<h2::RecvStream>,
+ mut respond: h2::server::SendResponse<bytes::Bytes>,
+ ) -> Result<(), anyhow::Error> {
+ let body = request.body_mut();
+ while let Some(data) = body.data().await {
+ let data = data?;
+ let _ = body.flow_control().release_capacity(data.len());
+ }
+
+ let maybe_recv_trailers = body.trailers().await?;
+
+ let response = http::Response::new(());
+ let mut send = respond.send_response(response, false)?;
+ send.send_data(bytes::Bytes::from_static(b"hello "), false)?;
+ send.send_data(bytes::Bytes::from_static(b"world\n"), false)?;
+ let mut trailers = http::HeaderMap::new();
+ trailers.insert(
+ http::HeaderName::from_static("abc"),
+ HeaderValue::from_static("def"),
+ );
+ trailers.insert(
+ http::HeaderName::from_static("opr"),
+ HeaderValue::from_static("stv"),
+ );
+ if let Some(recv_trailers) = maybe_recv_trailers {
+ for (key, value) in recv_trailers {
+ trailers.insert(key.unwrap(), value);
+ }
+ }
+ send.send_trailers(trailers)?;
+
+ Ok(())
+ }
+
+ let http = tokio::spawn(async move {
+ loop {
+ if let Ok((socket, _peer_addr)) = listener.accept().await {
+ tokio::spawn(async move {
+ let _ = serve(socket).await;
+ });
+ }
+ }
+ });
+
+ let https = tokio::spawn(async move {
+ loop {
+ if let Ok((socket, _peer_addr)) = listener_tls.accept().await {
+ let tls_acceptor = TlsAcceptor::from(tls_config.clone());
+ let tls = tls_acceptor.accept(socket).await.unwrap();
+ tokio::spawn(async move {
+ let _ = serve_tls(tls).await;
+ });
+ }
+ }
+ });
+
+ http.await.unwrap();
+ https.await.unwrap();
+}
+
async fn wrap_client_auth_https_server() {
let main_server_https_addr =
SocketAddr::from(([127, 0, 0, 1], HTTPS_CLIENT_AUTH_PORT));
@@ -1821,6 +1926,7 @@ pub async fn run_all_servers() {
let h2_only_server_tls_fut = wrap_https_h2_only_tls_server();
let h1_only_server_fut = wrap_https_h1_only_server();
let h2_only_server_fut = wrap_https_h2_only_server();
+ let h2_grpc_server_fut = h2_grpc_server();
let mut server_fut = async {
futures::join!(
@@ -1843,7 +1949,8 @@ pub async fn run_all_servers() {
h1_only_server_tls_fut,
h2_only_server_tls_fut,
h1_only_server_fut,
- h2_only_server_fut
+ h2_only_server_fut,
+ h2_grpc_server_fut,
)
}
.boxed();