summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-07-31 07:34:53 -0600
committerGitHub <noreply@github.com>2023-07-31 07:34:53 -0600
commit990ecc99d8248a66f1915b11e734e51ba104dcb3 (patch)
tree0539e6efbd26b493b58a4e14b3cdade04fb52835
parent43877f120905e9502472452d6d88ae4663cd9d94 (diff)
feat(ext/http): Upgrade to hyper1.0-rc4 (#19987)
Includes a lightly-modified version of hyper-util's `TokioIo` utility. Hyper changes: v1.0.0-rc.4 (2023-07-10) Bug Fixes http1: http1 server graceful shutdown fix (#3261) ([f4b51300](https://github.com/hyperium/hyper/commit/f4b513009d81083081d1c60c1981847bbb17dd5d)) send error on Incoming body when connection errors (#3256) ([52f19259](https://github.com/hyperium/hyper/commit/52f192593fb9ebcf6d3894e0c85cbf710da4decd), closes https://github.com/hyperium/hyper/issues/3253) properly end chunked bodies when it was known to be empty (#3254) ([fec64cf0](https://github.com/hyperium/hyper/commit/fec64cf0abdc678e30ca5f1b310c5118b2e01999), closes https://github.com/hyperium/hyper/issues/3252) Features client: Make clients able to use non-Send executor (#3184) ([d977f209](https://github.com/hyperium/hyper/commit/d977f209bc6068d8f878b22803fc42d90c887fcc), closes https://github.com/hyperium/hyper/issues/3017) rt: replace IO traits with hyper::rt ones (#3230) ([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2), closes https://github.com/hyperium/hyper/issues/3110) add downcast on Sleep trait (#3125) ([d92d3917](https://github.com/hyperium/hyper/commit/d92d3917d950e4c61c37c2170f3ce273d2a0f7d1), closes https://github.com/hyperium/hyper/issues/3027) service: change Service::call to take &self (#3223) ([d894439e](https://github.com/hyperium/hyper/commit/d894439e009aa75103f6382a7ba98fb17da72f02), closes https://github.com/hyperium/hyper/issues/3040) Breaking Changes Any IO transport type provided must not implement hyper::rt::{Read, Write} instead of tokio::io traits. You can grab a helper type from hyper-util to wrap Tokio types, or implement the traits yourself, if it's a custom type. ([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2)) client::conn::http2 types now use another generic for an Executor. Code that names Connection needs to include the additional generic parameter. ([d977f209](https://github.com/hyperium/hyper/commit/d977f209bc6068d8f878b22803fc42d90c887fcc)) The Service::call function no longer takes a mutable reference to self. The FnMut trait bound on the service::util::service_fn function and the trait bound on the impl for the ServiceFn struct were changed from FnMut to Fn.
-rw-r--r--Cargo.lock7
-rw-r--r--ext/http/Cargo.toml2
-rw-r--r--ext/http/http_next.rs8
-rw-r--r--ext/http/hyper_util_tokioio.rs206
-rw-r--r--ext/http/lib.rs7
5 files changed, 220 insertions, 10 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8ecca48c6..349354d25 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1166,7 +1166,7 @@ dependencies = [
"http",
"httparse",
"hyper 0.14.26",
- "hyper 1.0.0-rc.3",
+ "hyper 1.0.0-rc.4",
"memmem",
"mime",
"once_cell",
@@ -2530,13 +2530,12 @@ dependencies = [
[[package]]
name = "hyper"
-version = "1.0.0-rc.3"
+version = "1.0.0-rc.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7b75264b2003a3913f118d35c586e535293b3e22e41f074930762929d071e092"
+checksum = "d280a71f348bcc670fc55b02b63c53a04ac0bf2daff2980795aeaf53edae10e6"
dependencies = [
"bytes",
"futures-channel",
- "futures-core",
"futures-util",
"h2",
"http",
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index cb47db6b2..6c7d6e7bb 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -36,7 +36,7 @@ fly-accept-encoding = "0.2.0"
http.workspace = true
httparse.workspace = true
hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] }
-hyper1 = { package = "hyper", features = ["full"], version = "=1.0.0-rc.3" }
+hyper1 = { package = "hyper", features = ["full"], version = "=1.0.0-rc.4" }
memmem.workspace = true
mime = "0.3.16"
once_cell.workspace = true
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index cd63bc899..7cf088e30 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::compressible::is_content_compressible;
use crate::extract_network_stream;
+use crate::hyper_util_tokioio::TokioIo;
use crate::network_buffered_stream::NetworkStreamPrefixCheck;
use crate::request_body::HttpRequestBody;
use crate::request_properties::HttpConnectionProperties;
@@ -139,7 +140,7 @@ pub fn op_http_upgrade_raw(
let mut http = slab_get(slab_id);
*http.response() = response;
http.complete();
- let mut upgraded = upgrade.await?;
+ let mut upgraded = TokioIo::new(upgrade.await?);
upgraded.write_all(&bytes).await?;
break upgraded;
}
@@ -709,7 +710,7 @@ fn serve_http11_unconditional(
let conn = http1::Builder::new()
.keep_alive(true)
.writev(*USE_WRITEV)
- .serve_connection(io, svc);
+ .serve_connection(TokioIo::new(io), svc);
conn.with_upgrades().map_err(AnyError::from)
}
@@ -718,7 +719,8 @@ fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
) -> impl Future<Output = Result<(), AnyError>> + 'static {
- let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc);
+ let conn =
+ http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
conn.map_err(AnyError::from)
}
diff --git a/ext/http/hyper_util_tokioio.rs b/ext/http/hyper_util_tokioio.rs
new file mode 100644
index 000000000..a2d649ccc
--- /dev/null
+++ b/ext/http/hyper_util_tokioio.rs
@@ -0,0 +1,206 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+
+// Copyright 2023 Sean McArthur <sean@seanmonstar.com>
+// MIT licensed copy of unreleased hyper-util code from
+// https://raw.githubusercontent.com/hyperium/hyper-util/master/src/rt/tokio_io.rs
+
+#![allow(dead_code)]
+//! Tokio IO integration for hyper
+use hyper1 as hyper;
+use std::pin::Pin;
+use std::task::Context;
+use std::task::Poll;
+
+use pin_project::pin_project;
+
+/// A wrapping implementing hyper IO traits for a type that
+/// implements Tokio's IO traits.
+#[pin_project]
+#[derive(Debug)]
+pub struct TokioIo<T> {
+ #[pin]
+ inner: T,
+}
+
+impl<T> TokioIo<T> {
+ /// Wrap a type implementing Tokio's IO traits.
+ pub fn new(inner: T) -> Self {
+ Self { inner }
+ }
+
+ /// Borrow the inner type.
+ pub fn inner(&self) -> &T {
+ &self.inner
+ }
+
+ /// Consume this wrapper and get the inner type.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
+
+impl<T> hyper::rt::Read for TokioIo<T>
+where
+ T: tokio::io::AsyncRead,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ mut buf: hyper::rt::ReadBufCursor<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ // SAFETY: Imported code from hyper-util
+ let n = unsafe {
+ let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
+ match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf)
+ {
+ Poll::Ready(Ok(())) => tbuf.filled().len(),
+ other => return other,
+ }
+ };
+
+ // SAFETY: Imported code from hyper-util
+ unsafe {
+ buf.advance(n);
+ }
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<T> hyper::rt::Write for TokioIo<T>
+where
+ T: tokio::io::AsyncWrite,
+{
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ tokio::io::AsyncWrite::is_write_vectored(&self.inner)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
+ }
+}
+
+impl<T> tokio::io::AsyncRead for TokioIo<T>
+where
+ T: hyper::rt::Read,
+{
+ fn poll_read(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ tbuf: &mut tokio::io::ReadBuf<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ //let init = tbuf.initialized().len();
+ let filled = tbuf.filled().len();
+ // SAFETY: Imported code from hyper-util
+ let sub_filled = unsafe {
+ let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
+
+ match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled())
+ {
+ Poll::Ready(Ok(())) => buf.filled().len(),
+ other => return other,
+ }
+ };
+
+ let n_filled = filled + sub_filled;
+ // At least sub_filled bytes had to have been initialized.
+ let n_init = sub_filled;
+ // SAFETY: Imported code from hyper-util
+ unsafe {
+ tbuf.assume_init(n_init);
+ tbuf.set_filled(n_filled);
+ }
+
+ Poll::Ready(Ok(()))
+ }
+}
+
+impl<T> tokio::io::AsyncWrite for TokioIo<T>
+where
+ T: hyper::rt::Write,
+{
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ hyper::rt::Write::poll_write(self.project().inner, cx, buf)
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ hyper::rt::Write::poll_flush(self.project().inner, cx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Result<(), std::io::Error>> {
+ hyper::rt::Write::poll_shutdown(self.project().inner, cx)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ hyper::rt::Write::is_write_vectored(&self.inner)
+ }
+
+ fn poll_write_vectored(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[std::io::IoSlice<'_>],
+ ) -> Poll<Result<usize, std::io::Error>> {
+ hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs)
+ }
+}
+
+/// A wrapping implementing Tokio IO traits for a type that
+/// implements Hyper's IO traits.
+#[pin_project]
+#[derive(Debug)]
+pub struct TokioIoForHyper<T> {
+ #[pin]
+ inner: T,
+}
+
+impl<T> TokioIoForHyper<T> {
+ /// Wrap a type implementing Tokio's IO traits.
+ pub fn new(inner: T) -> Self {
+ Self { inner }
+ }
+
+ /// Borrow the inner type.
+ pub fn inner(&self) -> &T {
+ &self.inner
+ }
+
+ /// Consume this wrapper and get the inner type.
+ pub fn into_inner(self) -> T {
+ self.inner
+ }
+}
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index c33c1d15e..5a8788f92 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -50,6 +50,7 @@ use hyper::Body;
use hyper::HeaderMap;
use hyper::Request;
use hyper::Response;
+use hyper_util_tokioio::TokioIo;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
@@ -76,6 +77,7 @@ use crate::reader_stream::ShutdownHandle;
pub mod compressible;
mod http_next;
+mod hyper_util_tokioio;
mod network_buffered_stream;
mod reader_stream;
mod request_body;
@@ -1061,8 +1063,9 @@ impl CanDowncastUpgrade for hyper1::upgrade::Upgraded {
fn downcast<T: AsyncRead + AsyncWrite + Unpin + 'static>(
self,
) -> Result<(T, Bytes), Self> {
- let hyper1::upgrade::Parts { io, read_buf, .. } = self.downcast()?;
- Ok((io, read_buf))
+ let hyper1::upgrade::Parts { io, read_buf, .. } =
+ self.downcast::<TokioIo<T>>()?;
+ Ok((io.into_inner(), read_buf))
}
}