summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-05-16 17:00:59 -0600
committerGitHub <noreply@github.com>2023-05-16 17:00:59 -0600
commita22388bbd1377f75d3b873c59f6836cd12c2abe5 (patch)
tree04b9879ab7172884b3417fdf1a3d170564c54726
parent9ba2c4c42fcbadea1f19c67c88b5cbc4c97804f3 (diff)
fix(ext/http): Ensure cancelled requests don't crash Deno.serve (#19154)
Fixes for various `Attemped to access invalid request` bugs (#19058, #15427, #17213). We did not wait for both a drop event and a completion event before removing items from the slab table. This ensures that we do so. In addition, the slab methods are refactored out into `slab.rs` for maintainability.
-rw-r--r--cli/tests/unit/serve_test.ts33
-rw-r--r--ext/http/00_serve.js2
-rw-r--r--ext/http/Cargo.toml1
-rw-r--r--ext/http/http_next.rs447
-rw-r--r--ext/http/lib.rs1
-rw-r--r--ext/http/response_body.rs4
-rw-r--r--ext/http/slab.rs241
7 files changed, 416 insertions, 313 deletions
diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts
index 2bd2314b7..15dc84a28 100644
--- a/cli/tests/unit/serve_test.ts
+++ b/cli/tests/unit/serve_test.ts
@@ -15,7 +15,6 @@ import {
deferred,
fail,
} from "./test_util.ts";
-import { consoleSize } from "../../../runtime/js/40_tty.js";
const {
upgradeHttpRaw,
@@ -665,6 +664,38 @@ Deno.test({ permissions: { net: true } }, async function httpServerClose() {
await server;
});
+// https://github.com/denoland/deno/issues/15427
+Deno.test({ permissions: { net: true } }, async function httpServerCloseGet() {
+ const ac = new AbortController();
+ const listeningPromise = deferred();
+ const requestPromise = deferred();
+ const responsePromise = deferred();
+ const server = Deno.serve({
+ handler: async () => {
+ requestPromise.resolve();
+ await new Promise((r) => setTimeout(r, 500));
+ responsePromise.resolve();
+ return new Response("ok");
+ },
+ port: 4501,
+ signal: ac.signal,
+ onListen: onListen(listeningPromise),
+ onError: createOnErrorCb(ac),
+ });
+ await listeningPromise;
+ const conn = await Deno.connect({ port: 4501 });
+ const encoder = new TextEncoder();
+ const body =
+ `GET / HTTP/1.1\r\nHost: example.domain\r\nConnection: close\r\n\r\n`;
+ const writeResult = await conn.write(encoder.encode(body));
+ assertEquals(body.length, writeResult);
+ await requestPromise;
+ conn.close();
+ await responsePromise;
+ ac.abort();
+ await server;
+});
+
// FIXME:
Deno.test(
{ permissions: { net: true } },
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index 69ad88566..35af49b04 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -534,6 +534,8 @@ function mapToCallback(responseBodies, context, signal, callback, onError) {
// Did everything shut down while we were waiting?
if (context.closed) {
+ // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
+ op_http_set_promise_complete(req, 503);
innerRequest?.close();
return;
}
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index e555d742e..1b3d075d1 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -12,6 +12,7 @@ description = "HTTP server implementation for Deno"
[features]
"__zombie_http_tracking" = []
+"__http_tracing" = []
[lib]
path = "lib.rs"
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index a986de7f3..34281ee92 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -6,11 +6,14 @@ use crate::request_body::HttpRequestBody;
use crate::request_properties::HttpConnectionProperties;
use crate::request_properties::HttpListenProperties;
use crate::request_properties::HttpPropertyExtractor;
-use crate::response_body::CompletionHandle;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::response_body::V8StreamHttpResponseBody;
+use crate::slab::slab_drop;
+use crate::slab::slab_get;
+use crate::slab::slab_insert;
+use crate::slab::SlabId;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
use cache_control::CacheControl;
@@ -39,7 +42,6 @@ use http::header::CONTENT_ENCODING;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_RANGE;
use http::header::CONTENT_TYPE;
-use http::request::Parts;
use http::HeaderMap;
use hyper1::body::Incoming;
use hyper1::header::COOKIE;
@@ -49,12 +51,10 @@ use hyper1::server::conn::http1;
use hyper1::server::conn::http2;
use hyper1::service::service_fn;
use hyper1::service::HttpService;
-use hyper1::upgrade::OnUpgrade;
use hyper1::StatusCode;
use pin_project::pin_project;
use pin_project::pinned_drop;
-use slab::Slab;
use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
@@ -99,163 +99,16 @@ impl<
{
}
-pub struct HttpSlabRecord {
- request_info: HttpConnectionProperties,
- request_parts: Parts,
- request_body: Option<Incoming>,
- // The response may get taken before we tear this down
- response: Option<Response>,
- body: Option<Rc<HttpRequestBody>>,
- promise: CompletionHandle,
- #[cfg(__zombie_http_tracking)]
- alive: bool,
-}
-
-thread_local! {
- pub static SLAB: RefCell<Slab<HttpSlabRecord>> = RefCell::new(Slab::with_capacity(1024));
-}
-
-/// Generates getters and setters for the [`SLAB`]. For example,
-/// `with!(with_req, with_req_mut, Parts, http, http.request_parts);` expands to:
-///
-/// ```ignore
-/// #[inline(always)]
-/// #[allow(dead_code)]
-/// pub(crate) fn with_req_mut<T>(key: usize, f: impl FnOnce(&mut Parts) -> T) -> T {
-/// SLAB.with(|slab| {
-/// let mut borrow = slab.borrow_mut();
-/// let mut http = borrow.get_mut(key).unwrap();
-/// #[cfg(__zombie_http_tracking)]
-/// if !http.alive {
-/// panic!("Attempted to access a dead HTTP object")
-/// }
-/// f(&mut http.expr)
-/// })
-/// }
-
-/// #[inline(always)]
-/// #[allow(dead_code)]
-/// pub(crate) fn with_req<T>(key: usize, f: impl FnOnce(&Parts) -> T) -> T {
-/// SLAB.with(|slab| {
-/// let mut borrow = slab.borrow();
-/// let mut http = borrow.get(key).unwrap();
-/// #[cfg(__zombie_http_tracking)]
-/// if !http.alive {
-/// panic!("Attempted to access a dead HTTP object")
-/// }
-/// f(&http.expr)
-/// })
-/// }
-/// ```
-macro_rules! with {
- ($ref:ident, $mut:ident, $type:ty, $http:ident, $expr:expr) => {
- #[inline(always)]
- #[allow(dead_code)]
- pub(crate) fn $mut<T>(key: u32, f: impl FnOnce(&mut $type) -> T) -> T {
- SLAB.with(|slab| {
- let mut borrow = slab.borrow_mut();
- #[allow(unused_mut)] // TODO(mmastrac): compiler issue?
- let mut $http = match borrow.get_mut(key as usize) {
- Some(http) => http,
- None => panic!(
- "Attemped to access invalid request {} ({} in total available)",
- key,
- borrow.len()
- ),
- };
- #[cfg(__zombie_http_tracking)]
- if !$http.alive {
- panic!("Attempted to access a dead HTTP object")
- }
- f(&mut $expr)
- })
- }
-
- #[inline(always)]
- #[allow(dead_code)]
- pub(crate) fn $ref<T>(key: u32, f: impl FnOnce(&$type) -> T) -> T {
- SLAB.with(|slab| {
- let borrow = slab.borrow();
- let $http = borrow.get(key as usize).unwrap();
- #[cfg(__zombie_http_tracking)]
- if !$http.alive {
- panic!("Attempted to access a dead HTTP object")
- }
- f(&$expr)
- })
- }
- };
-}
-
-with!(with_req, with_req_mut, Parts, http, http.request_parts);
-with!(
- with_req_body,
- with_req_body_mut,
- Option<Incoming>,
- http,
- http.request_body
-);
-with!(
- with_resp,
- with_resp_mut,
- Option<Response>,
- http,
- http.response
-);
-with!(
- with_body,
- with_body_mut,
- Option<Rc<HttpRequestBody>>,
- http,
- http.body
-);
-with!(
- with_promise,
- with_promise_mut,
- CompletionHandle,
- http,
- http.promise
-);
-with!(with_http, with_http_mut, HttpSlabRecord, http, http);
-
-fn slab_insert(
- request: Request,
- request_info: HttpConnectionProperties,
-) -> u32 {
- SLAB.with(|slab| {
- let (request_parts, request_body) = request.into_parts();
- slab.borrow_mut().insert(HttpSlabRecord {
- request_info,
- request_parts,
- request_body: Some(request_body),
- response: Some(Response::new(ResponseBytes::default())),
- body: None,
- promise: CompletionHandle::default(),
- #[cfg(__zombie_http_tracking)]
- alive: true,
- })
- }) as u32
-}
-
#[op]
pub fn op_http_upgrade_raw(
state: &mut OpState,
- index: u32,
+ slab_id: SlabId,
) -> Result<ResourceId, AnyError> {
// Stage 1: extract the upgrade future
- let upgrade = with_http_mut(index, |http| {
- // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
- http
- .request_parts
- .extensions
- .remove::<OnUpgrade>()
- .ok_or_else(|| AnyError::msg("upgrade unavailable"))
- })?;
-
+ let upgrade = slab_get(slab_id).upgrade()?;
let (read, write) = tokio::io::duplex(1024);
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
-
spawn(async move {
let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();
@@ -266,8 +119,9 @@ pub fn op_http_upgrade_raw(
match upgrade_stream.write(&buf[..read]) {
Ok(None) => continue,
Ok(Some((response, bytes))) => {
- with_resp_mut(index, |resp| *resp = Some(response));
- with_promise_mut(index, |promise| promise.complete(true));
+ let mut http = slab_get(slab_id);
+ *http.response() = response;
+ http.complete();
let mut upgraded = upgrade.await?;
upgraded.write_all(&bytes).await?;
break upgraded;
@@ -315,29 +169,22 @@ pub fn op_http_upgrade_raw(
#[op]
pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>,
- index: u32,
+ slab_id: SlabId,
headers: Vec<(ByteString, ByteString)>,
) -> Result<ResourceId, AnyError> {
- // Stage 1: set the respnse to 101 Switching Protocols and send it
- let upgrade = with_http_mut(index, |http| {
- // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
- let upgrade = http
- .request_parts
- .extensions
- .remove::<OnUpgrade>()
- .ok_or_else(|| AnyError::msg("upgrade unavailable"))?;
-
- let response = http.response.as_mut().unwrap();
- *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
- for (name, value) in headers {
- response.headers_mut().append(
- HeaderName::from_bytes(&name).unwrap(),
- HeaderValue::from_bytes(&value).unwrap(),
- );
- }
- http.promise.complete(true);
- Ok::<_, AnyError>(upgrade)
- })?;
+ let mut http = slab_get(slab_id);
+ // Stage 1: set the response to 101 Switching Protocols and send it
+ let upgrade = http.upgrade()?;
+
+ let response = http.response();
+ *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
+ for (name, value) in headers {
+ response.headers_mut().append(
+ HeaderName::from_bytes(&name).unwrap(),
+ HeaderValue::from_bytes(&value).unwrap(),
+ );
+ }
+ http.complete();
// Stage 2: wait for the request to finish upgrading
let upgraded = upgrade.await?;
@@ -348,137 +195,127 @@ pub async fn op_http_upgrade_websocket_next(
}
#[op(fast)]
-pub fn op_http_set_promise_complete(index: u32, status: u16) {
- with_resp_mut(index, |resp| {
- // The Javascript code will never provide a status that is invalid here (see 23_response.js)
- *resp.as_mut().unwrap().status_mut() =
- StatusCode::from_u16(status).unwrap();
- });
- with_promise_mut(index, |promise| {
- promise.complete(true);
- });
+pub fn op_http_set_promise_complete(slab_id: SlabId, status: u16) {
+ let mut http = slab_get(slab_id);
+ // The Javascript code will never provide a status that is invalid here (see 23_response.js)
+ *http.response().status_mut() = StatusCode::from_u16(status).unwrap();
+ http.complete();
}
#[op]
pub fn op_http_get_request_method_and_url<HTTP>(
- index: u32,
+ slab_id: SlabId,
) -> (String, Option<String>, String, String, Option<u16>)
where
HTTP: HttpPropertyExtractor,
{
- // TODO(mmastrac): Passing method can be optimized
- with_http(index, |http| {
- let request_properties = HTTP::request_properties(
- &http.request_info,
- &http.request_parts.uri,
- &http.request_parts.headers,
- );
+ let http = slab_get(slab_id);
+ let request_info = http.request_info();
+ let request_parts = http.request_parts();
+ let request_properties = HTTP::request_properties(
+ request_info,
+ &request_parts.uri,
+ &request_parts.headers,
+ );
- // Only extract the path part - we handle authority elsewhere
- let path = match &http.request_parts.uri.path_and_query() {
- Some(path_and_query) => path_and_query.to_string(),
- None => "".to_owned(),
- };
+ // Only extract the path part - we handle authority elsewhere
+ let path = match &request_parts.uri.path_and_query() {
+ Some(path_and_query) => path_and_query.to_string(),
+ None => "".to_owned(),
+ };
- (
- http.request_parts.method.as_str().to_owned(),
- request_properties.authority,
- path,
- String::from(http.request_info.peer_address.as_ref()),
- http.request_info.peer_port,
- )
- })
+ // TODO(mmastrac): Passing method can be optimized
+ (
+ request_parts.method.as_str().to_owned(),
+ request_properties.authority,
+ path,
+ String::from(request_info.peer_address.as_ref()),
+ request_info.peer_port,
+ )
}
#[op]
pub fn op_http_get_request_header(
- index: u32,
+ slab_id: SlabId,
name: String,
) -> Option<ByteString> {
- with_req(index, |req| {
- let value = req.headers.get(name);
- value.map(|value| value.as_bytes().into())
- })
+ let http = slab_get(slab_id);
+ let value = http.request_parts().headers.get(name);
+ value.map(|value| value.as_bytes().into())
}
#[op]
pub fn op_http_get_request_headers(
- index: u32,
+ slab_id: SlabId,
) -> Vec<(ByteString, ByteString)> {
- with_req(index, |req| {
- let headers = &req.headers;
- let mut vec = Vec::with_capacity(headers.len());
- let mut cookies: Option<Vec<&[u8]>> = None;
- for (name, value) in headers {
- if name == COOKIE {
- if let Some(ref mut cookies) = cookies {
- cookies.push(value.as_bytes());
- } else {
- cookies = Some(vec![value.as_bytes()]);
- }
+ let http = slab_get(slab_id);
+ let headers = &http.request_parts().headers;
+ let mut vec = Vec::with_capacity(headers.len());
+ let mut cookies: Option<Vec<&[u8]>> = None;
+ for (name, value) in headers {
+ if name == COOKIE {
+ if let Some(ref mut cookies) = cookies {
+ cookies.push(value.as_bytes());
} else {
- let name: &[u8] = name.as_ref();
- vec.push((name.into(), value.as_bytes().into()))
+ cookies = Some(vec![value.as_bytes()]);
}
+ } else {
+ let name: &[u8] = name.as_ref();
+ vec.push((name.into(), value.as_bytes().into()))
}
+ }
- // We treat cookies specially, because we don't want them to get them
- // mangled by the `Headers` object in JS. What we do is take all cookie
- // headers and concat them into a single cookie header, separated by
- // semicolons.
- // TODO(mmastrac): This should probably happen on the JS side on-demand
- if let Some(cookies) = cookies {
- let cookie_sep = "; ".as_bytes();
- vec.push((
- ByteString::from(COOKIE.as_str()),
- ByteString::from(cookies.join(cookie_sep)),
- ));
- }
- vec
- })
+ // We treat cookies specially, because we don't want them to get them
+ // mangled by the `Headers` object in JS. What we do is take all cookie
+ // headers and concat them into a single cookie header, separated by
+ // semicolons.
+ // TODO(mmastrac): This should probably happen on the JS side on-demand
+ if let Some(cookies) = cookies {
+ let cookie_sep = "; ".as_bytes();
+ vec.push((
+ ByteString::from(COOKIE.as_str()),
+ ByteString::from(cookies.join(cookie_sep)),
+ ));
+ }
+ vec
}
#[op(fast)]
pub fn op_http_read_request_body(
state: &mut OpState,
- index: u32,
+ slab_id: SlabId,
) -> ResourceId {
- let incoming = with_req_body_mut(index, |body| body.take().unwrap());
+ let mut http = slab_get(slab_id);
+ let incoming = http.take_body();
let body_resource = Rc::new(HttpRequestBody::new(incoming));
- let res = state.resource_table.add_rc(body_resource.clone());
- with_body_mut(index, |body| {
- *body = Some(body_resource);
- });
- res
+ state.resource_table.add_rc(body_resource)
}
#[op(fast)]
-pub fn op_http_set_response_header(index: u32, name: &str, value: &str) {
- with_resp_mut(index, |resp| {
- let resp_headers = resp.as_mut().unwrap().headers_mut();
- // These are valid latin-1 strings
- let name = HeaderName::from_bytes(name.as_bytes()).unwrap();
- let value = HeaderValue::from_bytes(value.as_bytes()).unwrap();
- resp_headers.append(name, value);
- });
+pub fn op_http_set_response_header(slab_id: SlabId, name: &str, value: &str) {
+ let mut http = slab_get(slab_id);
+ let resp_headers = http.response().headers_mut();
+ // These are valid latin-1 strings
+ let name = HeaderName::from_bytes(name.as_bytes()).unwrap();
+ let value = HeaderValue::from_bytes(value.as_bytes()).unwrap();
+ resp_headers.append(name, value);
}
#[op]
pub fn op_http_set_response_headers(
- index: u32,
+ slab_id: SlabId,
headers: Vec<(ByteString, ByteString)>,
) {
+ let mut http = slab_get(slab_id);
// TODO(mmastrac): Invalid headers should be handled?
- with_resp_mut(index, |resp| {
- let resp_headers = resp.as_mut().unwrap().headers_mut();
- resp_headers.reserve(headers.len());
- for (name, value) in headers {
- // These are valid latin-1 strings
- let name = HeaderName::from_bytes(&name).unwrap();
- let value = HeaderValue::from_bytes(&value).unwrap();
- resp_headers.append(name, value);
- }
- })
+ let resp_headers = http.response().headers_mut();
+ resp_headers.reserve(headers.len());
+ for (name, value) in headers {
+ // These are valid latin-1 strings
+ let name = HeaderName::from_bytes(&name).unwrap();
+ let value = HeaderValue::from_bytes(&value).unwrap();
+ resp_headers.append(name, value);
+ }
}
fn is_request_compressible(headers: &HeaderMap) -> Compression {
@@ -588,28 +425,25 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
}
fn set_response(
- index: u32,
+ slab_id: SlabId,
length: Option<usize>,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
- let compression =
- with_req(index, |req| is_request_compressible(&req.headers));
-
- with_resp_mut(index, move |response| {
- let response = response.as_mut().unwrap();
- let compression = modify_compressibility_from_response(
- compression,
- length,
- response.headers_mut(),
- );
- response.body_mut().initialize(response_fn(compression))
- });
+ let mut http = slab_get(slab_id);
+ let compression = is_request_compressible(&http.request_parts().headers);
+ let response = http.response();
+ let compression = modify_compressibility_from_response(
+ compression,
+ length,
+ response.headers_mut(),
+ );
+ response.body_mut().initialize(response_fn(compression))
}
#[op(fast)]
pub fn op_http_set_response_body_resource(
state: &mut OpState,
- index: u32,
+ slab_id: SlabId,
stream_rid: ResourceId,
auto_close: bool,
) -> Result<(), AnyError> {
@@ -621,7 +455,7 @@ pub fn op_http_set_response_body_resource(
};
set_response(
- index,
+ slab_id,
resource.size_hint().1.map(|s| s as usize),
move |compression| {
ResponseBytesInner::from_resource(compression, resource, auto_close)
@@ -634,12 +468,11 @@ pub fn op_http_set_response_body_resource(
#[op(fast)]
pub fn op_http_set_response_body_stream(
state: &mut OpState,
- index: u32,
+ slab_id: SlabId,
) -> Result<ResourceId, AnyError> {
// TODO(mmastrac): what should this channel size be?
let (tx, rx) = tokio::sync::mpsc::channel(1);
-
- set_response(index, None, |compression| {
+ set_response(slab_id, None, |compression| {
ResponseBytesInner::from_v8(compression, rx)
});
@@ -647,18 +480,18 @@ pub fn op_http_set_response_body_stream(
}
#[op(fast)]
-pub fn op_http_set_response_body_text(index: u32, text: String) {
+pub fn op_http_set_response_body_text(slab_id: SlabId, text: String) {
if !text.is_empty() {
- set_response(index, Some(text.len()), |compression| {
+ set_response(slab_id, Some(text.len()), |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
}
}
#[op(fast)]
-pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) {
+pub fn op_http_set_response_body_bytes(slab_id: SlabId, buffer: &[u8]) {
if !buffer.is_empty() {
- set_response(index, Some(buffer.len()), |compression| {
+ set_response(slab_id, Some(buffer.len()), |compression| {
ResponseBytesInner::from_slice(compression, buffer)
});
};
@@ -667,12 +500,11 @@ pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) {
#[op]
pub async fn op_http_track(
state: Rc<RefCell<OpState>>,
- index: u32,
+ slab_id: SlabId,
server_rid: ResourceId,
) -> Result<(), AnyError> {
- let handle = with_resp(index, |resp| {
- resp.as_ref().unwrap().body().completion_handle()
- });
+ let http = slab_get(slab_id);
+ let handle = http.body_promise();
let join_handle = state
.borrow_mut()
@@ -689,15 +521,15 @@ pub async fn op_http_track(
}
#[pin_project(PinnedDrop)]
-pub struct SlabFuture<F: Future<Output = ()>>(u32, #[pin] F);
+pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F);
pub fn new_slab_future(
request: Request,
request_info: HttpConnectionProperties,
- tx: tokio::sync::mpsc::Sender<u32>,
+ tx: tokio::sync::mpsc::Sender<SlabId>,
) -> SlabFuture<impl Future<Output = ()>> {
let index = slab_insert(request, request_info);
- let rx = with_promise(index, |promise| promise.clone());
+ let rx = slab_get(index).promise();
SlabFuture(index, async move {
if tx.send(index).await.is_ok() {
// We only need to wait for completion if we aren't closed
@@ -711,16 +543,7 @@ impl<F: Future<Output = ()>> SlabFuture<F> {}
#[pinned_drop]
impl<F: Future<Output = ()>> PinnedDrop for SlabFuture<F> {
fn drop(self: Pin<&mut Self>) {
- SLAB.with(|slab| {
- #[cfg(__zombie_http_tracking)]
- {
- slab.borrow_mut().get_mut(self.0 as usize).unwrap().alive = false;
- }
- #[cfg(not(__zombie_http_tracking))]
- {
- slab.borrow_mut().remove(self.0 as usize);
- }
- });
+ slab_drop(self.0);
}
}
@@ -736,7 +559,7 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> {
.project()
.1
.poll(cx)
- .map(|_| Ok(with_resp_mut(index, |resp| resp.take().unwrap())))
+ .map(|_| Ok(slab_get(index).take_response()))
}
}
@@ -776,7 +599,7 @@ fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
cancel: Rc<CancelHandle>,
- tx: tokio::sync::mpsc::Sender<u32>,
+ tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone())
@@ -803,7 +626,7 @@ fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
cancel: Rc<CancelHandle>,
- tx: tokio::sync::mpsc::Sender<u32>,
+ tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone())
@@ -815,7 +638,7 @@ fn serve_http_on<HTTP>(
connection: HTTP::Connection,
listen_properties: &HttpListenProperties,
cancel: Rc<CancelHandle>,
- tx: tokio::sync::mpsc::Sender<u32>,
+ tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>>
where
HTTP: HttpPropertyExtractor,
@@ -843,7 +666,7 @@ struct HttpJoinHandle(
AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
// Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd
Rc<CancelHandle>,
- AsyncRefCell<tokio::sync::mpsc::Receiver<u32>>,
+ AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
);
impl HttpJoinHandle {
@@ -963,7 +786,7 @@ where
pub async fn op_http_wait(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
-) -> Result<u32, AnyError> {
+) -> Result<SlabId, AnyError> {
// We will get the join handle initially, as we might be consuming requests still
let join_handle = state
.borrow_mut()
@@ -1003,14 +826,14 @@ pub async fn op_http_wait(
if let Some(err) = err.source() {
if let Some(err) = err.downcast_ref::<io::Error>() {
if err.kind() == io::ErrorKind::NotConnected {
- return Ok(u32::MAX);
+ return Ok(SlabId::MAX);
}
}
}
return Err(err);
}
- Ok(u32::MAX)
+ Ok(SlabId::MAX)
}
struct UpgradeStream {
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 1ed1e60b7..2660f4653 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -81,6 +81,7 @@ mod reader_stream;
mod request_body;
mod request_properties;
mod response_body;
+mod slab;
mod websocket_upgrade;
pub use request_properties::DefaultHttpPropertyExtractor;
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index 288d74758..e30c917c3 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -82,6 +82,10 @@ impl CompletionHandle {
waker.wake();
}
}
+
+ pub fn is_completed(&self) -> bool {
+ self.inner.borrow().complete
+ }
}
impl Future for CompletionHandle {
diff --git a/ext/http/slab.rs b/ext/http/slab.rs
new file mode 100644
index 000000000..24554d689
--- /dev/null
+++ b/ext/http/slab.rs
@@ -0,0 +1,241 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use crate::request_properties::HttpConnectionProperties;
+use crate::response_body::CompletionHandle;
+use crate::response_body::ResponseBytes;
+use deno_core::error::AnyError;
+use http::request::Parts;
+use hyper1::body::Incoming;
+use hyper1::upgrade::OnUpgrade;
+
+use slab::Slab;
+use std::cell::RefCell;
+use std::cell::RefMut;
+use std::ptr::NonNull;
+
+pub type Request = hyper1::Request<Incoming>;
+pub type Response = hyper1::Response<ResponseBytes>;
+pub type SlabId = u32;
+
+pub struct HttpSlabRecord {
+ request_info: HttpConnectionProperties,
+ request_parts: Parts,
+ request_body: Option<Incoming>,
+ // The response may get taken before we tear this down
+ response: Option<Response>,
+ promise: CompletionHandle,
+ been_dropped: bool,
+ #[cfg(feature = "__zombie_http_tracking")]
+ alive: bool,
+}
+
+thread_local! {
+ static SLAB: RefCell<Slab<HttpSlabRecord>> = RefCell::new(Slab::with_capacity(1024));
+}
+
+macro_rules! http_trace {
+ ($index:expr, $args:tt) => {
+ #[cfg(feature = "__http_tracing")]
+ {
+ let total = SLAB.with(|x| x.try_borrow().map(|x| x.len()));
+ if let Ok(total) = total {
+ println!("HTTP id={} total={}: {}", $index, total, format!($args));
+ } else {
+ println!("HTTP id={} total=?: {}", $index, format!($args));
+ }
+ }
+ };
+}
+
+/// Hold a lock on the slab table and a reference to one entry in the table.
+pub struct SlabEntry(
+ NonNull<HttpSlabRecord>,
+ SlabId,
+ RefMut<'static, Slab<HttpSlabRecord>>,
+);
+
+pub fn slab_get(index: SlabId) -> SlabEntry {
+ http_trace!(index, "slab_get");
+ let mut lock: RefMut<'static, Slab<HttpSlabRecord>> = SLAB.with(|x| {
+ // SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static
+ unsafe { std::mem::transmute(x.borrow_mut()) }
+ });
+ let Some(entry) = lock.get_mut(index as usize) else {
+ panic!("HTTP state error: Attemped to access invalid request {} ({} in total available)",
+ index,
+ lock.len())
+ };
+ #[cfg(feature = "__zombie_http_tracking")]
+ {
+ assert!(entry.alive, "HTTP state error: Entry is not alive");
+ }
+ let entry = NonNull::new(entry as _).unwrap();
+
+ SlabEntry(entry, index, lock)
+}
+
+#[allow(clippy::let_and_return)]
+fn slab_insert_raw(
+ request_parts: Parts,
+ request_body: Option<Incoming>,
+ request_info: HttpConnectionProperties,
+) -> SlabId {
+ let index = SLAB.with(|slab| {
+ let mut slab = slab.borrow_mut();
+ slab.insert(HttpSlabRecord {
+ request_info,
+ request_parts,
+ request_body,
+ response: Some(Response::new(ResponseBytes::default())),
+ been_dropped: false,
+ promise: CompletionHandle::default(),
+ #[cfg(feature = "__zombie_http_tracking")]
+ alive: true,
+ })
+ }) as u32;
+ http_trace!(index, "slab_insert");
+ index
+}
+
+pub fn slab_insert(
+ request: Request,
+ request_info: HttpConnectionProperties,
+) -> SlabId {
+ let (request_parts, request_body) = request.into_parts();
+ slab_insert_raw(request_parts, Some(request_body), request_info)
+}
+
+pub fn slab_drop(index: SlabId) {
+ http_trace!(index, "slab_drop");
+ let mut entry = slab_get(index);
+ let record = entry.self_mut();
+ assert!(
+ !record.been_dropped,
+ "HTTP state error: Entry has already been dropped"
+ );
+ record.been_dropped = true;
+ if record.promise.is_completed() {
+ drop(entry);
+ slab_expunge(index);
+ }
+}
+
+fn slab_expunge(index: SlabId) {
+ SLAB.with(|slab| {
+ #[cfg(__zombie_http_tracking)]
+ {
+ slab.borrow_mut().get_mut(index as usize).unwrap().alive = false;
+ }
+ #[cfg(not(__zombie_http_tracking))]
+ {
+ slab.borrow_mut().remove(index as usize);
+ }
+ });
+ http_trace!(index, "slab_expunge");
+}
+
+impl SlabEntry {
+ fn self_ref(&self) -> &HttpSlabRecord {
+ // SAFETY: We have the lock and we're borrowing lifetime from self
+ unsafe { self.0.as_ref() }
+ }
+
+ fn self_mut(&mut self) -> &mut HttpSlabRecord {
+ // SAFETY: We have the lock and we're borrowing lifetime from self
+ unsafe { self.0.as_mut() }
+ }
+
+ /// Perform the Hyper upgrade on this entry.
+ pub fn upgrade(&mut self) -> Result<OnUpgrade, AnyError> {
+ // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
+ self
+ .self_mut()
+ .request_parts
+ .extensions
+ .remove::<OnUpgrade>()
+ .ok_or_else(|| AnyError::msg("upgrade unavailable"))
+ }
+
+ /// Take the Hyper body from this entry.
+ pub fn take_body(&mut self) -> Incoming {
+ self.self_mut().request_body.take().unwrap()
+ }
+
+ /// Complete this entry, potentially expunging it if it is complete.
+ pub fn complete(self) {
+ let promise = &self.self_ref().promise;
+ assert!(
+ !promise.is_completed(),
+ "HTTP state error: Entry has already been completed"
+ );
+ http_trace!(self.1, "SlabEntry::complete");
+ promise.complete(true);
+ // If we're all done, we need to drop ourself to release the lock before we expunge this record
+ if self.self_ref().been_dropped {
+ let index = self.1;
+ drop(self);
+ slab_expunge(index);
+ }
+ }
+
+ /// Get a mutable reference to the response.
+ pub fn response(&mut self) -> &mut Response {
+ self.self_mut().response.as_mut().unwrap()
+ }
+
+ /// Take the response.
+ pub fn take_response(&mut self) -> Response {
+ self.self_mut().response.take().unwrap()
+ }
+
+ /// Get a reference to the connection properties.
+ pub fn request_info(&self) -> &HttpConnectionProperties {
+ &self.self_ref().request_info
+ }
+
+ /// Get a reference to the request parts.
+ pub fn request_parts(&self) -> &Parts {
+ &self.self_ref().request_parts
+ }
+
+ /// Get a reference to the completion handle.
+ pub fn promise(&self) -> CompletionHandle {
+ self.self_ref().promise.clone()
+ }
+
+ /// Get a reference to the response body completion handle.
+ pub fn body_promise(&self) -> CompletionHandle {
+ self
+ .self_ref()
+ .response
+ .as_ref()
+ .unwrap()
+ .body()
+ .completion_handle()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use deno_net::raw::NetworkStreamType;
+ use http::Request;
+
+ #[test]
+ fn test_slab() {
+ let req = Request::builder().body(()).unwrap();
+ let (parts, _) = req.into_parts();
+ let id = slab_insert_raw(
+ parts,
+ None,
+ HttpConnectionProperties {
+ peer_address: "".into(),
+ peer_port: None,
+ local_port: None,
+ stream_type: NetworkStreamType::Tcp,
+ },
+ );
+ let entry = slab_get(id);
+ entry.complete();
+ slab_drop(id);
+ }
+}