summaryrefslogtreecommitdiff
path: root/ext/http/slab.rs
blob: 9f7c1f3e988a3496e477935b5cb56894b24cc2ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::request_properties::HttpConnectionProperties;
use crate::response_body::CompletionHandle;
use crate::response_body::ResponseBytes;
use deno_core::error::AnyError;
use http::request::Parts;
use http::HeaderMap;
use hyper1::body::Incoming;
use hyper1::upgrade::OnUpgrade;

use slab::Slab;
use std::cell::RefCell;
use std::cell::RefMut;
use std::ptr::NonNull;
use std::rc::Rc;

pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
pub type SlabId = u32;

pub struct HttpSlabRecord {
  request_info: HttpConnectionProperties,
  request_parts: Parts,
  request_body: Option<Incoming>,
  // The response may get taken before we tear this down
  response: Option<Response>,
  promise: CompletionHandle,
  trailers: Rc<RefCell<Option<HeaderMap>>>,
  been_dropped: bool,
  #[cfg(feature = "__zombie_http_tracking")]
  alive: bool,
}

thread_local! {
  static SLAB: RefCell<Slab<HttpSlabRecord>> = RefCell::new(Slab::with_capacity(1024));
}

macro_rules! http_trace {
  ($index:expr, $args:tt) => {
    #[cfg(feature = "__http_tracing")]
    {
      let total = SLAB.with(|x| x.try_borrow().map(|x| x.len()));
      if let Ok(total) = total {
        println!("HTTP id={} total={}: {}", $index, total, format!($args));
      } else {
        println!("HTTP id={} total=?: {}", $index, format!($args));
      }
    }
  };
}

/// Hold a lock on the slab table and a reference to one entry in the table.
pub struct SlabEntry(
  NonNull<HttpSlabRecord>,
  SlabId,
  RefMut<'static, Slab<HttpSlabRecord>>,
);

pub fn slab_get(index: SlabId) -> SlabEntry {
  http_trace!(index, "slab_get");
  let mut lock: RefMut<'static, Slab<HttpSlabRecord>> = SLAB.with(|x| {
    // SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static
    unsafe { std::mem::transmute(x.borrow_mut()) }
  });
  let Some(entry) = lock.get_mut(index as usize) else {
    panic!("HTTP state error: Attempted to access invalid request {} ({} in total available)",
    index,
    lock.len())
  };
  #[cfg(feature = "__zombie_http_tracking")]
  {
    assert!(entry.alive, "HTTP state error: Entry is not alive");
  }
  let entry = NonNull::new(entry as _).unwrap();

  SlabEntry(entry, index, lock)
}

#[allow(clippy::let_and_return)]
fn slab_insert_raw(
  request_parts: Parts,
  request_body: Option<Incoming>,
  request_info: HttpConnectionProperties,
) -> SlabId {
  let index = SLAB.with(|slab| {
    let mut slab = slab.borrow_mut();
    let body = ResponseBytes::default();
    let trailers = body.trailers();
    slab.insert(HttpSlabRecord {
      request_info,
      request_parts,
      request_body,
      response: Some(Response::new(body)),
      trailers,
      been_dropped: false,
      promise: CompletionHandle::default(),
      #[cfg(feature = "__zombie_http_tracking")]
      alive: true,
    })
  }) as u32;
  http_trace!(index, "slab_insert");
  index
}

pub fn slab_insert(
  request: Request,
  request_info: HttpConnectionProperties,
) -> SlabId {
  let (request_parts, request_body) = request.into_parts();
  slab_insert_raw(request_parts, Some(request_body), request_info)
}

pub fn slab_drop(index: SlabId) {
  http_trace!(index, "slab_drop");
  let mut entry = slab_get(index);
  let record = entry.self_mut();
  assert!(
    !record.been_dropped,
    "HTTP state error: Entry has already been dropped"
  );
  record.been_dropped = true;
  if record.promise.is_completed() {
    drop(entry);
    slab_expunge(index);
  }
}

fn slab_expunge(index: SlabId) {
  SLAB.with(|slab| {
    #[cfg(__zombie_http_tracking)]
    {
      slab.borrow_mut().get_mut(index as usize).unwrap().alive = false;
    }
    #[cfg(not(__zombie_http_tracking))]
    {
      slab.borrow_mut().remove(index as usize);
    }
  });
  http_trace!(index, "slab_expunge");
}

impl SlabEntry {
  fn self_ref(&self) -> &HttpSlabRecord {
    // SAFETY: We have the lock and we're borrowing lifetime from self
    unsafe { self.0.as_ref() }
  }

  fn self_mut(&mut self) -> &mut HttpSlabRecord {
    // SAFETY: We have the lock and we're borrowing lifetime from self
    unsafe { self.0.as_mut() }
  }

  /// Perform the Hyper upgrade on this entry.
  pub fn upgrade(&mut self) -> Result<OnUpgrade, AnyError> {
    // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
    self
      .self_mut()
      .request_parts
      .extensions
      .remove::<OnUpgrade>()
      .ok_or_else(|| AnyError::msg("upgrade unavailable"))
  }

  /// Take the Hyper body from this entry.
  pub fn take_body(&mut self) -> Incoming {
    self.self_mut().request_body.take().unwrap()
  }

  /// Complete this entry, potentially expunging it if it is complete.
  pub fn complete(self) {
    let promise = &self.self_ref().promise;
    assert!(
      !promise.is_completed(),
      "HTTP state error: Entry has already been completed"
    );
    http_trace!(self.1, "SlabEntry::complete");
    promise.complete(true);
    // If we're all done, we need to drop ourself to release the lock before we expunge this record
    if self.self_ref().been_dropped {
      let index = self.1;
      drop(self);
      slab_expunge(index);
    }
  }

  /// Get a mutable reference to the response.
  pub fn response(&mut self) -> &mut Response {
    self.self_mut().response.as_mut().unwrap()
  }

  /// Get a mutable reference to the trailers.
  pub fn trailers(&mut self) -> &RefCell<Option<HeaderMap>> {
    &self.self_mut().trailers
  }

  /// Take the response.
  pub fn take_response(&mut self) -> Response {
    self.self_mut().response.take().unwrap()
  }

  /// Get a reference to the connection properties.
  pub fn request_info(&self) -> &HttpConnectionProperties {
    &self.self_ref().request_info
  }

  /// Get a reference to the request parts.
  pub fn request_parts(&self) -> &Parts {
    &self.self_ref().request_parts
  }

  /// Get a reference to the completion handle.
  pub fn promise(&self) -> CompletionHandle {
    self.self_ref().promise.clone()
  }

  /// Get a reference to the response body completion handle.
  pub fn body_promise(&self) -> CompletionHandle {
    self
      .self_ref()
      .response
      .as_ref()
      .unwrap()
      .body()
      .completion_handle()
  }
}

#[cfg(test)]
mod tests {
  use super::*;
  use deno_net::raw::NetworkStreamType;
  use http::Request;

  #[test]
  fn test_slab() {
    let req = Request::builder().body(()).unwrap();
    let (parts, _) = req.into_parts();
    let id = slab_insert_raw(
      parts,
      None,
      HttpConnectionProperties {
        peer_address: "".into(),
        peer_port: None,
        local_port: None,
        stream_type: NetworkStreamType::Tcp,
      },
    );
    let entry = slab_get(id);
    entry.complete();
    slab_drop(id);
  }
}