diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-05-10 13:23:14 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-10 13:23:14 +0200 |
commit | 234cef982c12a6c46aa8ba3787920f7b9a856be3 (patch) | |
tree | 5aac91fce7f5d66727884bbc1897772979346aff | |
parent | 3dc745c881c43b9df4aa895291b9e13186be3f17 (diff) |
feat(ext/http): Automatic compression for Deno.serve (#19031)
`Content-Encoding: gzip` support for `Deno.serve`. This doesn't support
Brotli (`br`) yet, however it should not be difficult to add. Heuristics
for compression are modelled after those in `Deno.serveHttp`.
Tests are provided to ensure that the gzip compression is correct. We
chunk a number of different streams (zeros, hard-to-compress data,
already-gzipped data) in a number of different ways (regular, random,
large/small, small/large).
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | cli/tests/unit/serve_test.ts | 216 | ||||
-rw-r--r-- | ext/http/Cargo.toml | 1 | ||||
-rw-r--r-- | ext/http/http_next.rs | 178 | ||||
-rw-r--r-- | ext/http/response_body.rs | 603 |
5 files changed, 845 insertions, 154 deletions
diff --git a/Cargo.lock b/Cargo.lock index 01cadaa17..4fb268627 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1046,6 +1046,7 @@ dependencies = [ "percent-encoding", "phf", "pin-project", + "rand", "ring", "serde", "slab", diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index ce7267f58..2bd2314b7 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -1425,41 +1425,6 @@ Deno.test( }, ); -Deno.test( - { permissions: { net: true, write: true, read: true } }, - async function httpServerCorrectSizeResponse() { - const promise = deferred(); - const listeningPromise = deferred(); - const ac = new AbortController(); - - const tmpFile = await Deno.makeTempFile(); - const file = await Deno.open(tmpFile, { write: true, read: true }); - await file.write(new Uint8Array(70 * 1024).fill(1)); // 70kb sent in 64kb + 6kb chunks - file.close(); - - const server = Deno.serve({ - handler: async (request) => { - const f = await Deno.open(tmpFile, { read: true }); - promise.resolve(); - return new Response(f.readable); - }, - port: 4503, - signal: ac.signal, - onListen: onListen(listeningPromise), - onError: createOnErrorCb(ac), - }); - - await listeningPromise; - const resp = await fetch("http://127.0.0.1:4503/"); - await promise; - const body = await resp.arrayBuffer(); - - assertEquals(body.byteLength, 70 * 1024); - ac.abort(); - await server; - }, -); - // https://github.com/denoland/deno/issues/12741 // https://github.com/denoland/deno/pull/12746 // https://github.com/denoland/deno/pull/12798 @@ -2012,38 +1977,146 @@ Deno.test( }, ); -Deno.test( - { permissions: { net: true, write: true, read: true } }, - async function httpServerSendFile() { - const promise = deferred(); - const ac = new AbortController(); - const listeningPromise = deferred(); - const tmpFile = await Deno.makeTempFile(); - const file = await Deno.open(tmpFile, { write: true, read: true }); - const data = new Uint8Array(70 * 1024).fill(1); - await file.write(data); - file.close(); - const server = Deno.serve({ - handler: async () => { - const f = await Deno.open(tmpFile, { read: true }); - promise.resolve(); - return new Response(f.readable, { status: 200 }); - }, - port: 4503, - signal: ac.signal, - onListen: onListen(listeningPromise), - onError: createOnErrorCb(ac), - }); +function makeTempData(size: number) { + return new Uint8Array(size).fill(1); +} - await listeningPromise; - const response = await fetch(`http://localhost:4503/`); - assertEquals(response.status, 200); - await promise; - assertEquals(new Uint8Array(await response.arrayBuffer()), data); - ac.abort(); - await server; +async function makeTempFile(size: number) { + const tmpFile = await Deno.makeTempFile(); + const file = await Deno.open(tmpFile, { write: true, read: true }); + const data = makeTempData(size); + await file.write(data); + file.close(); + + return await Deno.open(tmpFile, { write: true, read: true }); +} + +const compressionTestCases = [ + { name: "Empty", length: 0, in: {}, out: {}, expect: null }, + { + name: "EmptyAcceptGzip", + length: 0, + in: { "Accept-Encoding": "gzip" }, + out: {}, + expect: null, }, -); + // This technically would be compressible if not for the size, however the size_hint is not implemented + // for FileResource and we don't currently peek ahead on resources. + // { + // name: "EmptyAcceptGzip2", + // length: 0, + // in: { "Accept-Encoding": "gzip" }, + // out: { "Content-Type": "text/plain" }, + // expect: null, + // }, + { name: "Uncompressible", length: 1024, in: {}, out: {}, expect: null }, + { + name: "UncompressibleAcceptGzip", + length: 1024, + in: { "Accept-Encoding": "gzip" }, + out: {}, + expect: null, + }, + { + name: "UncompressibleType", + length: 1024, + in: { "Accept-Encoding": "gzip" }, + out: { "Content-Type": "text/fake" }, + expect: null, + }, + { + name: "CompressibleType", + length: 1024, + in: { "Accept-Encoding": "gzip" }, + out: { "Content-Type": "text/plain" }, + expect: "gzip", + }, + { + name: "CompressibleType2", + length: 1024, + in: { "Accept-Encoding": "gzip, deflate, br" }, + out: { "Content-Type": "text/plain" }, + expect: "gzip", + }, + { + name: "UncompressibleRange", + length: 1024, + in: { "Accept-Encoding": "gzip" }, + out: { "Content-Type": "text/plain", "Content-Range": "1" }, + expect: null, + }, + { + name: "UncompressibleCE", + length: 1024, + in: { "Accept-Encoding": "gzip" }, + out: { "Content-Type": "text/plain", "Content-Encoding": "random" }, + expect: null, + }, + { + name: "UncompressibleCC", + length: 1024, + in: { "Accept-Encoding": "gzip" }, + out: { "Content-Type": "text/plain", "Cache-Control": "no-transform" }, + expect: null, + }, +]; + +for (const testCase of compressionTestCases) { + const name = `httpServerCompression${testCase.name}`; + Deno.test( + { permissions: { net: true, write: true, read: true } }, + { + [name]: async function () { + const promise = deferred(); + const ac = new AbortController(); + const listeningPromise = deferred(); + const server = Deno.serve({ + handler: async (request) => { + const f = await makeTempFile(testCase.length); + promise.resolve(); + const headers = testCase.out as any; + headers["Content-Length"] = testCase.length.toString(); + return new Response(f.readable, { + headers: headers as HeadersInit, + }); + }, + port: 4503, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: createOnErrorCb(ac), + }); + try { + await listeningPromise; + const resp = await fetch("http://127.0.0.1:4503/", { + headers: testCase.in as HeadersInit, + }); + await promise; + const body = await resp.arrayBuffer(); + if (testCase.expect == null) { + assertEquals(body.byteLength, testCase.length); + assertEquals( + resp.headers.get("content-length"), + testCase.length.toString(), + ); + assertEquals( + resp.headers.get("content-encoding"), + testCase.out["Content-Encoding"] || null, + ); + } else if (testCase.expect == "gzip") { + // Note the fetch will transparently decompress this response, BUT we can detect that a response + // was compressed by the lack of a content length. + assertEquals(body.byteLength, testCase.length); + assertEquals(resp.headers.get("content-encoding"), null); + assertEquals(resp.headers.get("content-length"), null); + } + } finally { + ac.abort(); + await server; + } + }, + }[name], + ); +} Deno.test( { permissions: { net: true, write: true, read: true } }, @@ -2052,15 +2125,12 @@ Deno.test( const ac = new AbortController(); const listeningPromise = deferred(); - const tmpFile = await Deno.makeTempFile(); - const file = await Deno.open(tmpFile, { write: true, read: true }); - const data = new Uint8Array(70 * 1024).fill(1); - await file.write(data); - file.close(); - const server = Deno.serve({ handler: async (request) => { - assertEquals(new Uint8Array(await request.arrayBuffer()), data); + assertEquals( + new Uint8Array(await request.arrayBuffer()), + makeTempData(70 * 1024), + ); promise.resolve(); return new Response("ok"); }, @@ -2071,7 +2141,7 @@ Deno.test( }); await listeningPromise; - const f = await Deno.open(tmpFile, { write: true, read: true }); + const f = await makeTempFile(70 * 1024); const response = await fetch(`http://localhost:4503/`, { method: "POST", body: f.readable, diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index 9691879ad..c1de81170 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -50,3 +50,4 @@ tokio-util = { workspace = true, features = ["io"] } [dev-dependencies] bencher.workspace = true +rand.workspace = true diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index f3d37f751..080cfea6c 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -1,3 +1,4 @@ +use crate::compressible::is_content_compressible; // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use crate::extract_network_stream; use crate::network_buffered_stream::NetworkStreamPrefixCheck; @@ -7,17 +8,18 @@ use crate::request_properties::HttpConnectionProperties; use crate::request_properties::HttpListenProperties; use crate::request_properties::HttpPropertyExtractor; use crate::response_body::CompletionHandle; +use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; use crate::response_body::V8StreamHttpResponseBody; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; +use cache_control::CacheControl; use deno_core::error::AnyError; use deno_core::futures::TryFutureExt; use deno_core::op; use deno_core::AsyncRefCell; use deno_core::AsyncResult; -use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -31,7 +33,15 @@ use deno_net::ops_tls::TlsStream; use deno_net::raw::put_network_stream_resource; use deno_net::raw::NetworkStream; use deno_net::raw::NetworkStreamAddress; +use fly_accept_encoding::Encoding; +use http::header::ACCEPT_ENCODING; +use http::header::CACHE_CONTROL; +use http::header::CONTENT_ENCODING; +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_RANGE; +use http::header::CONTENT_TYPE; use http::request::Parts; +use http::HeaderMap; use hyper1::body::Incoming; use hyper1::header::COOKIE; use hyper1::http::HeaderName; @@ -483,6 +493,131 @@ pub fn op_http_set_response_headers( }) } +fn is_request_compressible(headers: &HeaderMap) -> Compression { + let Some(accept_encoding) = headers.get(ACCEPT_ENCODING) else { + return Compression::None; + }; + // Firefox and Chrome send this -- no need to parse + if accept_encoding == "gzip, deflate, br" { + return Compression::GZip; + } + if accept_encoding == "gzip" { + return Compression::GZip; + } + // Fall back to the expensive parser + let accepted = fly_accept_encoding::encodings_iter(headers).filter(|r| { + matches!(r, Ok((Some(Encoding::Identity | Encoding::Gzip), _))) + }); + #[allow(clippy::single_match)] + match fly_accept_encoding::preferred(accepted) { + Ok(Some(fly_accept_encoding::Encoding::Gzip)) => return Compression::GZip, + _ => {} + } + Compression::None +} + +fn is_response_compressible(headers: &HeaderMap) -> bool { + if let Some(content_type) = headers.get(CONTENT_TYPE) { + if !is_content_compressible(content_type) { + return false; + } + } else { + return false; + } + if headers.contains_key(CONTENT_ENCODING) { + return false; + } + if headers.contains_key(CONTENT_RANGE) { + return false; + } + if let Some(cache_control) = headers.get(CACHE_CONTROL) { + if let Ok(s) = std::str::from_utf8(cache_control.as_bytes()) { + if let Some(cache_control) = CacheControl::from_value(s) { + if cache_control.no_transform { + return false; + } + } + } + } + true +} + +fn modify_compressibility_from_response( + compression: Compression, + length: Option<usize>, + headers: &mut HeaderMap, +) -> Compression { + ensure_vary_accept_encoding(headers); + if let Some(length) = length { + // By the time we add compression headers and Accept-Encoding, it probably doesn't make sense + // to compress stuff that's smaller than this. + if length < 64 { + return Compression::None; + } + } + if compression == Compression::None { + return Compression::None; + } + if !is_response_compressible(headers) { + return Compression::None; + } + weaken_etag(headers); + headers.remove(CONTENT_LENGTH); + headers.insert(CONTENT_ENCODING, HeaderValue::from_static("gzip")); + compression +} + +/// If the user provided a ETag header for uncompressed data, we need to ensure it is a +/// weak Etag header ("W/"). +fn weaken_etag(hmap: &mut HeaderMap) { + if let Some(etag) = hmap.get_mut(hyper::header::ETAG) { + if !etag.as_bytes().starts_with(b"W/") { + let mut v = Vec::with_capacity(etag.as_bytes().len() + 2); + v.extend(b"W/"); + v.extend(etag.as_bytes()); + *etag = v.try_into().unwrap(); + } + } +} + +// Set Vary: Accept-Encoding header for direct body response. +// Note: we set the header irrespective of whether or not we compress the data +// to make sure cache services do not serve uncompressed data to clients that +// support compression. +fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { + if let Some(v) = hmap.get_mut(hyper::header::VARY) { + if let Ok(s) = v.to_str() { + if !s.to_lowercase().contains("accept-encoding") { + *v = format!("Accept-Encoding, {s}").try_into().unwrap() + } + return; + } + } + hmap.insert( + hyper::header::VARY, + HeaderValue::from_static("Accept-Encoding"), + ); +} + +fn set_response( + index: u32, + length: Option<usize>, + response_fn: impl FnOnce(Compression) -> ResponseBytesInner, +) { + let compression = + with_req(index, |req| is_request_compressible(&req.headers)); + + with_resp_mut(index, move |response| { + let response = response.as_mut().unwrap(); + let compression = modify_compressibility_from_response( + compression, + length, + response.headers_mut(), + ); + response.body_mut().initialize(response_fn(compression)) + }); +} + #[op(fast)] pub fn op_http_set_response_body_resource( state: &mut OpState, @@ -497,14 +632,13 @@ pub fn op_http_set_response_body_resource( state.resource_table.get_any(stream_rid)? }; - with_resp_mut(index, move |response| { - let future = resource.clone().read(64 * 1024); - response - .as_mut() - .unwrap() - .body_mut() - .initialize(ResponseBytesInner::Resource(auto_close, resource, future)); - }); + set_response( + index, + resource.size_hint().1.map(|s| s as usize), + move |compression| { + ResponseBytesInner::from_resource(compression, resource, auto_close) + }, + ); Ok(()) } @@ -516,27 +650,19 @@ pub fn op_http_set_response_body_stream( ) -> Result<ResourceId, AnyError> { // TODO(mmastrac): what should this channel size be? let (tx, rx) = tokio::sync::mpsc::channel(1); - let (tx, rx) = ( - V8StreamHttpResponseBody::new(tx), - ResponseBytesInner::V8Stream(rx), - ); - with_resp_mut(index, move |response| { - response.as_mut().unwrap().body_mut().initialize(rx); + set_response(index, None, |compression| { + ResponseBytesInner::from_v8(compression, rx) }); - Ok(state.resource_table.add(tx)) + Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx))) } #[op(fast)] pub fn op_http_set_response_body_text(index: u32, text: String) { if !text.is_empty() { - with_resp_mut(index, move |response| { - response - .as_mut() - .unwrap() - .body_mut() - .initialize(ResponseBytesInner::Bytes(BufView::from(text.into_bytes()))) + set_response(index, Some(text.len()), |compression| { + ResponseBytesInner::from_vec(compression, text.into_bytes()) }); } } @@ -544,12 +670,8 @@ pub fn op_http_set_response_body_text(index: u32, text: String) { #[op(fast)] pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) { if !buffer.is_empty() { - with_resp_mut(index, |response| { - response - .as_mut() - .unwrap() - .body_mut() - .initialize(ResponseBytesInner::Bytes(BufView::from(buffer.to_vec()))) + set_response(index, Some(buffer.len()), |compression| { + ResponseBytesInner::from_slice(compression, buffer) }); }; } diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 0086e4d78..288d74758 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -2,12 +2,16 @@ use std::borrow::Cow; use std::cell::RefCell; use std::future::Future; +use std::io::Write; use std::pin::Pin; use std::rc::Rc; use std::task::Waker; +use bytes::Bytes; +use bytes::BytesMut; use deno_core::error::bad_resource; use deno_core::error::AnyError; +use deno_core::futures::ready; use deno_core::futures::FutureExt; use deno_core::AsyncRefCell; use deno_core::AsyncResult; @@ -17,9 +21,44 @@ use deno_core::CancelTryFuture; use deno_core::RcRef; use deno_core::Resource; use deno_core::WriteOutcome; +use flate2::write::GzEncoder; +use http::HeaderMap; use hyper1::body::Body; use hyper1::body::Frame; use hyper1::body::SizeHint; +use pin_project::pin_project; + +/// Simplification for nested types we use for our streams. We provide a way to convert from +/// this type into Hyper's body [`Frame`]. +enum ResponseStreamResult { + /// Stream is over. + EndOfStream, + /// Stream provided non-empty data. + NonEmptyBuf(BufView), + /// Stream is ready, but provided no data. Retry. This is a result that is like Pending, but does + /// not register a waker and should be called again at the lowest level of this code. Generally this + /// will only be returned from compression streams that require additional buffering. + NoData, + /// Stream provided trailers. + // TODO(mmastrac): We are threading trailers through the response system to eventually support Grpc. + #[allow(unused)] + Trailers(HeaderMap), + /// Stream failed. + Error(AnyError), +} + +impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> { + fn from(value: ResponseStreamResult) -> Self { + match value { + ResponseStreamResult::EndOfStream => None, + ResponseStreamResult::NonEmptyBuf(buf) => Some(Ok(Frame::data(buf))), + ResponseStreamResult::Error(err) => Some(Err(err)), + ResponseStreamResult::Trailers(map) => Some(Ok(Frame::trailers(map))), + // This result should be handled by retrying + ResponseStreamResult::NoData => unimplemented!(), + } + } +} #[derive(Clone, Debug, Default)] pub struct CompletionHandle { @@ -62,6 +101,28 @@ impl Future for CompletionHandle { } } +trait PollFrame: Unpin { + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<ResponseStreamResult>; + + fn size_hint(&self) -> SizeHint; +} + +#[derive(PartialEq, Eq)] +pub enum Compression { + None, + GZip, +} + +pub enum ResponseStream { + /// A resource stream, piped in fast mode. + Resource(ResourceBodyAdapter), + /// A JS-backed stream, written in JS and transported via pipe. + V8Stream(tokio::sync::mpsc::Receiver<BufView>), +} + #[derive(Default)] pub enum ResponseBytesInner { /// An empty stream. @@ -69,12 +130,12 @@ pub enum ResponseBytesInner { Empty, /// A completed stream. Done, - /// A static buffer of bytes, sent it one fell swoop. + /// A static buffer of bytes, sent in one fell swoop. Bytes(BufView), - /// A resource stream, piped in fast mode. - Resource(bool, Rc<dyn Resource>, AsyncResult<BufView>), - /// A JS-backed stream, written in JS and transported via pipe. - V8Stream(tokio::sync::mpsc::Receiver<BufView>), + /// An uncompressed stream. + UncompressedStream(ResponseStream), + /// A GZip stream. + GZipStream(GZipResponseStream), } impl std::fmt::Debug for ResponseBytesInner { @@ -83,8 +144,8 @@ impl std::fmt::Debug for ResponseBytesInner { Self::Done => f.write_str("Done"), Self::Empty => f.write_str("Empty"), Self::Bytes(..) => f.write_str("Bytes"), - Self::Resource(..) => f.write_str("Resource"), - Self::V8Stream(..) => f.write_str("V8Stream"), + Self::UncompressedStream(..) => f.write_str("Uncompressed"), + Self::GZipStream(..) => f.write_str("GZip"), } } } @@ -122,16 +183,54 @@ impl ResponseBytesInner { Self::Done => SizeHint::with_exact(0), Self::Empty => SizeHint::with_exact(0), Self::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64), - Self::Resource(_, res, _) => { - let hint = res.size_hint(); - let mut size_hint = SizeHint::new(); - size_hint.set_lower(hint.0); - if let Some(upper) = hint.1 { - size_hint.set_upper(upper) - } - size_hint - } - Self::V8Stream(..) => SizeHint::default(), + Self::UncompressedStream(res) => res.size_hint(), + Self::GZipStream(..) => SizeHint::default(), + } + } + + fn from_stream(compression: Compression, stream: ResponseStream) -> Self { + if compression == Compression::GZip { + Self::GZipStream(GZipResponseStream::new(stream)) + } else { + Self::UncompressedStream(stream) + } + } + + pub fn from_v8( + compression: Compression, + rx: tokio::sync::mpsc::Receiver<BufView>, + ) -> Self { + Self::from_stream(compression, ResponseStream::V8Stream(rx)) + } + + pub fn from_resource( + compression: Compression, + stm: Rc<dyn Resource>, + auto_close: bool, + ) -> Self { + Self::from_stream( + compression, + ResponseStream::Resource(ResourceBodyAdapter::new(stm, auto_close)), + ) + } + + pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self { + if compression == Compression::GZip { + let mut writer = GzEncoder::new(Vec::new(), flate2::Compression::fast()); + writer.write_all(bytes).unwrap(); + Self::Bytes(BufView::from(writer.finish().unwrap())) + } else { + Self::Bytes(BufView::from(bytes.to_vec())) + } + } + + pub fn from_vec(compression: Compression, vec: Vec<u8>) -> Self { + if compression == Compression::GZip { + let mut writer = GzEncoder::new(Vec::new(), flate2::Compression::fast()); + writer.write_all(&vec).unwrap(); + Self::Bytes(BufView::from(writer.finish().unwrap())) + } else { + Self::Bytes(BufView::from(vec)) } } } @@ -144,48 +243,33 @@ impl Body for ResponseBytes { mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { - match &mut self.0 { - ResponseBytesInner::Done | ResponseBytesInner::Empty => { - unreachable!() - } - ResponseBytesInner::Bytes(..) => { - if let ResponseBytesInner::Bytes(data) = self.complete(true) { - std::task::Poll::Ready(Some(Ok(Frame::data(data)))) - } else { + let res = loop { + let res = match &mut self.0 { + ResponseBytesInner::Done | ResponseBytesInner::Empty => { unreachable!() } - } - ResponseBytesInner::Resource(auto_close, stm, ref mut future) => { - match future.poll_unpin(cx) { - std::task::Poll::Pending => std::task::Poll::Pending, - std::task::Poll::Ready(Err(err)) => { - std::task::Poll::Ready(Some(Err(err))) - } - std::task::Poll::Ready(Ok(buf)) => { - if buf.is_empty() { - if *auto_close { - stm.clone().close(); - } - self.complete(true); - return std::task::Poll::Ready(None); - } - // Re-arm the future - *future = stm.clone().read(64 * 1024); - std::task::Poll::Ready(Some(Ok(Frame::data(buf)))) - } + ResponseBytesInner::Bytes(..) => { + let ResponseBytesInner::Bytes(data) = self.complete(true) else { unreachable!(); }; + return std::task::Poll::Ready(Some(Ok(Frame::data(data)))); } - } - ResponseBytesInner::V8Stream(stm) => match stm.poll_recv(cx) { - std::task::Poll::Pending => std::task::Poll::Pending, - std::task::Poll::Ready(Some(buf)) => { - std::task::Poll::Ready(Some(Ok(Frame::data(buf)))) + ResponseBytesInner::UncompressedStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) } - std::task::Poll::Ready(None) => { - self.complete(true); - std::task::Poll::Ready(None) + ResponseBytesInner::GZipStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) } - }, + }; + // This is where we retry the NoData response + if matches!(res, ResponseStreamResult::NoData) { + continue; + } + break res; + }; + + if matches!(res, ResponseStreamResult::EndOfStream) { + self.complete(true); } + std::task::Poll::Ready(res.into()) } fn is_end_stream(&self) -> bool { @@ -206,6 +290,243 @@ impl Drop for ResponseBytes { } } +pub struct ResourceBodyAdapter { + auto_close: bool, + stm: Rc<dyn Resource>, + future: AsyncResult<BufView>, +} + +impl ResourceBodyAdapter { + pub fn new(stm: Rc<dyn Resource>, auto_close: bool) -> Self { + let future = stm.clone().read(64 * 1024); + ResourceBodyAdapter { + auto_close, + stm, + future, + } + } +} + +impl PollFrame for ResponseStream { + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<ResponseStreamResult> { + match &mut *self { + ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx), + ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx), + } + } + + fn size_hint(&self) -> SizeHint { + match self { + ResponseStream::Resource(res) => res.size_hint(), + ResponseStream::V8Stream(res) => res.size_hint(), + } + } +} + +impl PollFrame for ResourceBodyAdapter { + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<ResponseStreamResult> { + let res = match ready!(self.future.poll_unpin(cx)) { + Err(err) => ResponseStreamResult::Error(err), + Ok(buf) => { + if buf.is_empty() { + if self.auto_close { + self.stm.clone().close(); + } + ResponseStreamResult::EndOfStream + } else { + // Re-arm the future + self.future = self.stm.clone().read(64 * 1024); + ResponseStreamResult::NonEmptyBuf(buf) + } + } + }; + std::task::Poll::Ready(res) + } + + fn size_hint(&self) -> SizeHint { + let hint = self.stm.size_hint(); + let mut size_hint = SizeHint::new(); + size_hint.set_lower(hint.0); + if let Some(upper) = hint.1 { + size_hint.set_upper(upper) + } + size_hint + } +} + +impl PollFrame for tokio::sync::mpsc::Receiver<BufView> { + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<ResponseStreamResult> { + let res = match ready!(self.poll_recv(cx)) { + Some(buf) => ResponseStreamResult::NonEmptyBuf(buf), + None => ResponseStreamResult::EndOfStream, + }; + std::task::Poll::Ready(res) + } + + fn size_hint(&self) -> SizeHint { + SizeHint::default() + } +} + +#[derive(Copy, Clone, Debug)] +enum GZipState { + Header, + Streaming, + Flushing, + Trailer, + EndOfStream, +} + +#[pin_project] +pub struct GZipResponseStream { + stm: flate2::Compress, + crc: flate2::Crc, + next_buf: Option<BytesMut>, + partial: Option<BufView>, + #[pin] + underlying: ResponseStream, + state: GZipState, +} + +impl GZipResponseStream { + pub fn new(underlying: ResponseStream) -> Self { + Self { + stm: flate2::Compress::new(flate2::Compression::fast(), false), + crc: flate2::Crc::new(), + next_buf: None, + partial: None, + state: GZipState::Header, + underlying, + } + } +} + +/// This is a minimal GZip header suitable for serving data from a webserver. We don't need to provide +/// most of the information. We're skipping header name, CRC, etc, and providing a null timestamp. +/// +/// We're using compression level 1, as higher levels don't produce significant size differences. This +/// is probably the reason why nginx's default gzip compression level is also 1: +/// +/// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level +static GZIP_HEADER: Bytes = + Bytes::from_static(&[0x1f, 0x8b, 0x08, 0, 0, 0, 0, 0, 0x01, 0xff]); + +impl PollFrame for GZipResponseStream { + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll<ResponseStreamResult> { + let this = self.get_mut(); + let state = &mut this.state; + let orig_state = *state; + let frame = match *state { + GZipState::EndOfStream => { + return std::task::Poll::Ready(ResponseStreamResult::EndOfStream) + } + GZipState::Header => { + *state = GZipState::Streaming; + return std::task::Poll::Ready(ResponseStreamResult::NonEmptyBuf( + BufView::from(GZIP_HEADER.clone()), + )); + } + GZipState::Trailer => { + *state = GZipState::EndOfStream; + let mut v = Vec::with_capacity(8); + v.extend(&this.crc.sum().to_le_bytes()); + v.extend(&this.crc.amount().to_le_bytes()); + return std::task::Poll::Ready(ResponseStreamResult::NonEmptyBuf( + BufView::from(v), + )); + } + GZipState::Streaming => { + if let Some(partial) = this.partial.take() { + ResponseStreamResult::NonEmptyBuf(partial) + } else { + ready!(Pin::new(&mut this.underlying).poll_frame(cx)) + } + } + GZipState::Flushing => ResponseStreamResult::EndOfStream, + }; + + let stm = &mut this.stm; + + // Ideally we could use MaybeUninit here, but flate2 requires &[u8]. We should also try + // to dynamically adjust this buffer. + let mut buf = this + .next_buf + .take() + .unwrap_or_else(|| BytesMut::zeroed(64 * 1024)); + + let start_in = stm.total_in(); + let start_out = stm.total_out(); + let res = match frame { + // Short-circuit these and just return + x @ (ResponseStreamResult::NoData + | ResponseStreamResult::Error(..) + | ResponseStreamResult::Trailers(..)) => { + return std::task::Poll::Ready(x) + } + ResponseStreamResult::EndOfStream => { + *state = GZipState::Flushing; + stm.compress(&[], &mut buf, flate2::FlushCompress::Finish) + } + ResponseStreamResult::NonEmptyBuf(mut input) => { + let res = stm.compress(&input, &mut buf, flate2::FlushCompress::None); + let len_in = (stm.total_in() - start_in) as usize; + debug_assert!(len_in <= input.len()); + this.crc.update(&input[..len_in]); + if len_in < input.len() { + input.advance_cursor(len_in); + this.partial = Some(input); + } + res + } + }; + let len = stm.total_out() - start_out; + let res = match res { + Err(err) => ResponseStreamResult::Error(err.into()), + Ok(flate2::Status::BufError) => { + // This should not happen + unreachable!("old={orig_state:?} new={state:?} buf_len={}", buf.len()); + } + Ok(flate2::Status::Ok) => { + if len == 0 { + this.next_buf = Some(buf); + ResponseStreamResult::NoData + } else { + buf.truncate(len as usize); + ResponseStreamResult::NonEmptyBuf(BufView::from(buf.freeze())) + } + } + Ok(flate2::Status::StreamEnd) => { + *state = GZipState::Trailer; + if len == 0 { + this.next_buf = Some(buf); + ResponseStreamResult::NoData + } else { + buf.truncate(len as usize); + ResponseStreamResult::NonEmptyBuf(BufView::from(buf.freeze())) + } + } + }; + + std::task::Poll::Ready(res) + } + + fn size_hint(&self) -> SizeHint { + SizeHint::default() + } +} + /// A response body object that can be passed to V8. This body will feed byte buffers to a channel which /// feed's hyper's HTTP response. pub struct V8StreamHttpResponseBody( @@ -251,3 +572,179 @@ impl Resource for V8StreamHttpResponseBody { self.1.cancel(); } } + +#[cfg(test)] +mod tests { + use super::*; + use deno_core::futures::future::poll_fn; + use std::hash::Hasher; + use std::io::Read; + use std::io::Write; + + fn zeros() -> Vec<u8> { + vec![0; 1024 * 1024] + } + + fn hard_to_gzip_data() -> Vec<u8> { + const SIZE: usize = 1024 * 1024; + let mut v = Vec::with_capacity(SIZE); + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + for i in 0..SIZE { + hasher.write_usize(i); + v.push(hasher.finish() as u8); + } + v + } + + fn already_gzipped_data() -> Vec<u8> { + let mut v = Vec::with_capacity(1024 * 1024); + let mut gz = + flate2::GzBuilder::new().write(&mut v, flate2::Compression::best()); + gz.write_all(&hard_to_gzip_data()).unwrap(); + _ = gz.finish().unwrap(); + v + } + + fn chunk(v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> { + // Chunk the data into 10k + let mut out = vec![]; + for v in v.chunks(10 * 1024) { + out.push(v.to_vec()); + } + out.into_iter() + } + + fn random(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> { + let mut out = vec![]; + loop { + if v.is_empty() { + break; + } + let rand = (rand::random::<usize>() % v.len()) + 1; + let new = v.split_off(rand); + out.push(v); + v = new; + } + // Print the lengths of the vectors if we actually fail this test at some point + let lengths = out.iter().map(|v| v.len()).collect::<Vec<_>>(); + eprintln!("Lengths = {:?}", lengths); + out.into_iter() + } + + fn front_load(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> { + // Chunk the data at 90% + let offset = (v.len() * 90) / 100; + let v2 = v.split_off(offset); + vec![v, v2].into_iter() + } + + fn front_load_but_one(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> { + let offset = v.len() - 1; + let v2 = v.split_off(offset); + vec![v, v2].into_iter() + } + + fn back_load(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> { + // Chunk the data at 10% + let offset = (v.len() * 10) / 100; + let v2 = v.split_off(offset); + vec![v, v2].into_iter() + } + + async fn test(i: impl Iterator<Item = Vec<u8>> + Send + 'static) { + let v = i.collect::<Vec<_>>(); + let mut expected: Vec<u8> = vec![]; + for v in &v { + expected.extend(v); + } + let (tx, rx) = tokio::sync::mpsc::channel(1); + let underlying = ResponseStream::V8Stream(rx); + let mut resp = GZipResponseStream::new(underlying); + let handle = tokio::task::spawn(async move { + for chunk in v { + tx.send(chunk.into()).await.ok().unwrap(); + } + }); + // Limit how many times we'll loop + const LIMIT: usize = 1000; + let mut v: Vec<u8> = vec![]; + for i in 0..=LIMIT { + assert_ne!(i, LIMIT); + let frame = poll_fn(|cx| Pin::new(&mut resp).poll_frame(cx)).await; + if matches!(frame, ResponseStreamResult::EndOfStream) { + break; + } + if matches!(frame, ResponseStreamResult::NoData) { + continue; + } + let ResponseStreamResult::NonEmptyBuf(buf) = frame else { + panic!("Unexpected stream type"); + }; + assert_ne!(buf.len(), 0); + v.extend(&*buf); + } + + let mut gz = flate2::read::GzDecoder::new(&*v); + let mut v = vec![]; + gz.read_to_end(&mut v).unwrap(); + + assert_eq!(v, expected); + + handle.await.unwrap(); + } + + #[tokio::test] + async fn test_simple() { + test(vec![b"hello world".to_vec()].into_iter()).await + } + + #[tokio::test] + async fn test_empty() { + test(vec![].into_iter()).await + } + + #[tokio::test] + async fn test_simple_zeros() { + test(vec![vec![0; 0x10000]].into_iter()).await + } + + macro_rules! test { + ($vec:ident) => { + mod $vec { + #[tokio::test] + async fn chunk() { + let iter = super::chunk(super::$vec()); + super::test(iter).await; + } + + #[tokio::test] + async fn front_load() { + let iter = super::front_load(super::$vec()); + super::test(iter).await; + } + + #[tokio::test] + async fn front_load_but_one() { + let iter = super::front_load_but_one(super::$vec()); + super::test(iter).await; + } + + #[tokio::test] + async fn back_load() { + let iter = super::back_load(super::$vec()); + super::test(iter).await; + } + + #[tokio::test] + async fn random() { + let iter = super::random(super::$vec()); + super::test(iter).await; + } + } + }; + } + + test!(zeros); + test!(hard_to_gzip_data); + test!(already_gzipped_data); +} |