summaryrefslogtreecommitdiff
path: root/ext/http/slab.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/slab.rs')
-rw-r--r--ext/http/slab.rs241
1 files changed, 241 insertions, 0 deletions
diff --git a/ext/http/slab.rs b/ext/http/slab.rs
new file mode 100644
index 000000000..24554d689
--- /dev/null
+++ b/ext/http/slab.rs
@@ -0,0 +1,241 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use crate::request_properties::HttpConnectionProperties;
+use crate::response_body::CompletionHandle;
+use crate::response_body::ResponseBytes;
+use deno_core::error::AnyError;
+use http::request::Parts;
+use hyper1::body::Incoming;
+use hyper1::upgrade::OnUpgrade;
+
+use slab::Slab;
+use std::cell::RefCell;
+use std::cell::RefMut;
+use std::ptr::NonNull;
+
+pub type Request = hyper1::Request<Incoming>;
+pub type Response = hyper1::Response<ResponseBytes>;
+pub type SlabId = u32;
+
+pub struct HttpSlabRecord {
+ request_info: HttpConnectionProperties,
+ request_parts: Parts,
+ request_body: Option<Incoming>,
+ // The response may get taken before we tear this down
+ response: Option<Response>,
+ promise: CompletionHandle,
+ been_dropped: bool,
+ #[cfg(feature = "__zombie_http_tracking")]
+ alive: bool,
+}
+
+thread_local! {
+ static SLAB: RefCell<Slab<HttpSlabRecord>> = RefCell::new(Slab::with_capacity(1024));
+}
+
+macro_rules! http_trace {
+ ($index:expr, $args:tt) => {
+ #[cfg(feature = "__http_tracing")]
+ {
+ let total = SLAB.with(|x| x.try_borrow().map(|x| x.len()));
+ if let Ok(total) = total {
+ println!("HTTP id={} total={}: {}", $index, total, format!($args));
+ } else {
+ println!("HTTP id={} total=?: {}", $index, format!($args));
+ }
+ }
+ };
+}
+
+/// Hold a lock on the slab table and a reference to one entry in the table.
+pub struct SlabEntry(
+ NonNull<HttpSlabRecord>,
+ SlabId,
+ RefMut<'static, Slab<HttpSlabRecord>>,
+);
+
+pub fn slab_get(index: SlabId) -> SlabEntry {
+ http_trace!(index, "slab_get");
+ let mut lock: RefMut<'static, Slab<HttpSlabRecord>> = SLAB.with(|x| {
+ // SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static
+ unsafe { std::mem::transmute(x.borrow_mut()) }
+ });
+ let Some(entry) = lock.get_mut(index as usize) else {
+ panic!("HTTP state error: Attemped to access invalid request {} ({} in total available)",
+ index,
+ lock.len())
+ };
+ #[cfg(feature = "__zombie_http_tracking")]
+ {
+ assert!(entry.alive, "HTTP state error: Entry is not alive");
+ }
+ let entry = NonNull::new(entry as _).unwrap();
+
+ SlabEntry(entry, index, lock)
+}
+
+#[allow(clippy::let_and_return)]
+fn slab_insert_raw(
+ request_parts: Parts,
+ request_body: Option<Incoming>,
+ request_info: HttpConnectionProperties,
+) -> SlabId {
+ let index = SLAB.with(|slab| {
+ let mut slab = slab.borrow_mut();
+ slab.insert(HttpSlabRecord {
+ request_info,
+ request_parts,
+ request_body,
+ response: Some(Response::new(ResponseBytes::default())),
+ been_dropped: false,
+ promise: CompletionHandle::default(),
+ #[cfg(feature = "__zombie_http_tracking")]
+ alive: true,
+ })
+ }) as u32;
+ http_trace!(index, "slab_insert");
+ index
+}
+
+pub fn slab_insert(
+ request: Request,
+ request_info: HttpConnectionProperties,
+) -> SlabId {
+ let (request_parts, request_body) = request.into_parts();
+ slab_insert_raw(request_parts, Some(request_body), request_info)
+}
+
+pub fn slab_drop(index: SlabId) {
+ http_trace!(index, "slab_drop");
+ let mut entry = slab_get(index);
+ let record = entry.self_mut();
+ assert!(
+ !record.been_dropped,
+ "HTTP state error: Entry has already been dropped"
+ );
+ record.been_dropped = true;
+ if record.promise.is_completed() {
+ drop(entry);
+ slab_expunge(index);
+ }
+}
+
+fn slab_expunge(index: SlabId) {
+ SLAB.with(|slab| {
+ #[cfg(__zombie_http_tracking)]
+ {
+ slab.borrow_mut().get_mut(index as usize).unwrap().alive = false;
+ }
+ #[cfg(not(__zombie_http_tracking))]
+ {
+ slab.borrow_mut().remove(index as usize);
+ }
+ });
+ http_trace!(index, "slab_expunge");
+}
+
+impl SlabEntry {
+ fn self_ref(&self) -> &HttpSlabRecord {
+ // SAFETY: We have the lock and we're borrowing lifetime from self
+ unsafe { self.0.as_ref() }
+ }
+
+ fn self_mut(&mut self) -> &mut HttpSlabRecord {
+ // SAFETY: We have the lock and we're borrowing lifetime from self
+ unsafe { self.0.as_mut() }
+ }
+
+ /// Perform the Hyper upgrade on this entry.
+ pub fn upgrade(&mut self) -> Result<OnUpgrade, AnyError> {
+ // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
+ self
+ .self_mut()
+ .request_parts
+ .extensions
+ .remove::<OnUpgrade>()
+ .ok_or_else(|| AnyError::msg("upgrade unavailable"))
+ }
+
+ /// Take the Hyper body from this entry.
+ pub fn take_body(&mut self) -> Incoming {
+ self.self_mut().request_body.take().unwrap()
+ }
+
+ /// Complete this entry, potentially expunging it if it is complete.
+ pub fn complete(self) {
+ let promise = &self.self_ref().promise;
+ assert!(
+ !promise.is_completed(),
+ "HTTP state error: Entry has already been completed"
+ );
+ http_trace!(self.1, "SlabEntry::complete");
+ promise.complete(true);
+ // If we're all done, we need to drop ourself to release the lock before we expunge this record
+ if self.self_ref().been_dropped {
+ let index = self.1;
+ drop(self);
+ slab_expunge(index);
+ }
+ }
+
+ /// Get a mutable reference to the response.
+ pub fn response(&mut self) -> &mut Response {
+ self.self_mut().response.as_mut().unwrap()
+ }
+
+ /// Take the response.
+ pub fn take_response(&mut self) -> Response {
+ self.self_mut().response.take().unwrap()
+ }
+
+ /// Get a reference to the connection properties.
+ pub fn request_info(&self) -> &HttpConnectionProperties {
+ &self.self_ref().request_info
+ }
+
+ /// Get a reference to the request parts.
+ pub fn request_parts(&self) -> &Parts {
+ &self.self_ref().request_parts
+ }
+
+ /// Get a reference to the completion handle.
+ pub fn promise(&self) -> CompletionHandle {
+ self.self_ref().promise.clone()
+ }
+
+ /// Get a reference to the response body completion handle.
+ pub fn body_promise(&self) -> CompletionHandle {
+ self
+ .self_ref()
+ .response
+ .as_ref()
+ .unwrap()
+ .body()
+ .completion_handle()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use deno_net::raw::NetworkStreamType;
+ use http::Request;
+
+ #[test]
+ fn test_slab() {
+ let req = Request::builder().body(()).unwrap();
+ let (parts, _) = req.into_parts();
+ let id = slab_insert_raw(
+ parts,
+ None,
+ HttpConnectionProperties {
+ peer_address: "".into(),
+ peer_port: None,
+ local_port: None,
+ stream_type: NetworkStreamType::Tcp,
+ },
+ );
+ let entry = slab_get(id);
+ entry.complete();
+ slab_drop(id);
+ }
+}