diff options
| author | Matt Mastracci <matthew@mastracci.com> | 2023-08-17 07:52:37 -0600 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-08-17 07:52:37 -0600 |
| commit | 23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch) | |
| tree | 1521ffd2ac5e803224546cb349b3905925b9b5ff /ext/http | |
| parent | 0960e895da1275792c1f38999f6a185c864edb3f (diff) | |
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work.
This is a resource wrapper for `ReadableStream`, allowing us to treat
all `ReadableStream` instances as resources, and remove special paths in
both `fetch` and `serve`.
Performance with a ReadableStream response yields ~18% improvement:
```
return new Response(new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]));
controller.close();
}
})
```
This patch:
```
12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 99.96us 100.03us 6.65ms 98.84%
Req/Sec 47.73k 2.43k 51.02k 89.11%
959308 requests in 10.10s, 117.10MB read
Requests/sec: 94978.71
Transfer/sec: 11.59MB
```
main:
```
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 163.03us 685.51us 19.73ms 99.27%
Req/Sec 39.50k 3.98k 66.11k 95.52%
789582 requests in 10.10s, 82.83MB read
Requests/sec: 78182.65
Transfer/sec: 8.20MB
```
Diffstat (limited to 'ext/http')
| -rw-r--r-- | ext/http/00_serve.js | 154 | ||||
| -rw-r--r-- | ext/http/http_next.rs | 48 | ||||
| -rw-r--r-- | ext/http/lib.rs | 1 | ||||
| -rw-r--r-- | ext/http/response_body.rs | 83 |
4 files changed, 56 insertions, 230 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 3447f48e2..265b79706 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -30,9 +30,9 @@ import { import { Deferred, getReadableStreamResourceBacking, - readableStreamClose, readableStreamForRid, ReadableStreamPrototype, + resourceForReadableStream, } from "ext:deno_web/06_streams.js"; import { listen, TcpConn } from "ext:deno_net/01_net.js"; import { listenTls } from "ext:deno_net/02_tls.js"; @@ -41,10 +41,6 @@ const { Error, ObjectPrototypeIsPrototypeOf, PromisePrototypeCatch, - SafeSet, - SafeSetIterator, - SetPrototypeAdd, - SetPrototypeDelete, Symbol, SymbolFor, TypeError, @@ -61,7 +57,6 @@ const { op_http_set_promise_complete, op_http_set_response_body_bytes, op_http_set_response_body_resource, - op_http_set_response_body_stream, op_http_set_response_body_text, op_http_set_response_header, op_http_set_response_headers, @@ -339,7 +334,6 @@ class InnerRequest { class CallbackContext { abortController; - responseBodies; scheme; fallbackHost; serverRid; @@ -352,7 +346,6 @@ class CallbackContext { { once: true }, ); this.abortController = new AbortController(); - this.responseBodies = new SafeSet(); this.serverRid = args[0]; this.scheme = args[1]; this.fallbackHost = args[2]; @@ -379,23 +372,24 @@ class ServeHandlerInfo { } } -function fastSyncResponseOrStream(req, respBody) { +function fastSyncResponseOrStream(req, respBody, status) { if (respBody === null || respBody === undefined) { // Don't set the body - return null; + op_http_set_promise_complete(req, status); + return; } const stream = respBody.streamOrStatic; const body = stream.body; if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) { - op_http_set_response_body_bytes(req, body); - return null; + op_http_set_response_body_bytes(req, body, status); + return; } if (typeof body === "string") { - op_http_set_response_body_text(req, body); - return null; + op_http_set_response_body_text(req, body, status); + return; } // At this point in the response it needs to be a stream @@ -408,115 +402,16 @@ function fastSyncResponseOrStream(req, respBody) { req, resourceBacking.rid, resourceBacking.autoClose, + status, + ); + } else { + const rid = resourceForReadableStream(stream); + op_http_set_response_body_resource( + req, + rid, + true, + status, ); - return null; - } - - return stream; -} - -async function asyncResponse(responseBodies, req, status, stream) { - const reader = stream.getReader(); - let responseRid; - let closed = false; - let timeout; - - try { - // IMPORTANT: We get a performance boost from this optimization, but V8 is very - // sensitive to the order and structure. Benchmark any changes to this code. - - // Optimize for streams that are done in zero or one packets. We will not - // have to allocate a resource in this case. - const { value: value1, done: done1 } = await reader.read(); - if (done1) { - closed = true; - // Exit 1: no response body at all, extreme fast path - // Reader will be closed by finally block - return; - } - - // The second value cannot block indefinitely, as someone may be waiting on a response - // of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms - // and we race it. - let timeoutPromise; - timeout = setTimeout(() => { - responseRid = op_http_set_response_body_stream(req); - SetPrototypeAdd(responseBodies, responseRid); - op_http_set_promise_complete(req, status); - // TODO(mmastrac): if this promise fails before we get to the await below, it crashes - // the process with an error: - // - // 'Uncaught (in promise) BadResource: failed to write'. - // - // To avoid this, we're going to swallow errors here and allow the code later in the - // file to re-throw them in a way that doesn't appear to be an uncaught promise rejection. - timeoutPromise = PromisePrototypeCatch( - core.writeAll(responseRid, value1), - () => null, - ); - }, 250); - const { value: value2, done: done2 } = await reader.read(); - - if (timeoutPromise) { - await timeoutPromise; - if (done2) { - closed = true; - // Exit 2(a): read 2 is EOS, and timeout resolved. - // Reader will be closed by finally block - // Response stream will be closed by finally block. - return; - } - - // Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward. - } else { - clearTimeout(timeout); - timeout = undefined; - - if (done2) { - // Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough. - // Reader will be closed by finally block - // No response stream - closed = true; - op_http_set_response_body_bytes(req, value1); - return; - } - - responseRid = op_http_set_response_body_stream(req); - SetPrototypeAdd(responseBodies, responseRid); - op_http_set_promise_complete(req, status); - // Write our first packet - await core.writeAll(responseRid, value1); - } - - await core.writeAll(responseRid, value2); - while (true) { - const { value, done } = await reader.read(); - if (done) { - closed = true; - break; - } - await core.writeAll(responseRid, value); - } - } catch (error) { - closed = true; - try { - await reader.cancel(error); - } catch { - // Pass - } - } finally { - if (!closed) { - readableStreamClose(reader); - } - if (timeout !== undefined) { - clearTimeout(timeout); - } - if (responseRid) { - core.tryClose(responseRid); - SetPrototypeDelete(responseBodies, responseRid); - } else { - op_http_set_promise_complete(req, status); - } } } @@ -528,7 +423,6 @@ async function asyncResponse(responseBodies, req, status, stream) { * This function returns a promise that will only reject in the case of abnormal exit. */ function mapToCallback(context, callback, onError) { - const responseBodies = context.responseBodies; const signal = context.abortController.signal; const hasCallback = callback.length > 0; const hasOneCallback = callback.length === 1; @@ -591,15 +485,7 @@ function mapToCallback(context, callback, onError) { } } - // Attempt to respond quickly to this request, otherwise extract the stream - const stream = fastSyncResponseOrStream(req, inner.body); - if (stream !== null) { - // Handle the stream asynchronously - await asyncResponse(responseBodies, req, status, stream); - } else { - op_http_set_promise_complete(req, status); - } - + fastSyncResponseOrStream(req, inner.body, status); innerRequest?.close(); }; } @@ -755,10 +641,6 @@ function serveHttpOn(context, callback) { } PromisePrototypeCatch(callback(req), promiseErrorHandler); } - - for (const streamRid of new SafeSetIterator(context.responseBodies)) { - core.tryClose(streamRid); - } })(); return { diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 2e9b315ca..60ef83b0f 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,7 +10,6 @@ use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; -use crate::response_body::V8StreamHttpResponseBody; use crate::slab::slab_drop; use crate::slab::slab_get; use crate::slab::slab_init; @@ -30,6 +29,7 @@ use deno_core::task::JoinHandle; use deno_core::v8; use deno_core::AsyncRefCell; use deno_core::AsyncResult; +use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -573,6 +573,7 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { fn set_response( slab_id: SlabId, length: Option<usize>, + status: u16, response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { let mut http = slab_get(slab_id); @@ -583,7 +584,14 @@ fn set_response( length, response.headers_mut(), ); - response.body_mut().initialize(response_fn(compression)) + response.body_mut().initialize(response_fn(compression)); + + // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we + // will quitely ignore invalid values. + if let Ok(code) = StatusCode::from_u16(status) { + *response.status_mut() = code; + } + http.complete(); } #[op2(fast)] @@ -592,6 +600,7 @@ pub fn op_http_set_response_body_resource( #[smi] slab_id: SlabId, #[smi] stream_rid: ResourceId, auto_close: bool, + status: u16, ) -> Result<(), AnyError> { // If the stream is auto_close, we will hold the last ref to it until the response is complete. let resource = if auto_close { @@ -603,6 +612,7 @@ pub fn op_http_set_response_body_resource( set_response( slab_id, resource.size_hint().1.map(|s| s as usize), + status, move |compression| { ResponseBytesInner::from_resource(compression, resource, auto_close) }, @@ -612,42 +622,34 @@ pub fn op_http_set_response_body_resource( } #[op2(fast)] -#[smi] -pub fn op_http_set_response_body_stream( - state: &mut OpState, - #[smi] slab_id: SlabId, -) -> Result<ResourceId, AnyError> { - // TODO(mmastrac): what should this channel size be? - let (tx, rx) = tokio::sync::mpsc::channel(1); - set_response(slab_id, None, |compression| { - ResponseBytesInner::from_v8(compression, rx) - }); - - Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx))) -} - -#[op2(fast)] pub fn op_http_set_response_body_text( #[smi] slab_id: SlabId, #[string] text: String, + status: u16, ) { if !text.is_empty() { - set_response(slab_id, Some(text.len()), |compression| { + set_response(slab_id, Some(text.len()), status, |compression| { ResponseBytesInner::from_vec(compression, text.into_bytes()) }); + } else { + op_http_set_promise_complete::call(slab_id, status); } } -#[op2(fast)] +// Skipping `fast` because we prefer an owned buffer here. +#[op2] pub fn op_http_set_response_body_bytes( #[smi] slab_id: SlabId, - #[buffer] buffer: &[u8], + #[buffer] buffer: JsBuffer, + status: u16, ) { if !buffer.is_empty() { - set_response(slab_id, Some(buffer.len()), |compression| { - ResponseBytesInner::from_slice(compression, buffer) + set_response(slab_id, Some(buffer.len()), status, |compression| { + ResponseBytesInner::from_bufview(compression, BufView::from(buffer)) }); - }; + } else { + op_http_set_promise_complete::call(slab_id, status); + } } #[op2(async)] diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 8060b5a1e..e0c5c89d0 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -115,7 +115,6 @@ deno_core::extension!( http_next::op_http_set_promise_complete, http_next::op_http_set_response_body_bytes, http_next::op_http_set_response_body_resource, - http_next::op_http_set_response_body_stream, http_next::op_http_set_response_body_text, http_next::op_http_set_response_header, http_next::op_http_set_response_headers, diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 3697b2732..bd9d6f433 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -1,5 +1,4 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use std::borrow::Cow; use std::cell::RefCell; use std::future::Future; use std::io::Write; @@ -11,18 +10,12 @@ use brotli::enc::encode::BrotliEncoderParameter; use brotli::ffi::compressor::BrotliEncoderState; use bytes::Bytes; use bytes::BytesMut; -use deno_core::error::bad_resource; use deno_core::error::AnyError; use deno_core::futures::ready; use deno_core::futures::FutureExt; -use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::BufView; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::RcRef; use deno_core::Resource; -use deno_core::WriteOutcome; use flate2::write::GzEncoder; use http::HeaderMap; use hyper1::body::Body; @@ -126,8 +119,8 @@ pub enum Compression { pub enum ResponseStream { /// A resource stream, piped in fast mode. Resource(ResourceBodyAdapter), - /// A JS-backed stream, written in JS and transported via pipe. - V8Stream(tokio::sync::mpsc::Receiver<BufView>), + #[cfg(test)] + TestChannel(tokio::sync::mpsc::Receiver<BufView>), } #[derive(Default)] @@ -217,13 +210,6 @@ impl ResponseBytesInner { } } - pub fn from_v8( - compression: Compression, - rx: tokio::sync::mpsc::Receiver<BufView>, - ) -> Self { - Self::from_stream(compression, ResponseStream::V8Stream(rx)) - } - pub fn from_resource( compression: Compression, stm: Rc<dyn Resource>, @@ -235,12 +221,12 @@ impl ResponseBytesInner { ) } - pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self { + pub fn from_bufview(compression: Compression, buf: BufView) -> Self { match compression { Compression::GZip => { let mut writer = GzEncoder::new(Vec::new(), flate2::Compression::fast()); - writer.write_all(bytes).unwrap(); + writer.write_all(&buf).unwrap(); Self::Bytes(BufView::from(writer.finish().unwrap())) } Compression::Brotli => { @@ -251,11 +237,11 @@ impl ResponseBytesInner { // (~4MB) let mut writer = brotli::CompressorWriter::new(Vec::new(), 65 * 1024, 6, 22); - writer.write_all(bytes).unwrap(); + writer.write_all(&buf).unwrap(); writer.flush().unwrap(); Self::Bytes(BufView::from(writer.into_inner())) } - _ => Self::Bytes(BufView::from(bytes.to_vec())), + _ => Self::Bytes(buf), } } @@ -368,14 +354,16 @@ impl PollFrame for ResponseStream { ) -> std::task::Poll<ResponseStreamResult> { match &mut *self { ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx), - ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx), + #[cfg(test)] + ResponseStream::TestChannel(rx) => Pin::new(rx).poll_frame(cx), } } fn size_hint(&self) -> SizeHint { match self { ResponseStream::Resource(res) => res.size_hint(), - ResponseStream::V8Stream(res) => res.size_hint(), + #[cfg(test)] + ResponseStream::TestChannel(_) => SizeHint::default(), } } } @@ -414,6 +402,7 @@ impl PollFrame for ResourceBodyAdapter { } } +#[cfg(test)] impl PollFrame for tokio::sync::mpsc::Receiver<BufView> { fn poll_frame( mut self: Pin<&mut Self>, @@ -761,52 +750,6 @@ impl PollFrame for BrotliResponseStream { } } -/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which -/// feed's hyper's HTTP response. -pub struct V8StreamHttpResponseBody( - AsyncRefCell<Option<tokio::sync::mpsc::Sender<BufView>>>, - CancelHandle, -); - -impl V8StreamHttpResponseBody { - pub fn new(sender: tokio::sync::mpsc::Sender<BufView>) -> Self { - Self(AsyncRefCell::new(Some(sender)), CancelHandle::default()) - } -} - -impl Resource for V8StreamHttpResponseBody { - fn name(&self) -> Cow<str> { - "responseBody".into() - } - - fn write( - self: Rc<Self>, - buf: BufView, - ) -> AsyncResult<deno_core::WriteOutcome> { - let cancel_handle = RcRef::map(&self, |this| &this.1); - Box::pin( - async move { - let nwritten = buf.len(); - - let res = RcRef::map(self, |this| &this.0).borrow().await; - if let Some(tx) = res.as_ref() { - tx.send(buf) - .await - .map_err(|_| bad_resource("failed to write"))?; - Ok(WriteOutcome::Full { nwritten }) - } else { - Err(bad_resource("failed to write")) - } - } - .try_or_cancel(cancel_handle), - ) - } - - fn close(self: Rc<Self>) { - self.1.cancel(); - } -} - #[cfg(test)] mod tests { use super::*; @@ -892,7 +835,7 @@ mod tests { expected.extend(v); } let (tx, rx) = tokio::sync::mpsc::channel(1); - let underlying = ResponseStream::V8Stream(rx); + let underlying = ResponseStream::TestChannel(rx); let mut resp = GZipResponseStream::new(underlying); let handle = tokio::task::spawn(async move { for chunk in v { @@ -934,7 +877,7 @@ mod tests { expected.extend(v); } let (tx, rx) = tokio::sync::mpsc::channel(1); - let underlying = ResponseStream::V8Stream(rx); + let underlying = ResponseStream::TestChannel(rx); let mut resp = BrotliResponseStream::new(underlying); let handle = tokio::task::spawn(async move { for chunk in v { |
