diff options
Diffstat (limited to 'ext/http/slab.rs')
-rw-r--r-- | ext/http/slab.rs | 374 |
1 files changed, 0 insertions, 374 deletions
diff --git a/ext/http/slab.rs b/ext/http/slab.rs deleted file mode 100644 index 790b4649a..000000000 --- a/ext/http/slab.rs +++ /dev/null @@ -1,374 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use crate::request_properties::HttpConnectionProperties; -use crate::response_body::CompletionHandle; -use crate::response_body::ResponseBytes; -use deno_core::error::AnyError; -use deno_core::OpState; -use deno_core::ResourceId; -use http::request::Parts; -use http::HeaderMap; -use hyper1::body::Incoming; -use hyper1::upgrade::OnUpgrade; - -use scopeguard::defer; -use slab::Slab; -use std::cell::RefCell; -use std::cell::RefMut; -use std::ptr::NonNull; -use std::rc::Rc; - -pub type Request = hyper1::Request<Incoming>; -pub type Response = hyper1::Response<ResponseBytes>; -pub type SlabId = u32; - -#[repr(transparent)] -#[derive(Clone, Default)] -pub struct RefCount(pub Rc<()>); - -enum RequestBodyState { - Incoming(Incoming), - Resource(HttpRequestBodyAutocloser), -} - -impl From<Incoming> for RequestBodyState { - fn from(value: Incoming) -> Self { - RequestBodyState::Incoming(value) - } -} - -/// Ensures that the request body closes itself when no longer needed. -pub struct HttpRequestBodyAutocloser(ResourceId, Rc<RefCell<OpState>>); - -impl HttpRequestBodyAutocloser { - pub fn new(res: ResourceId, op_state: Rc<RefCell<OpState>>) -> Self { - Self(res, op_state) - } -} - -impl Drop for HttpRequestBodyAutocloser { - fn drop(&mut self) { - if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) { - res.close(); - } - } -} - -pub async fn new_slab_future( - request: Request, - request_info: HttpConnectionProperties, - refcount: RefCount, - tx: tokio::sync::mpsc::Sender<SlabId>, -) -> Result<Response, hyper::Error> { - let index = slab_insert(request, request_info, refcount); - defer! { - slab_drop(index); - } - let rx = slab_get(index).promise(); - if tx.send(index).await.is_ok() { - http_trace!(index, "SlabFuture await"); - // We only need to wait for completion if we aren't closed - rx.await; - http_trace!(index, "SlabFuture complete"); - } - let response = slab_get(index).take_response(); - Ok(response) -} - -pub struct HttpSlabRecord { - request_info: HttpConnectionProperties, - request_parts: Parts, - request_body: Option<RequestBodyState>, - /// The response may get taken before we tear this down - response: Option<Response>, - promise: CompletionHandle, - trailers: Rc<RefCell<Option<HeaderMap>>>, - been_dropped: bool, - /// Use a `Rc` to keep track of outstanding requests. We don't use this, but - /// when it drops, it decrements the refcount of the server itself. - refcount: Option<RefCount>, - #[cfg(feature = "__zombie_http_tracking")] - alive: bool, -} - -thread_local! { - pub(crate) static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) }; -} - -macro_rules! http_trace { - ($index:expr, $args:tt) => { - #[cfg(feature = "__http_tracing")] - { - let total = $crate::slab::SLAB.with(|x| x.try_borrow().map(|x| x.len())); - if let Ok(total) = total { - println!("HTTP id={} total={}: {}", $index, total, format!($args)); - } else { - println!("HTTP id={} total=?: {}", $index, format!($args)); - } - } - }; -} - -pub(crate) use http_trace; - -/// Hold a lock on the slab table and a reference to one entry in the table. -pub struct SlabEntry( - NonNull<HttpSlabRecord>, - SlabId, - RefMut<'static, Slab<HttpSlabRecord>>, -); - -const SLAB_CAPACITY: usize = 1024; - -pub fn slab_init() { - SLAB.with(|slab: &RefCell<Slab<HttpSlabRecord>>| { - // Note that there might already be an active HTTP server, so this may just - // end up adding room for an additional SLAB_CAPACITY items. All HTTP servers - // on a single thread share the same slab. - let mut slab = slab.borrow_mut(); - slab.reserve(SLAB_CAPACITY); - }) -} - -pub fn slab_get(index: SlabId) -> SlabEntry { - http_trace!(index, "slab_get"); - let mut lock: RefMut<'static, Slab<HttpSlabRecord>> = SLAB.with(|x| { - // SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static - unsafe { std::mem::transmute(x.borrow_mut()) } - }); - let Some(entry) = lock.get_mut(index as usize) else { - panic!("HTTP state error: Attempted to access invalid request {} ({} in total available)", - index, - lock.len()) - }; - #[cfg(feature = "__zombie_http_tracking")] - { - assert!(entry.alive, "HTTP state error: Entry is not alive"); - } - let entry = NonNull::new(entry as _).unwrap(); - - SlabEntry(entry, index, lock) -} - -#[allow(clippy::let_and_return)] -fn slab_insert_raw( - request_parts: Parts, - request_body: Option<Incoming>, - request_info: HttpConnectionProperties, - refcount: RefCount, -) -> SlabId { - let index = SLAB.with(|slab| { - let mut slab = slab.borrow_mut(); - let body = ResponseBytes::default(); - let trailers = body.trailers(); - let request_body = request_body.map(|r| r.into()); - slab.insert(HttpSlabRecord { - request_info, - request_parts, - request_body, - response: Some(Response::new(body)), - trailers, - been_dropped: false, - promise: CompletionHandle::default(), - refcount: Some(refcount), - #[cfg(feature = "__zombie_http_tracking")] - alive: true, - }) - }) as u32; - http_trace!(index, "slab_insert"); - index -} - -pub fn slab_insert( - request: Request, - request_info: HttpConnectionProperties, - refcount: RefCount, -) -> SlabId { - let (request_parts, request_body) = request.into_parts(); - slab_insert_raw(request_parts, Some(request_body), request_info, refcount) -} - -pub fn slab_drop(index: SlabId) { - http_trace!(index, "slab_drop"); - let mut entry = slab_get(index); - let record = entry.self_mut(); - assert!( - !record.been_dropped, - "HTTP state error: Entry has already been dropped" - ); - - // The logic here is somewhat complicated. A slab record cannot be expunged until it has been dropped by Rust AND - // the promise has been completed (indicating that JavaScript is done processing). However, if Rust has finished - // dealing with this entry, we DO want to clean up some of the associated items -- namely the request body, which - // might include actual resources, and the refcount, which is keeping the server alive. - record.been_dropped = true; - if record.promise.is_completed() { - drop(entry); - slab_expunge(index); - } else { - // Take the request body, as the future has been dropped and this will allow some resources to close - record.request_body.take(); - // Take the refcount keeping the server alive. The future is no longer alive, which means this request - // is toast. - record.refcount.take(); - } -} - -fn slab_expunge(index: SlabId) { - SLAB.with(|slab| { - #[cfg(__zombie_http_tracking)] - { - slab.borrow_mut().get_mut(index as usize).unwrap().alive = false; - } - #[cfg(not(__zombie_http_tracking))] - { - slab.borrow_mut().remove(index as usize); - } - }); - http_trace!(index, "slab_expunge"); -} - -impl SlabEntry { - fn self_ref(&self) -> &HttpSlabRecord { - // SAFETY: We have the lock and we're borrowing lifetime from self - unsafe { self.0.as_ref() } - } - - fn self_mut(&mut self) -> &mut HttpSlabRecord { - // SAFETY: We have the lock and we're borrowing lifetime from self - unsafe { self.0.as_mut() } - } - - /// Perform the Hyper upgrade on this entry. - pub fn upgrade(&mut self) -> Result<OnUpgrade, AnyError> { - // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit - self - .self_mut() - .request_parts - .extensions - .remove::<OnUpgrade>() - .ok_or_else(|| AnyError::msg("upgrade unavailable")) - } - - /// Take the Hyper body from this entry. - pub fn take_body(&mut self) -> Option<Incoming> { - let body_holder = &mut self.self_mut().request_body; - let body = body_holder.take(); - match body { - Some(RequestBodyState::Incoming(body)) => Some(body), - x => { - *body_holder = x; - None - } - } - } - - pub fn take_resource(&mut self) -> Option<HttpRequestBodyAutocloser> { - let body_holder = &mut self.self_mut().request_body; - let body = body_holder.take(); - match body { - Some(RequestBodyState::Resource(res)) => Some(res), - x => { - *body_holder = x; - None - } - } - } - - /// Replace the request body with a resource ID and the OpState we'll need to shut it down. - /// We cannot keep just the resource itself, as JS code might be reading from the resource ID - /// to generate the response data (requiring us to keep it in the resource table). - pub fn put_resource(&mut self, res: HttpRequestBodyAutocloser) { - self.self_mut().request_body = Some(RequestBodyState::Resource(res)); - } - - /// Complete this entry, potentially expunging it if it is fully complete (ie: dropped as well). - pub fn complete(self) { - let promise = &self.self_ref().promise; - assert!( - !promise.is_completed(), - "HTTP state error: Entry has already been completed" - ); - http_trace!(self.1, "SlabEntry::complete"); - promise.complete(true); - // If we're all done, we need to drop ourself to release the lock before we expunge this record - if self.self_ref().been_dropped { - let index = self.1; - drop(self); - slab_expunge(index); - } - } - - /// Has the future for this entry been dropped? ie, has the underlying TCP connection - /// been closed? - pub fn cancelled(&self) -> bool { - self.self_ref().been_dropped - } - - /// Get a mutable reference to the response. - pub fn response(&mut self) -> &mut Response { - self.self_mut().response.as_mut().unwrap() - } - - /// Get a mutable reference to the trailers. - pub fn trailers(&mut self) -> &RefCell<Option<HeaderMap>> { - &self.self_mut().trailers - } - - /// Take the response. - pub fn take_response(&mut self) -> Response { - self.self_mut().response.take().unwrap() - } - - /// Get a reference to the connection properties. - pub fn request_info(&self) -> &HttpConnectionProperties { - &self.self_ref().request_info - } - - /// Get a reference to the request parts. - pub fn request_parts(&self) -> &Parts { - &self.self_ref().request_parts - } - - /// Get a reference to the completion handle. - pub fn promise(&self) -> CompletionHandle { - self.self_ref().promise.clone() - } - - /// Get a reference to the response body completion handle. - pub fn body_promise(&self) -> CompletionHandle { - self - .self_ref() - .response - .as_ref() - .unwrap() - .body() - .completion_handle() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use deno_net::raw::NetworkStreamType; - use http::Request; - - #[test] - fn test_slab() { - let req = Request::builder().body(()).unwrap(); - let (parts, _) = req.into_parts(); - let id = slab_insert_raw( - parts, - None, - HttpConnectionProperties { - peer_address: "".into(), - peer_port: None, - local_port: None, - stream_type: NetworkStreamType::Tcp, - }, - RefCount::default(), - ); - let entry = slab_get(id); - entry.complete(); - slab_drop(id); - } -} |