summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuca Casonato <hello@lcas.dev>2022-04-20 22:53:56 +0200
committerGitHub <noreply@github.com>2022-04-20 22:53:56 +0200
commit8b258070542a81d217226fe832b26d81cf20113d (patch)
tree5650bbc0ef9967430b45471b4ffdad9159725f9d
parent2a93c134dc93d70b5f6ea9d417c88207661884d5 (diff)
feat(ext/http): stream auto resp body compression (#14325)
This commit adds support for auto response body compression for streaming bodies.
-rw-r--r--Cargo.lock1
-rw-r--r--cli/tests/unit/http_test.ts161
-rw-r--r--ext/http/Cargo.toml1
-rw-r--r--ext/http/lib.rs189
-rw-r--r--serde_v8/magic/buffer.rs8
5 files changed, 256 insertions, 104 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 76f8ca028..794c7d875 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -964,6 +964,7 @@ dependencies = [
name = "deno_http"
version = "0.41.0"
dependencies = [
+ "async-compression",
"base64 0.13.0",
"brotli",
"bytes",
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts
index f48f314db..5fabd40fe 100644
--- a/cli/tests/unit/http_test.ts
+++ b/cli/tests/unit/http_test.ts
@@ -1224,26 +1224,25 @@ Deno.test(
const decoder = new TextDecoder();
Deno.test({
- name: "http server compresses body",
+ name: "http server compresses body - check headers",
permissions: { net: true, run: true },
async fn() {
const hostname = "localhost";
const port = 4501;
+ const listener = Deno.listen({ hostname, port });
+
+ const data = { hello: "deno", now: "with", compressed: "body" };
async function server() {
- const listener = Deno.listen({ hostname, port });
const tcpConn = await listener.accept();
const httpConn = Deno.serveHttp(tcpConn);
const e = await httpConn.nextRequest();
assert(e);
const { request, respondWith } = e;
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
- const response = new Response(
- JSON.stringify({ hello: "deno", now: "with", compressed: "body" }),
- {
- headers: { "content-type": "application/json" },
- },
- );
+ const response = new Response(JSON.stringify(data), {
+ headers: { "content-type": "application/json" },
+ });
await respondWith(response);
httpConn.close();
listener.close();
@@ -1275,6 +1274,60 @@ Deno.test({
});
Deno.test({
+ name: "http server compresses body - check body",
+ permissions: { net: true, run: true },
+ async fn() {
+ const hostname = "localhost";
+ const port = 4501;
+ const listener = Deno.listen({ hostname, port });
+
+ const data = { hello: "deno", now: "with", compressed: "body" };
+
+ async function server() {
+ const tcpConn = await listener.accept();
+ const httpConn = Deno.serveHttp(tcpConn);
+ const e = await httpConn.nextRequest();
+ assert(e);
+ const { request, respondWith } = e;
+ assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
+ const response = new Response(JSON.stringify(data), {
+ headers: { "content-type": "application/json" },
+ });
+ await respondWith(response);
+ httpConn.close();
+ listener.close();
+ }
+
+ async function client() {
+ const url = `http://${hostname}:${port}/`;
+ const cmd = [
+ "curl",
+ "--request",
+ "GET",
+ "--url",
+ url,
+ "--header",
+ "Accept-Encoding: gzip, deflate, br",
+ ];
+ const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" });
+ const status = await proc.status();
+ assert(status.success);
+ const stdout = proc.stdout!.readable
+ .pipeThrough(new DecompressionStream("gzip"))
+ .pipeThrough(new TextDecoderStream());
+ let body = "";
+ for await (const chunk of stdout) {
+ body += chunk;
+ }
+ assertEquals(JSON.parse(body), data);
+ proc.close();
+ }
+
+ await Promise.all([server(), client()]);
+ },
+});
+
+Deno.test({
name: "http server doesn't compress small body",
permissions: { net: true, run: true },
async fn() {
@@ -1653,15 +1706,18 @@ Deno.test({
});
Deno.test({
- name: "http server doesn't compress streamed bodies",
+ name: "http server compresses streamed bodies - check headers",
permissions: { net: true, run: true },
async fn() {
const hostname = "localhost";
const port = 4501;
+ const encoder = new TextEncoder();
+ const listener = Deno.listen({ hostname, port });
+
+ const data = { hello: "deno", now: "with", compressed: "body" };
+
async function server() {
- const encoder = new TextEncoder();
- const listener = Deno.listen({ hostname, port });
const tcpConn = await listener.accept();
const httpConn = Deno.serveHttp(tcpConn);
const e = await httpConn.nextRequest();
@@ -1670,23 +1726,13 @@ Deno.test({
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
const bodyInit = new ReadableStream({
start(controller) {
- controller.enqueue(
- encoder.encode(
- JSON.stringify({
- hello: "deno",
- now: "with",
- compressed: "body",
- }),
- ),
- );
+ controller.enqueue(encoder.encode(JSON.stringify(data)));
controller.close();
},
});
const response = new Response(
bodyInit,
- {
- headers: { "content-type": "application/json", vary: "Accept" },
- },
+ { headers: { "content-type": "application/json" } },
);
await respondWith(response);
httpConn.close();
@@ -1709,8 +1755,71 @@ Deno.test({
const status = await proc.status();
assert(status.success);
const output = decoder.decode(await proc.output());
- assert(output.includes("vary: Accept\r\n"));
- assert(!output.includes("content-encoding: "));
+ assert(output.includes("vary: Accept-Encoding\r\n"));
+ assert(output.includes("content-encoding: gzip\r\n"));
+ proc.close();
+ }
+
+ await Promise.all([server(), client()]);
+ },
+});
+
+Deno.test({
+ name: "http server compresses streamed bodies - check body",
+ permissions: { net: true, run: true },
+ async fn() {
+ const hostname = "localhost";
+ const port = 4501;
+
+ const encoder = new TextEncoder();
+ const listener = Deno.listen({ hostname, port });
+
+ const data = { hello: "deno", now: "with", compressed: "body" };
+
+ async function server() {
+ const tcpConn = await listener.accept();
+ const httpConn = Deno.serveHttp(tcpConn);
+ const e = await httpConn.nextRequest();
+ assert(e);
+ const { request, respondWith } = e;
+ assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
+ const bodyInit = new ReadableStream({
+ start(controller) {
+ controller.enqueue(encoder.encode(JSON.stringify(data)));
+ controller.close();
+ },
+ });
+ const response = new Response(
+ bodyInit,
+ { headers: { "content-type": "application/json" } },
+ );
+ await respondWith(response);
+ httpConn.close();
+ listener.close();
+ }
+
+ async function client() {
+ const url = `http://${hostname}:${port}/`;
+ const cmd = [
+ "curl",
+ "--request",
+ "GET",
+ "--url",
+ url,
+ "--header",
+ "Accept-Encoding: gzip, deflate, br",
+ ];
+ const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" });
+ const status = await proc.status();
+ assert(status.success);
+ const stdout = proc.stdout.readable
+ .pipeThrough(new DecompressionStream("gzip"))
+ .pipeThrough(new TextDecoderStream());
+ let body = "";
+ for await (const chunk of stdout) {
+ body += chunk;
+ }
+ assertEquals(JSON.parse(body), data);
proc.close();
}
@@ -1775,8 +1884,6 @@ Deno.test({
// Ensure the content-length header is updated.
assert(!output.includes(`content-length: ${contentLength}\r\n`));
assert(output.includes("content-length: 72\r\n"));
- console.log(output);
-
proc.close();
}
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index 2bdbfdade..b4a208228 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -14,6 +14,7 @@ description = "HTTP server implementation for Deno"
path = "lib.rs"
[dependencies]
+async-compression = { version = "0.3.1", features = ["tokio", "brotli", "gzip"] }
base64 = "0.13.0"
brotli = "3.3.3"
bytes = "1"
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index b85dcc473..a6f47c1c9 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -1,6 +1,7 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
-use bytes::Bytes;
+use async_compression::tokio::write::BrotliEncoder;
+use async_compression::tokio::write::GzipEncoder;
use cache_control::CacheControl;
use deno_core::error::custom_error;
use deno_core::error::AnyError;
@@ -21,7 +22,6 @@ use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt;
use deno_core::include_js_files;
use deno_core::op;
-
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
@@ -60,7 +60,9 @@ use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
+use tokio::io::AsyncWriteExt;
use tokio::task::spawn_local;
+use tokio_util::io::ReaderStream;
mod compressible;
@@ -339,7 +341,7 @@ impl Default for HttpRequestReader {
/// The write half of an HTTP stream.
enum HttpResponseWriter {
Headers(oneshot::Sender<Response<Body>>),
- Body(hyper::body::Sender),
+ Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
Closed,
}
@@ -546,55 +548,60 @@ async fn op_http_write_headers(
let body: Response<Body>;
let new_wr: HttpResponseWriter;
- match data {
- Some(data) => {
- // Set Vary: Accept-Encoding header for direct body response.
- // Note: we set the header irrespective of whether or not we compress the
- // data to make sure cache services do not serve uncompressed data to
- // clients that support compression.
- let vary_value = if let Some(value) = vary_header {
- if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
- if !value_str.to_lowercase().contains("accept-encoding") {
- format!("Accept-Encoding, {}", value_str)
- } else {
- value_str.to_string()
- }
+ // Set Vary: Accept-Encoding header for direct body response.
+ // Note: we set the header irrespective of whether or not we compress the data
+ // to make sure cache services do not serve uncompressed data to clients that
+ // support compression.
+ let vary_value = if let Some(value) = vary_header {
+ if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
+ if !value_str.to_lowercase().contains("accept-encoding") {
+ format!("Accept-Encoding, {}", value_str)
+ } else {
+ value_str.to_string()
+ }
+ } else {
+ // the header value wasn't valid UTF8, so it would have been a
+ // problem anyways, so sending a default header.
+ "Accept-Encoding".to_string()
+ }
+ } else {
+ "Accept-Encoding".to_string()
+ };
+ builder = builder.header("vary", &vary_value);
+
+ let accepts_compression = matches!(
+ *stream.accept_encoding.borrow(),
+ Encoding::Brotli | Encoding::Gzip
+ );
+ let should_compress = body_compressible
+ && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
+ && accepts_compression;
+
+ if should_compress {
+ // If user provided a ETag header for uncompressed data, we need to
+ // ensure it is a Weak Etag header ("W/").
+ if let Some(value) = etag_header {
+ if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
+ if !value_str.starts_with("W/") {
+ builder = builder.header("etag", format!("W/{}", value_str));
} else {
- // the header value wasn't valid UTF8, so it would have been a
- // problem anyways, so sending a default header.
- "Accept-Encoding".to_string()
+ builder = builder.header("etag", value.as_slice());
}
} else {
- "Accept-Encoding".to_string()
- };
- builder = builder.header("vary", &vary_value);
-
- let accepts_compression = matches!(
- *stream.accept_encoding.borrow(),
- Encoding::Brotli | Encoding::Gzip
- );
-
- let should_compress =
- body_compressible && data.len() > 20 && accepts_compression;
+ builder = builder.header("etag", value.as_slice());
+ }
+ }
+ } else if let Some(value) = etag_header {
+ builder = builder.header("etag", value.as_slice());
+ }
+ match data {
+ Some(data) => {
if should_compress {
// Drop 'content-length' header. Hyper will update it using compressed body.
if let Some(headers) = builder.headers_mut() {
headers.remove("content-length");
}
- // If user provided a ETag header for uncompressed data, we need to
- // ensure it is a Weak Etag header ("W/").
- if let Some(value) = etag_header {
- if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
- if !value_str.starts_with("W/") {
- builder = builder.header("etag", format!("W/{}", value_str));
- } else {
- builder = builder.header("etag", value.as_slice());
- }
- } else {
- builder = builder.header("etag", value.as_slice());
- }
- }
match *stream.accept_encoding.borrow() {
Encoding::Brotli => {
@@ -622,9 +629,6 @@ async fn op_http_write_headers(
}
}
} else {
- if let Some(value) = etag_header {
- builder = builder.header("etag", value.as_slice());
- }
// If a buffer was passed, but isn't compressible, we use it to
// construct a response body.
body = builder.body(data.into_bytes().into())?;
@@ -634,19 +638,35 @@ async fn op_http_write_headers(
None => {
// If no buffer was passed, the caller will stream the response body.
- // TODO(@kitsonk) had compression for streamed bodies.
+ // Create a one way pipe that implements tokio's async io traits. To do
+ // this we create a [tokio::io::DuplexStream], but then throw away one
+ // of the directions to create a one way pipe.
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, writer) = tokio::io::split(b);
- // Set the user provided ETag & Vary headers for a streaming response
- if let Some(value) = etag_header {
- builder = builder.header("etag", value.as_slice());
- }
- if let Some(value) = vary_header {
- builder = builder.header("vary", value.as_slice());
+ let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
+
+ if should_compress {
+ match *stream.accept_encoding.borrow() {
+ Encoding::Brotli => {
+ let writer = BrotliEncoder::new(writer);
+ writer_body = Box::pin(writer);
+ builder = builder.header("content-encoding", "br");
+ }
+ _ => {
+ assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
+ let writer = GzipEncoder::new(writer);
+ writer_body = Box::pin(writer);
+ builder = builder.header("content-encoding", "gzip");
+ }
+ }
+ } else {
+ writer_body = Box::pin(writer);
}
- let (body_tx, body_rx) = Body::channel();
- body = builder.body(body_rx)?;
- new_wr = HttpResponseWriter::Body(body_tx);
+ body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
+ new_wr = HttpResponseWriter::Body(writer_body);
}
}
@@ -678,8 +698,8 @@ async fn op_http_write_resource(
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
let resource = state.borrow().resource_table.get_any(stream)?;
loop {
- let body_tx = match &mut *wr {
- HttpResponseWriter::Body(body_tx) => body_tx,
+ let body_writer = match &mut *wr {
+ HttpResponseWriter::Body(body_writer) => body_writer,
HttpResponseWriter::Headers(_) => {
return Err(http_error("no response headers"))
}
@@ -694,13 +714,17 @@ async fn op_http_write_resource(
if nread == 0 {
break;
}
- let bytes = Bytes::from(buf.to_temp());
- match body_tx.send_data(bytes).await {
+
+ let mut res = body_writer.write_all(&buf).await;
+ if res.is_ok() {
+ res = body_writer.flush().await;
+ }
+ match res {
Ok(_) => {}
Err(err) => {
- // Don't return "channel closed", that's an implementation detail.
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
- assert!(err.is_closed());
http_stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
@@ -708,7 +732,19 @@ async fn op_http_write_resource(
}
}
- take(&mut *wr);
+ let wr = take(&mut *wr);
+ if let HttpResponseWriter::Body(mut body_writer) = wr {
+ match body_writer.shutdown().await {
+ Ok(_) => {}
+ Err(err) => {
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ http_stream.conn.closed().await?;
+ }
+ }
+ }
+
Ok(())
}
@@ -725,7 +761,7 @@ async fn op_http_write(
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
loop {
- let body_tx = match &mut *wr {
+ let body_writer = match &mut *wr {
HttpResponseWriter::Body(body_tx) => body_tx,
HttpResponseWriter::Headers(_) => {
break Err(http_error("no response headers"))
@@ -735,13 +771,17 @@ async fn op_http_write(
}
};
- let bytes = Bytes::copy_from_slice(&buf[..]);
- match body_tx.send_data(bytes).await {
+ let mut res = body_writer.write_all(&buf).await;
+ if res.is_ok() {
+ res = body_writer.flush().await;
+ }
+
+ match res {
Ok(_) => break Ok(()),
Err(err) => {
- // Don't return "channel closed", that's an implementation detail.
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
- assert!(err.is_closed());
stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
@@ -763,7 +803,18 @@ async fn op_http_shutdown(
.resource_table
.get::<HttpStreamResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
- take(&mut *wr);
+ let wr = take(&mut *wr);
+ if let HttpResponseWriter::Body(mut body_writer) = wr {
+ match body_writer.shutdown().await {
+ Ok(_) => {}
+ Err(err) => {
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ stream.conn.closed().await?;
+ }
+ }
+ }
Ok(())
}
diff --git a/serde_v8/magic/buffer.rs b/serde_v8/magic/buffer.rs
index 3a8c9499b..a0a1c974b 100644
--- a/serde_v8/magic/buffer.rs
+++ b/serde_v8/magic/buffer.rs
@@ -29,14 +29,6 @@ impl MagicBuffer {
pub fn new_temp(vec: Vec<u8>) -> Self {
MagicBuffer::Temp(vec)
}
-
- // TODO(@littledivy): Temporary, this needs a refactor.
- pub fn to_temp(self) -> Vec<u8> {
- match self {
- MagicBuffer::Temp(vec) => vec,
- _ => unreachable!(),
- }
- }
}
impl Clone for MagicBuffer {