summaryrefslogtreecommitdiff
path: root/ext/http/http_next.rs
diff options
context:
space:
mode:
authorLaurence Rowe <l@lrowe.co.uk>2023-11-13 11:17:31 -0800
committerGitHub <noreply@github.com>2023-11-13 19:17:31 +0000
commite5819777c3962079a06c3b09bced1945b1c135f7 (patch)
treec925baa676de01f8ca026cb61473e1891a6d8254 /ext/http/http_next.rs
parent0209f7b46954d1b7bf923b4191e5a356ec09622c (diff)
refactor(ext/http): Use HttpRecord as response body to track until body completion (#20822)
Use HttpRecord as response body so requests can be tracked all the way to response body completion. This allows Request properties to be accessed while the response body is streaming. Graceful shutdown now awaits a future instead of async spinning waiting for requests to finish. On the minimal benchmark this refactor improves performance an additional 2% over pooling alone for a net 3% increase over the previous deno main branch. Builds upon https://github.com/denoland/deno/pull/20809 and https://github.com/denoland/deno/pull/20770. --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r--ext/http/http_next.rs131
1 files changed, 63 insertions, 68 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 4035fe259..f42275b0e 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -8,11 +8,11 @@ use crate::request_properties::HttpConnectionProperties;
use crate::request_properties::HttpListenProperties;
use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
-use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::service::handle_request;
use crate::service::http_trace;
use crate::service::HttpRecord;
+use crate::service::HttpRecordResponse;
use crate::service::HttpRequestBodyAutocloser;
use crate::service::HttpServerState;
use crate::websocket_upgrade::WebSocketUpgrade;
@@ -68,7 +68,6 @@ use std::io;
use std::pin::Pin;
use std::ptr::null;
use std::rc::Rc;
-use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@@ -182,7 +181,7 @@ pub fn op_http_upgrade_raw(
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
spawn(async move {
- let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();
+ let mut upgrade_stream = WebSocketUpgrade::<()>::default();
// Stage 2: Extract the Upgraded connection
let mut buf = [0; 1024];
@@ -191,7 +190,8 @@ pub fn op_http_upgrade_raw(
match upgrade_stream.write(&buf[..read]) {
Ok(None) => continue,
Ok(Some((response, bytes))) => {
- *http.response() = response;
+ let (response_parts, _) = response.into_parts();
+ *http.response_parts() = response_parts;
http.complete();
let mut upgraded = TokioIo::new(upgrade.await?);
upgraded.write_all(&bytes).await?;
@@ -250,10 +250,10 @@ pub async fn op_http_upgrade_websocket_next(
// Stage 1: set the response to 101 Switching Protocols and send it
let upgrade = http.upgrade()?;
{
- let mut response = http.response();
- *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
+ let mut response_parts = http.response_parts();
+ response_parts.status = StatusCode::SWITCHING_PROTOCOLS;
for (name, value) in headers {
- response.headers_mut().append(
+ response_parts.headers.append(
HeaderName::from_bytes(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
@@ -274,10 +274,14 @@ pub fn op_http_set_promise_complete(external: *const c_void, status: u16) {
let http =
// SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_set_promise_complete") };
+ set_promise_complete(http, status);
+}
+
+fn set_promise_complete(http: Rc<HttpRecord>, status: u16) {
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
- *http.response().status_mut() = code;
+ http.response_parts().status = code;
}
http.complete();
}
@@ -441,7 +445,7 @@ pub fn op_http_read_request_body(
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_read_request_body") };
- let rid = if let Some(incoming) = http.take_body() {
+ let rid = if let Some(incoming) = http.take_request_body() {
let body_resource = Rc::new(HttpRequestBody::new(incoming));
state.borrow_mut().resource_table.add_rc(body_resource)
} else {
@@ -462,8 +466,7 @@ pub fn op_http_set_response_header(
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_header") };
- let mut response = http.response();
- let resp_headers = response.headers_mut();
+ let mut response_parts = http.response_parts();
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = match value {
@@ -473,7 +476,7 @@ pub fn op_http_set_response_header(
HeaderValue::from_maybe_shared_unchecked(bytes::Bytes::from(bytes_vec))
},
};
- resp_headers.append(name, value);
+ response_parts.headers.append(name, value);
}
#[op2]
@@ -486,12 +489,13 @@ pub fn op_http_set_response_headers(
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_headers") };
// TODO(mmastrac): Invalid headers should be handled?
- let mut response = http.response();
- let resp_headers = response.headers_mut();
+ let mut response_parts = http.response_parts();
let len = headers.length();
let header_len = len * 2;
- resp_headers.reserve(header_len.try_into().unwrap());
+ response_parts
+ .headers
+ .reserve(header_len.try_into().unwrap());
for i in 0..len {
let item = headers.get_index(scope, i).unwrap();
@@ -505,7 +509,7 @@ pub fn op_http_set_response_headers(
let header_value =
// SAFETY: These are valid latin-1 strings
unsafe { HeaderValue::from_maybe_shared_unchecked(v8_value) };
- resp_headers.append(header_name, header_value);
+ response_parts.headers.append(header_name, header_value);
}
}
@@ -525,7 +529,7 @@ pub fn op_http_set_response_trailers(
let value = unsafe { HeaderValue::from_maybe_shared_unchecked(value) };
trailer_map.append(name, value);
}
- *http.trailers().borrow_mut() = Some(trailer_map);
+ *http.trailers() = Some(trailer_map);
}
fn is_request_compressible(
@@ -652,31 +656,28 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
/// Sets the appropriate response body. Use `force_instantiate_body` if you need
/// to ensure that the response is cleaned up correctly (eg: for resources).
fn set_response(
- external: *const c_void,
+ http: Rc<HttpRecord>,
length: Option<usize>,
status: u16,
force_instantiate_body: bool,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
- // SAFETY: external is deleted before calling this op.
- let http = unsafe { take_external!(external, "set_response") };
// The request may have been cancelled by this point and if so, there's no need for us to
// do all of this work to send the response.
if !http.cancelled() {
- let resource = http.take_resource();
let compression =
is_request_compressible(length, &http.request_parts().headers);
- let mut response = http.response();
+ let mut response_headers =
+ std::cell::RefMut::map(http.response_parts(), |this| &mut this.headers);
let compression =
- modify_compressibility_from_response(compression, response.headers_mut());
- response
- .body_mut()
- .initialize(response_fn(compression), resource);
+ modify_compressibility_from_response(compression, &mut response_headers);
+ drop(response_headers);
+ http.set_response_body(response_fn(compression));
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
- *response.status_mut() = code;
+ http.response_parts().status = code;
}
} else if force_instantiate_body {
response_fn(Compression::None).abort();
@@ -685,14 +686,20 @@ fn set_response(
http.complete();
}
-#[op2(fast)]
-pub fn op_http_set_response_body_resource(
+/// Returned promise resolves when body streaming finishes.
+/// Call [`op_http_close_after_finish`] when done with the external.
+#[op2(async)]
+pub async fn op_http_set_response_body_resource(
state: Rc<RefCell<OpState>>,
external: *const c_void,
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
) -> Result<(), AnyError> {
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_body_resource") };
+
// IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request
// body resource so we _cannot_ hold the OpState lock longer than necessary.
@@ -710,7 +717,7 @@ pub fn op_http_set_response_body_resource(
};
set_response(
- external,
+ http.clone(),
resource.size_hint().1.map(|s| s as usize),
status,
true,
@@ -719,21 +726,34 @@ pub fn op_http_set_response_body_resource(
},
);
+ *http.needs_close_after_finish() = true;
+ http.response_body_finished().await;
Ok(())
}
#[op2(fast)]
+pub fn op_http_close_after_finish(external: *const c_void) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_close_after_finish") };
+ http.close_after_finish();
+}
+
+#[op2(fast)]
pub fn op_http_set_response_body_text(
external: *const c_void,
#[string] text: String,
status: u16,
) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_set_response_body_text") };
if !text.is_empty() {
- set_response(external, Some(text.len()), status, false, |compression| {
+ set_response(http, Some(text.len()), status, false, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
} else {
- op_http_set_promise_complete::call(external, status);
+ set_promise_complete(http, status);
}
}
@@ -743,45 +763,21 @@ pub fn op_http_set_response_body_bytes(
#[buffer] buffer: JsBuffer,
status: u16,
) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_set_response_body_bytes") };
if !buffer.is_empty() {
- set_response(external, Some(buffer.len()), status, false, |compression| {
+ set_response(http, Some(buffer.len()), status, false, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
} else {
- op_http_set_promise_complete::call(external, status);
- }
-}
-
-#[op2(async)]
-pub async fn op_http_track(
- state: Rc<RefCell<OpState>>,
- external: *const c_void,
- #[smi] server_rid: ResourceId,
-) -> Result<(), AnyError> {
- // SAFETY: op is called with external.
- let http = unsafe { clone_external!(external, "op_http_track") };
- let handle = http.body_promise();
-
- let join_handle = state
- .borrow_mut()
- .resource_table
- .get::<HttpJoinHandle>(server_rid)?;
-
- match handle
- .or_cancel(join_handle.connection_cancel_handle())
- .await
- {
- Ok(true) => Ok(()),
- Ok(false) => {
- Err(AnyError::msg("connection closed before message completed"))
- }
- Err(_e) => Ok(()),
+ set_promise_complete(http, status);
}
}
fn serve_http11_unconditional(
io: impl HttpServeStream,
- svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn = http1::Builder::new()
@@ -803,7 +799,7 @@ fn serve_http11_unconditional(
fn serve_http2_unconditional(
io: impl HttpServeStream,
- svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn =
@@ -821,7 +817,7 @@ fn serve_http2_unconditional(
async fn serve_http2_autodetect(
io: impl HttpServeStream,
- svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
@@ -1194,14 +1190,13 @@ pub async fn op_http_close(
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
join_handle.listen_cancel_handle().cancel();
- // Async spin on the server_state while we wait for everything to drain
- while Rc::strong_count(&join_handle.server_state) > 1 {
- tokio::time::sleep(Duration::from_millis(10)).await;
- }
+ join_handle.server_state.drain().await;
} else {
// In a forceful shutdown, we close everything
join_handle.listen_cancel_handle().cancel();
join_handle.connection_cancel_handle().cancel();
+ // Give streaming responses a tick to close
+ tokio::task::yield_now().await;
}
let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)