diff options
| author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2022-04-21 02:22:55 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-04-21 02:22:55 +0200 |
| commit | 03019e778189b38938f1238f22652162de5a7434 (patch) | |
| tree | cf16b44be07c1c488ffe4f31fe77eab7f6bd8c95 /ext/http | |
| parent | aaaa877d91c5f8b88722fd1ec725791b0eb4efe0 (diff) | |
Revert various PRs related to "ext/http" (#14339)
* Revert "feat(ext/http): stream auto resp body compression (#14325)"
* Revert "core: introduce `resource.read_return` (#14331)"
* Revert "perf(http): optimize `ReadableStream`s backed by a resource (#14284)"
Diffstat (limited to 'ext/http')
| -rw-r--r-- | ext/http/01_http.js | 63 | ||||
| -rw-r--r-- | ext/http/Cargo.toml | 1 | ||||
| -rw-r--r-- | ext/http/lib.rs | 223 |
3 files changed, 86 insertions, 201 deletions
diff --git a/ext/http/01_http.js b/ext/http/01_http.js index ff4b6f41f..217bfc061 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -32,8 +32,7 @@ } = window.__bootstrap.webSocket; const { TcpConn, UnixConn } = window.__bootstrap.net; const { TlsConn } = window.__bootstrap.tls; - const { Deferred, getReadableStreamRid, readableStreamClose } = - window.__bootstrap.streams; + const { Deferred } = window.__bootstrap.streams; const { ArrayPrototypeIncludes, ArrayPrototypePush, @@ -236,6 +235,7 @@ typeof respBody === "string" || ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody) ); + try { await core.opAsync( "op_http_write_headers", @@ -269,50 +269,35 @@ ) { throw new TypeError("Unreachable"); } - const resourceRid = getReadableStreamRid(respBody); - if (resourceRid) { - if (respBody.locked) { - throw new TypeError("ReadableStream is locked."); + const reader = respBody.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { + await reader.cancel(new TypeError("Value not a Uint8Array")); + break; } - const _reader = respBody.getReader(); // Aquire JS lock. - await core.opAsync( - "op_http_write_resource", - streamRid, - resourceRid, - ); - readableStreamClose(respBody); // Release JS lock. - } else { - const reader = respBody.getReader(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { - await reader.cancel(new TypeError("Value not a Uint8Array")); - break; - } - try { - await core.opAsync("op_http_write", streamRid, value); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if ( - ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && - connError != null - ) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - await reader.cancel(error); - throw error; - } - } - try { - await core.opAsync("op_http_shutdown", streamRid); + await core.opAsync("op_http_write", streamRid, value); } catch (error) { + const connError = httpConn[connErrorSymbol]; + if ( + ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && + connError != null + ) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } await reader.cancel(error); throw error; } } + try { + await core.opAsync("op_http_shutdown", streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } } const deferred = request[_deferred]; diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index b4a208228..2bdbfdade 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -14,7 +14,6 @@ description = "HTTP server implementation for Deno" path = "lib.rs" [dependencies] -async-compression = { version = "0.3.1", features = ["tokio", "brotli", "gzip"] } base64 = "0.13.0" brotli = "3.3.3" bytes = "1" diff --git a/ext/http/lib.rs b/ext/http/lib.rs index a6f47c1c9..9c0109937 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1,7 +1,6 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. -use async_compression::tokio::write::BrotliEncoder; -use async_compression::tokio::write::GzipEncoder; +use bytes::Bytes; use cache_control::CacheControl; use deno_core::error::custom_error; use deno_core::error::AnyError; @@ -22,6 +21,7 @@ use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op; + use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; @@ -60,9 +60,7 @@ use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; -use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; -use tokio_util::io::ReaderStream; mod compressible; @@ -77,7 +75,6 @@ pub fn init() -> Extension { op_http_read::decl(), op_http_write_headers::decl(), op_http_write::decl(), - op_http_write_resource::decl(), op_http_shutdown::decl(), op_http_websocket_accept_header::decl(), op_http_upgrade_websocket::decl(), @@ -341,7 +338,7 @@ impl Default for HttpRequestReader { /// The write half of an HTTP stream. enum HttpResponseWriter { Headers(oneshot::Sender<Response<Body>>), - Body(Pin<Box<dyn tokio::io::AsyncWrite>>), + Body(hyper::body::Sender), Closed, } @@ -548,60 +545,55 @@ async fn op_http_write_headers( let body: Response<Body>; let new_wr: HttpResponseWriter; - // Set Vary: Accept-Encoding header for direct body response. - // Note: we set the header irrespective of whether or not we compress the data - // to make sure cache services do not serve uncompressed data to clients that - // support compression. - let vary_value = if let Some(value) = vary_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.to_lowercase().contains("accept-encoding") { - format!("Accept-Encoding, {}", value_str) - } else { - value_str.to_string() - } - } else { - // the header value wasn't valid UTF8, so it would have been a - // problem anyways, so sending a default header. - "Accept-Encoding".to_string() - } - } else { - "Accept-Encoding".to_string() - }; - builder = builder.header("vary", &vary_value); - - let accepts_compression = matches!( - *stream.accept_encoding.borrow(), - Encoding::Brotli | Encoding::Gzip - ); - let should_compress = body_compressible - && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none()) - && accepts_compression; - - if should_compress { - // If user provided a ETag header for uncompressed data, we need to - // ensure it is a Weak Etag header ("W/"). - if let Some(value) = etag_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.starts_with("W/") { - builder = builder.header("etag", format!("W/{}", value_str)); + match data { + Some(data) => { + // Set Vary: Accept-Encoding header for direct body response. + // Note: we set the header irrespective of whether or not we compress the + // data to make sure cache services do not serve uncompressed data to + // clients that support compression. + let vary_value = if let Some(value) = vary_header { + if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { + if !value_str.to_lowercase().contains("accept-encoding") { + format!("Accept-Encoding, {}", value_str) + } else { + value_str.to_string() + } } else { - builder = builder.header("etag", value.as_slice()); + // the header value wasn't valid UTF8, so it would have been a + // problem anyways, so sending a default header. + "Accept-Encoding".to_string() } } else { - builder = builder.header("etag", value.as_slice()); - } - } - } else if let Some(value) = etag_header { - builder = builder.header("etag", value.as_slice()); - } + "Accept-Encoding".to_string() + }; + builder = builder.header("vary", &vary_value); + + let accepts_compression = matches!( + *stream.accept_encoding.borrow(), + Encoding::Brotli | Encoding::Gzip + ); + + let should_compress = + body_compressible && data.len() > 20 && accepts_compression; - match data { - Some(data) => { if should_compress { // Drop 'content-length' header. Hyper will update it using compressed body. if let Some(headers) = builder.headers_mut() { headers.remove("content-length"); } + // If user provided a ETag header for uncompressed data, we need to + // ensure it is a Weak Etag header ("W/"). + if let Some(value) = etag_header { + if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { + if !value_str.starts_with("W/") { + builder = builder.header("etag", format!("W/{}", value_str)); + } else { + builder = builder.header("etag", value.as_slice()); + } + } else { + builder = builder.header("etag", value.as_slice()); + } + } match *stream.accept_encoding.borrow() { Encoding::Brotli => { @@ -629,6 +621,9 @@ async fn op_http_write_headers( } } } else { + if let Some(value) = etag_header { + builder = builder.header("etag", value.as_slice()); + } // If a buffer was passed, but isn't compressible, we use it to // construct a response body. body = builder.body(data.into_bytes().into())?; @@ -638,35 +633,19 @@ async fn op_http_write_headers( None => { // If no buffer was passed, the caller will stream the response body. - // Create a one way pipe that implements tokio's async io traits. To do - // this we create a [tokio::io::DuplexStream], but then throw away one - // of the directions to create a one way pipe. - let (a, b) = tokio::io::duplex(64 * 1024); - let (reader, _) = tokio::io::split(a); - let (_, writer) = tokio::io::split(b); + // TODO(@kitsonk) had compression for streamed bodies. - let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>; - - if should_compress { - match *stream.accept_encoding.borrow() { - Encoding::Brotli => { - let writer = BrotliEncoder::new(writer); - writer_body = Box::pin(writer); - builder = builder.header("content-encoding", "br"); - } - _ => { - assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip); - let writer = GzipEncoder::new(writer); - writer_body = Box::pin(writer); - builder = builder.header("content-encoding", "gzip"); - } - } - } else { - writer_body = Box::pin(writer); + // Set the user provided ETag & Vary headers for a streaming response + if let Some(value) = etag_header { + builder = builder.header("etag", value.as_slice()); + } + if let Some(value) = vary_header { + builder = builder.header("vary", value.as_slice()); } - body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?; - new_wr = HttpResponseWriter::Body(writer_body); + let (body_tx, body_rx) = Body::channel(); + body = builder.body(body_rx)?; + new_wr = HttpResponseWriter::Body(body_tx); } } @@ -686,69 +665,6 @@ async fn op_http_write_headers( } #[op] -async fn op_http_write_resource( - state: Rc<RefCell<OpState>>, - rid: ResourceId, - stream: ResourceId, -) -> Result<(), AnyError> { - let http_stream = state - .borrow() - .resource_table - .get::<HttpStreamResource>(rid)?; - let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; - let resource = state.borrow().resource_table.get_any(stream)?; - loop { - let body_writer = match &mut *wr { - HttpResponseWriter::Body(body_writer) => body_writer, - HttpResponseWriter::Headers(_) => { - return Err(http_error("no response headers")) - } - HttpResponseWriter::Closed => { - return Err(http_error("response already completed")) - } - }; - - let vec = vec![0u8; 64 * 1024]; // 64KB - let buf = ZeroCopyBuf::new_temp(vec); - let (nread, buf) = resource.clone().read_return(buf).await?; - if nread == 0 { - break; - } - - let mut res = body_writer.write_all(&buf).await; - if res.is_ok() { - res = body_writer.flush().await; - } - match res { - Ok(_) => {} - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - http_stream.conn.closed().await?; - // If there was no connection error, drop body_tx. - *wr = HttpResponseWriter::Closed; - } - } - } - - let wr = take(&mut *wr); - if let HttpResponseWriter::Body(mut body_writer) = wr { - match body_writer.shutdown().await { - Ok(_) => {} - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - http_stream.conn.closed().await?; - } - } - } - - Ok(()) -} - -#[op] async fn op_http_write( state: Rc<RefCell<OpState>>, rid: ResourceId, @@ -761,7 +677,7 @@ async fn op_http_write( let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; loop { - let body_writer = match &mut *wr { + let body_tx = match &mut *wr { HttpResponseWriter::Body(body_tx) => body_tx, HttpResponseWriter::Headers(_) => { break Err(http_error("no response headers")) @@ -771,17 +687,13 @@ async fn op_http_write( } }; - let mut res = body_writer.write_all(&buf).await; - if res.is_ok() { - res = body_writer.flush().await; - } - - match res { + let bytes = Bytes::copy_from_slice(&buf[..]); + match body_tx.send_data(bytes).await { Ok(_) => break Ok(()), Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. + // Don't return "channel closed", that's an implementation detail. // Pull up the failure associated with the transport connection instead. + assert!(err.is_closed()); stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; @@ -803,18 +715,7 @@ async fn op_http_shutdown( .resource_table .get::<HttpStreamResource>(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - let wr = take(&mut *wr); - if let HttpResponseWriter::Body(mut body_writer) = wr { - match body_writer.shutdown().await { - Ok(_) => {} - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - stream.conn.closed().await?; - } - } - } + take(&mut *wr); Ok(()) } |
