diff options
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r-- | ext/http/lib.rs | 168 |
1 files changed, 101 insertions, 67 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 9c0109937..9c1b48fff 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1,6 +1,7 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. -use bytes::Bytes; +use async_compression::tokio::write::BrotliEncoder; +use async_compression::tokio::write::GzipEncoder; use cache_control::CacheControl; use deno_core::error::custom_error; use deno_core::error::AnyError; @@ -21,7 +22,6 @@ use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op; - use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; @@ -60,7 +60,9 @@ use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; +use tokio_util::io::ReaderStream; mod compressible; @@ -338,7 +340,7 @@ impl Default for HttpRequestReader { /// The write half of an HTTP stream. enum HttpResponseWriter { Headers(oneshot::Sender<Response<Body>>), - Body(hyper::body::Sender), + Body(Pin<Box<dyn tokio::io::AsyncWrite>>), Closed, } @@ -545,56 +547,60 @@ async fn op_http_write_headers( let body: Response<Body>; let new_wr: HttpResponseWriter; - match data { - Some(data) => { - // Set Vary: Accept-Encoding header for direct body response. - // Note: we set the header irrespective of whether or not we compress the - // data to make sure cache services do not serve uncompressed data to - // clients that support compression. - let vary_value = if let Some(value) = vary_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.to_lowercase().contains("accept-encoding") { - format!("Accept-Encoding, {}", value_str) - } else { - value_str.to_string() - } + // Set Vary: Accept-Encoding header for direct body response. + // Note: we set the header irrespective of whether or not we compress the data + // to make sure cache services do not serve uncompressed data to clients that + // support compression. + let vary_value = if let Some(value) = vary_header { + if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { + if !value_str.to_lowercase().contains("accept-encoding") { + format!("Accept-Encoding, {}", value_str) + } else { + value_str.to_string() + } + } else { + // the header value wasn't valid UTF8, so it would have been a + // problem anyways, so sending a default header. + "Accept-Encoding".to_string() + } + } else { + "Accept-Encoding".to_string() + }; + builder = builder.header("vary", &vary_value); + + let accepts_compression = matches!( + *stream.accept_encoding.borrow(), + Encoding::Brotli | Encoding::Gzip + ); + let should_compress = body_compressible + && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none()) + && accepts_compression; + + if should_compress { + // If user provided a ETag header for uncompressed data, we need to + // ensure it is a Weak Etag header ("W/"). + if let Some(value) = etag_header { + if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { + if !value_str.starts_with("W/") { + builder = builder.header("etag", format!("W/{}", value_str)); } else { - // the header value wasn't valid UTF8, so it would have been a - // problem anyways, so sending a default header. - "Accept-Encoding".to_string() + builder = builder.header("etag", value.as_slice()); } } else { - "Accept-Encoding".to_string() - }; - builder = builder.header("vary", &vary_value); - - let accepts_compression = matches!( - *stream.accept_encoding.borrow(), - Encoding::Brotli | Encoding::Gzip - ); - - let should_compress = - body_compressible && data.len() > 20 && accepts_compression; + builder = builder.header("etag", value.as_slice()); + } + } + // Drop 'content-length' header. Hyper will update it using compressed body. + if let Some(headers) = builder.headers_mut() { + headers.remove("content-length"); + } + } else if let Some(value) = etag_header { + builder = builder.header("etag", value.as_slice()); + } + match data { + Some(data) => { if should_compress { - // Drop 'content-length' header. Hyper will update it using compressed body. - if let Some(headers) = builder.headers_mut() { - headers.remove("content-length"); - } - // If user provided a ETag header for uncompressed data, we need to - // ensure it is a Weak Etag header ("W/"). - if let Some(value) = etag_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.starts_with("W/") { - builder = builder.header("etag", format!("W/{}", value_str)); - } else { - builder = builder.header("etag", value.as_slice()); - } - } else { - builder = builder.header("etag", value.as_slice()); - } - } - match *stream.accept_encoding.borrow() { Encoding::Brotli => { builder = builder.header("content-encoding", "br"); @@ -621,9 +627,6 @@ async fn op_http_write_headers( } } } else { - if let Some(value) = etag_header { - builder = builder.header("etag", value.as_slice()); - } // If a buffer was passed, but isn't compressible, we use it to // construct a response body. body = builder.body(data.into_bytes().into())?; @@ -633,19 +636,35 @@ async fn op_http_write_headers( None => { // If no buffer was passed, the caller will stream the response body. - // TODO(@kitsonk) had compression for streamed bodies. + // Create a one way pipe that implements tokio's async io traits. To do + // this we create a [tokio::io::DuplexStream], but then throw away one + // of the directions to create a one way pipe. + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, writer) = tokio::io::split(b); - // Set the user provided ETag & Vary headers for a streaming response - if let Some(value) = etag_header { - builder = builder.header("etag", value.as_slice()); - } - if let Some(value) = vary_header { - builder = builder.header("vary", value.as_slice()); + let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>; + + if should_compress { + match *stream.accept_encoding.borrow() { + Encoding::Brotli => { + let writer = BrotliEncoder::new(writer); + writer_body = Box::pin(writer); + builder = builder.header("content-encoding", "br"); + } + _ => { + assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip); + let writer = GzipEncoder::new(writer); + writer_body = Box::pin(writer); + builder = builder.header("content-encoding", "gzip"); + } + } + } else { + writer_body = Box::pin(writer); } - let (body_tx, body_rx) = Body::channel(); - body = builder.body(body_rx)?; - new_wr = HttpResponseWriter::Body(body_tx); + body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?; + new_wr = HttpResponseWriter::Body(writer_body); } } @@ -677,7 +696,7 @@ async fn op_http_write( let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; loop { - let body_tx = match &mut *wr { + let body_writer = match &mut *wr { HttpResponseWriter::Body(body_tx) => body_tx, HttpResponseWriter::Headers(_) => { break Err(http_error("no response headers")) @@ -687,13 +706,17 @@ async fn op_http_write( } }; - let bytes = Bytes::copy_from_slice(&buf[..]); - match body_tx.send_data(bytes).await { + let mut res = body_writer.write_all(&buf).await; + if res.is_ok() { + res = body_writer.flush().await; + } + + match res { Ok(_) => break Ok(()), Err(err) => { - // Don't return "channel closed", that's an implementation detail. + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. // Pull up the failure associated with the transport connection instead. - assert!(err.is_closed()); stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; @@ -715,7 +738,18 @@ async fn op_http_shutdown( .resource_table .get::<HttpStreamResource>(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - take(&mut *wr); + let wr = take(&mut *wr); + if let HttpResponseWriter::Body(mut body_writer) = wr { + match body_writer.shutdown().await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + stream.conn.closed().await?; + } + } + } Ok(()) } |