diff options
-rw-r--r-- | ext/http/00_serve.js | 35 | ||||
-rw-r--r-- | ext/http/http_next.rs | 131 | ||||
-rw-r--r-- | ext/http/lib.rs | 2 | ||||
-rw-r--r-- | ext/http/response_body.rs | 171 | ||||
-rw-r--r-- | ext/http/service.rs | 338 |
5 files changed, 341 insertions, 336 deletions
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 05e0bb5c3..adb03a22c 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -43,6 +43,7 @@ const { ObjectHasOwn, ObjectPrototypeIsPrototypeOf, PromisePrototypeCatch, + PromisePrototypeThen, Symbol, TypeError, Uint8Array, @@ -50,6 +51,7 @@ const { } = primordials; const { + op_http_close_after_finish, op_http_get_request_headers, op_http_get_request_method_and_url, op_http_read_request_body, @@ -386,9 +388,10 @@ class ServeHandlerInfo { } } -function fastSyncResponseOrStream(req, respBody, status) { +function fastSyncResponseOrStream(req, respBody, status, innerRequest) { if (respBody === null || respBody === undefined) { // Don't set the body + innerRequest?.close(); op_http_set_promise_complete(req, status); return; } @@ -397,36 +400,43 @@ function fastSyncResponseOrStream(req, respBody, status) { const body = stream.body; if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) { + innerRequest?.close(); op_http_set_response_body_bytes(req, body, status); return; } if (typeof body === "string") { + innerRequest?.close(); op_http_set_response_body_text(req, body, status); return; } // At this point in the response it needs to be a stream if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) { + innerRequest?.close(); throw TypeError("invalid response"); } const resourceBacking = getReadableStreamResourceBacking(stream); + let rid, autoClose; if (resourceBacking) { - op_http_set_response_body_resource( - req, - resourceBacking.rid, - resourceBacking.autoClose, - status, - ); + rid = resourceBacking.rid; + autoClose = resourceBacking.autoClose; } else { - const rid = resourceForReadableStream(stream); + rid = resourceForReadableStream(stream); + autoClose = true; + } + PromisePrototypeThen( op_http_set_response_body_resource( req, rid, - true, + autoClose, status, - ); - } + ), + () => { + innerRequest?.close(); + op_http_close_after_finish(req); + }, + ); } /** @@ -499,8 +509,7 @@ function mapToCallback(context, callback, onError) { } } - innerRequest?.close(); - fastSyncResponseOrStream(req, inner.body, status); + fastSyncResponseOrStream(req, inner.body, status, innerRequest); }; } diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 4035fe259..f42275b0e 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -8,11 +8,11 @@ use crate::request_properties::HttpConnectionProperties; use crate::request_properties::HttpListenProperties; use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; -use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; use crate::service::handle_request; use crate::service::http_trace; use crate::service::HttpRecord; +use crate::service::HttpRecordResponse; use crate::service::HttpRequestBodyAutocloser; use crate::service::HttpServerState; use crate::websocket_upgrade::WebSocketUpgrade; @@ -68,7 +68,6 @@ use std::io; use std::pin::Pin; use std::ptr::null; use std::rc::Rc; -use std::time::Duration; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -182,7 +181,7 @@ pub fn op_http_upgrade_raw( let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); spawn(async move { - let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default(); + let mut upgrade_stream = WebSocketUpgrade::<()>::default(); // Stage 2: Extract the Upgraded connection let mut buf = [0; 1024]; @@ -191,7 +190,8 @@ pub fn op_http_upgrade_raw( match upgrade_stream.write(&buf[..read]) { Ok(None) => continue, Ok(Some((response, bytes))) => { - *http.response() = response; + let (response_parts, _) = response.into_parts(); + *http.response_parts() = response_parts; http.complete(); let mut upgraded = TokioIo::new(upgrade.await?); upgraded.write_all(&bytes).await?; @@ -250,10 +250,10 @@ pub async fn op_http_upgrade_websocket_next( // Stage 1: set the response to 101 Switching Protocols and send it let upgrade = http.upgrade()?; { - let mut response = http.response(); - *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + let mut response_parts = http.response_parts(); + response_parts.status = StatusCode::SWITCHING_PROTOCOLS; for (name, value) in headers { - response.headers_mut().append( + response_parts.headers.append( HeaderName::from_bytes(&name).unwrap(), HeaderValue::from_bytes(&value).unwrap(), ); @@ -274,10 +274,14 @@ pub fn op_http_set_promise_complete(external: *const c_void, status: u16) { let http = // SAFETY: external is deleted before calling this op. unsafe { take_external!(external, "op_http_set_promise_complete") }; + set_promise_complete(http, status); +} + +fn set_promise_complete(http: Rc<HttpRecord>, status: u16) { // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we // will quitely ignore invalid values. if let Ok(code) = StatusCode::from_u16(status) { - *http.response().status_mut() = code; + http.response_parts().status = code; } http.complete(); } @@ -441,7 +445,7 @@ pub fn op_http_read_request_body( let http = // SAFETY: op is called with external. unsafe { clone_external!(external, "op_http_read_request_body") }; - let rid = if let Some(incoming) = http.take_body() { + let rid = if let Some(incoming) = http.take_request_body() { let body_resource = Rc::new(HttpRequestBody::new(incoming)); state.borrow_mut().resource_table.add_rc(body_resource) } else { @@ -462,8 +466,7 @@ pub fn op_http_set_response_header( let http = // SAFETY: op is called with external. unsafe { clone_external!(external, "op_http_set_response_header") }; - let mut response = http.response(); - let resp_headers = response.headers_mut(); + let mut response_parts = http.response_parts(); // These are valid latin-1 strings let name = HeaderName::from_bytes(&name).unwrap(); let value = match value { @@ -473,7 +476,7 @@ pub fn op_http_set_response_header( HeaderValue::from_maybe_shared_unchecked(bytes::Bytes::from(bytes_vec)) }, }; - resp_headers.append(name, value); + response_parts.headers.append(name, value); } #[op2] @@ -486,12 +489,13 @@ pub fn op_http_set_response_headers( // SAFETY: op is called with external. unsafe { clone_external!(external, "op_http_set_response_headers") }; // TODO(mmastrac): Invalid headers should be handled? - let mut response = http.response(); - let resp_headers = response.headers_mut(); + let mut response_parts = http.response_parts(); let len = headers.length(); let header_len = len * 2; - resp_headers.reserve(header_len.try_into().unwrap()); + response_parts + .headers + .reserve(header_len.try_into().unwrap()); for i in 0..len { let item = headers.get_index(scope, i).unwrap(); @@ -505,7 +509,7 @@ pub fn op_http_set_response_headers( let header_value = // SAFETY: These are valid latin-1 strings unsafe { HeaderValue::from_maybe_shared_unchecked(v8_value) }; - resp_headers.append(header_name, header_value); + response_parts.headers.append(header_name, header_value); } } @@ -525,7 +529,7 @@ pub fn op_http_set_response_trailers( let value = unsafe { HeaderValue::from_maybe_shared_unchecked(value) }; trailer_map.append(name, value); } - *http.trailers().borrow_mut() = Some(trailer_map); + *http.trailers() = Some(trailer_map); } fn is_request_compressible( @@ -652,31 +656,28 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { /// Sets the appropriate response body. Use `force_instantiate_body` if you need /// to ensure that the response is cleaned up correctly (eg: for resources). fn set_response( - external: *const c_void, + http: Rc<HttpRecord>, length: Option<usize>, status: u16, force_instantiate_body: bool, response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { - // SAFETY: external is deleted before calling this op. - let http = unsafe { take_external!(external, "set_response") }; // The request may have been cancelled by this point and if so, there's no need for us to // do all of this work to send the response. if !http.cancelled() { - let resource = http.take_resource(); let compression = is_request_compressible(length, &http.request_parts().headers); - let mut response = http.response(); + let mut response_headers = + std::cell::RefMut::map(http.response_parts(), |this| &mut this.headers); let compression = - modify_compressibility_from_response(compression, response.headers_mut()); - response - .body_mut() - .initialize(response_fn(compression), resource); + modify_compressibility_from_response(compression, &mut response_headers); + drop(response_headers); + http.set_response_body(response_fn(compression)); // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we // will quitely ignore invalid values. if let Ok(code) = StatusCode::from_u16(status) { - *response.status_mut() = code; + http.response_parts().status = code; } } else if force_instantiate_body { response_fn(Compression::None).abort(); @@ -685,14 +686,20 @@ fn set_response( http.complete(); } -#[op2(fast)] -pub fn op_http_set_response_body_resource( +/// Returned promise resolves when body streaming finishes. +/// Call [`op_http_close_after_finish`] when done with the external. +#[op2(async)] +pub async fn op_http_set_response_body_resource( state: Rc<RefCell<OpState>>, external: *const c_void, #[smi] stream_rid: ResourceId, auto_close: bool, status: u16, ) -> Result<(), AnyError> { + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_set_response_body_resource") }; + // IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request // body resource so we _cannot_ hold the OpState lock longer than necessary. @@ -710,7 +717,7 @@ pub fn op_http_set_response_body_resource( }; set_response( - external, + http.clone(), resource.size_hint().1.map(|s| s as usize), status, true, @@ -719,21 +726,34 @@ pub fn op_http_set_response_body_resource( }, ); + *http.needs_close_after_finish() = true; + http.response_body_finished().await; Ok(()) } #[op2(fast)] +pub fn op_http_close_after_finish(external: *const c_void) { + let http = + // SAFETY: external is deleted before calling this op. + unsafe { take_external!(external, "op_http_close_after_finish") }; + http.close_after_finish(); +} + +#[op2(fast)] pub fn op_http_set_response_body_text( external: *const c_void, #[string] text: String, status: u16, ) { + let http = + // SAFETY: external is deleted before calling this op. + unsafe { take_external!(external, "op_http_set_response_body_text") }; if !text.is_empty() { - set_response(external, Some(text.len()), status, false, |compression| { + set_response(http, Some(text.len()), status, false, |compression| { ResponseBytesInner::from_vec(compression, text.into_bytes()) }); } else { - op_http_set_promise_complete::call(external, status); + set_promise_complete(http, status); } } @@ -743,45 +763,21 @@ pub fn op_http_set_response_body_bytes( #[buffer] buffer: JsBuffer, status: u16, ) { + let http = + // SAFETY: external is deleted before calling this op. + unsafe { take_external!(external, "op_http_set_response_body_bytes") }; if !buffer.is_empty() { - set_response(external, Some(buffer.len()), status, false, |compression| { + set_response(http, Some(buffer.len()), status, false, |compression| { ResponseBytesInner::from_bufview(compression, BufView::from(buffer)) }); } else { - op_http_set_promise_complete::call(external, status); - } -} - -#[op2(async)] -pub async fn op_http_track( - state: Rc<RefCell<OpState>>, - external: *const c_void, - #[smi] server_rid: ResourceId, -) -> Result<(), AnyError> { - // SAFETY: op is called with external. - let http = unsafe { clone_external!(external, "op_http_track") }; - let handle = http.body_promise(); - - let join_handle = state - .borrow_mut() - .resource_table - .get::<HttpJoinHandle>(server_rid)?; - - match handle - .or_cancel(join_handle.connection_cancel_handle()) - .await - { - Ok(true) => Ok(()), - Ok(false) => { - Err(AnyError::msg("connection closed before message completed")) - } - Err(_e) => Ok(()), + set_promise_complete(http, status); } } fn serve_http11_unconditional( io: impl HttpServeStream, - svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, + svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, ) -> impl Future<Output = Result<(), hyper1::Error>> + 'static { let conn = http1::Builder::new() @@ -803,7 +799,7 @@ fn serve_http11_unconditional( fn serve_http2_unconditional( io: impl HttpServeStream, - svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, + svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, ) -> impl Future<Output = Result<(), hyper1::Error>> + 'static { let conn = @@ -821,7 +817,7 @@ fn serve_http2_unconditional( async fn serve_http2_autodetect( io: impl HttpServeStream, - svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, + svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, ) -> Result<(), AnyError> { let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX); @@ -1194,14 +1190,13 @@ pub async fn op_http_close( // In a graceful shutdown, we close the listener and allow all the remaining connections to drain join_handle.listen_cancel_handle().cancel(); - // Async spin on the server_state while we wait for everything to drain - while Rc::strong_count(&join_handle.server_state) > 1 { - tokio::time::sleep(Duration::from_millis(10)).await; - } + join_handle.server_state.drain().await; } else { // In a forceful shutdown, we close everything join_handle.listen_cancel_handle().cancel(); join_handle.connection_cancel_handle().cancel(); + // Give streaming responses a tick to close + tokio::task::yield_now().await; } let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle) diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 0460a3707..af0f0f0c9 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -108,6 +108,7 @@ deno_core::extension!( op_http_write_headers, op_http_write_resource, op_http_write, + http_next::op_http_close_after_finish, http_next::op_http_get_request_header, http_next::op_http_get_request_headers, http_next::op_http_get_request_method_and_url<HTTP>, @@ -121,7 +122,6 @@ deno_core::extension!( http_next::op_http_set_response_header, http_next::op_http_set_response_headers, http_next::op_http_set_response_trailers, - http_next::op_http_track, http_next::op_http_upgrade_websocket_next, http_next::op_http_upgrade_raw, http_next::op_raw_write_vectored, diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 7d91dce6b..7201855cc 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -1,10 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use std::cell::RefCell; -use std::future::Future; use std::io::Write; use std::pin::Pin; use std::rc::Rc; -use std::task::Waker; use brotli::enc::encode::BrotliEncoderParameter; use brotli::ffi::compressor::BrotliEncoderState; @@ -18,16 +15,13 @@ use deno_core::BufView; use deno_core::Resource; use flate2::write::GzEncoder; use http::HeaderMap; -use hyper1::body::Body; use hyper1::body::Frame; use hyper1::body::SizeHint; use pin_project::pin_project; -use crate::service::HttpRequestBodyAutocloser; - /// Simplification for nested types we use for our streams. We provide a way to convert from /// this type into Hyper's body [`Frame`]. -enum ResponseStreamResult { +pub enum ResponseStreamResult { /// Stream is over. EndOfStream, /// Stream provided non-empty data. @@ -57,53 +51,7 @@ impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> { } } -#[derive(Clone, Debug, Default)] -pub struct CompletionHandle { - inner: Rc<RefCell<CompletionHandleInner>>, -} - -#[derive(Debug, Default)] -struct CompletionHandleInner { - complete: bool, - success: bool, - waker: Option<Waker>, -} - -impl CompletionHandle { - pub fn complete(&self, success: bool) { - let mut mut_self = self.inner.borrow_mut(); - mut_self.complete = true; - mut_self.success = success; - if let Some(waker) = mut_self.waker.take() { - drop(mut_self); - waker.wake(); - } - } - - #[allow(dead_code)] - pub fn is_completed(&self) -> bool { - self.inner.borrow().complete - } -} - -impl Future for CompletionHandle { - type Output = bool; - - fn poll( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Self::Output> { - let mut mut_self = self.inner.borrow_mut(); - if mut_self.complete { - return std::task::Poll::Ready(mut_self.success); - } - - mut_self.waker = Some(cx.waker().clone()); - std::task::Poll::Pending - } -} - -trait PollFrame: Unpin { +pub trait PollFrame: Unpin { fn poll_frame( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -166,52 +114,6 @@ impl std::fmt::Debug for ResponseBytesInner { } } -/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface -/// required by hyper. As the API requires information about request completion (including a success/fail -/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on. -#[derive(Default)] -pub struct ResponseBytes { - inner: ResponseBytesInner, - completion_handle: CompletionHandle, - headers: Rc<RefCell<Option<HeaderMap>>>, - res: Option<HttpRequestBodyAutocloser>, -} - -impl ResponseBytes { - pub fn initialize( - &mut self, - inner: ResponseBytesInner, - req_body_resource: Option<HttpRequestBodyAutocloser>, - ) { - debug_assert!(matches!(self.inner, ResponseBytesInner::Empty)); - self.inner = inner; - self.res = req_body_resource; - } - - pub fn completion_handle(&self) -> CompletionHandle { - self.completion_handle.clone() - } - - pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> { - self.headers.clone() - } - - fn complete(&mut self, success: bool) -> ResponseBytesInner { - if matches!(self.inner, ResponseBytesInner::Done) { - return ResponseBytesInner::Done; - } - - let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done); - self.completion_handle.complete(success); - if success { - current - } else { - current.abort(); - ResponseBytesInner::Done - } - } -} - impl ResponseBytesInner { pub fn abort(self) { match self { @@ -298,75 +200,6 @@ impl ResponseBytesInner { } } -impl Body for ResponseBytes { - type Data = BufView; - type Error = AnyError; - - fn poll_frame( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { - let res = loop { - let res = match &mut self.inner { - ResponseBytesInner::Done | ResponseBytesInner::Empty => { - if let Some(trailers) = self.headers.borrow_mut().take() { - return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers)))); - } - unreachable!() - } - ResponseBytesInner::Bytes(..) => { - let ResponseBytesInner::Bytes(data) = self.complete(true) else { - unreachable!(); - }; - return std::task::Poll::Ready(Some(Ok(Frame::data(data)))); - } - ResponseBytesInner::UncompressedStream(stm) => { - ready!(Pin::new(stm).poll_frame(cx)) - } - ResponseBytesInner::GZipStream(stm) => { - ready!(Pin::new(stm).poll_frame(cx)) - } - ResponseBytesInner::BrotliStream(stm) => { - ready!(Pin::new(stm).poll_frame(cx)) - } - }; - // This is where we retry the NoData response - if matches!(res, ResponseStreamResult::NoData) { - continue; - } - break res; - }; - - if matches!(res, ResponseStreamResult::EndOfStream) { - if let Some(trailers) = self.headers.borrow_mut().take() { - return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers)))); - } - self.complete(true); - } - std::task::Poll::Ready(res.into()) - } - - fn is_end_stream(&self) -> bool { - matches!( - self.inner, - ResponseBytesInner::Done | ResponseBytesInner::Empty - ) && self.headers.borrow_mut().is_none() - } - - fn size_hint(&self) -> SizeHint { - // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through - // anyways just in case hyper needs it. - self.inner.size_hint() - } -} - -impl Drop for ResponseBytes { - fn drop(&mut self) { - // We won't actually poll_frame for Empty responses so this is where we return success - self.complete(matches!(self.inner, ResponseBytesInner::Empty)); - } -} - pub struct ResourceBodyAdapter { auto_close: bool, stm: Rc<dyn Resource>, diff --git a/ext/http/service.rs b/ext/http/service.rs index 87b308755..fbd533cac 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -1,13 +1,18 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use crate::request_properties::HttpConnectionProperties; -use crate::response_body::CompletionHandle; -use crate::response_body::ResponseBytes; +use crate::response_body::ResponseBytesInner; +use crate::response_body::ResponseStreamResult; use deno_core::error::AnyError; +use deno_core::futures::ready; +use deno_core::BufView; use deno_core::OpState; use deno_core::ResourceId; use http::request::Parts; use http::HeaderMap; +use hyper1::body::Body; +use hyper1::body::Frame; use hyper1::body::Incoming; +use hyper1::body::SizeHint; use hyper1::upgrade::OnUpgrade; use scopeguard::guard; @@ -16,10 +21,15 @@ use std::cell::Ref; use std::cell::RefCell; use std::cell::RefMut; use std::future::Future; +use std::mem::ManuallyDrop; +use std::pin::Pin; use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use std::task::Waker; pub type Request = hyper1::Request<Incoming>; -pub type Response = hyper1::Response<ResponseBytes>; +pub type Response = hyper1::Response<HttpRecordResponse>; macro_rules! http_trace { ($record:expr $(, $args:expr)*) => { @@ -39,6 +49,7 @@ pub(crate) use http_trace; struct HttpServerStateInner { pool: Vec<(Rc<HttpRecord>, HeaderMap)>, + drain_waker: Option<Waker>, } pub struct HttpServerState(RefCell<HttpServerStateInner>); @@ -47,8 +58,32 @@ impl HttpServerState { pub fn new() -> Rc<Self> { Rc::new(Self(RefCell::new(HttpServerStateInner { pool: Vec::new(), + drain_waker: None, }))) } + + pub fn drain<'a>(self: &'a Rc<Self>) -> impl Future<Output = ()> + 'a { + struct HttpServerStateDrain<'a>(&'a Rc<HttpServerState>); + + impl<'a> Future for HttpServerStateDrain<'a> { + type Output = (); + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + let server_state = self.0; + http_trace!(server_state, "HttpServerState::drain poll"); + if Rc::strong_count(server_state) <= 1 { + return Poll::Ready(()); + } + server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + + HttpServerStateDrain(self) + } } enum RequestBodyState { @@ -102,25 +137,28 @@ pub async fn handle_request( http_trace!(*guarded_record, "handle_request response_ready.await"); guarded_record.response_ready().await; - // Defuse the guard. Must not await after the point. + // Defuse the guard. Must not await after this point. let record = ScopeGuard::into_inner(guarded_record); http_trace!(record, "handle_request complete"); - let response = record.take_response(); - record.recycle(); + let response = record.into_response(); Ok(response) } struct HttpRecordInner { server_state: Rc<HttpServerState>, request_info: HttpConnectionProperties, - request_parts: Parts, + request_parts: http::request::Parts, request_body: Option<RequestBodyState>, - /// The response may get taken before we tear this down - response: Option<Response>, + response_parts: Option<http::response::Parts>, response_ready: bool, - response_waker: Option<std::task::Waker>, - trailers: Rc<RefCell<Option<HeaderMap>>>, + response_waker: Option<Waker>, + response_body: ResponseBytesInner, + response_body_finished: bool, + response_body_waker: Option<Waker>, + trailers: Option<HeaderMap>, been_dropped: bool, + finished: bool, + needs_close_after_finish: bool, } pub struct HttpRecord(RefCell<Option<HttpRecordInner>>); @@ -147,45 +185,72 @@ impl HttpRecord { server_state: Rc<HttpServerState>, ) -> Rc<Self> { let (request_parts, request_body) = request.into_parts(); - let body = ResponseBytes::default(); - let trailers = body.trailers(); let request_body = Some(request_body.into()); - let mut response = Response::new(body); - let reuse_record = + let (mut response_parts, _) = http::Response::new(()).into_parts(); + let record = if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() { - *response.headers_mut() = headers; - Some(record) + response_parts.headers = headers; + http_trace!(record, "HttpRecord::reuse"); + record } else { - None + #[cfg(feature = "__http_tracing")] + { + RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + + #[allow(clippy::let_and_return)] + let record = Rc::new(Self(RefCell::new(None))); + http_trace!(record, "HttpRecord::new"); + record }; - let inner = HttpRecordInner { + *record.0.borrow_mut() = Some(HttpRecordInner { server_state, request_info, request_parts, request_body, - response: Some(response), + response_parts: Some(response_parts), response_ready: false, response_waker: None, - trailers, + response_body: ResponseBytesInner::Empty, + response_body_finished: false, + response_body_waker: None, + trailers: None, been_dropped: false, - }; - if let Some(record) = reuse_record { - *record.0.borrow_mut() = Some(inner); - http_trace!(record, "HttpRecord::reuse"); - record - } else { - #[cfg(feature = "__http_tracing")] - { - RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } + finished: false, + needs_close_after_finish: false, + }); + record + } + + fn finish(self: Rc<Self>) { + http_trace!(self, "HttpRecord::finish"); + let mut inner = self.self_mut(); + inner.response_body_finished = true; + let response_body_waker = inner.response_body_waker.take(); + let needs_close_after_finish = inner.needs_close_after_finish; + drop(inner); + if let Some(waker) = response_body_waker { + waker.wake(); + } + if !needs_close_after_finish { + self.recycle(); + } + } - #[allow(clippy::let_and_return)] - let record = Rc::new(Self(RefCell::new(Some(inner)))); - http_trace!(record, "HttpRecord::new"); - record + pub fn close_after_finish(self: Rc<Self>) { + debug_assert!(self.self_ref().needs_close_after_finish); + let mut inner = self.self_mut(); + inner.needs_close_after_finish = false; + if !inner.finished { + drop(inner); + self.recycle(); } } + pub fn needs_close_after_finish(&self) -> RefMut<'_, bool> { + RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish) + } + fn recycle(self: Rc<Self>) { assert!( Rc::strong_count(&self) == 1, @@ -196,12 +261,19 @@ impl HttpRecord { request_parts: Parts { mut headers, .. }, .. } = self.0.borrow_mut().take().unwrap(); + let mut server_state_mut = server_state.0.borrow_mut(); let inflight = Rc::strong_count(&server_state); http_trace!(self, "HttpRecord::recycle inflight={}", inflight); - // TODO(mmastrac): we never recover the pooled memory here, and we could likely be shuttling - // the to-drop objects off to another thread. + // Server is shutting down so wake the drain future. + if let Some(waker) = server_state_mut.drain_waker.take() { + drop(server_state_mut); + drop(server_state); + http_trace!(self, "HttpRecord::recycle wake"); + waker.wake(); + return; + } // Keep a buffer of allocations on hand to be reused by incoming requests. // Estimated target size is 16 + 1/8 the number of inflight requests. @@ -235,7 +307,7 @@ impl HttpRecord { } /// Take the Hyper body from this record. - pub fn take_body(&self) -> Option<Incoming> { + pub fn take_request_body(&self) -> Option<Incoming> { let body_holder = &mut self.self_mut().request_body; let body = body_holder.take(); match body { @@ -247,18 +319,6 @@ impl HttpRecord { } } - pub fn take_resource(&self) -> Option<HttpRequestBodyAutocloser> { - let body_holder = &mut self.self_mut().request_body; - let body = body_holder.take(); - match body { - Some(RequestBodyState::Resource(res)) => Some(res), - x => { - *body_holder = x; - None - } - } - } - /// Replace the request body with a resource ID and the OpState we'll need to shut it down. /// We cannot keep just the resource itself, as JS code might be reading from the resource ID /// to generate the response data (requiring us to keep it in the resource table). @@ -273,7 +333,7 @@ impl HttpRecord { if inner.response_ready { // Future dropped between wake() and async fn resuming. drop(inner); - self.recycle(); + self.finish(); return; } inner.been_dropped = true; @@ -291,7 +351,7 @@ impl HttpRecord { ); if inner.been_dropped { drop(inner); - self.recycle(); + self.finish(); return; } inner.response_ready = true; @@ -301,25 +361,44 @@ impl HttpRecord { } } + fn take_response_body(&self) -> ResponseBytesInner { + let mut inner = self.self_mut(); + debug_assert!( + !matches!(inner.response_body, ResponseBytesInner::Done), + "HTTP state error: response body already complete" + ); + std::mem::replace(&mut inner.response_body, ResponseBytesInner::Done) + } + /// Has the future for this record been dropped? ie, has the underlying TCP connection /// been closed? pub fn cancelled(&self) -> bool { self.self_ref().been_dropped } - /// Get a mutable reference to the response. - pub fn response(&self) -> RefMut<'_, Response> { - RefMut::map(self.self_mut(), |inner| inner.response.as_mut().unwrap()) + /// Get a mutable reference to the response status and headers. + pub fn response_parts(&self) -> RefMut<'_, http::response::Parts> { + RefMut::map(self.self_mut(), |inner| { + inner.response_parts.as_mut().unwrap() + }) } /// Get a mutable reference to the trailers. - pub fn trailers(&self) -> Ref<'_, Rc<RefCell<Option<HeaderMap>>>> { - Ref::map(self.self_ref(), |inner| &inner.trailers) + pub fn trailers(&self) -> RefMut<'_, Option<HeaderMap>> { + RefMut::map(self.self_mut(), |inner| &mut inner.trailers) + } + + pub fn set_response_body(&self, response_body: ResponseBytesInner) { + let mut inner = self.self_mut(); + debug_assert!(matches!(inner.response_body, ResponseBytesInner::Empty)); + inner.response_body = response_body; } /// Take the response. - fn take_response(&self) -> Response { - self.self_mut().response.take().unwrap() + fn into_response(self: Rc<Self>) -> Response { + let parts = self.self_mut().response_parts.take().unwrap(); + let body = HttpRecordResponse(ManuallyDrop::new(self)); + http::Response::from_parts(parts, body) } /// Get a reference to the connection properties. @@ -332,38 +411,131 @@ impl HttpRecord { Ref::map(self.self_ref(), |inner| &inner.request_parts) } - /// Get a reference to the completion handle. + /// Resolves when response head is ready. fn response_ready(&self) -> impl Future<Output = ()> + '_ { - struct HttpRecordComplete<'a>(&'a HttpRecord); + struct HttpRecordReady<'a>(&'a HttpRecord); - impl<'a> Future for HttpRecordComplete<'a> { + impl<'a> Future for HttpRecordReady<'a> { type Output = (); fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Self::Output> { + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { let mut mut_self = self.0.self_mut(); if mut_self.response_ready { - return std::task::Poll::Ready(()); + return Poll::Ready(()); } mut_self.response_waker = Some(cx.waker().clone()); - std::task::Poll::Pending + Poll::Pending } } - HttpRecordComplete(self) + HttpRecordReady(self) } - /// Get a reference to the response body completion handle. - pub fn body_promise(&self) -> CompletionHandle { - self - .self_ref() - .response - .as_ref() - .unwrap() - .body() - .completion_handle() + /// Resolves when response body has finished streaming. + pub fn response_body_finished(&self) -> impl Future<Output = ()> + '_ { + struct HttpRecordFinished<'a>(&'a HttpRecord); + + impl<'a> Future for HttpRecordFinished<'a> { + type Output = (); + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + let mut mut_self = self.0.self_mut(); + if mut_self.response_body_finished { + return Poll::Ready(()); + } + mut_self.response_body_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + + HttpRecordFinished(self) + } +} + +#[repr(transparent)] +pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>); + +impl Body for HttpRecordResponse { + type Data = BufView; + type Error = AnyError; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { + use crate::response_body::PollFrame; + let record = &self.0; + + let res = loop { + let mut inner = record.self_mut(); + let res = match &mut inner.response_body { + ResponseBytesInner::Done | ResponseBytesInner::Empty => { + if let Some(trailers) = inner.trailers.take() { + return Poll::Ready(Some(Ok(Frame::trailers(trailers)))); + } + unreachable!() + } + ResponseBytesInner::Bytes(..) => { + drop(inner); + let ResponseBytesInner::Bytes(data) = record.take_response_body() + else { + unreachable!(); + }; + return Poll::Ready(Some(Ok(Frame::data(data)))); + } + ResponseBytesInner::UncompressedStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) + } + ResponseBytesInner::GZipStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) + } + ResponseBytesInner::BrotliStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) + } + }; + // This is where we retry the NoData response + if matches!(res, ResponseStreamResult::NoData) { + continue; + } + break res; + }; + + if matches!(res, ResponseStreamResult::EndOfStream) { + if let Some(trailers) = record.self_mut().trailers.take() { + return Poll::Ready(Some(Ok(Frame::trailers(trailers)))); + } + record.take_response_body(); + } + Poll::Ready(res.into()) + } + + fn is_end_stream(&self) -> bool { + let inner = self.0.self_ref(); + matches!( + inner.response_body, + ResponseBytesInner::Done | ResponseBytesInner::Empty + ) && inner.trailers.is_none() + } + + fn size_hint(&self) -> SizeHint { + // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through + // anyways just in case hyper needs it. + self.0.self_ref().response_body.size_hint() + } +} + +impl Drop for HttpRecordResponse { + fn drop(&mut self) { + // SAFETY: this ManuallyDrop is not used again. + let record = unsafe { ManuallyDrop::take(&mut self.0) }; + http_trace!(record, "HttpRecordResponse::drop"); + record.finish(); } } @@ -442,14 +614,10 @@ mod tests { async move { // JavaScript handler produces response let record = rx.recv().await.unwrap(); - let resource = record.take_resource(); - record.response().body_mut().initialize( - ResponseBytesInner::from_vec( - Compression::None, - b"hello world".to_vec(), - ), - resource, - ); + record.set_response_body(ResponseBytesInner::from_vec( + Compression::None, + b"hello world".to_vec(), + )); record.complete(); Ok(()) }, |