summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ext/http/00_serve.js35
-rw-r--r--ext/http/http_next.rs131
-rw-r--r--ext/http/lib.rs2
-rw-r--r--ext/http/response_body.rs171
-rw-r--r--ext/http/service.rs338
5 files changed, 341 insertions, 336 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index 05e0bb5c3..adb03a22c 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -43,6 +43,7 @@ const {
ObjectHasOwn,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeCatch,
+ PromisePrototypeThen,
Symbol,
TypeError,
Uint8Array,
@@ -50,6 +51,7 @@ const {
} = primordials;
const {
+ op_http_close_after_finish,
op_http_get_request_headers,
op_http_get_request_method_and_url,
op_http_read_request_body,
@@ -386,9 +388,10 @@ class ServeHandlerInfo {
}
}
-function fastSyncResponseOrStream(req, respBody, status) {
+function fastSyncResponseOrStream(req, respBody, status, innerRequest) {
if (respBody === null || respBody === undefined) {
// Don't set the body
+ innerRequest?.close();
op_http_set_promise_complete(req, status);
return;
}
@@ -397,36 +400,43 @@ function fastSyncResponseOrStream(req, respBody, status) {
const body = stream.body;
if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
+ innerRequest?.close();
op_http_set_response_body_bytes(req, body, status);
return;
}
if (typeof body === "string") {
+ innerRequest?.close();
op_http_set_response_body_text(req, body, status);
return;
}
// At this point in the response it needs to be a stream
if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) {
+ innerRequest?.close();
throw TypeError("invalid response");
}
const resourceBacking = getReadableStreamResourceBacking(stream);
+ let rid, autoClose;
if (resourceBacking) {
- op_http_set_response_body_resource(
- req,
- resourceBacking.rid,
- resourceBacking.autoClose,
- status,
- );
+ rid = resourceBacking.rid;
+ autoClose = resourceBacking.autoClose;
} else {
- const rid = resourceForReadableStream(stream);
+ rid = resourceForReadableStream(stream);
+ autoClose = true;
+ }
+ PromisePrototypeThen(
op_http_set_response_body_resource(
req,
rid,
- true,
+ autoClose,
status,
- );
- }
+ ),
+ () => {
+ innerRequest?.close();
+ op_http_close_after_finish(req);
+ },
+ );
}
/**
@@ -499,8 +509,7 @@ function mapToCallback(context, callback, onError) {
}
}
- innerRequest?.close();
- fastSyncResponseOrStream(req, inner.body, status);
+ fastSyncResponseOrStream(req, inner.body, status, innerRequest);
};
}
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 4035fe259..f42275b0e 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -8,11 +8,11 @@ use crate::request_properties::HttpConnectionProperties;
use crate::request_properties::HttpListenProperties;
use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
-use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::service::handle_request;
use crate::service::http_trace;
use crate::service::HttpRecord;
+use crate::service::HttpRecordResponse;
use crate::service::HttpRequestBodyAutocloser;
use crate::service::HttpServerState;
use crate::websocket_upgrade::WebSocketUpgrade;
@@ -68,7 +68,6 @@ use std::io;
use std::pin::Pin;
use std::ptr::null;
use std::rc::Rc;
-use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@@ -182,7 +181,7 @@ pub fn op_http_upgrade_raw(
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
spawn(async move {
- let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();
+ let mut upgrade_stream = WebSocketUpgrade::<()>::default();
// Stage 2: Extract the Upgraded connection
let mut buf = [0; 1024];
@@ -191,7 +190,8 @@ pub fn op_http_upgrade_raw(
match upgrade_stream.write(&buf[..read]) {
Ok(None) => continue,
Ok(Some((response, bytes))) => {
- *http.response() = response;
+ let (response_parts, _) = response.into_parts();
+ *http.response_parts() = response_parts;
http.complete();
let mut upgraded = TokioIo::new(upgrade.await?);
upgraded.write_all(&bytes).await?;
@@ -250,10 +250,10 @@ pub async fn op_http_upgrade_websocket_next(
// Stage 1: set the response to 101 Switching Protocols and send it
let upgrade = http.upgrade()?;
{
- let mut response = http.response();
- *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
+ let mut response_parts = http.response_parts();
+ response_parts.status = StatusCode::SWITCHING_PROTOCOLS;
for (name, value) in headers {
- response.headers_mut().append(
+ response_parts.headers.append(
HeaderName::from_bytes(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
@@ -274,10 +274,14 @@ pub fn op_http_set_promise_complete(external: *const c_void, status: u16) {
let http =
// SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_set_promise_complete") };
+ set_promise_complete(http, status);
+}
+
+fn set_promise_complete(http: Rc<HttpRecord>, status: u16) {
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
- *http.response().status_mut() = code;
+ http.response_parts().status = code;
}
http.complete();
}
@@ -441,7 +445,7 @@ pub fn op_http_read_request_body(
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_read_request_body") };
- let rid = if let Some(incoming) = http.take_body() {
+ let rid = if let Some(incoming) = http.take_request_body() {
let body_resource = Rc::new(HttpRequestBody::new(incoming));
state.borrow_mut().resource_table.add_rc(body_resource)
} else {
@@ -462,8 +466,7 @@ pub fn op_http_set_response_header(
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_header") };
- let mut response = http.response();
- let resp_headers = response.headers_mut();
+ let mut response_parts = http.response_parts();
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = match value {
@@ -473,7 +476,7 @@ pub fn op_http_set_response_header(
HeaderValue::from_maybe_shared_unchecked(bytes::Bytes::from(bytes_vec))
},
};
- resp_headers.append(name, value);
+ response_parts.headers.append(name, value);
}
#[op2]
@@ -486,12 +489,13 @@ pub fn op_http_set_response_headers(
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_headers") };
// TODO(mmastrac): Invalid headers should be handled?
- let mut response = http.response();
- let resp_headers = response.headers_mut();
+ let mut response_parts = http.response_parts();
let len = headers.length();
let header_len = len * 2;
- resp_headers.reserve(header_len.try_into().unwrap());
+ response_parts
+ .headers
+ .reserve(header_len.try_into().unwrap());
for i in 0..len {
let item = headers.get_index(scope, i).unwrap();
@@ -505,7 +509,7 @@ pub fn op_http_set_response_headers(
let header_value =
// SAFETY: These are valid latin-1 strings
unsafe { HeaderValue::from_maybe_shared_unchecked(v8_value) };
- resp_headers.append(header_name, header_value);
+ response_parts.headers.append(header_name, header_value);
}
}
@@ -525,7 +529,7 @@ pub fn op_http_set_response_trailers(
let value = unsafe { HeaderValue::from_maybe_shared_unchecked(value) };
trailer_map.append(name, value);
}
- *http.trailers().borrow_mut() = Some(trailer_map);
+ *http.trailers() = Some(trailer_map);
}
fn is_request_compressible(
@@ -652,31 +656,28 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
/// Sets the appropriate response body. Use `force_instantiate_body` if you need
/// to ensure that the response is cleaned up correctly (eg: for resources).
fn set_response(
- external: *const c_void,
+ http: Rc<HttpRecord>,
length: Option<usize>,
status: u16,
force_instantiate_body: bool,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
- // SAFETY: external is deleted before calling this op.
- let http = unsafe { take_external!(external, "set_response") };
// The request may have been cancelled by this point and if so, there's no need for us to
// do all of this work to send the response.
if !http.cancelled() {
- let resource = http.take_resource();
let compression =
is_request_compressible(length, &http.request_parts().headers);
- let mut response = http.response();
+ let mut response_headers =
+ std::cell::RefMut::map(http.response_parts(), |this| &mut this.headers);
let compression =
- modify_compressibility_from_response(compression, response.headers_mut());
- response
- .body_mut()
- .initialize(response_fn(compression), resource);
+ modify_compressibility_from_response(compression, &mut response_headers);
+ drop(response_headers);
+ http.set_response_body(response_fn(compression));
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
- *response.status_mut() = code;
+ http.response_parts().status = code;
}
} else if force_instantiate_body {
response_fn(Compression::None).abort();
@@ -685,14 +686,20 @@ fn set_response(
http.complete();
}
-#[op2(fast)]
-pub fn op_http_set_response_body_resource(
+/// Returned promise resolves when body streaming finishes.
+/// Call [`op_http_close_after_finish`] when done with the external.
+#[op2(async)]
+pub async fn op_http_set_response_body_resource(
state: Rc<RefCell<OpState>>,
external: *const c_void,
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
) -> Result<(), AnyError> {
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_body_resource") };
+
// IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request
// body resource so we _cannot_ hold the OpState lock longer than necessary.
@@ -710,7 +717,7 @@ pub fn op_http_set_response_body_resource(
};
set_response(
- external,
+ http.clone(),
resource.size_hint().1.map(|s| s as usize),
status,
true,
@@ -719,21 +726,34 @@ pub fn op_http_set_response_body_resource(
},
);
+ *http.needs_close_after_finish() = true;
+ http.response_body_finished().await;
Ok(())
}
#[op2(fast)]
+pub fn op_http_close_after_finish(external: *const c_void) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_close_after_finish") };
+ http.close_after_finish();
+}
+
+#[op2(fast)]
pub fn op_http_set_response_body_text(
external: *const c_void,
#[string] text: String,
status: u16,
) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_set_response_body_text") };
if !text.is_empty() {
- set_response(external, Some(text.len()), status, false, |compression| {
+ set_response(http, Some(text.len()), status, false, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
} else {
- op_http_set_promise_complete::call(external, status);
+ set_promise_complete(http, status);
}
}
@@ -743,45 +763,21 @@ pub fn op_http_set_response_body_bytes(
#[buffer] buffer: JsBuffer,
status: u16,
) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_set_response_body_bytes") };
if !buffer.is_empty() {
- set_response(external, Some(buffer.len()), status, false, |compression| {
+ set_response(http, Some(buffer.len()), status, false, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
} else {
- op_http_set_promise_complete::call(external, status);
- }
-}
-
-#[op2(async)]
-pub async fn op_http_track(
- state: Rc<RefCell<OpState>>,
- external: *const c_void,
- #[smi] server_rid: ResourceId,
-) -> Result<(), AnyError> {
- // SAFETY: op is called with external.
- let http = unsafe { clone_external!(external, "op_http_track") };
- let handle = http.body_promise();
-
- let join_handle = state
- .borrow_mut()
- .resource_table
- .get::<HttpJoinHandle>(server_rid)?;
-
- match handle
- .or_cancel(join_handle.connection_cancel_handle())
- .await
- {
- Ok(true) => Ok(()),
- Ok(false) => {
- Err(AnyError::msg("connection closed before message completed"))
- }
- Err(_e) => Ok(()),
+ set_promise_complete(http, status);
}
}
fn serve_http11_unconditional(
io: impl HttpServeStream,
- svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn = http1::Builder::new()
@@ -803,7 +799,7 @@ fn serve_http11_unconditional(
fn serve_http2_unconditional(
io: impl HttpServeStream,
- svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn =
@@ -821,7 +817,7 @@ fn serve_http2_unconditional(
async fn serve_http2_autodetect(
io: impl HttpServeStream,
- svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
@@ -1194,14 +1190,13 @@ pub async fn op_http_close(
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
join_handle.listen_cancel_handle().cancel();
- // Async spin on the server_state while we wait for everything to drain
- while Rc::strong_count(&join_handle.server_state) > 1 {
- tokio::time::sleep(Duration::from_millis(10)).await;
- }
+ join_handle.server_state.drain().await;
} else {
// In a forceful shutdown, we close everything
join_handle.listen_cancel_handle().cancel();
join_handle.connection_cancel_handle().cancel();
+ // Give streaming responses a tick to close
+ tokio::task::yield_now().await;
}
let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 0460a3707..af0f0f0c9 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -108,6 +108,7 @@ deno_core::extension!(
op_http_write_headers,
op_http_write_resource,
op_http_write,
+ http_next::op_http_close_after_finish,
http_next::op_http_get_request_header,
http_next::op_http_get_request_headers,
http_next::op_http_get_request_method_and_url<HTTP>,
@@ -121,7 +122,6 @@ deno_core::extension!(
http_next::op_http_set_response_header,
http_next::op_http_set_response_headers,
http_next::op_http_set_response_trailers,
- http_next::op_http_track,
http_next::op_http_upgrade_websocket_next,
http_next::op_http_upgrade_raw,
http_next::op_raw_write_vectored,
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index 7d91dce6b..7201855cc 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -1,10 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use std::cell::RefCell;
-use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::rc::Rc;
-use std::task::Waker;
use brotli::enc::encode::BrotliEncoderParameter;
use brotli::ffi::compressor::BrotliEncoderState;
@@ -18,16 +15,13 @@ use deno_core::BufView;
use deno_core::Resource;
use flate2::write::GzEncoder;
use http::HeaderMap;
-use hyper1::body::Body;
use hyper1::body::Frame;
use hyper1::body::SizeHint;
use pin_project::pin_project;
-use crate::service::HttpRequestBodyAutocloser;
-
/// Simplification for nested types we use for our streams. We provide a way to convert from
/// this type into Hyper's body [`Frame`].
-enum ResponseStreamResult {
+pub enum ResponseStreamResult {
/// Stream is over.
EndOfStream,
/// Stream provided non-empty data.
@@ -57,53 +51,7 @@ impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> {
}
}
-#[derive(Clone, Debug, Default)]
-pub struct CompletionHandle {
- inner: Rc<RefCell<CompletionHandleInner>>,
-}
-
-#[derive(Debug, Default)]
-struct CompletionHandleInner {
- complete: bool,
- success: bool,
- waker: Option<Waker>,
-}
-
-impl CompletionHandle {
- pub fn complete(&self, success: bool) {
- let mut mut_self = self.inner.borrow_mut();
- mut_self.complete = true;
- mut_self.success = success;
- if let Some(waker) = mut_self.waker.take() {
- drop(mut_self);
- waker.wake();
- }
- }
-
- #[allow(dead_code)]
- pub fn is_completed(&self) -> bool {
- self.inner.borrow().complete
- }
-}
-
-impl Future for CompletionHandle {
- type Output = bool;
-
- fn poll(
- self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Self::Output> {
- let mut mut_self = self.inner.borrow_mut();
- if mut_self.complete {
- return std::task::Poll::Ready(mut_self.success);
- }
-
- mut_self.waker = Some(cx.waker().clone());
- std::task::Poll::Pending
- }
-}
-
-trait PollFrame: Unpin {
+pub trait PollFrame: Unpin {
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
@@ -166,52 +114,6 @@ impl std::fmt::Debug for ResponseBytesInner {
}
}
-/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface
-/// required by hyper. As the API requires information about request completion (including a success/fail
-/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
-#[derive(Default)]
-pub struct ResponseBytes {
- inner: ResponseBytesInner,
- completion_handle: CompletionHandle,
- headers: Rc<RefCell<Option<HeaderMap>>>,
- res: Option<HttpRequestBodyAutocloser>,
-}
-
-impl ResponseBytes {
- pub fn initialize(
- &mut self,
- inner: ResponseBytesInner,
- req_body_resource: Option<HttpRequestBodyAutocloser>,
- ) {
- debug_assert!(matches!(self.inner, ResponseBytesInner::Empty));
- self.inner = inner;
- self.res = req_body_resource;
- }
-
- pub fn completion_handle(&self) -> CompletionHandle {
- self.completion_handle.clone()
- }
-
- pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> {
- self.headers.clone()
- }
-
- fn complete(&mut self, success: bool) -> ResponseBytesInner {
- if matches!(self.inner, ResponseBytesInner::Done) {
- return ResponseBytesInner::Done;
- }
-
- let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done);
- self.completion_handle.complete(success);
- if success {
- current
- } else {
- current.abort();
- ResponseBytesInner::Done
- }
- }
-}
-
impl ResponseBytesInner {
pub fn abort(self) {
match self {
@@ -298,75 +200,6 @@ impl ResponseBytesInner {
}
}
-impl Body for ResponseBytes {
- type Data = BufView;
- type Error = AnyError;
-
- fn poll_frame(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
- let res = loop {
- let res = match &mut self.inner {
- ResponseBytesInner::Done | ResponseBytesInner::Empty => {
- if let Some(trailers) = self.headers.borrow_mut().take() {
- return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
- }
- unreachable!()
- }
- ResponseBytesInner::Bytes(..) => {
- let ResponseBytesInner::Bytes(data) = self.complete(true) else {
- unreachable!();
- };
- return std::task::Poll::Ready(Some(Ok(Frame::data(data))));
- }
- ResponseBytesInner::UncompressedStream(stm) => {
- ready!(Pin::new(stm).poll_frame(cx))
- }
- ResponseBytesInner::GZipStream(stm) => {
- ready!(Pin::new(stm).poll_frame(cx))
- }
- ResponseBytesInner::BrotliStream(stm) => {
- ready!(Pin::new(stm).poll_frame(cx))
- }
- };
- // This is where we retry the NoData response
- if matches!(res, ResponseStreamResult::NoData) {
- continue;
- }
- break res;
- };
-
- if matches!(res, ResponseStreamResult::EndOfStream) {
- if let Some(trailers) = self.headers.borrow_mut().take() {
- return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
- }
- self.complete(true);
- }
- std::task::Poll::Ready(res.into())
- }
-
- fn is_end_stream(&self) -> bool {
- matches!(
- self.inner,
- ResponseBytesInner::Done | ResponseBytesInner::Empty
- ) && self.headers.borrow_mut().is_none()
- }
-
- fn size_hint(&self) -> SizeHint {
- // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
- // anyways just in case hyper needs it.
- self.inner.size_hint()
- }
-}
-
-impl Drop for ResponseBytes {
- fn drop(&mut self) {
- // We won't actually poll_frame for Empty responses so this is where we return success
- self.complete(matches!(self.inner, ResponseBytesInner::Empty));
- }
-}
-
pub struct ResourceBodyAdapter {
auto_close: bool,
stm: Rc<dyn Resource>,
diff --git a/ext/http/service.rs b/ext/http/service.rs
index 87b308755..fbd533cac 100644
--- a/ext/http/service.rs
+++ b/ext/http/service.rs
@@ -1,13 +1,18 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::request_properties::HttpConnectionProperties;
-use crate::response_body::CompletionHandle;
-use crate::response_body::ResponseBytes;
+use crate::response_body::ResponseBytesInner;
+use crate::response_body::ResponseStreamResult;
use deno_core::error::AnyError;
+use deno_core::futures::ready;
+use deno_core::BufView;
use deno_core::OpState;
use deno_core::ResourceId;
use http::request::Parts;
use http::HeaderMap;
+use hyper1::body::Body;
+use hyper1::body::Frame;
use hyper1::body::Incoming;
+use hyper1::body::SizeHint;
use hyper1::upgrade::OnUpgrade;
use scopeguard::guard;
@@ -16,10 +21,15 @@ use std::cell::Ref;
use std::cell::RefCell;
use std::cell::RefMut;
use std::future::Future;
+use std::mem::ManuallyDrop;
+use std::pin::Pin;
use std::rc::Rc;
+use std::task::Context;
+use std::task::Poll;
+use std::task::Waker;
pub type Request = hyper1::Request<Incoming>;
-pub type Response = hyper1::Response<ResponseBytes>;
+pub type Response = hyper1::Response<HttpRecordResponse>;
macro_rules! http_trace {
($record:expr $(, $args:expr)*) => {
@@ -39,6 +49,7 @@ pub(crate) use http_trace;
struct HttpServerStateInner {
pool: Vec<(Rc<HttpRecord>, HeaderMap)>,
+ drain_waker: Option<Waker>,
}
pub struct HttpServerState(RefCell<HttpServerStateInner>);
@@ -47,8 +58,32 @@ impl HttpServerState {
pub fn new() -> Rc<Self> {
Rc::new(Self(RefCell::new(HttpServerStateInner {
pool: Vec::new(),
+ drain_waker: None,
})))
}
+
+ pub fn drain<'a>(self: &'a Rc<Self>) -> impl Future<Output = ()> + 'a {
+ struct HttpServerStateDrain<'a>(&'a Rc<HttpServerState>);
+
+ impl<'a> Future for HttpServerStateDrain<'a> {
+ type Output = ();
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ let server_state = self.0;
+ http_trace!(server_state, "HttpServerState::drain poll");
+ if Rc::strong_count(server_state) <= 1 {
+ return Poll::Ready(());
+ }
+ server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+
+ HttpServerStateDrain(self)
+ }
}
enum RequestBodyState {
@@ -102,25 +137,28 @@ pub async fn handle_request(
http_trace!(*guarded_record, "handle_request response_ready.await");
guarded_record.response_ready().await;
- // Defuse the guard. Must not await after the point.
+ // Defuse the guard. Must not await after this point.
let record = ScopeGuard::into_inner(guarded_record);
http_trace!(record, "handle_request complete");
- let response = record.take_response();
- record.recycle();
+ let response = record.into_response();
Ok(response)
}
struct HttpRecordInner {
server_state: Rc<HttpServerState>,
request_info: HttpConnectionProperties,
- request_parts: Parts,
+ request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
- /// The response may get taken before we tear this down
- response: Option<Response>,
+ response_parts: Option<http::response::Parts>,
response_ready: bool,
- response_waker: Option<std::task::Waker>,
- trailers: Rc<RefCell<Option<HeaderMap>>>,
+ response_waker: Option<Waker>,
+ response_body: ResponseBytesInner,
+ response_body_finished: bool,
+ response_body_waker: Option<Waker>,
+ trailers: Option<HeaderMap>,
been_dropped: bool,
+ finished: bool,
+ needs_close_after_finish: bool,
}
pub struct HttpRecord(RefCell<Option<HttpRecordInner>>);
@@ -147,45 +185,72 @@ impl HttpRecord {
server_state: Rc<HttpServerState>,
) -> Rc<Self> {
let (request_parts, request_body) = request.into_parts();
- let body = ResponseBytes::default();
- let trailers = body.trailers();
let request_body = Some(request_body.into());
- let mut response = Response::new(body);
- let reuse_record =
+ let (mut response_parts, _) = http::Response::new(()).into_parts();
+ let record =
if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() {
- *response.headers_mut() = headers;
- Some(record)
+ response_parts.headers = headers;
+ http_trace!(record, "HttpRecord::reuse");
+ record
} else {
- None
+ #[cfg(feature = "__http_tracing")]
+ {
+ RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ }
+
+ #[allow(clippy::let_and_return)]
+ let record = Rc::new(Self(RefCell::new(None)));
+ http_trace!(record, "HttpRecord::new");
+ record
};
- let inner = HttpRecordInner {
+ *record.0.borrow_mut() = Some(HttpRecordInner {
server_state,
request_info,
request_parts,
request_body,
- response: Some(response),
+ response_parts: Some(response_parts),
response_ready: false,
response_waker: None,
- trailers,
+ response_body: ResponseBytesInner::Empty,
+ response_body_finished: false,
+ response_body_waker: None,
+ trailers: None,
been_dropped: false,
- };
- if let Some(record) = reuse_record {
- *record.0.borrow_mut() = Some(inner);
- http_trace!(record, "HttpRecord::reuse");
- record
- } else {
- #[cfg(feature = "__http_tracing")]
- {
- RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
- }
+ finished: false,
+ needs_close_after_finish: false,
+ });
+ record
+ }
+
+ fn finish(self: Rc<Self>) {
+ http_trace!(self, "HttpRecord::finish");
+ let mut inner = self.self_mut();
+ inner.response_body_finished = true;
+ let response_body_waker = inner.response_body_waker.take();
+ let needs_close_after_finish = inner.needs_close_after_finish;
+ drop(inner);
+ if let Some(waker) = response_body_waker {
+ waker.wake();
+ }
+ if !needs_close_after_finish {
+ self.recycle();
+ }
+ }
- #[allow(clippy::let_and_return)]
- let record = Rc::new(Self(RefCell::new(Some(inner))));
- http_trace!(record, "HttpRecord::new");
- record
+ pub fn close_after_finish(self: Rc<Self>) {
+ debug_assert!(self.self_ref().needs_close_after_finish);
+ let mut inner = self.self_mut();
+ inner.needs_close_after_finish = false;
+ if !inner.finished {
+ drop(inner);
+ self.recycle();
}
}
+ pub fn needs_close_after_finish(&self) -> RefMut<'_, bool> {
+ RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish)
+ }
+
fn recycle(self: Rc<Self>) {
assert!(
Rc::strong_count(&self) == 1,
@@ -196,12 +261,19 @@ impl HttpRecord {
request_parts: Parts { mut headers, .. },
..
} = self.0.borrow_mut().take().unwrap();
+
let mut server_state_mut = server_state.0.borrow_mut();
let inflight = Rc::strong_count(&server_state);
http_trace!(self, "HttpRecord::recycle inflight={}", inflight);
- // TODO(mmastrac): we never recover the pooled memory here, and we could likely be shuttling
- // the to-drop objects off to another thread.
+ // Server is shutting down so wake the drain future.
+ if let Some(waker) = server_state_mut.drain_waker.take() {
+ drop(server_state_mut);
+ drop(server_state);
+ http_trace!(self, "HttpRecord::recycle wake");
+ waker.wake();
+ return;
+ }
// Keep a buffer of allocations on hand to be reused by incoming requests.
// Estimated target size is 16 + 1/8 the number of inflight requests.
@@ -235,7 +307,7 @@ impl HttpRecord {
}
/// Take the Hyper body from this record.
- pub fn take_body(&self) -> Option<Incoming> {
+ pub fn take_request_body(&self) -> Option<Incoming> {
let body_holder = &mut self.self_mut().request_body;
let body = body_holder.take();
match body {
@@ -247,18 +319,6 @@ impl HttpRecord {
}
}
- pub fn take_resource(&self) -> Option<HttpRequestBodyAutocloser> {
- let body_holder = &mut self.self_mut().request_body;
- let body = body_holder.take();
- match body {
- Some(RequestBodyState::Resource(res)) => Some(res),
- x => {
- *body_holder = x;
- None
- }
- }
- }
-
/// Replace the request body with a resource ID and the OpState we'll need to shut it down.
/// We cannot keep just the resource itself, as JS code might be reading from the resource ID
/// to generate the response data (requiring us to keep it in the resource table).
@@ -273,7 +333,7 @@ impl HttpRecord {
if inner.response_ready {
// Future dropped between wake() and async fn resuming.
drop(inner);
- self.recycle();
+ self.finish();
return;
}
inner.been_dropped = true;
@@ -291,7 +351,7 @@ impl HttpRecord {
);
if inner.been_dropped {
drop(inner);
- self.recycle();
+ self.finish();
return;
}
inner.response_ready = true;
@@ -301,25 +361,44 @@ impl HttpRecord {
}
}
+ fn take_response_body(&self) -> ResponseBytesInner {
+ let mut inner = self.self_mut();
+ debug_assert!(
+ !matches!(inner.response_body, ResponseBytesInner::Done),
+ "HTTP state error: response body already complete"
+ );
+ std::mem::replace(&mut inner.response_body, ResponseBytesInner::Done)
+ }
+
/// Has the future for this record been dropped? ie, has the underlying TCP connection
/// been closed?
pub fn cancelled(&self) -> bool {
self.self_ref().been_dropped
}
- /// Get a mutable reference to the response.
- pub fn response(&self) -> RefMut<'_, Response> {
- RefMut::map(self.self_mut(), |inner| inner.response.as_mut().unwrap())
+ /// Get a mutable reference to the response status and headers.
+ pub fn response_parts(&self) -> RefMut<'_, http::response::Parts> {
+ RefMut::map(self.self_mut(), |inner| {
+ inner.response_parts.as_mut().unwrap()
+ })
}
/// Get a mutable reference to the trailers.
- pub fn trailers(&self) -> Ref<'_, Rc<RefCell<Option<HeaderMap>>>> {
- Ref::map(self.self_ref(), |inner| &inner.trailers)
+ pub fn trailers(&self) -> RefMut<'_, Option<HeaderMap>> {
+ RefMut::map(self.self_mut(), |inner| &mut inner.trailers)
+ }
+
+ pub fn set_response_body(&self, response_body: ResponseBytesInner) {
+ let mut inner = self.self_mut();
+ debug_assert!(matches!(inner.response_body, ResponseBytesInner::Empty));
+ inner.response_body = response_body;
}
/// Take the response.
- fn take_response(&self) -> Response {
- self.self_mut().response.take().unwrap()
+ fn into_response(self: Rc<Self>) -> Response {
+ let parts = self.self_mut().response_parts.take().unwrap();
+ let body = HttpRecordResponse(ManuallyDrop::new(self));
+ http::Response::from_parts(parts, body)
}
/// Get a reference to the connection properties.
@@ -332,38 +411,131 @@ impl HttpRecord {
Ref::map(self.self_ref(), |inner| &inner.request_parts)
}
- /// Get a reference to the completion handle.
+ /// Resolves when response head is ready.
fn response_ready(&self) -> impl Future<Output = ()> + '_ {
- struct HttpRecordComplete<'a>(&'a HttpRecord);
+ struct HttpRecordReady<'a>(&'a HttpRecord);
- impl<'a> Future for HttpRecordComplete<'a> {
+ impl<'a> Future for HttpRecordReady<'a> {
type Output = ();
fn poll(
- self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Self::Output> {
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
let mut mut_self = self.0.self_mut();
if mut_self.response_ready {
- return std::task::Poll::Ready(());
+ return Poll::Ready(());
}
mut_self.response_waker = Some(cx.waker().clone());
- std::task::Poll::Pending
+ Poll::Pending
}
}
- HttpRecordComplete(self)
+ HttpRecordReady(self)
}
- /// Get a reference to the response body completion handle.
- pub fn body_promise(&self) -> CompletionHandle {
- self
- .self_ref()
- .response
- .as_ref()
- .unwrap()
- .body()
- .completion_handle()
+ /// Resolves when response body has finished streaming.
+ pub fn response_body_finished(&self) -> impl Future<Output = ()> + '_ {
+ struct HttpRecordFinished<'a>(&'a HttpRecord);
+
+ impl<'a> Future for HttpRecordFinished<'a> {
+ type Output = ();
+
+ fn poll(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Self::Output> {
+ let mut mut_self = self.0.self_mut();
+ if mut_self.response_body_finished {
+ return Poll::Ready(());
+ }
+ mut_self.response_body_waker = Some(cx.waker().clone());
+ Poll::Pending
+ }
+ }
+
+ HttpRecordFinished(self)
+ }
+}
+
+#[repr(transparent)]
+pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>);
+
+impl Body for HttpRecordResponse {
+ type Data = BufView;
+ type Error = AnyError;
+
+ fn poll_frame(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
+ use crate::response_body::PollFrame;
+ let record = &self.0;
+
+ let res = loop {
+ let mut inner = record.self_mut();
+ let res = match &mut inner.response_body {
+ ResponseBytesInner::Done | ResponseBytesInner::Empty => {
+ if let Some(trailers) = inner.trailers.take() {
+ return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
+ }
+ unreachable!()
+ }
+ ResponseBytesInner::Bytes(..) => {
+ drop(inner);
+ let ResponseBytesInner::Bytes(data) = record.take_response_body()
+ else {
+ unreachable!();
+ };
+ return Poll::Ready(Some(Ok(Frame::data(data))));
+ }
+ ResponseBytesInner::UncompressedStream(stm) => {
+ ready!(Pin::new(stm).poll_frame(cx))
+ }
+ ResponseBytesInner::GZipStream(stm) => {
+ ready!(Pin::new(stm).poll_frame(cx))
+ }
+ ResponseBytesInner::BrotliStream(stm) => {
+ ready!(Pin::new(stm).poll_frame(cx))
+ }
+ };
+ // This is where we retry the NoData response
+ if matches!(res, ResponseStreamResult::NoData) {
+ continue;
+ }
+ break res;
+ };
+
+ if matches!(res, ResponseStreamResult::EndOfStream) {
+ if let Some(trailers) = record.self_mut().trailers.take() {
+ return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
+ }
+ record.take_response_body();
+ }
+ Poll::Ready(res.into())
+ }
+
+ fn is_end_stream(&self) -> bool {
+ let inner = self.0.self_ref();
+ matches!(
+ inner.response_body,
+ ResponseBytesInner::Done | ResponseBytesInner::Empty
+ ) && inner.trailers.is_none()
+ }
+
+ fn size_hint(&self) -> SizeHint {
+ // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
+ // anyways just in case hyper needs it.
+ self.0.self_ref().response_body.size_hint()
+ }
+}
+
+impl Drop for HttpRecordResponse {
+ fn drop(&mut self) {
+ // SAFETY: this ManuallyDrop is not used again.
+ let record = unsafe { ManuallyDrop::take(&mut self.0) };
+ http_trace!(record, "HttpRecordResponse::drop");
+ record.finish();
}
}
@@ -442,14 +614,10 @@ mod tests {
async move {
// JavaScript handler produces response
let record = rx.recv().await.unwrap();
- let resource = record.take_resource();
- record.response().body_mut().initialize(
- ResponseBytesInner::from_vec(
- Compression::None,
- b"hello world".to_vec(),
- ),
- resource,
- );
+ record.set_response_body(ResponseBytesInner::from_vec(
+ Compression::None,
+ b"hello world".to_vec(),
+ ));
record.complete();
Ok(())
},