summaryrefslogtreecommitdiff
path: root/ext/http
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-08-17 07:52:37 -0600
committerGitHub <noreply@github.com>2023-08-17 07:52:37 -0600
commit23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch)
tree1521ffd2ac5e803224546cb349b3905925b9b5ff /ext/http
parent0960e895da1275792c1f38999f6a185c864edb3f (diff)
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work. This is a resource wrapper for `ReadableStream`, allowing us to treat all `ReadableStream` instances as resources, and remove special paths in both `fetch` and `serve`. Performance with a ReadableStream response yields ~18% improvement: ``` return new Response(new ReadableStream({ start(controller) { controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100])); controller.close(); } }) ``` This patch: ``` 12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080 Running 10s test @ http://localhost:8080 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 99.96us 100.03us 6.65ms 98.84% Req/Sec 47.73k 2.43k 51.02k 89.11% 959308 requests in 10.10s, 117.10MB read Requests/sec: 94978.71 Transfer/sec: 11.59MB ``` main: ``` Running 10s test @ http://localhost:8080 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 163.03us 685.51us 19.73ms 99.27% Req/Sec 39.50k 3.98k 66.11k 95.52% 789582 requests in 10.10s, 82.83MB read Requests/sec: 78182.65 Transfer/sec: 8.20MB ```
Diffstat (limited to 'ext/http')
-rw-r--r--ext/http/00_serve.js154
-rw-r--r--ext/http/http_next.rs48
-rw-r--r--ext/http/lib.rs1
-rw-r--r--ext/http/response_body.rs83
4 files changed, 56 insertions, 230 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index 3447f48e2..265b79706 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -30,9 +30,9 @@ import {
import {
Deferred,
getReadableStreamResourceBacking,
- readableStreamClose,
readableStreamForRid,
ReadableStreamPrototype,
+ resourceForReadableStream,
} from "ext:deno_web/06_streams.js";
import { listen, TcpConn } from "ext:deno_net/01_net.js";
import { listenTls } from "ext:deno_net/02_tls.js";
@@ -41,10 +41,6 @@ const {
Error,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeCatch,
- SafeSet,
- SafeSetIterator,
- SetPrototypeAdd,
- SetPrototypeDelete,
Symbol,
SymbolFor,
TypeError,
@@ -61,7 +57,6 @@ const {
op_http_set_promise_complete,
op_http_set_response_body_bytes,
op_http_set_response_body_resource,
- op_http_set_response_body_stream,
op_http_set_response_body_text,
op_http_set_response_header,
op_http_set_response_headers,
@@ -339,7 +334,6 @@ class InnerRequest {
class CallbackContext {
abortController;
- responseBodies;
scheme;
fallbackHost;
serverRid;
@@ -352,7 +346,6 @@ class CallbackContext {
{ once: true },
);
this.abortController = new AbortController();
- this.responseBodies = new SafeSet();
this.serverRid = args[0];
this.scheme = args[1];
this.fallbackHost = args[2];
@@ -379,23 +372,24 @@ class ServeHandlerInfo {
}
}
-function fastSyncResponseOrStream(req, respBody) {
+function fastSyncResponseOrStream(req, respBody, status) {
if (respBody === null || respBody === undefined) {
// Don't set the body
- return null;
+ op_http_set_promise_complete(req, status);
+ return;
}
const stream = respBody.streamOrStatic;
const body = stream.body;
if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
- op_http_set_response_body_bytes(req, body);
- return null;
+ op_http_set_response_body_bytes(req, body, status);
+ return;
}
if (typeof body === "string") {
- op_http_set_response_body_text(req, body);
- return null;
+ op_http_set_response_body_text(req, body, status);
+ return;
}
// At this point in the response it needs to be a stream
@@ -408,115 +402,16 @@ function fastSyncResponseOrStream(req, respBody) {
req,
resourceBacking.rid,
resourceBacking.autoClose,
+ status,
+ );
+ } else {
+ const rid = resourceForReadableStream(stream);
+ op_http_set_response_body_resource(
+ req,
+ rid,
+ true,
+ status,
);
- return null;
- }
-
- return stream;
-}
-
-async function asyncResponse(responseBodies, req, status, stream) {
- const reader = stream.getReader();
- let responseRid;
- let closed = false;
- let timeout;
-
- try {
- // IMPORTANT: We get a performance boost from this optimization, but V8 is very
- // sensitive to the order and structure. Benchmark any changes to this code.
-
- // Optimize for streams that are done in zero or one packets. We will not
- // have to allocate a resource in this case.
- const { value: value1, done: done1 } = await reader.read();
- if (done1) {
- closed = true;
- // Exit 1: no response body at all, extreme fast path
- // Reader will be closed by finally block
- return;
- }
-
- // The second value cannot block indefinitely, as someone may be waiting on a response
- // of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms
- // and we race it.
- let timeoutPromise;
- timeout = setTimeout(() => {
- responseRid = op_http_set_response_body_stream(req);
- SetPrototypeAdd(responseBodies, responseRid);
- op_http_set_promise_complete(req, status);
- // TODO(mmastrac): if this promise fails before we get to the await below, it crashes
- // the process with an error:
- //
- // 'Uncaught (in promise) BadResource: failed to write'.
- //
- // To avoid this, we're going to swallow errors here and allow the code later in the
- // file to re-throw them in a way that doesn't appear to be an uncaught promise rejection.
- timeoutPromise = PromisePrototypeCatch(
- core.writeAll(responseRid, value1),
- () => null,
- );
- }, 250);
- const { value: value2, done: done2 } = await reader.read();
-
- if (timeoutPromise) {
- await timeoutPromise;
- if (done2) {
- closed = true;
- // Exit 2(a): read 2 is EOS, and timeout resolved.
- // Reader will be closed by finally block
- // Response stream will be closed by finally block.
- return;
- }
-
- // Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward.
- } else {
- clearTimeout(timeout);
- timeout = undefined;
-
- if (done2) {
- // Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough.
- // Reader will be closed by finally block
- // No response stream
- closed = true;
- op_http_set_response_body_bytes(req, value1);
- return;
- }
-
- responseRid = op_http_set_response_body_stream(req);
- SetPrototypeAdd(responseBodies, responseRid);
- op_http_set_promise_complete(req, status);
- // Write our first packet
- await core.writeAll(responseRid, value1);
- }
-
- await core.writeAll(responseRid, value2);
- while (true) {
- const { value, done } = await reader.read();
- if (done) {
- closed = true;
- break;
- }
- await core.writeAll(responseRid, value);
- }
- } catch (error) {
- closed = true;
- try {
- await reader.cancel(error);
- } catch {
- // Pass
- }
- } finally {
- if (!closed) {
- readableStreamClose(reader);
- }
- if (timeout !== undefined) {
- clearTimeout(timeout);
- }
- if (responseRid) {
- core.tryClose(responseRid);
- SetPrototypeDelete(responseBodies, responseRid);
- } else {
- op_http_set_promise_complete(req, status);
- }
}
}
@@ -528,7 +423,6 @@ async function asyncResponse(responseBodies, req, status, stream) {
* This function returns a promise that will only reject in the case of abnormal exit.
*/
function mapToCallback(context, callback, onError) {
- const responseBodies = context.responseBodies;
const signal = context.abortController.signal;
const hasCallback = callback.length > 0;
const hasOneCallback = callback.length === 1;
@@ -591,15 +485,7 @@ function mapToCallback(context, callback, onError) {
}
}
- // Attempt to respond quickly to this request, otherwise extract the stream
- const stream = fastSyncResponseOrStream(req, inner.body);
- if (stream !== null) {
- // Handle the stream asynchronously
- await asyncResponse(responseBodies, req, status, stream);
- } else {
- op_http_set_promise_complete(req, status);
- }
-
+ fastSyncResponseOrStream(req, inner.body, status);
innerRequest?.close();
};
}
@@ -755,10 +641,6 @@ function serveHttpOn(context, callback) {
}
PromisePrototypeCatch(callback(req), promiseErrorHandler);
}
-
- for (const streamRid of new SafeSetIterator(context.responseBodies)) {
- core.tryClose(streamRid);
- }
})();
return {
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 2e9b315ca..60ef83b0f 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -10,7 +10,6 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
-use crate::response_body::V8StreamHttpResponseBody;
use crate::slab::slab_drop;
use crate::slab::slab_get;
use crate::slab::slab_init;
@@ -30,6 +29,7 @@ use deno_core::task::JoinHandle;
use deno_core::v8;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
+use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
@@ -573,6 +573,7 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
fn set_response(
slab_id: SlabId,
length: Option<usize>,
+ status: u16,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
let mut http = slab_get(slab_id);
@@ -583,7 +584,14 @@ fn set_response(
length,
response.headers_mut(),
);
- response.body_mut().initialize(response_fn(compression))
+ response.body_mut().initialize(response_fn(compression));
+
+ // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
+ // will quitely ignore invalid values.
+ if let Ok(code) = StatusCode::from_u16(status) {
+ *response.status_mut() = code;
+ }
+ http.complete();
}
#[op2(fast)]
@@ -592,6 +600,7 @@ pub fn op_http_set_response_body_resource(
#[smi] slab_id: SlabId,
#[smi] stream_rid: ResourceId,
auto_close: bool,
+ status: u16,
) -> Result<(), AnyError> {
// If the stream is auto_close, we will hold the last ref to it until the response is complete.
let resource = if auto_close {
@@ -603,6 +612,7 @@ pub fn op_http_set_response_body_resource(
set_response(
slab_id,
resource.size_hint().1.map(|s| s as usize),
+ status,
move |compression| {
ResponseBytesInner::from_resource(compression, resource, auto_close)
},
@@ -612,42 +622,34 @@ pub fn op_http_set_response_body_resource(
}
#[op2(fast)]
-#[smi]
-pub fn op_http_set_response_body_stream(
- state: &mut OpState,
- #[smi] slab_id: SlabId,
-) -> Result<ResourceId, AnyError> {
- // TODO(mmastrac): what should this channel size be?
- let (tx, rx) = tokio::sync::mpsc::channel(1);
- set_response(slab_id, None, |compression| {
- ResponseBytesInner::from_v8(compression, rx)
- });
-
- Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx)))
-}
-
-#[op2(fast)]
pub fn op_http_set_response_body_text(
#[smi] slab_id: SlabId,
#[string] text: String,
+ status: u16,
) {
if !text.is_empty() {
- set_response(slab_id, Some(text.len()), |compression| {
+ set_response(slab_id, Some(text.len()), status, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
+ } else {
+ op_http_set_promise_complete::call(slab_id, status);
}
}
-#[op2(fast)]
+// Skipping `fast` because we prefer an owned buffer here.
+#[op2]
pub fn op_http_set_response_body_bytes(
#[smi] slab_id: SlabId,
- #[buffer] buffer: &[u8],
+ #[buffer] buffer: JsBuffer,
+ status: u16,
) {
if !buffer.is_empty() {
- set_response(slab_id, Some(buffer.len()), |compression| {
- ResponseBytesInner::from_slice(compression, buffer)
+ set_response(slab_id, Some(buffer.len()), status, |compression| {
+ ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
- };
+ } else {
+ op_http_set_promise_complete::call(slab_id, status);
+ }
}
#[op2(async)]
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 8060b5a1e..e0c5c89d0 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -115,7 +115,6 @@ deno_core::extension!(
http_next::op_http_set_promise_complete,
http_next::op_http_set_response_body_bytes,
http_next::op_http_set_response_body_resource,
- http_next::op_http_set_response_body_stream,
http_next::op_http_set_response_body_text,
http_next::op_http_set_response_header,
http_next::op_http_set_response_headers,
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index 3697b2732..bd9d6f433 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -1,5 +1,4 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
use std::io::Write;
@@ -11,18 +10,12 @@ use brotli::enc::encode::BrotliEncoderParameter;
use brotli::ffi::compressor::BrotliEncoderState;
use bytes::Bytes;
use bytes::BytesMut;
-use deno_core::error::bad_resource;
use deno_core::error::AnyError;
use deno_core::futures::ready;
use deno_core::futures::FutureExt;
-use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
-use deno_core::CancelHandle;
-use deno_core::CancelTryFuture;
-use deno_core::RcRef;
use deno_core::Resource;
-use deno_core::WriteOutcome;
use flate2::write::GzEncoder;
use http::HeaderMap;
use hyper1::body::Body;
@@ -126,8 +119,8 @@ pub enum Compression {
pub enum ResponseStream {
/// A resource stream, piped in fast mode.
Resource(ResourceBodyAdapter),
- /// A JS-backed stream, written in JS and transported via pipe.
- V8Stream(tokio::sync::mpsc::Receiver<BufView>),
+ #[cfg(test)]
+ TestChannel(tokio::sync::mpsc::Receiver<BufView>),
}
#[derive(Default)]
@@ -217,13 +210,6 @@ impl ResponseBytesInner {
}
}
- pub fn from_v8(
- compression: Compression,
- rx: tokio::sync::mpsc::Receiver<BufView>,
- ) -> Self {
- Self::from_stream(compression, ResponseStream::V8Stream(rx))
- }
-
pub fn from_resource(
compression: Compression,
stm: Rc<dyn Resource>,
@@ -235,12 +221,12 @@ impl ResponseBytesInner {
)
}
- pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self {
+ pub fn from_bufview(compression: Compression, buf: BufView) -> Self {
match compression {
Compression::GZip => {
let mut writer =
GzEncoder::new(Vec::new(), flate2::Compression::fast());
- writer.write_all(bytes).unwrap();
+ writer.write_all(&buf).unwrap();
Self::Bytes(BufView::from(writer.finish().unwrap()))
}
Compression::Brotli => {
@@ -251,11 +237,11 @@ impl ResponseBytesInner {
// (~4MB)
let mut writer =
brotli::CompressorWriter::new(Vec::new(), 65 * 1024, 6, 22);
- writer.write_all(bytes).unwrap();
+ writer.write_all(&buf).unwrap();
writer.flush().unwrap();
Self::Bytes(BufView::from(writer.into_inner()))
}
- _ => Self::Bytes(BufView::from(bytes.to_vec())),
+ _ => Self::Bytes(buf),
}
}
@@ -368,14 +354,16 @@ impl PollFrame for ResponseStream {
) -> std::task::Poll<ResponseStreamResult> {
match &mut *self {
ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx),
- ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx),
+ #[cfg(test)]
+ ResponseStream::TestChannel(rx) => Pin::new(rx).poll_frame(cx),
}
}
fn size_hint(&self) -> SizeHint {
match self {
ResponseStream::Resource(res) => res.size_hint(),
- ResponseStream::V8Stream(res) => res.size_hint(),
+ #[cfg(test)]
+ ResponseStream::TestChannel(_) => SizeHint::default(),
}
}
}
@@ -414,6 +402,7 @@ impl PollFrame for ResourceBodyAdapter {
}
}
+#[cfg(test)]
impl PollFrame for tokio::sync::mpsc::Receiver<BufView> {
fn poll_frame(
mut self: Pin<&mut Self>,
@@ -761,52 +750,6 @@ impl PollFrame for BrotliResponseStream {
}
}
-/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which
-/// feed's hyper's HTTP response.
-pub struct V8StreamHttpResponseBody(
- AsyncRefCell<Option<tokio::sync::mpsc::Sender<BufView>>>,
- CancelHandle,
-);
-
-impl V8StreamHttpResponseBody {
- pub fn new(sender: tokio::sync::mpsc::Sender<BufView>) -> Self {
- Self(AsyncRefCell::new(Some(sender)), CancelHandle::default())
- }
-}
-
-impl Resource for V8StreamHttpResponseBody {
- fn name(&self) -> Cow<str> {
- "responseBody".into()
- }
-
- fn write(
- self: Rc<Self>,
- buf: BufView,
- ) -> AsyncResult<deno_core::WriteOutcome> {
- let cancel_handle = RcRef::map(&self, |this| &this.1);
- Box::pin(
- async move {
- let nwritten = buf.len();
-
- let res = RcRef::map(self, |this| &this.0).borrow().await;
- if let Some(tx) = res.as_ref() {
- tx.send(buf)
- .await
- .map_err(|_| bad_resource("failed to write"))?;
- Ok(WriteOutcome::Full { nwritten })
- } else {
- Err(bad_resource("failed to write"))
- }
- }
- .try_or_cancel(cancel_handle),
- )
- }
-
- fn close(self: Rc<Self>) {
- self.1.cancel();
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -892,7 +835,7 @@ mod tests {
expected.extend(v);
}
let (tx, rx) = tokio::sync::mpsc::channel(1);
- let underlying = ResponseStream::V8Stream(rx);
+ let underlying = ResponseStream::TestChannel(rx);
let mut resp = GZipResponseStream::new(underlying);
let handle = tokio::task::spawn(async move {
for chunk in v {
@@ -934,7 +877,7 @@ mod tests {
expected.extend(v);
}
let (tx, rx) = tokio::sync::mpsc::channel(1);
- let underlying = ResponseStream::V8Stream(rx);
+ let underlying = ResponseStream::TestChannel(rx);
let mut resp = BrotliResponseStream::new(underlying);
let handle = tokio::task::spawn(async move {
for chunk in v {