summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-05-10 13:23:14 +0200
committerGitHub <noreply@github.com>2023-05-10 13:23:14 +0200
commit234cef982c12a6c46aa8ba3787920f7b9a856be3 (patch)
tree5aac91fce7f5d66727884bbc1897772979346aff
parent3dc745c881c43b9df4aa895291b9e13186be3f17 (diff)
feat(ext/http): Automatic compression for Deno.serve (#19031)
`Content-Encoding: gzip` support for `Deno.serve`. This doesn't support Brotli (`br`) yet, however it should not be difficult to add. Heuristics for compression are modelled after those in `Deno.serveHttp`. Tests are provided to ensure that the gzip compression is correct. We chunk a number of different streams (zeros, hard-to-compress data, already-gzipped data) in a number of different ways (regular, random, large/small, small/large).
-rw-r--r--Cargo.lock1
-rw-r--r--cli/tests/unit/serve_test.ts216
-rw-r--r--ext/http/Cargo.toml1
-rw-r--r--ext/http/http_next.rs178
-rw-r--r--ext/http/response_body.rs603
5 files changed, 845 insertions, 154 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 01cadaa17..4fb268627 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1046,6 +1046,7 @@ dependencies = [
"percent-encoding",
"phf",
"pin-project",
+ "rand",
"ring",
"serde",
"slab",
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts
index ce7267f58..2bd2314b7 100644
--- a/cli/tests/unit/serve_test.ts
+++ b/cli/tests/unit/serve_test.ts
@@ -1425,41 +1425,6 @@ Deno.test(
},
);
-Deno.test(
- { permissions: { net: true, write: true, read: true } },
- async function httpServerCorrectSizeResponse() {
- const promise = deferred();
- const listeningPromise = deferred();
- const ac = new AbortController();
-
- const tmpFile = await Deno.makeTempFile();
- const file = await Deno.open(tmpFile, { write: true, read: true });
- await file.write(new Uint8Array(70 * 1024).fill(1)); // 70kb sent in 64kb + 6kb chunks
- file.close();
-
- const server = Deno.serve({
- handler: async (request) => {
- const f = await Deno.open(tmpFile, { read: true });
- promise.resolve();
- return new Response(f.readable);
- },
- port: 4503,
- signal: ac.signal,
- onListen: onListen(listeningPromise),
- onError: createOnErrorCb(ac),
- });
-
- await listeningPromise;
- const resp = await fetch("http://127.0.0.1:4503/");
- await promise;
- const body = await resp.arrayBuffer();
-
- assertEquals(body.byteLength, 70 * 1024);
- ac.abort();
- await server;
- },
-);
-
// https://github.com/denoland/deno/issues/12741
// https://github.com/denoland/deno/pull/12746
// https://github.com/denoland/deno/pull/12798
@@ -2012,38 +1977,146 @@ Deno.test(
},
);
-Deno.test(
- { permissions: { net: true, write: true, read: true } },
- async function httpServerSendFile() {
- const promise = deferred();
- const ac = new AbortController();
- const listeningPromise = deferred();
- const tmpFile = await Deno.makeTempFile();
- const file = await Deno.open(tmpFile, { write: true, read: true });
- const data = new Uint8Array(70 * 1024).fill(1);
- await file.write(data);
- file.close();
- const server = Deno.serve({
- handler: async () => {
- const f = await Deno.open(tmpFile, { read: true });
- promise.resolve();
- return new Response(f.readable, { status: 200 });
- },
- port: 4503,
- signal: ac.signal,
- onListen: onListen(listeningPromise),
- onError: createOnErrorCb(ac),
- });
+function makeTempData(size: number) {
+ return new Uint8Array(size).fill(1);
+}
- await listeningPromise;
- const response = await fetch(`http://localhost:4503/`);
- assertEquals(response.status, 200);
- await promise;
- assertEquals(new Uint8Array(await response.arrayBuffer()), data);
- ac.abort();
- await server;
+async function makeTempFile(size: number) {
+ const tmpFile = await Deno.makeTempFile();
+ const file = await Deno.open(tmpFile, { write: true, read: true });
+ const data = makeTempData(size);
+ await file.write(data);
+ file.close();
+
+ return await Deno.open(tmpFile, { write: true, read: true });
+}
+
+const compressionTestCases = [
+ { name: "Empty", length: 0, in: {}, out: {}, expect: null },
+ {
+ name: "EmptyAcceptGzip",
+ length: 0,
+ in: { "Accept-Encoding": "gzip" },
+ out: {},
+ expect: null,
},
-);
+ // This technically would be compressible if not for the size, however the size_hint is not implemented
+ // for FileResource and we don't currently peek ahead on resources.
+ // {
+ // name: "EmptyAcceptGzip2",
+ // length: 0,
+ // in: { "Accept-Encoding": "gzip" },
+ // out: { "Content-Type": "text/plain" },
+ // expect: null,
+ // },
+ { name: "Uncompressible", length: 1024, in: {}, out: {}, expect: null },
+ {
+ name: "UncompressibleAcceptGzip",
+ length: 1024,
+ in: { "Accept-Encoding": "gzip" },
+ out: {},
+ expect: null,
+ },
+ {
+ name: "UncompressibleType",
+ length: 1024,
+ in: { "Accept-Encoding": "gzip" },
+ out: { "Content-Type": "text/fake" },
+ expect: null,
+ },
+ {
+ name: "CompressibleType",
+ length: 1024,
+ in: { "Accept-Encoding": "gzip" },
+ out: { "Content-Type": "text/plain" },
+ expect: "gzip",
+ },
+ {
+ name: "CompressibleType2",
+ length: 1024,
+ in: { "Accept-Encoding": "gzip, deflate, br" },
+ out: { "Content-Type": "text/plain" },
+ expect: "gzip",
+ },
+ {
+ name: "UncompressibleRange",
+ length: 1024,
+ in: { "Accept-Encoding": "gzip" },
+ out: { "Content-Type": "text/plain", "Content-Range": "1" },
+ expect: null,
+ },
+ {
+ name: "UncompressibleCE",
+ length: 1024,
+ in: { "Accept-Encoding": "gzip" },
+ out: { "Content-Type": "text/plain", "Content-Encoding": "random" },
+ expect: null,
+ },
+ {
+ name: "UncompressibleCC",
+ length: 1024,
+ in: { "Accept-Encoding": "gzip" },
+ out: { "Content-Type": "text/plain", "Cache-Control": "no-transform" },
+ expect: null,
+ },
+];
+
+for (const testCase of compressionTestCases) {
+ const name = `httpServerCompression${testCase.name}`;
+ Deno.test(
+ { permissions: { net: true, write: true, read: true } },
+ {
+ [name]: async function () {
+ const promise = deferred();
+ const ac = new AbortController();
+ const listeningPromise = deferred();
+ const server = Deno.serve({
+ handler: async (request) => {
+ const f = await makeTempFile(testCase.length);
+ promise.resolve();
+ const headers = testCase.out as any;
+ headers["Content-Length"] = testCase.length.toString();
+ return new Response(f.readable, {
+ headers: headers as HeadersInit,
+ });
+ },
+ port: 4503,
+ signal: ac.signal,
+ onListen: onListen(listeningPromise),
+ onError: createOnErrorCb(ac),
+ });
+ try {
+ await listeningPromise;
+ const resp = await fetch("http://127.0.0.1:4503/", {
+ headers: testCase.in as HeadersInit,
+ });
+ await promise;
+ const body = await resp.arrayBuffer();
+ if (testCase.expect == null) {
+ assertEquals(body.byteLength, testCase.length);
+ assertEquals(
+ resp.headers.get("content-length"),
+ testCase.length.toString(),
+ );
+ assertEquals(
+ resp.headers.get("content-encoding"),
+ testCase.out["Content-Encoding"] || null,
+ );
+ } else if (testCase.expect == "gzip") {
+ // Note the fetch will transparently decompress this response, BUT we can detect that a response
+ // was compressed by the lack of a content length.
+ assertEquals(body.byteLength, testCase.length);
+ assertEquals(resp.headers.get("content-encoding"), null);
+ assertEquals(resp.headers.get("content-length"), null);
+ }
+ } finally {
+ ac.abort();
+ await server;
+ }
+ },
+ }[name],
+ );
+}
Deno.test(
{ permissions: { net: true, write: true, read: true } },
@@ -2052,15 +2125,12 @@ Deno.test(
const ac = new AbortController();
const listeningPromise = deferred();
- const tmpFile = await Deno.makeTempFile();
- const file = await Deno.open(tmpFile, { write: true, read: true });
- const data = new Uint8Array(70 * 1024).fill(1);
- await file.write(data);
- file.close();
-
const server = Deno.serve({
handler: async (request) => {
- assertEquals(new Uint8Array(await request.arrayBuffer()), data);
+ assertEquals(
+ new Uint8Array(await request.arrayBuffer()),
+ makeTempData(70 * 1024),
+ );
promise.resolve();
return new Response("ok");
},
@@ -2071,7 +2141,7 @@ Deno.test(
});
await listeningPromise;
- const f = await Deno.open(tmpFile, { write: true, read: true });
+ const f = await makeTempFile(70 * 1024);
const response = await fetch(`http://localhost:4503/`, {
method: "POST",
body: f.readable,
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index 9691879ad..c1de81170 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -50,3 +50,4 @@ tokio-util = { workspace = true, features = ["io"] }
[dev-dependencies]
bencher.workspace = true
+rand.workspace = true
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index f3d37f751..080cfea6c 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -1,3 +1,4 @@
+use crate::compressible::is_content_compressible;
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::extract_network_stream;
use crate::network_buffered_stream::NetworkStreamPrefixCheck;
@@ -7,17 +8,18 @@ use crate::request_properties::HttpConnectionProperties;
use crate::request_properties::HttpListenProperties;
use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::CompletionHandle;
+use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::response_body::V8StreamHttpResponseBody;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
+use cache_control::CacheControl;
use deno_core::error::AnyError;
use deno_core::futures::TryFutureExt;
use deno_core::op;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
-use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@@ -31,7 +33,15 @@ use deno_net::ops_tls::TlsStream;
use deno_net::raw::put_network_stream_resource;
use deno_net::raw::NetworkStream;
use deno_net::raw::NetworkStreamAddress;
+use fly_accept_encoding::Encoding;
+use http::header::ACCEPT_ENCODING;
+use http::header::CACHE_CONTROL;
+use http::header::CONTENT_ENCODING;
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_RANGE;
+use http::header::CONTENT_TYPE;
use http::request::Parts;
+use http::HeaderMap;
use hyper1::body::Incoming;
use hyper1::header::COOKIE;
use hyper1::http::HeaderName;
@@ -483,6 +493,131 @@ pub fn op_http_set_response_headers(
})
}
+fn is_request_compressible(headers: &HeaderMap) -> Compression {
+ let Some(accept_encoding) = headers.get(ACCEPT_ENCODING) else {
+ return Compression::None;
+ };
+ // Firefox and Chrome send this -- no need to parse
+ if accept_encoding == "gzip, deflate, br" {
+ return Compression::GZip;
+ }
+ if accept_encoding == "gzip" {
+ return Compression::GZip;
+ }
+ // Fall back to the expensive parser
+ let accepted = fly_accept_encoding::encodings_iter(headers).filter(|r| {
+ matches!(r, Ok((Some(Encoding::Identity | Encoding::Gzip), _)))
+ });
+ #[allow(clippy::single_match)]
+ match fly_accept_encoding::preferred(accepted) {
+ Ok(Some(fly_accept_encoding::Encoding::Gzip)) => return Compression::GZip,
+ _ => {}
+ }
+ Compression::None
+}
+
+fn is_response_compressible(headers: &HeaderMap) -> bool {
+ if let Some(content_type) = headers.get(CONTENT_TYPE) {
+ if !is_content_compressible(content_type) {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ if headers.contains_key(CONTENT_ENCODING) {
+ return false;
+ }
+ if headers.contains_key(CONTENT_RANGE) {
+ return false;
+ }
+ if let Some(cache_control) = headers.get(CACHE_CONTROL) {
+ if let Ok(s) = std::str::from_utf8(cache_control.as_bytes()) {
+ if let Some(cache_control) = CacheControl::from_value(s) {
+ if cache_control.no_transform {
+ return false;
+ }
+ }
+ }
+ }
+ true
+}
+
+fn modify_compressibility_from_response(
+ compression: Compression,
+ length: Option<usize>,
+ headers: &mut HeaderMap,
+) -> Compression {
+ ensure_vary_accept_encoding(headers);
+ if let Some(length) = length {
+ // By the time we add compression headers and Accept-Encoding, it probably doesn't make sense
+ // to compress stuff that's smaller than this.
+ if length < 64 {
+ return Compression::None;
+ }
+ }
+ if compression == Compression::None {
+ return Compression::None;
+ }
+ if !is_response_compressible(headers) {
+ return Compression::None;
+ }
+ weaken_etag(headers);
+ headers.remove(CONTENT_LENGTH);
+ headers.insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
+ compression
+}
+
+/// If the user provided a ETag header for uncompressed data, we need to ensure it is a
+/// weak Etag header ("W/").
+fn weaken_etag(hmap: &mut HeaderMap) {
+ if let Some(etag) = hmap.get_mut(hyper::header::ETAG) {
+ if !etag.as_bytes().starts_with(b"W/") {
+ let mut v = Vec::with_capacity(etag.as_bytes().len() + 2);
+ v.extend(b"W/");
+ v.extend(etag.as_bytes());
+ *etag = v.try_into().unwrap();
+ }
+ }
+}
+
+// Set Vary: Accept-Encoding header for direct body response.
+// Note: we set the header irrespective of whether or not we compress the data
+// to make sure cache services do not serve uncompressed data to clients that
+// support compression.
+fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
+ if let Some(v) = hmap.get_mut(hyper::header::VARY) {
+ if let Ok(s) = v.to_str() {
+ if !s.to_lowercase().contains("accept-encoding") {
+ *v = format!("Accept-Encoding, {s}").try_into().unwrap()
+ }
+ return;
+ }
+ }
+ hmap.insert(
+ hyper::header::VARY,
+ HeaderValue::from_static("Accept-Encoding"),
+ );
+}
+
+fn set_response(
+ index: u32,
+ length: Option<usize>,
+ response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
+) {
+ let compression =
+ with_req(index, |req| is_request_compressible(&req.headers));
+
+ with_resp_mut(index, move |response| {
+ let response = response.as_mut().unwrap();
+ let compression = modify_compressibility_from_response(
+ compression,
+ length,
+ response.headers_mut(),
+ );
+ response.body_mut().initialize(response_fn(compression))
+ });
+}
+
#[op(fast)]
pub fn op_http_set_response_body_resource(
state: &mut OpState,
@@ -497,14 +632,13 @@ pub fn op_http_set_response_body_resource(
state.resource_table.get_any(stream_rid)?
};
- with_resp_mut(index, move |response| {
- let future = resource.clone().read(64 * 1024);
- response
- .as_mut()
- .unwrap()
- .body_mut()
- .initialize(ResponseBytesInner::Resource(auto_close, resource, future));
- });
+ set_response(
+ index,
+ resource.size_hint().1.map(|s| s as usize),
+ move |compression| {
+ ResponseBytesInner::from_resource(compression, resource, auto_close)
+ },
+ );
Ok(())
}
@@ -516,27 +650,19 @@ pub fn op_http_set_response_body_stream(
) -> Result<ResourceId, AnyError> {
// TODO(mmastrac): what should this channel size be?
let (tx, rx) = tokio::sync::mpsc::channel(1);
- let (tx, rx) = (
- V8StreamHttpResponseBody::new(tx),
- ResponseBytesInner::V8Stream(rx),
- );
- with_resp_mut(index, move |response| {
- response.as_mut().unwrap().body_mut().initialize(rx);
+ set_response(index, None, |compression| {
+ ResponseBytesInner::from_v8(compression, rx)
});
- Ok(state.resource_table.add(tx))
+ Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx)))
}
#[op(fast)]
pub fn op_http_set_response_body_text(index: u32, text: String) {
if !text.is_empty() {
- with_resp_mut(index, move |response| {
- response
- .as_mut()
- .unwrap()
- .body_mut()
- .initialize(ResponseBytesInner::Bytes(BufView::from(text.into_bytes())))
+ set_response(index, Some(text.len()), |compression| {
+ ResponseBytesInner::from_vec(compression, text.into_bytes())
});
}
}
@@ -544,12 +670,8 @@ pub fn op_http_set_response_body_text(index: u32, text: String) {
#[op(fast)]
pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) {
if !buffer.is_empty() {
- with_resp_mut(index, |response| {
- response
- .as_mut()
- .unwrap()
- .body_mut()
- .initialize(ResponseBytesInner::Bytes(BufView::from(buffer.to_vec())))
+ set_response(index, Some(buffer.len()), |compression| {
+ ResponseBytesInner::from_slice(compression, buffer)
});
};
}
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index 0086e4d78..288d74758 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -2,12 +2,16 @@
use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
+use std::io::Write;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Waker;
+use bytes::Bytes;
+use bytes::BytesMut;
use deno_core::error::bad_resource;
use deno_core::error::AnyError;
+use deno_core::futures::ready;
use deno_core::futures::FutureExt;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
@@ -17,9 +21,44 @@ use deno_core::CancelTryFuture;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::WriteOutcome;
+use flate2::write::GzEncoder;
+use http::HeaderMap;
use hyper1::body::Body;
use hyper1::body::Frame;
use hyper1::body::SizeHint;
+use pin_project::pin_project;
+
+/// Simplification for nested types we use for our streams. We provide a way to convert from
+/// this type into Hyper's body [`Frame`].
+enum ResponseStreamResult {
+ /// Stream is over.
+ EndOfStream,
+ /// Stream provided non-empty data.
+ NonEmptyBuf(BufView),
+ /// Stream is ready, but provided no data. Retry. This is a result that is like Pending, but does
+ /// not register a waker and should be called again at the lowest level of this code. Generally this
+ /// will only be returned from compression streams that require additional buffering.
+ NoData,
+ /// Stream provided trailers.
+ // TODO(mmastrac): We are threading trailers through the response system to eventually support Grpc.
+ #[allow(unused)]
+ Trailers(HeaderMap),
+ /// Stream failed.
+ Error(AnyError),
+}
+
+impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> {
+ fn from(value: ResponseStreamResult) -> Self {
+ match value {
+ ResponseStreamResult::EndOfStream => None,
+ ResponseStreamResult::NonEmptyBuf(buf) => Some(Ok(Frame::data(buf))),
+ ResponseStreamResult::Error(err) => Some(Err(err)),
+ ResponseStreamResult::Trailers(map) => Some(Ok(Frame::trailers(map))),
+ // This result should be handled by retrying
+ ResponseStreamResult::NoData => unimplemented!(),
+ }
+ }
+}
#[derive(Clone, Debug, Default)]
pub struct CompletionHandle {
@@ -62,6 +101,28 @@ impl Future for CompletionHandle {
}
}
+trait PollFrame: Unpin {
+ fn poll_frame(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<ResponseStreamResult>;
+
+ fn size_hint(&self) -> SizeHint;
+}
+
+#[derive(PartialEq, Eq)]
+pub enum Compression {
+ None,
+ GZip,
+}
+
+pub enum ResponseStream {
+ /// A resource stream, piped in fast mode.
+ Resource(ResourceBodyAdapter),
+ /// A JS-backed stream, written in JS and transported via pipe.
+ V8Stream(tokio::sync::mpsc::Receiver<BufView>),
+}
+
#[derive(Default)]
pub enum ResponseBytesInner {
/// An empty stream.
@@ -69,12 +130,12 @@ pub enum ResponseBytesInner {
Empty,
/// A completed stream.
Done,
- /// A static buffer of bytes, sent it one fell swoop.
+ /// A static buffer of bytes, sent in one fell swoop.
Bytes(BufView),
- /// A resource stream, piped in fast mode.
- Resource(bool, Rc<dyn Resource>, AsyncResult<BufView>),
- /// A JS-backed stream, written in JS and transported via pipe.
- V8Stream(tokio::sync::mpsc::Receiver<BufView>),
+ /// An uncompressed stream.
+ UncompressedStream(ResponseStream),
+ /// A GZip stream.
+ GZipStream(GZipResponseStream),
}
impl std::fmt::Debug for ResponseBytesInner {
@@ -83,8 +144,8 @@ impl std::fmt::Debug for ResponseBytesInner {
Self::Done => f.write_str("Done"),
Self::Empty => f.write_str("Empty"),
Self::Bytes(..) => f.write_str("Bytes"),
- Self::Resource(..) => f.write_str("Resource"),
- Self::V8Stream(..) => f.write_str("V8Stream"),
+ Self::UncompressedStream(..) => f.write_str("Uncompressed"),
+ Self::GZipStream(..) => f.write_str("GZip"),
}
}
}
@@ -122,16 +183,54 @@ impl ResponseBytesInner {
Self::Done => SizeHint::with_exact(0),
Self::Empty => SizeHint::with_exact(0),
Self::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64),
- Self::Resource(_, res, _) => {
- let hint = res.size_hint();
- let mut size_hint = SizeHint::new();
- size_hint.set_lower(hint.0);
- if let Some(upper) = hint.1 {
- size_hint.set_upper(upper)
- }
- size_hint
- }
- Self::V8Stream(..) => SizeHint::default(),
+ Self::UncompressedStream(res) => res.size_hint(),
+ Self::GZipStream(..) => SizeHint::default(),
+ }
+ }
+
+ fn from_stream(compression: Compression, stream: ResponseStream) -> Self {
+ if compression == Compression::GZip {
+ Self::GZipStream(GZipResponseStream::new(stream))
+ } else {
+ Self::UncompressedStream(stream)
+ }
+ }
+
+ pub fn from_v8(
+ compression: Compression,
+ rx: tokio::sync::mpsc::Receiver<BufView>,
+ ) -> Self {
+ Self::from_stream(compression, ResponseStream::V8Stream(rx))
+ }
+
+ pub fn from_resource(
+ compression: Compression,
+ stm: Rc<dyn Resource>,
+ auto_close: bool,
+ ) -> Self {
+ Self::from_stream(
+ compression,
+ ResponseStream::Resource(ResourceBodyAdapter::new(stm, auto_close)),
+ )
+ }
+
+ pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self {
+ if compression == Compression::GZip {
+ let mut writer = GzEncoder::new(Vec::new(), flate2::Compression::fast());
+ writer.write_all(bytes).unwrap();
+ Self::Bytes(BufView::from(writer.finish().unwrap()))
+ } else {
+ Self::Bytes(BufView::from(bytes.to_vec()))
+ }
+ }
+
+ pub fn from_vec(compression: Compression, vec: Vec<u8>) -> Self {
+ if compression == Compression::GZip {
+ let mut writer = GzEncoder::new(Vec::new(), flate2::Compression::fast());
+ writer.write_all(&vec).unwrap();
+ Self::Bytes(BufView::from(writer.finish().unwrap()))
+ } else {
+ Self::Bytes(BufView::from(vec))
}
}
}
@@ -144,48 +243,33 @@ impl Body for ResponseBytes {
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
- match &mut self.0 {
- ResponseBytesInner::Done | ResponseBytesInner::Empty => {
- unreachable!()
- }
- ResponseBytesInner::Bytes(..) => {
- if let ResponseBytesInner::Bytes(data) = self.complete(true) {
- std::task::Poll::Ready(Some(Ok(Frame::data(data))))
- } else {
+ let res = loop {
+ let res = match &mut self.0 {
+ ResponseBytesInner::Done | ResponseBytesInner::Empty => {
unreachable!()
}
- }
- ResponseBytesInner::Resource(auto_close, stm, ref mut future) => {
- match future.poll_unpin(cx) {
- std::task::Poll::Pending => std::task::Poll::Pending,
- std::task::Poll::Ready(Err(err)) => {
- std::task::Poll::Ready(Some(Err(err)))
- }
- std::task::Poll::Ready(Ok(buf)) => {
- if buf.is_empty() {
- if *auto_close {
- stm.clone().close();
- }
- self.complete(true);
- return std::task::Poll::Ready(None);
- }
- // Re-arm the future
- *future = stm.clone().read(64 * 1024);
- std::task::Poll::Ready(Some(Ok(Frame::data(buf))))
- }
+ ResponseBytesInner::Bytes(..) => {
+ let ResponseBytesInner::Bytes(data) = self.complete(true) else { unreachable!(); };
+ return std::task::Poll::Ready(Some(Ok(Frame::data(data))));
}
- }
- ResponseBytesInner::V8Stream(stm) => match stm.poll_recv(cx) {
- std::task::Poll::Pending => std::task::Poll::Pending,
- std::task::Poll::Ready(Some(buf)) => {
- std::task::Poll::Ready(Some(Ok(Frame::data(buf))))
+ ResponseBytesInner::UncompressedStream(stm) => {
+ ready!(Pin::new(stm).poll_frame(cx))
}
- std::task::Poll::Ready(None) => {
- self.complete(true);
- std::task::Poll::Ready(None)
+ ResponseBytesInner::GZipStream(stm) => {
+ ready!(Pin::new(stm).poll_frame(cx))
}
- },
+ };
+ // This is where we retry the NoData response
+ if matches!(res, ResponseStreamResult::NoData) {
+ continue;
+ }
+ break res;
+ };
+
+ if matches!(res, ResponseStreamResult::EndOfStream) {
+ self.complete(true);
}
+ std::task::Poll::Ready(res.into())
}
fn is_end_stream(&self) -> bool {
@@ -206,6 +290,243 @@ impl Drop for ResponseBytes {
}
}
+pub struct ResourceBodyAdapter {
+ auto_close: bool,
+ stm: Rc<dyn Resource>,
+ future: AsyncResult<BufView>,
+}
+
+impl ResourceBodyAdapter {
+ pub fn new(stm: Rc<dyn Resource>, auto_close: bool) -> Self {
+ let future = stm.clone().read(64 * 1024);
+ ResourceBodyAdapter {
+ auto_close,
+ stm,
+ future,
+ }
+ }
+}
+
+impl PollFrame for ResponseStream {
+ fn poll_frame(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<ResponseStreamResult> {
+ match &mut *self {
+ ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx),
+ ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx),
+ }
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ match self {
+ ResponseStream::Resource(res) => res.size_hint(),
+ ResponseStream::V8Stream(res) => res.size_hint(),
+ }
+ }
+}
+
+impl PollFrame for ResourceBodyAdapter {
+ fn poll_frame(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<ResponseStreamResult> {
+ let res = match ready!(self.future.poll_unpin(cx)) {
+ Err(err) => ResponseStreamResult::Error(err),
+ Ok(buf) => {
+ if buf.is_empty() {
+ if self.auto_close {
+ self.stm.clone().close();
+ }
+ ResponseStreamResult::EndOfStream
+ } else {
+ // Re-arm the future
+ self.future = self.stm.clone().read(64 * 1024);
+ ResponseStreamResult::NonEmptyBuf(buf)
+ }
+ }
+ };
+ std::task::Poll::Ready(res)
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ let hint = self.stm.size_hint();
+ let mut size_hint = SizeHint::new();
+ size_hint.set_lower(hint.0);
+ if let Some(upper) = hint.1 {
+ size_hint.set_upper(upper)
+ }
+ size_hint
+ }
+}
+
+impl PollFrame for tokio::sync::mpsc::Receiver<BufView> {
+ fn poll_frame(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<ResponseStreamResult> {
+ let res = match ready!(self.poll_recv(cx)) {
+ Some(buf) => ResponseStreamResult::NonEmptyBuf(buf),
+ None => ResponseStreamResult::EndOfStream,
+ };
+ std::task::Poll::Ready(res)
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ SizeHint::default()
+ }
+}
+
+#[derive(Copy, Clone, Debug)]
+enum GZipState {
+ Header,
+ Streaming,
+ Flushing,
+ Trailer,
+ EndOfStream,
+}
+
+#[pin_project]
+pub struct GZipResponseStream {
+ stm: flate2::Compress,
+ crc: flate2::Crc,
+ next_buf: Option<BytesMut>,
+ partial: Option<BufView>,
+ #[pin]
+ underlying: ResponseStream,
+ state: GZipState,
+}
+
+impl GZipResponseStream {
+ pub fn new(underlying: ResponseStream) -> Self {
+ Self {
+ stm: flate2::Compress::new(flate2::Compression::fast(), false),
+ crc: flate2::Crc::new(),
+ next_buf: None,
+ partial: None,
+ state: GZipState::Header,
+ underlying,
+ }
+ }
+}
+
+/// This is a minimal GZip header suitable for serving data from a webserver. We don't need to provide
+/// most of the information. We're skipping header name, CRC, etc, and providing a null timestamp.
+///
+/// We're using compression level 1, as higher levels don't produce significant size differences. This
+/// is probably the reason why nginx's default gzip compression level is also 1:
+///
+/// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
+static GZIP_HEADER: Bytes =
+ Bytes::from_static(&[0x1f, 0x8b, 0x08, 0, 0, 0, 0, 0, 0x01, 0xff]);
+
+impl PollFrame for GZipResponseStream {
+ fn poll_frame(
+ self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<ResponseStreamResult> {
+ let this = self.get_mut();
+ let state = &mut this.state;
+ let orig_state = *state;
+ let frame = match *state {
+ GZipState::EndOfStream => {
+ return std::task::Poll::Ready(ResponseStreamResult::EndOfStream)
+ }
+ GZipState::Header => {
+ *state = GZipState::Streaming;
+ return std::task::Poll::Ready(ResponseStreamResult::NonEmptyBuf(
+ BufView::from(GZIP_HEADER.clone()),
+ ));
+ }
+ GZipState::Trailer => {
+ *state = GZipState::EndOfStream;
+ let mut v = Vec::with_capacity(8);
+ v.extend(&this.crc.sum().to_le_bytes());
+ v.extend(&this.crc.amount().to_le_bytes());
+ return std::task::Poll::Ready(ResponseStreamResult::NonEmptyBuf(
+ BufView::from(v),
+ ));
+ }
+ GZipState::Streaming => {
+ if let Some(partial) = this.partial.take() {
+ ResponseStreamResult::NonEmptyBuf(partial)
+ } else {
+ ready!(Pin::new(&mut this.underlying).poll_frame(cx))
+ }
+ }
+ GZipState::Flushing => ResponseStreamResult::EndOfStream,
+ };
+
+ let stm = &mut this.stm;
+
+ // Ideally we could use MaybeUninit here, but flate2 requires &[u8]. We should also try
+ // to dynamically adjust this buffer.
+ let mut buf = this
+ .next_buf
+ .take()
+ .unwrap_or_else(|| BytesMut::zeroed(64 * 1024));
+
+ let start_in = stm.total_in();
+ let start_out = stm.total_out();
+ let res = match frame {
+ // Short-circuit these and just return
+ x @ (ResponseStreamResult::NoData
+ | ResponseStreamResult::Error(..)
+ | ResponseStreamResult::Trailers(..)) => {
+ return std::task::Poll::Ready(x)
+ }
+ ResponseStreamResult::EndOfStream => {
+ *state = GZipState::Flushing;
+ stm.compress(&[], &mut buf, flate2::FlushCompress::Finish)
+ }
+ ResponseStreamResult::NonEmptyBuf(mut input) => {
+ let res = stm.compress(&input, &mut buf, flate2::FlushCompress::None);
+ let len_in = (stm.total_in() - start_in) as usize;
+ debug_assert!(len_in <= input.len());
+ this.crc.update(&input[..len_in]);
+ if len_in < input.len() {
+ input.advance_cursor(len_in);
+ this.partial = Some(input);
+ }
+ res
+ }
+ };
+ let len = stm.total_out() - start_out;
+ let res = match res {
+ Err(err) => ResponseStreamResult::Error(err.into()),
+ Ok(flate2::Status::BufError) => {
+ // This should not happen
+ unreachable!("old={orig_state:?} new={state:?} buf_len={}", buf.len());
+ }
+ Ok(flate2::Status::Ok) => {
+ if len == 0 {
+ this.next_buf = Some(buf);
+ ResponseStreamResult::NoData
+ } else {
+ buf.truncate(len as usize);
+ ResponseStreamResult::NonEmptyBuf(BufView::from(buf.freeze()))
+ }
+ }
+ Ok(flate2::Status::StreamEnd) => {
+ *state = GZipState::Trailer;
+ if len == 0 {
+ this.next_buf = Some(buf);
+ ResponseStreamResult::NoData
+ } else {
+ buf.truncate(len as usize);
+ ResponseStreamResult::NonEmptyBuf(BufView::from(buf.freeze()))
+ }
+ }
+ };
+
+ std::task::Poll::Ready(res)
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ SizeHint::default()
+ }
+}
+
/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which
/// feed's hyper's HTTP response.
pub struct V8StreamHttpResponseBody(
@@ -251,3 +572,179 @@ impl Resource for V8StreamHttpResponseBody {
self.1.cancel();
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use deno_core::futures::future::poll_fn;
+ use std::hash::Hasher;
+ use std::io::Read;
+ use std::io::Write;
+
+ fn zeros() -> Vec<u8> {
+ vec![0; 1024 * 1024]
+ }
+
+ fn hard_to_gzip_data() -> Vec<u8> {
+ const SIZE: usize = 1024 * 1024;
+ let mut v = Vec::with_capacity(SIZE);
+ let mut hasher = std::collections::hash_map::DefaultHasher::new();
+ for i in 0..SIZE {
+ hasher.write_usize(i);
+ v.push(hasher.finish() as u8);
+ }
+ v
+ }
+
+ fn already_gzipped_data() -> Vec<u8> {
+ let mut v = Vec::with_capacity(1024 * 1024);
+ let mut gz =
+ flate2::GzBuilder::new().write(&mut v, flate2::Compression::best());
+ gz.write_all(&hard_to_gzip_data()).unwrap();
+ _ = gz.finish().unwrap();
+ v
+ }
+
+ fn chunk(v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
+ // Chunk the data into 10k
+ let mut out = vec![];
+ for v in v.chunks(10 * 1024) {
+ out.push(v.to_vec());
+ }
+ out.into_iter()
+ }
+
+ fn random(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
+ let mut out = vec![];
+ loop {
+ if v.is_empty() {
+ break;
+ }
+ let rand = (rand::random::<usize>() % v.len()) + 1;
+ let new = v.split_off(rand);
+ out.push(v);
+ v = new;
+ }
+ // Print the lengths of the vectors if we actually fail this test at some point
+ let lengths = out.iter().map(|v| v.len()).collect::<Vec<_>>();
+ eprintln!("Lengths = {:?}", lengths);
+ out.into_iter()
+ }
+
+ fn front_load(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
+ // Chunk the data at 90%
+ let offset = (v.len() * 90) / 100;
+ let v2 = v.split_off(offset);
+ vec![v, v2].into_iter()
+ }
+
+ fn front_load_but_one(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
+ let offset = v.len() - 1;
+ let v2 = v.split_off(offset);
+ vec![v, v2].into_iter()
+ }
+
+ fn back_load(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
+ // Chunk the data at 10%
+ let offset = (v.len() * 10) / 100;
+ let v2 = v.split_off(offset);
+ vec![v, v2].into_iter()
+ }
+
+ async fn test(i: impl Iterator<Item = Vec<u8>> + Send + 'static) {
+ let v = i.collect::<Vec<_>>();
+ let mut expected: Vec<u8> = vec![];
+ for v in &v {
+ expected.extend(v);
+ }
+ let (tx, rx) = tokio::sync::mpsc::channel(1);
+ let underlying = ResponseStream::V8Stream(rx);
+ let mut resp = GZipResponseStream::new(underlying);
+ let handle = tokio::task::spawn(async move {
+ for chunk in v {
+ tx.send(chunk.into()).await.ok().unwrap();
+ }
+ });
+ // Limit how many times we'll loop
+ const LIMIT: usize = 1000;
+ let mut v: Vec<u8> = vec![];
+ for i in 0..=LIMIT {
+ assert_ne!(i, LIMIT);
+ let frame = poll_fn(|cx| Pin::new(&mut resp).poll_frame(cx)).await;
+ if matches!(frame, ResponseStreamResult::EndOfStream) {
+ break;
+ }
+ if matches!(frame, ResponseStreamResult::NoData) {
+ continue;
+ }
+ let ResponseStreamResult::NonEmptyBuf(buf) = frame else {
+ panic!("Unexpected stream type");
+ };
+ assert_ne!(buf.len(), 0);
+ v.extend(&*buf);
+ }
+
+ let mut gz = flate2::read::GzDecoder::new(&*v);
+ let mut v = vec![];
+ gz.read_to_end(&mut v).unwrap();
+
+ assert_eq!(v, expected);
+
+ handle.await.unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_simple() {
+ test(vec![b"hello world".to_vec()].into_iter()).await
+ }
+
+ #[tokio::test]
+ async fn test_empty() {
+ test(vec![].into_iter()).await
+ }
+
+ #[tokio::test]
+ async fn test_simple_zeros() {
+ test(vec![vec![0; 0x10000]].into_iter()).await
+ }
+
+ macro_rules! test {
+ ($vec:ident) => {
+ mod $vec {
+ #[tokio::test]
+ async fn chunk() {
+ let iter = super::chunk(super::$vec());
+ super::test(iter).await;
+ }
+
+ #[tokio::test]
+ async fn front_load() {
+ let iter = super::front_load(super::$vec());
+ super::test(iter).await;
+ }
+
+ #[tokio::test]
+ async fn front_load_but_one() {
+ let iter = super::front_load_but_one(super::$vec());
+ super::test(iter).await;
+ }
+
+ #[tokio::test]
+ async fn back_load() {
+ let iter = super::back_load(super::$vec());
+ super::test(iter).await;
+ }
+
+ #[tokio::test]
+ async fn random() {
+ let iter = super::random(super::$vec());
+ super::test(iter).await;
+ }
+ }
+ };
+ }
+
+ test!(zeros);
+ test!(hard_to_gzip_data);
+ test!(already_gzipped_data);
+}