diff options
Diffstat (limited to 'ext/http/service.rs')
-rw-r--r-- | ext/http/service.rs | 338 |
1 files changed, 253 insertions, 85 deletions
diff --git a/ext/http/service.rs b/ext/http/service.rs index 87b308755..fbd533cac 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -1,13 +1,18 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use crate::request_properties::HttpConnectionProperties; -use crate::response_body::CompletionHandle; -use crate::response_body::ResponseBytes; +use crate::response_body::ResponseBytesInner; +use crate::response_body::ResponseStreamResult; use deno_core::error::AnyError; +use deno_core::futures::ready; +use deno_core::BufView; use deno_core::OpState; use deno_core::ResourceId; use http::request::Parts; use http::HeaderMap; +use hyper1::body::Body; +use hyper1::body::Frame; use hyper1::body::Incoming; +use hyper1::body::SizeHint; use hyper1::upgrade::OnUpgrade; use scopeguard::guard; @@ -16,10 +21,15 @@ use std::cell::Ref; use std::cell::RefCell; use std::cell::RefMut; use std::future::Future; +use std::mem::ManuallyDrop; +use std::pin::Pin; use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use std::task::Waker; pub type Request = hyper1::Request<Incoming>; -pub type Response = hyper1::Response<ResponseBytes>; +pub type Response = hyper1::Response<HttpRecordResponse>; macro_rules! http_trace { ($record:expr $(, $args:expr)*) => { @@ -39,6 +49,7 @@ pub(crate) use http_trace; struct HttpServerStateInner { pool: Vec<(Rc<HttpRecord>, HeaderMap)>, + drain_waker: Option<Waker>, } pub struct HttpServerState(RefCell<HttpServerStateInner>); @@ -47,8 +58,32 @@ impl HttpServerState { pub fn new() -> Rc<Self> { Rc::new(Self(RefCell::new(HttpServerStateInner { pool: Vec::new(), + drain_waker: None, }))) } + + pub fn drain<'a>(self: &'a Rc<Self>) -> impl Future<Output = ()> + 'a { + struct HttpServerStateDrain<'a>(&'a Rc<HttpServerState>); + + impl<'a> Future for HttpServerStateDrain<'a> { + type Output = (); + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + let server_state = self.0; + http_trace!(server_state, "HttpServerState::drain poll"); + if Rc::strong_count(server_state) <= 1 { + return Poll::Ready(()); + } + server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + + HttpServerStateDrain(self) + } } enum RequestBodyState { @@ -102,25 +137,28 @@ pub async fn handle_request( http_trace!(*guarded_record, "handle_request response_ready.await"); guarded_record.response_ready().await; - // Defuse the guard. Must not await after the point. + // Defuse the guard. Must not await after this point. let record = ScopeGuard::into_inner(guarded_record); http_trace!(record, "handle_request complete"); - let response = record.take_response(); - record.recycle(); + let response = record.into_response(); Ok(response) } struct HttpRecordInner { server_state: Rc<HttpServerState>, request_info: HttpConnectionProperties, - request_parts: Parts, + request_parts: http::request::Parts, request_body: Option<RequestBodyState>, - /// The response may get taken before we tear this down - response: Option<Response>, + response_parts: Option<http::response::Parts>, response_ready: bool, - response_waker: Option<std::task::Waker>, - trailers: Rc<RefCell<Option<HeaderMap>>>, + response_waker: Option<Waker>, + response_body: ResponseBytesInner, + response_body_finished: bool, + response_body_waker: Option<Waker>, + trailers: Option<HeaderMap>, been_dropped: bool, + finished: bool, + needs_close_after_finish: bool, } pub struct HttpRecord(RefCell<Option<HttpRecordInner>>); @@ -147,45 +185,72 @@ impl HttpRecord { server_state: Rc<HttpServerState>, ) -> Rc<Self> { let (request_parts, request_body) = request.into_parts(); - let body = ResponseBytes::default(); - let trailers = body.trailers(); let request_body = Some(request_body.into()); - let mut response = Response::new(body); - let reuse_record = + let (mut response_parts, _) = http::Response::new(()).into_parts(); + let record = if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() { - *response.headers_mut() = headers; - Some(record) + response_parts.headers = headers; + http_trace!(record, "HttpRecord::reuse"); + record } else { - None + #[cfg(feature = "__http_tracing")] + { + RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + + #[allow(clippy::let_and_return)] + let record = Rc::new(Self(RefCell::new(None))); + http_trace!(record, "HttpRecord::new"); + record }; - let inner = HttpRecordInner { + *record.0.borrow_mut() = Some(HttpRecordInner { server_state, request_info, request_parts, request_body, - response: Some(response), + response_parts: Some(response_parts), response_ready: false, response_waker: None, - trailers, + response_body: ResponseBytesInner::Empty, + response_body_finished: false, + response_body_waker: None, + trailers: None, been_dropped: false, - }; - if let Some(record) = reuse_record { - *record.0.borrow_mut() = Some(inner); - http_trace!(record, "HttpRecord::reuse"); - record - } else { - #[cfg(feature = "__http_tracing")] - { - RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } + finished: false, + needs_close_after_finish: false, + }); + record + } + + fn finish(self: Rc<Self>) { + http_trace!(self, "HttpRecord::finish"); + let mut inner = self.self_mut(); + inner.response_body_finished = true; + let response_body_waker = inner.response_body_waker.take(); + let needs_close_after_finish = inner.needs_close_after_finish; + drop(inner); + if let Some(waker) = response_body_waker { + waker.wake(); + } + if !needs_close_after_finish { + self.recycle(); + } + } - #[allow(clippy::let_and_return)] - let record = Rc::new(Self(RefCell::new(Some(inner)))); - http_trace!(record, "HttpRecord::new"); - record + pub fn close_after_finish(self: Rc<Self>) { + debug_assert!(self.self_ref().needs_close_after_finish); + let mut inner = self.self_mut(); + inner.needs_close_after_finish = false; + if !inner.finished { + drop(inner); + self.recycle(); } } + pub fn needs_close_after_finish(&self) -> RefMut<'_, bool> { + RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish) + } + fn recycle(self: Rc<Self>) { assert!( Rc::strong_count(&self) == 1, @@ -196,12 +261,19 @@ impl HttpRecord { request_parts: Parts { mut headers, .. }, .. } = self.0.borrow_mut().take().unwrap(); + let mut server_state_mut = server_state.0.borrow_mut(); let inflight = Rc::strong_count(&server_state); http_trace!(self, "HttpRecord::recycle inflight={}", inflight); - // TODO(mmastrac): we never recover the pooled memory here, and we could likely be shuttling - // the to-drop objects off to another thread. + // Server is shutting down so wake the drain future. + if let Some(waker) = server_state_mut.drain_waker.take() { + drop(server_state_mut); + drop(server_state); + http_trace!(self, "HttpRecord::recycle wake"); + waker.wake(); + return; + } // Keep a buffer of allocations on hand to be reused by incoming requests. // Estimated target size is 16 + 1/8 the number of inflight requests. @@ -235,7 +307,7 @@ impl HttpRecord { } /// Take the Hyper body from this record. - pub fn take_body(&self) -> Option<Incoming> { + pub fn take_request_body(&self) -> Option<Incoming> { let body_holder = &mut self.self_mut().request_body; let body = body_holder.take(); match body { @@ -247,18 +319,6 @@ impl HttpRecord { } } - pub fn take_resource(&self) -> Option<HttpRequestBodyAutocloser> { - let body_holder = &mut self.self_mut().request_body; - let body = body_holder.take(); - match body { - Some(RequestBodyState::Resource(res)) => Some(res), - x => { - *body_holder = x; - None - } - } - } - /// Replace the request body with a resource ID and the OpState we'll need to shut it down. /// We cannot keep just the resource itself, as JS code might be reading from the resource ID /// to generate the response data (requiring us to keep it in the resource table). @@ -273,7 +333,7 @@ impl HttpRecord { if inner.response_ready { // Future dropped between wake() and async fn resuming. drop(inner); - self.recycle(); + self.finish(); return; } inner.been_dropped = true; @@ -291,7 +351,7 @@ impl HttpRecord { ); if inner.been_dropped { drop(inner); - self.recycle(); + self.finish(); return; } inner.response_ready = true; @@ -301,25 +361,44 @@ impl HttpRecord { } } + fn take_response_body(&self) -> ResponseBytesInner { + let mut inner = self.self_mut(); + debug_assert!( + !matches!(inner.response_body, ResponseBytesInner::Done), + "HTTP state error: response body already complete" + ); + std::mem::replace(&mut inner.response_body, ResponseBytesInner::Done) + } + /// Has the future for this record been dropped? ie, has the underlying TCP connection /// been closed? pub fn cancelled(&self) -> bool { self.self_ref().been_dropped } - /// Get a mutable reference to the response. - pub fn response(&self) -> RefMut<'_, Response> { - RefMut::map(self.self_mut(), |inner| inner.response.as_mut().unwrap()) + /// Get a mutable reference to the response status and headers. + pub fn response_parts(&self) -> RefMut<'_, http::response::Parts> { + RefMut::map(self.self_mut(), |inner| { + inner.response_parts.as_mut().unwrap() + }) } /// Get a mutable reference to the trailers. - pub fn trailers(&self) -> Ref<'_, Rc<RefCell<Option<HeaderMap>>>> { - Ref::map(self.self_ref(), |inner| &inner.trailers) + pub fn trailers(&self) -> RefMut<'_, Option<HeaderMap>> { + RefMut::map(self.self_mut(), |inner| &mut inner.trailers) + } + + pub fn set_response_body(&self, response_body: ResponseBytesInner) { + let mut inner = self.self_mut(); + debug_assert!(matches!(inner.response_body, ResponseBytesInner::Empty)); + inner.response_body = response_body; } /// Take the response. - fn take_response(&self) -> Response { - self.self_mut().response.take().unwrap() + fn into_response(self: Rc<Self>) -> Response { + let parts = self.self_mut().response_parts.take().unwrap(); + let body = HttpRecordResponse(ManuallyDrop::new(self)); + http::Response::from_parts(parts, body) } /// Get a reference to the connection properties. @@ -332,38 +411,131 @@ impl HttpRecord { Ref::map(self.self_ref(), |inner| &inner.request_parts) } - /// Get a reference to the completion handle. + /// Resolves when response head is ready. fn response_ready(&self) -> impl Future<Output = ()> + '_ { - struct HttpRecordComplete<'a>(&'a HttpRecord); + struct HttpRecordReady<'a>(&'a HttpRecord); - impl<'a> Future for HttpRecordComplete<'a> { + impl<'a> Future for HttpRecordReady<'a> { type Output = (); fn poll( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll<Self::Output> { + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { let mut mut_self = self.0.self_mut(); if mut_self.response_ready { - return std::task::Poll::Ready(()); + return Poll::Ready(()); } mut_self.response_waker = Some(cx.waker().clone()); - std::task::Poll::Pending + Poll::Pending } } - HttpRecordComplete(self) + HttpRecordReady(self) } - /// Get a reference to the response body completion handle. - pub fn body_promise(&self) -> CompletionHandle { - self - .self_ref() - .response - .as_ref() - .unwrap() - .body() - .completion_handle() + /// Resolves when response body has finished streaming. + pub fn response_body_finished(&self) -> impl Future<Output = ()> + '_ { + struct HttpRecordFinished<'a>(&'a HttpRecord); + + impl<'a> Future for HttpRecordFinished<'a> { + type Output = (); + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Self::Output> { + let mut mut_self = self.0.self_mut(); + if mut_self.response_body_finished { + return Poll::Ready(()); + } + mut_self.response_body_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + + HttpRecordFinished(self) + } +} + +#[repr(transparent)] +pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>); + +impl Body for HttpRecordResponse { + type Data = BufView; + type Error = AnyError; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { + use crate::response_body::PollFrame; + let record = &self.0; + + let res = loop { + let mut inner = record.self_mut(); + let res = match &mut inner.response_body { + ResponseBytesInner::Done | ResponseBytesInner::Empty => { + if let Some(trailers) = inner.trailers.take() { + return Poll::Ready(Some(Ok(Frame::trailers(trailers)))); + } + unreachable!() + } + ResponseBytesInner::Bytes(..) => { + drop(inner); + let ResponseBytesInner::Bytes(data) = record.take_response_body() + else { + unreachable!(); + }; + return Poll::Ready(Some(Ok(Frame::data(data)))); + } + ResponseBytesInner::UncompressedStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) + } + ResponseBytesInner::GZipStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) + } + ResponseBytesInner::BrotliStream(stm) => { + ready!(Pin::new(stm).poll_frame(cx)) + } + }; + // This is where we retry the NoData response + if matches!(res, ResponseStreamResult::NoData) { + continue; + } + break res; + }; + + if matches!(res, ResponseStreamResult::EndOfStream) { + if let Some(trailers) = record.self_mut().trailers.take() { + return Poll::Ready(Some(Ok(Frame::trailers(trailers)))); + } + record.take_response_body(); + } + Poll::Ready(res.into()) + } + + fn is_end_stream(&self) -> bool { + let inner = self.0.self_ref(); + matches!( + inner.response_body, + ResponseBytesInner::Done | ResponseBytesInner::Empty + ) && inner.trailers.is_none() + } + + fn size_hint(&self) -> SizeHint { + // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through + // anyways just in case hyper needs it. + self.0.self_ref().response_body.size_hint() + } +} + +impl Drop for HttpRecordResponse { + fn drop(&mut self) { + // SAFETY: this ManuallyDrop is not used again. + let record = unsafe { ManuallyDrop::take(&mut self.0) }; + http_trace!(record, "HttpRecordResponse::drop"); + record.finish(); } } @@ -442,14 +614,10 @@ mod tests { async move { // JavaScript handler produces response let record = rx.recv().await.unwrap(); - let resource = record.take_resource(); - record.response().body_mut().initialize( - ResponseBytesInner::from_vec( - Compression::None, - b"hello world".to_vec(), - ), - resource, - ); + record.set_response_body(ResponseBytesInner::from_vec( + Compression::None, + b"hello world".to_vec(), + )); record.complete(); Ok(()) }, |