summaryrefslogtreecommitdiff
path: root/ext/http/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/lib.rs')
-rw-r--r--ext/http/lib.rs223
1 files changed, 62 insertions, 161 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index a6f47c1c9..9c0109937 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -1,7 +1,6 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
-use async_compression::tokio::write::BrotliEncoder;
-use async_compression::tokio::write::GzipEncoder;
+use bytes::Bytes;
use cache_control::CacheControl;
use deno_core::error::custom_error;
use deno_core::error::AnyError;
@@ -22,6 +21,7 @@ use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt;
use deno_core::include_js_files;
use deno_core::op;
+
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
@@ -60,9 +60,7 @@ use std::task::Context;
use std::task::Poll;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
-use tokio::io::AsyncWriteExt;
use tokio::task::spawn_local;
-use tokio_util::io::ReaderStream;
mod compressible;
@@ -77,7 +75,6 @@ pub fn init() -> Extension {
op_http_read::decl(),
op_http_write_headers::decl(),
op_http_write::decl(),
- op_http_write_resource::decl(),
op_http_shutdown::decl(),
op_http_websocket_accept_header::decl(),
op_http_upgrade_websocket::decl(),
@@ -341,7 +338,7 @@ impl Default for HttpRequestReader {
/// The write half of an HTTP stream.
enum HttpResponseWriter {
Headers(oneshot::Sender<Response<Body>>),
- Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
+ Body(hyper::body::Sender),
Closed,
}
@@ -548,60 +545,55 @@ async fn op_http_write_headers(
let body: Response<Body>;
let new_wr: HttpResponseWriter;
- // Set Vary: Accept-Encoding header for direct body response.
- // Note: we set the header irrespective of whether or not we compress the data
- // to make sure cache services do not serve uncompressed data to clients that
- // support compression.
- let vary_value = if let Some(value) = vary_header {
- if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
- if !value_str.to_lowercase().contains("accept-encoding") {
- format!("Accept-Encoding, {}", value_str)
- } else {
- value_str.to_string()
- }
- } else {
- // the header value wasn't valid UTF8, so it would have been a
- // problem anyways, so sending a default header.
- "Accept-Encoding".to_string()
- }
- } else {
- "Accept-Encoding".to_string()
- };
- builder = builder.header("vary", &vary_value);
-
- let accepts_compression = matches!(
- *stream.accept_encoding.borrow(),
- Encoding::Brotli | Encoding::Gzip
- );
- let should_compress = body_compressible
- && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
- && accepts_compression;
-
- if should_compress {
- // If user provided a ETag header for uncompressed data, we need to
- // ensure it is a Weak Etag header ("W/").
- if let Some(value) = etag_header {
- if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
- if !value_str.starts_with("W/") {
- builder = builder.header("etag", format!("W/{}", value_str));
+ match data {
+ Some(data) => {
+ // Set Vary: Accept-Encoding header for direct body response.
+ // Note: we set the header irrespective of whether or not we compress the
+ // data to make sure cache services do not serve uncompressed data to
+ // clients that support compression.
+ let vary_value = if let Some(value) = vary_header {
+ if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
+ if !value_str.to_lowercase().contains("accept-encoding") {
+ format!("Accept-Encoding, {}", value_str)
+ } else {
+ value_str.to_string()
+ }
} else {
- builder = builder.header("etag", value.as_slice());
+ // the header value wasn't valid UTF8, so it would have been a
+ // problem anyways, so sending a default header.
+ "Accept-Encoding".to_string()
}
} else {
- builder = builder.header("etag", value.as_slice());
- }
- }
- } else if let Some(value) = etag_header {
- builder = builder.header("etag", value.as_slice());
- }
+ "Accept-Encoding".to_string()
+ };
+ builder = builder.header("vary", &vary_value);
+
+ let accepts_compression = matches!(
+ *stream.accept_encoding.borrow(),
+ Encoding::Brotli | Encoding::Gzip
+ );
+
+ let should_compress =
+ body_compressible && data.len() > 20 && accepts_compression;
- match data {
- Some(data) => {
if should_compress {
// Drop 'content-length' header. Hyper will update it using compressed body.
if let Some(headers) = builder.headers_mut() {
headers.remove("content-length");
}
+ // If user provided a ETag header for uncompressed data, we need to
+ // ensure it is a Weak Etag header ("W/").
+ if let Some(value) = etag_header {
+ if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
+ if !value_str.starts_with("W/") {
+ builder = builder.header("etag", format!("W/{}", value_str));
+ } else {
+ builder = builder.header("etag", value.as_slice());
+ }
+ } else {
+ builder = builder.header("etag", value.as_slice());
+ }
+ }
match *stream.accept_encoding.borrow() {
Encoding::Brotli => {
@@ -629,6 +621,9 @@ async fn op_http_write_headers(
}
}
} else {
+ if let Some(value) = etag_header {
+ builder = builder.header("etag", value.as_slice());
+ }
// If a buffer was passed, but isn't compressible, we use it to
// construct a response body.
body = builder.body(data.into_bytes().into())?;
@@ -638,35 +633,19 @@ async fn op_http_write_headers(
None => {
// If no buffer was passed, the caller will stream the response body.
- // Create a one way pipe that implements tokio's async io traits. To do
- // this we create a [tokio::io::DuplexStream], but then throw away one
- // of the directions to create a one way pipe.
- let (a, b) = tokio::io::duplex(64 * 1024);
- let (reader, _) = tokio::io::split(a);
- let (_, writer) = tokio::io::split(b);
+ // TODO(@kitsonk) had compression for streamed bodies.
- let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
-
- if should_compress {
- match *stream.accept_encoding.borrow() {
- Encoding::Brotli => {
- let writer = BrotliEncoder::new(writer);
- writer_body = Box::pin(writer);
- builder = builder.header("content-encoding", "br");
- }
- _ => {
- assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
- let writer = GzipEncoder::new(writer);
- writer_body = Box::pin(writer);
- builder = builder.header("content-encoding", "gzip");
- }
- }
- } else {
- writer_body = Box::pin(writer);
+ // Set the user provided ETag & Vary headers for a streaming response
+ if let Some(value) = etag_header {
+ builder = builder.header("etag", value.as_slice());
+ }
+ if let Some(value) = vary_header {
+ builder = builder.header("vary", value.as_slice());
}
- body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
- new_wr = HttpResponseWriter::Body(writer_body);
+ let (body_tx, body_rx) = Body::channel();
+ body = builder.body(body_rx)?;
+ new_wr = HttpResponseWriter::Body(body_tx);
}
}
@@ -686,69 +665,6 @@ async fn op_http_write_headers(
}
#[op]
-async fn op_http_write_resource(
- state: Rc<RefCell<OpState>>,
- rid: ResourceId,
- stream: ResourceId,
-) -> Result<(), AnyError> {
- let http_stream = state
- .borrow()
- .resource_table
- .get::<HttpStreamResource>(rid)?;
- let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
- let resource = state.borrow().resource_table.get_any(stream)?;
- loop {
- let body_writer = match &mut *wr {
- HttpResponseWriter::Body(body_writer) => body_writer,
- HttpResponseWriter::Headers(_) => {
- return Err(http_error("no response headers"))
- }
- HttpResponseWriter::Closed => {
- return Err(http_error("response already completed"))
- }
- };
-
- let vec = vec![0u8; 64 * 1024]; // 64KB
- let buf = ZeroCopyBuf::new_temp(vec);
- let (nread, buf) = resource.clone().read_return(buf).await?;
- if nread == 0 {
- break;
- }
-
- let mut res = body_writer.write_all(&buf).await;
- if res.is_ok() {
- res = body_writer.flush().await;
- }
- match res {
- Ok(_) => {}
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- http_stream.conn.closed().await?;
- // If there was no connection error, drop body_tx.
- *wr = HttpResponseWriter::Closed;
- }
- }
- }
-
- let wr = take(&mut *wr);
- if let HttpResponseWriter::Body(mut body_writer) = wr {
- match body_writer.shutdown().await {
- Ok(_) => {}
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- http_stream.conn.closed().await?;
- }
- }
- }
-
- Ok(())
-}
-
-#[op]
async fn op_http_write(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
@@ -761,7 +677,7 @@ async fn op_http_write(
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
loop {
- let body_writer = match &mut *wr {
+ let body_tx = match &mut *wr {
HttpResponseWriter::Body(body_tx) => body_tx,
HttpResponseWriter::Headers(_) => {
break Err(http_error("no response headers"))
@@ -771,17 +687,13 @@ async fn op_http_write(
}
};
- let mut res = body_writer.write_all(&buf).await;
- if res.is_ok() {
- res = body_writer.flush().await;
- }
-
- match res {
+ let bytes = Bytes::copy_from_slice(&buf[..]);
+ match body_tx.send_data(bytes).await {
Ok(_) => break Ok(()),
Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
+ // Don't return "channel closed", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
+ assert!(err.is_closed());
stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
@@ -803,18 +715,7 @@ async fn op_http_shutdown(
.resource_table
.get::<HttpStreamResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
- let wr = take(&mut *wr);
- if let HttpResponseWriter::Body(mut body_writer) = wr {
- match body_writer.shutdown().await {
- Ok(_) => {}
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- stream.conn.closed().await?;
- }
- }
- }
+ take(&mut *wr);
Ok(())
}