diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-08-17 07:52:37 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-17 07:52:37 -0600 |
commit | 23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch) | |
tree | 1521ffd2ac5e803224546cb349b3905925b9b5ff /ext/http/response_body.rs | |
parent | 0960e895da1275792c1f38999f6a185c864edb3f (diff) |
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work.
This is a resource wrapper for `ReadableStream`, allowing us to treat
all `ReadableStream` instances as resources, and remove special paths in
both `fetch` and `serve`.
Performance with a ReadableStream response yields ~18% improvement:
```
return new Response(new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]));
controller.close();
}
})
```
This patch:
```
12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 99.96us 100.03us 6.65ms 98.84%
Req/Sec 47.73k 2.43k 51.02k 89.11%
959308 requests in 10.10s, 117.10MB read
Requests/sec: 94978.71
Transfer/sec: 11.59MB
```
main:
```
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 163.03us 685.51us 19.73ms 99.27%
Req/Sec 39.50k 3.98k 66.11k 95.52%
789582 requests in 10.10s, 82.83MB read
Requests/sec: 78182.65
Transfer/sec: 8.20MB
```
Diffstat (limited to 'ext/http/response_body.rs')
-rw-r--r-- | ext/http/response_body.rs | 83 |
1 files changed, 13 insertions, 70 deletions
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 3697b2732..bd9d6f433 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -1,5 +1,4 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use std::borrow::Cow; use std::cell::RefCell; use std::future::Future; use std::io::Write; @@ -11,18 +10,12 @@ use brotli::enc::encode::BrotliEncoderParameter; use brotli::ffi::compressor::BrotliEncoderState; use bytes::Bytes; use bytes::BytesMut; -use deno_core::error::bad_resource; use deno_core::error::AnyError; use deno_core::futures::ready; use deno_core::futures::FutureExt; -use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::BufView; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::RcRef; use deno_core::Resource; -use deno_core::WriteOutcome; use flate2::write::GzEncoder; use http::HeaderMap; use hyper1::body::Body; @@ -126,8 +119,8 @@ pub enum Compression { pub enum ResponseStream { /// A resource stream, piped in fast mode. Resource(ResourceBodyAdapter), - /// A JS-backed stream, written in JS and transported via pipe. - V8Stream(tokio::sync::mpsc::Receiver<BufView>), + #[cfg(test)] + TestChannel(tokio::sync::mpsc::Receiver<BufView>), } #[derive(Default)] @@ -217,13 +210,6 @@ impl ResponseBytesInner { } } - pub fn from_v8( - compression: Compression, - rx: tokio::sync::mpsc::Receiver<BufView>, - ) -> Self { - Self::from_stream(compression, ResponseStream::V8Stream(rx)) - } - pub fn from_resource( compression: Compression, stm: Rc<dyn Resource>, @@ -235,12 +221,12 @@ impl ResponseBytesInner { ) } - pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self { + pub fn from_bufview(compression: Compression, buf: BufView) -> Self { match compression { Compression::GZip => { let mut writer = GzEncoder::new(Vec::new(), flate2::Compression::fast()); - writer.write_all(bytes).unwrap(); + writer.write_all(&buf).unwrap(); Self::Bytes(BufView::from(writer.finish().unwrap())) } Compression::Brotli => { @@ -251,11 +237,11 @@ impl ResponseBytesInner { // (~4MB) let mut writer = brotli::CompressorWriter::new(Vec::new(), 65 * 1024, 6, 22); - writer.write_all(bytes).unwrap(); + writer.write_all(&buf).unwrap(); writer.flush().unwrap(); Self::Bytes(BufView::from(writer.into_inner())) } - _ => Self::Bytes(BufView::from(bytes.to_vec())), + _ => Self::Bytes(buf), } } @@ -368,14 +354,16 @@ impl PollFrame for ResponseStream { ) -> std::task::Poll<ResponseStreamResult> { match &mut *self { ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx), - ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx), + #[cfg(test)] + ResponseStream::TestChannel(rx) => Pin::new(rx).poll_frame(cx), } } fn size_hint(&self) -> SizeHint { match self { ResponseStream::Resource(res) => res.size_hint(), - ResponseStream::V8Stream(res) => res.size_hint(), + #[cfg(test)] + ResponseStream::TestChannel(_) => SizeHint::default(), } } } @@ -414,6 +402,7 @@ impl PollFrame for ResourceBodyAdapter { } } +#[cfg(test)] impl PollFrame for tokio::sync::mpsc::Receiver<BufView> { fn poll_frame( mut self: Pin<&mut Self>, @@ -761,52 +750,6 @@ impl PollFrame for BrotliResponseStream { } } -/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which -/// feed's hyper's HTTP response. -pub struct V8StreamHttpResponseBody( - AsyncRefCell<Option<tokio::sync::mpsc::Sender<BufView>>>, - CancelHandle, -); - -impl V8StreamHttpResponseBody { - pub fn new(sender: tokio::sync::mpsc::Sender<BufView>) -> Self { - Self(AsyncRefCell::new(Some(sender)), CancelHandle::default()) - } -} - -impl Resource for V8StreamHttpResponseBody { - fn name(&self) -> Cow<str> { - "responseBody".into() - } - - fn write( - self: Rc<Self>, - buf: BufView, - ) -> AsyncResult<deno_core::WriteOutcome> { - let cancel_handle = RcRef::map(&self, |this| &this.1); - Box::pin( - async move { - let nwritten = buf.len(); - - let res = RcRef::map(self, |this| &this.0).borrow().await; - if let Some(tx) = res.as_ref() { - tx.send(buf) - .await - .map_err(|_| bad_resource("failed to write"))?; - Ok(WriteOutcome::Full { nwritten }) - } else { - Err(bad_resource("failed to write")) - } - } - .try_or_cancel(cancel_handle), - ) - } - - fn close(self: Rc<Self>) { - self.1.cancel(); - } -} - #[cfg(test)] mod tests { use super::*; @@ -892,7 +835,7 @@ mod tests { expected.extend(v); } let (tx, rx) = tokio::sync::mpsc::channel(1); - let underlying = ResponseStream::V8Stream(rx); + let underlying = ResponseStream::TestChannel(rx); let mut resp = GZipResponseStream::new(underlying); let handle = tokio::task::spawn(async move { for chunk in v { @@ -934,7 +877,7 @@ mod tests { expected.extend(v); } let (tx, rx) = tokio::sync::mpsc::channel(1); - let underlying = ResponseStream::V8Stream(rx); + let underlying = ResponseStream::TestChannel(rx); let mut resp = BrotliResponseStream::new(underlying); let handle = tokio::task::spawn(async move { for chunk in v { |