summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/http/lib.rs381
-rw-r--r--serde_v8/magic/bytestring.rs7
2 files changed, 189 insertions, 199 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 28689654e..e34d4b80f 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -39,6 +39,8 @@ use flate2::write::GzEncoder;
use flate2::Compression;
use fly_accept_encoding::Encoding;
use hyper::body::Bytes;
+use hyper::header::HeaderName;
+use hyper::header::HeaderValue;
use hyper::server::conn::Http;
use hyper::service::Service;
use hyper::Body;
@@ -500,191 +502,176 @@ async fn op_http_write_headers(
let mut builder = Response::builder().status(status);
- let mut body_compressible = false;
- let mut headers_allow_compression = true;
- let mut vary_header = None;
- let mut etag_header = None;
- let mut content_type_header = None;
-
- builder.headers_mut().unwrap().reserve(headers.len());
- for (key, value) in &headers {
- if key.eq_ignore_ascii_case(b"cache-control") {
- if let Ok(value) = std::str::from_utf8(value) {
- if let Some(cache_control) = CacheControl::from_value(value) {
- // We skip compression if the cache-control header value is set to
- // "no-transform"
- if cache_control.no_transform {
- headers_allow_compression = false;
- }
- }
- } else {
- headers_allow_compression = false;
- }
- } else if key.eq_ignore_ascii_case(b"content-range") {
- // we skip compression if the `content-range` header value is set, as it
- // indicates the contents of the body were negotiated based directly
- // with the user code and we can't compress the response
- headers_allow_compression = false;
- } else if key.eq_ignore_ascii_case(b"content-type") && !value.is_empty() {
- content_type_header = Some(value);
- } else if key.eq_ignore_ascii_case(b"content-encoding") {
- // we don't compress if a content-encoding header was provided
- headers_allow_compression = false;
- } else if key.eq_ignore_ascii_case(b"etag") && !value.is_empty() {
- // we store the values of ETag and Vary and skip adding them for now, as
- // we may need to modify or change.
- etag_header = Some(value);
- continue;
- } else if key.eq_ignore_ascii_case(b"vary") && !value.is_empty() {
- vary_header = Some(value);
- continue;
- }
- builder = builder.header(key.as_slice(), value.as_slice());
- }
+ // Add headers
+ let header_count = headers.len();
+ let headers = headers.into_iter().filter_map(|(k, v)| {
+ let v: Vec<u8> = v.into();
+ Some((
+ HeaderName::try_from(k.as_slice()).ok()?,
+ HeaderValue::try_from(v).ok()?,
+ ))
+ });
+ // Track supported encoding
+ let encoding = *stream.accept_encoding.borrow();
+
+ let hmap = builder.headers_mut().unwrap();
+ hmap.reserve(header_count + 2);
+ hmap.extend(headers);
+ ensure_vary_accept_encoding(hmap);
+
+ let accepts_compression =
+ matches!(encoding, Encoding::Brotli | Encoding::Gzip);
+ let compressing = accepts_compression
+ && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
+ && should_compress(hmap);
- if headers_allow_compression {
- body_compressible = content_type_header
- .map(compressible::is_content_compressible)
- .unwrap_or_default();
+ if compressing {
+ weaken_etag(hmap);
+ // Drop 'content-length' header. Hyper will update it using compressed body.
+ hmap.remove(hyper::header::CONTENT_LENGTH);
+ // Content-Encoding header
+ hmap.insert(
+ hyper::header::CONTENT_ENCODING,
+ HeaderValue::from_static(match encoding {
+ Encoding::Brotli => "br",
+ Encoding::Gzip => "gzip",
+ _ => unreachable!(), // Forbidden by accepts_compression
+ }),
+ );
}
- let body: Response<Body>;
- let new_wr: HttpResponseWriter;
+ let (new_wr, body) = http_response(data, compressing, encoding)?;
+ let body = builder.body(body)?;
- // Set Vary: Accept-Encoding header for direct body response.
- // Note: we set the header irrespective of whether or not we compress the data
- // to make sure cache services do not serve uncompressed data to clients that
- // support compression.
- let vary_value = if let Some(value) = vary_header {
- if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
- if !value_str.to_lowercase().contains("accept-encoding") {
- format!("Accept-Encoding, {}", value_str)
- } else {
- value_str.to_string()
- }
- } else {
- // the header value wasn't valid UTF8, so it would have been a
- // problem anyways, so sending a default header.
- "Accept-Encoding".to_string()
- }
- } else {
- "Accept-Encoding".to_string()
+ let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
+ let response_tx = match replace(&mut *old_wr, new_wr) {
+ HttpResponseWriter::Headers(response_tx) => response_tx,
+ _ => return Err(http_error("response headers already sent")),
};
- builder = builder.header("vary", &vary_value);
- let accepts_compression = matches!(
- *stream.accept_encoding.borrow(),
- Encoding::Brotli | Encoding::Gzip
- );
- let should_compress = body_compressible
- && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
- && accepts_compression;
-
- if should_compress {
- // If user provided a ETag header for uncompressed data, we need to
- // ensure it is a Weak Etag header ("W/").
- if let Some(value) = etag_header {
- if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
- if !value_str.starts_with("W/") {
- builder = builder.header("etag", format!("W/{}", value_str));
- } else {
- builder = builder.header("etag", value.as_slice());
- }
- } else {
- builder = builder.header("etag", value.as_slice());
- }
- }
- // Drop 'content-length' header. Hyper will update it using compressed body.
- if let Some(headers) = builder.headers_mut() {
- headers.remove("content-length");
+ match response_tx.send(body) {
+ Ok(_) => Ok(()),
+ Err(_) => {
+ stream.conn.closed().await?;
+ Err(http_error("connection closed while sending response"))
}
- } else if let Some(value) = etag_header {
- builder = builder.header("etag", value.as_slice());
}
+}
+fn http_response(
+ data: Option<StringOrBuffer>,
+ compressing: bool,
+ encoding: Encoding,
+) -> Result<(HttpResponseWriter, hyper::Body), AnyError> {
match data {
- Some(data) => {
- if should_compress {
- match *stream.accept_encoding.borrow() {
- Encoding::Brotli => {
- builder = builder.header("content-encoding", "br");
- // quality level 6 is based on google's nginx default value for
- // on-the-fly compression
- // https://github.com/google/ngx_brotli#brotli_comp_level
- // lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes
- // (~4MB)
- let mut writer =
- brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22);
- writer.write_all(&data)?;
- body = builder.body(writer.into_inner().into())?;
- }
- _ => {
- assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
- builder = builder.header("content-encoding", "gzip");
- // Gzip, after level 1, doesn't produce significant size difference.
- // Probably the reason why nginx's default gzip compression level is
- // 1.
- // https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
- let mut writer = GzEncoder::new(Vec::new(), Compression::new(1));
- writer.write_all(&data)?;
- body = builder.body(writer.finish()?.into())?;
- }
- }
- } else {
- // If a buffer was passed, but isn't compressible, we use it to
- // construct a response body.
- body = builder.body(Bytes::copy_from_slice(&data).into())?;
+ Some(data) if compressing => match encoding {
+ Encoding::Brotli => {
+ // quality level 6 is based on google's nginx default value for
+ // on-the-fly compression
+ // https://github.com/google/ngx_brotli#brotli_comp_level
+ // lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes
+ // (~4MB)
+ let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22);
+ writer.write_all(&data)?;
+ Ok((HttpResponseWriter::Closed, writer.into_inner().into()))
+ }
+ Encoding::Gzip => {
+ // Gzip, after level 1, doesn't produce significant size difference.
+ // Probably the reason why nginx's default gzip compression level is
+ // 1.
+ // https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
+ let mut writer = GzEncoder::new(Vec::new(), Compression::new(1));
+ writer.write_all(&data)?;
+ Ok((HttpResponseWriter::Closed, writer.finish()?.into()))
}
- new_wr = HttpResponseWriter::Closed;
+ _ => unreachable!(), // forbidden by accepts_compression
+ },
+ Some(data) => {
+ // If a buffer was passed, but isn't compressible, we use it to
+ // construct a response body.
+ Ok((
+ HttpResponseWriter::Closed,
+ Bytes::copy_from_slice(&data).into(),
+ ))
+ }
+ None if compressing => {
+ // Create a one way pipe that implements tokio's async io traits. To do
+ // this we create a [tokio::io::DuplexStream], but then throw away one
+ // of the directions to create a one way pipe.
+ let (a, b) = tokio::io::duplex(64 * 1024);
+ let (reader, _) = tokio::io::split(a);
+ let (_, writer) = tokio::io::split(b);
+ let writer: Pin<Box<dyn tokio::io::AsyncWrite>> = match encoding {
+ Encoding::Brotli => Box::pin(BrotliEncoder::new(writer)),
+ Encoding::Gzip => Box::pin(GzipEncoder::new(writer)),
+ _ => unreachable!(), // forbidden by accepts_compression
+ };
+ Ok((
+ HttpResponseWriter::Body(writer),
+ Body::wrap_stream(ReaderStream::new(reader)),
+ ))
}
None => {
- // If no buffer was passed, the caller will stream the response body.
- if should_compress {
- // Create a one way pipe that implements tokio's async io traits. To do
- // this we create a [tokio::io::DuplexStream], but then throw away one
- // of the directions to create a one way pipe.
- let (a, b) = tokio::io::duplex(64 * 1024);
- let (reader, _) = tokio::io::split(a);
- let (_, writer) = tokio::io::split(b);
-
- let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
- match *stream.accept_encoding.borrow() {
- Encoding::Brotli => {
- let writer = BrotliEncoder::new(writer);
- writer_body = Box::pin(writer);
- builder = builder.header("content-encoding", "br");
- }
- _ => {
- assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
- let writer = GzipEncoder::new(writer);
- writer_body = Box::pin(writer);
- builder = builder.header("content-encoding", "gzip");
- }
- }
-
- body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
- new_wr = HttpResponseWriter::Body(writer_body);
- } else {
- let (body_tx, body_rx) = Body::channel();
- body = builder.body(body_rx)?;
- new_wr = HttpResponseWriter::BodyUncompressed(body_tx);
- }
+ let (body_tx, body_rx) = Body::channel();
+ Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx))
}
}
+}
- let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
- let response_tx = match replace(&mut *old_wr, new_wr) {
- HttpResponseWriter::Headers(response_tx) => response_tx,
- _ => return Err(http_error("response headers already sent")),
- };
+// If user provided a ETag header for uncompressed data, we need to
+// ensure it is a Weak Etag header ("W/").
+fn weaken_etag(hmap: &mut hyper::HeaderMap) {
+ if let Some(etag) = hmap.get_mut(hyper::header::ETAG) {
+ if !etag.as_bytes().starts_with(b"W/") {
+ let mut v = Vec::with_capacity(etag.as_bytes().len() + 2);
+ v.extend(b"W/");
+ v.extend(etag.as_bytes());
+ *etag = v.try_into().unwrap();
+ }
+ }
+}
- match response_tx.send(body) {
- Ok(_) => Ok(()),
- Err(_) => {
- stream.conn.closed().await?;
- Err(http_error("connection closed while sending response"))
+// Set Vary: Accept-Encoding header for direct body response.
+// Note: we set the header irrespective of whether or not we compress the data
+// to make sure cache services do not serve uncompressed data to clients that
+// support compression.
+fn ensure_vary_accept_encoding(hmap: &mut hyper::HeaderMap) {
+ if let Some(v) = hmap.get_mut(hyper::header::VARY) {
+ if let Ok(s) = v.to_str() {
+ if !s.to_lowercase().contains("accept-encoding") {
+ *v = format!("Accept-Encoding, {}", s).try_into().unwrap()
+ }
+ return;
}
}
+ hmap.insert(
+ hyper::header::VARY,
+ HeaderValue::from_static("Accept-Encoding"),
+ );
+}
+
+fn should_compress(headers: &hyper::HeaderMap) -> bool {
+ // skip compression if the cache-control header value is set to "no-transform" or not utf8
+ fn cache_control_no_transform(headers: &hyper::HeaderMap) -> Option<bool> {
+ let v = headers.get(hyper::header::CACHE_CONTROL)?;
+ let s = match std::str::from_utf8(v.as_bytes()) {
+ Ok(s) => s,
+ Err(_) => return Some(true),
+ };
+ let c = CacheControl::from_value(s)?;
+ Some(c.no_transform)
+ }
+ // we skip compression if the `content-range` header value is set, as it
+ // indicates the contents of the body were negotiated based directly
+ // with the user code and we can't compress the response
+ let content_range = headers.contains_key(hyper::header::CONTENT_RANGE);
+
+ !content_range
+ && !cache_control_no_transform(headers).unwrap_or_default()
+ && headers
+ .get(hyper::header::CONTENT_TYPE)
+ .map(compressible::is_content_compressible)
+ .unwrap_or_default()
}
#[op]
@@ -757,42 +744,38 @@ async fn op_http_write(
.get::<HttpStreamResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
- loop {
- match &mut *wr {
- HttpResponseWriter::Headers(_) => {
- break Err(http_error("no response headers"))
+ match &mut *wr {
+ HttpResponseWriter::Headers(_) => Err(http_error("no response headers")),
+ HttpResponseWriter::Closed => Err(http_error("response already completed")),
+ HttpResponseWriter::Body(body) => {
+ let mut result = body.write_all(&buf).await;
+ if result.is_ok() {
+ result = body.flush().await;
}
- HttpResponseWriter::Closed => {
- break Err(http_error("response already completed"))
- }
- HttpResponseWriter::Body(body) => {
- let mut result = body.write_all(&buf).await;
- if result.is_ok() {
- result = body.flush().await;
- }
- match result {
- Ok(_) => break Ok(()),
- Err(err) => {
- assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
- // Don't return "broken pipe", that's an implementation detail.
- // Pull up the failure associated with the transport connection instead.
- stream.conn.closed().await?;
- // If there was no connection error, drop body_tx.
- *wr = HttpResponseWriter::Closed;
- }
+ match result {
+ Ok(_) => Ok(()),
+ Err(err) => {
+ assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+ // Don't return "broken pipe", that's an implementation detail.
+ // Pull up the failure associated with the transport connection instead.
+ stream.conn.closed().await?;
+ // If there was no connection error, drop body_tx.
+ *wr = HttpResponseWriter::Closed;
+ Err(http_error("response already completed"))
}
}
- HttpResponseWriter::BodyUncompressed(body) => {
- let bytes = Bytes::copy_from_slice(&buf[..]);
- match body.send_data(bytes).await {
- Ok(_) => break Ok(()),
- Err(err) => {
- assert!(err.is_closed());
- // Pull up the failure associated with the transport connection instead.
- stream.conn.closed().await?;
- // If there was no connection error, drop body_tx.
- *wr = HttpResponseWriter::Closed;
- }
+ }
+ HttpResponseWriter::BodyUncompressed(body) => {
+ let bytes = Bytes::copy_from_slice(&buf[..]);
+ match body.send_data(bytes).await {
+ Ok(_) => Ok(()),
+ Err(err) => {
+ assert!(err.is_closed());
+ // Pull up the failure associated with the transport connection instead.
+ stream.conn.closed().await?;
+ // If there was no connection error, drop body_tx.
+ *wr = HttpResponseWriter::Closed;
+ Err(http_error("response already completed"))
}
}
}
diff --git a/serde_v8/magic/bytestring.rs b/serde_v8/magic/bytestring.rs
index a4c664b29..d6f3cb174 100644
--- a/serde_v8/magic/bytestring.rs
+++ b/serde_v8/magic/bytestring.rs
@@ -46,3 +46,10 @@ impl FromV8 for ByteString {
Ok(buffer.into())
}
}
+
+#[allow(clippy::from_over_into)]
+impl Into<Vec<u8>> for ByteString {
+ fn into(self) -> Vec<u8> {
+ self.0
+ }
+}