summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock15
-rw-r--r--ext/http/00_serve.js59
-rw-r--r--ext/http/Cargo.toml3
-rw-r--r--ext/http/http_next.rs230
-rw-r--r--ext/http/lib.rs2
-rw-r--r--ext/http/response_body.rs3
-rw-r--r--ext/http/service.rs401
-rw-r--r--ext/http/slab.rs374
8 files changed, 602 insertions, 485 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 6b7f42c11..7c9816668 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1285,6 +1285,7 @@ dependencies = [
"flate2",
"fly-accept-encoding",
"http",
+ "http-body-util",
"httparse",
"hyper 0.14.27",
"hyper 1.0.0-rc.4",
@@ -1298,7 +1299,6 @@ dependencies = [
"ring",
"scopeguard",
"serde",
- "slab",
"smallvec",
"thiserror",
"tokio",
@@ -2717,6 +2717,19 @@ dependencies = [
]
[[package]]
+name = "http-body-util"
+version = "0.1.0-rc.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "08ef12f041acdd397010e5fb6433270c147d3b8b2d0a840cd7fff8e531dca5c8"
+dependencies = [
+ "bytes",
+ "futures-util",
+ "http",
+ "http-body 1.0.0-rc.2",
+ "pin-project-lite",
+]
+
+[[package]]
name = "httparse"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js
index fbd2014a7..05e0bb5c3 100644
--- a/ext/http/00_serve.js
+++ b/ext/http/00_serve.js
@@ -117,11 +117,11 @@ function upgradeHttpRaw(req, conn) {
function addTrailers(resp, headerList) {
const inner = toInnerResponse(resp);
- op_http_set_response_trailers(inner.slabId, headerList);
+ op_http_set_response_trailers(inner.external, headerList);
}
class InnerRequest {
- #slabId;
+ #external;
#context;
#methodAndUri;
#streamRid;
@@ -129,14 +129,14 @@ class InnerRequest {
#upgraded;
#urlValue;
- constructor(slabId, context) {
- this.#slabId = slabId;
+ constructor(external, context) {
+ this.#external = external;
this.#context = context;
this.#upgraded = false;
}
close() {
- this.#slabId = undefined;
+ this.#external = null;
}
get [_upgraded]() {
@@ -147,7 +147,7 @@ class InnerRequest {
if (this.#upgraded) {
throw new Deno.errors.Http("already upgraded");
}
- if (this.#slabId === undefined) {
+ if (this.#external === null) {
throw new Deno.errors.Http("already closed");
}
@@ -159,7 +159,7 @@ class InnerRequest {
// upgradeHttpRaw is sync
if (upgradeType == "upgradeHttpRaw") {
- const slabId = this.#slabId;
+ const external = this.#external;
const underlyingConn = originalArgs[0];
this.url();
@@ -168,7 +168,7 @@ class InnerRequest {
this.#upgraded = () => {};
- const upgradeRid = op_http_upgrade_raw(slabId);
+ const upgradeRid = op_http_upgrade_raw(external);
const conn = new TcpConn(
upgradeRid,
@@ -184,7 +184,7 @@ class InnerRequest {
const response = originalArgs[0];
const ws = originalArgs[1];
- const slabId = this.#slabId;
+ const external = this.#external;
this.url();
this.headerList;
@@ -194,15 +194,16 @@ class InnerRequest {
this.#upgraded = () => {
goAhead.resolve();
};
+ const wsPromise = op_http_upgrade_websocket_next(
+ external,
+ response.headerList,
+ );
// Start the upgrade in the background.
(async () => {
try {
// Returns the upgraded websocket connection
- const wsRid = await op_http_upgrade_websocket_next(
- slabId,
- response.headerList,
- );
+ const wsRid = await wsPromise;
// We have to wait for the go-ahead signal
await goAhead;
@@ -236,12 +237,12 @@ class InnerRequest {
}
if (this.#methodAndUri === undefined) {
- if (this.#slabId === undefined) {
+ if (this.#external === null) {
throw new TypeError("request closed");
}
// TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider
// splitting this up into multiple ops.
- this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId);
+ this.#methodAndUri = op_http_get_request_method_and_url(this.#external);
}
const path = this.#methodAndUri[2];
@@ -281,10 +282,10 @@ class InnerRequest {
};
}
if (this.#methodAndUri === undefined) {
- if (this.#slabId === undefined) {
+ if (this.#external === null) {
throw new TypeError("request closed");
}
- this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId);
+ this.#methodAndUri = op_http_get_request_method_and_url(this.#external);
}
return {
transport: "tcp",
@@ -295,16 +296,16 @@ class InnerRequest {
get method() {
if (this.#methodAndUri === undefined) {
- if (this.#slabId === undefined) {
+ if (this.#external === null) {
throw new TypeError("request closed");
}
- this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId);
+ this.#methodAndUri = op_http_get_request_method_and_url(this.#external);
}
return this.#methodAndUri[0];
}
get body() {
- if (this.#slabId === undefined) {
+ if (this.#external === null) {
throw new TypeError("request closed");
}
if (this.#body !== undefined) {
@@ -316,25 +317,25 @@ class InnerRequest {
this.#body = null;
return null;
}
- this.#streamRid = op_http_read_request_body(this.#slabId);
+ this.#streamRid = op_http_read_request_body(this.#external);
this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false));
return this.#body;
}
get headerList() {
- if (this.#slabId === undefined) {
+ if (this.#external === null) {
throw new TypeError("request closed");
}
const headers = [];
- const reqHeaders = op_http_get_request_headers(this.#slabId);
+ const reqHeaders = op_http_get_request_headers(this.#external);
for (let i = 0; i < reqHeaders.length; i += 2) {
ArrayPrototypePush(headers, [reqHeaders[i], reqHeaders[i + 1]]);
}
return headers;
}
- get slabId() {
- return this.#slabId;
+ get external() {
+ return this.#external;
}
}
@@ -483,8 +484,8 @@ function mapToCallback(context, callback, onError) {
// Did everything shut down while we were waiting?
if (context.closed) {
// We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
- op_http_set_promise_complete(req, 503);
innerRequest?.close();
+ op_http_set_promise_complete(req, 503);
return;
}
@@ -498,8 +499,8 @@ function mapToCallback(context, callback, onError) {
}
}
- fastSyncResponseOrStream(req, inner.body, status);
innerRequest?.close();
+ fastSyncResponseOrStream(req, inner.body, status);
};
}
@@ -659,7 +660,7 @@ function serveHttpOn(context, callback) {
try {
// Attempt to pull as many requests out of the queue as possible before awaiting. This API is
// a synchronous, non-blocking API that returns u32::MAX if anything goes wrong.
- while ((req = op_http_try_wait(rid)) !== -1) {
+ while ((req = op_http_try_wait(rid)) !== null) {
PromisePrototypeCatch(callback(req), promiseErrorHandler);
}
currentPromise = op_http_wait(rid);
@@ -677,7 +678,7 @@ function serveHttpOn(context, callback) {
}
throw new Deno.errors.Http(error);
}
- if (req === -1) {
+ if (req === null) {
break;
}
PromisePrototypeCatch(callback(req), promiseErrorHandler);
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index ff44a6992..923d9e2ea 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -11,7 +11,6 @@ repository.workspace = true
description = "HTTP server implementation for Deno"
[features]
-"__zombie_http_tracking" = []
"__http_tracing" = []
[lib]
@@ -46,7 +45,6 @@ pin-project.workspace = true
ring.workspace = true
scopeguard.workspace = true
serde.workspace = true
-slab.workspace = true
smallvec.workspace = true
thiserror.workspace = true
tokio.workspace = true
@@ -54,4 +52,5 @@ tokio-util = { workspace = true, features = ["io"] }
[dev-dependencies]
bencher.workspace = true
+http-body-util = "=0.1.0-rc.3"
rand.workspace = true
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 7fc396b38..399515159 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -10,12 +10,11 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
-use crate::slab::new_slab_future;
-use crate::slab::slab_get;
-use crate::slab::slab_init;
-use crate::slab::HttpRequestBodyAutocloser;
-use crate::slab::RefCount;
-use crate::slab::SlabId;
+use crate::service::handle_request;
+use crate::service::http_trace;
+use crate::service::HttpRecord;
+use crate::service::HttpRequestBodyAutocloser;
+use crate::service::RefCount;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
use cache_control::CacheControl;
@@ -33,6 +32,7 @@ use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
+use deno_core::ExternalPointer;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
@@ -62,9 +62,11 @@ use once_cell::sync::Lazy;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::ffi::c_void;
use std::future::Future;
use std::io;
use std::pin::Pin;
+use std::ptr::null;
use std::rc::Rc;
use std::time::Duration;
@@ -116,14 +118,66 @@ impl<
{
}
+#[repr(transparent)]
+struct RcHttpRecord(Rc<HttpRecord>);
+
+// Temp copy
+/// Define an external type.
+macro_rules! external {
+ ($type:ident, $name:literal) => {
+ impl deno_core::Externalizable for $type {
+ fn external_marker() -> usize {
+ // Use the address of a static mut as a way to get around lack of usize-sized TypeId. Because it is mutable, the
+ // compiler cannot collapse multiple definitions into one.
+ static mut DEFINITION: deno_core::ExternalDefinition =
+ deno_core::ExternalDefinition::new($name);
+ // Wash the pointer through black_box so the compiler cannot see what we're going to do with it and needs
+ // to assume it will be used for valid purposes.
+ // SAFETY: temporary while waiting on deno core bump
+ let ptr = std::hint::black_box(unsafe { &mut DEFINITION } as *mut _);
+ ptr as usize
+ }
+
+ fn external_name() -> &'static str {
+ $name
+ }
+ }
+ };
+}
+
+// Register the [`HttpRecord`] as an external.
+external!(RcHttpRecord, "http record");
+
+/// Construct Rc<HttpRecord> from raw external pointer, consuming
+/// refcount. You must make sure the external is deleted on the JS side.
+macro_rules! take_external {
+ ($external:expr, $args:tt) => {{
+ let ptr = ExternalPointer::<RcHttpRecord>::from_raw($external);
+ let record = ptr.unsafely_take().0;
+ http_trace!(record, $args);
+ record
+ }};
+}
+
+/// Clone Rc<HttpRecord> from raw external pointer.
+macro_rules! clone_external {
+ ($external:expr, $args:tt) => {{
+ let ptr = ExternalPointer::<RcHttpRecord>::from_raw($external);
+ ptr.unsafely_deref().0.clone()
+ }};
+}
+
#[op2(fast)]
#[smi]
pub fn op_http_upgrade_raw(
state: &mut OpState,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> Result<ResourceId, AnyError> {
+ // SAFETY: external is deleted before calling this op.
+ let http = unsafe { take_external!(external, "op_http_upgrade_raw") };
+
// Stage 1: extract the upgrade future
- let upgrade = slab_get(slab_id).upgrade()?;
+ let upgrade = http.upgrade()?;
let (read, write) = tokio::io::duplex(1024);
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
@@ -137,7 +191,6 @@ pub fn op_http_upgrade_raw(
match upgrade_stream.write(&buf[..read]) {
Ok(None) => continue,
Ok(Some((response, bytes))) => {
- let mut http = slab_get(slab_id);
*http.response() = response;
http.complete();
let mut upgraded = TokioIo::new(upgrade.await?);
@@ -188,20 +241,23 @@ pub fn op_http_upgrade_raw(
#[smi]
pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<ResourceId, AnyError> {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_upgrade_websocket_next") };
// Stage 1: set the response to 101 Switching Protocols and send it
let upgrade = http.upgrade()?;
-
- let response = http.response();
- *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
- for (name, value) in headers {
- response.headers_mut().append(
- HeaderName::from_bytes(&name).unwrap(),
- HeaderValue::from_bytes(&value).unwrap(),
- );
+ {
+ let mut response = http.response();
+ *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
+ for (name, value) in headers {
+ response.headers_mut().append(
+ HeaderName::from_bytes(&name).unwrap(),
+ HeaderValue::from_bytes(&value).unwrap(),
+ );
+ }
}
http.complete();
@@ -214,8 +270,10 @@ pub async fn op_http_upgrade_websocket_next(
}
#[op2(fast)]
-pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) {
- let mut http = slab_get(slab_id);
+pub fn op_http_set_promise_complete(external: *const c_void, status: u16) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_set_promise_complete") };
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
@@ -227,16 +285,18 @@ pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) {
#[op2]
pub fn op_http_get_request_method_and_url<'scope, HTTP>(
scope: &mut v8::HandleScope<'scope>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> v8::Local<'scope, v8::Array>
where
HTTP: HttpPropertyExtractor,
{
- let http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_method_and_url") };
let request_info = http.request_info();
let request_parts = http.request_parts();
let request_properties = HTTP::request_properties(
- request_info,
+ &request_info,
&request_parts.uri,
&request_parts.headers,
);
@@ -291,20 +351,25 @@ where
#[op2]
#[serde]
pub fn op_http_get_request_header(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[string] name: String,
) -> Option<ByteString> {
- let http = slab_get(slab_id);
- let value = http.request_parts().headers.get(name);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_header") };
+ let request_parts = http.request_parts();
+ let value = request_parts.headers.get(name);
value.map(|value| value.as_bytes().into())
}
#[op2]
pub fn op_http_get_request_headers<'scope>(
scope: &mut v8::HandleScope<'scope>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> v8::Local<'scope, v8::Array> {
- let http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_headers") };
let headers = &http.request_parts().headers;
// Two slots for each header key/value pair
let mut vec: SmallVec<[v8::Local<v8::Value>; 32]> =
@@ -371,9 +436,11 @@ pub fn op_http_get_request_headers<'scope>(
#[smi]
pub fn op_http_read_request_body(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> ResourceId {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_read_request_body") };
let rid = if let Some(incoming) = http.take_body() {
let body_resource = Rc::new(HttpRequestBody::new(incoming));
state.borrow_mut().resource_table.add_rc(body_resource)
@@ -388,12 +455,15 @@ pub fn op_http_read_request_body(
#[op2(fast)]
pub fn op_http_set_response_header(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[string(onebyte)] name: Cow<[u8]>,
#[string(onebyte)] value: Cow<[u8]>,
) {
- let mut http = slab_get(slab_id);
- let resp_headers = http.response().headers_mut();
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_header") };
+ let mut response = http.response();
+ let resp_headers = response.headers_mut();
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = match value {
@@ -409,12 +479,15 @@ pub fn op_http_set_response_header(
#[op2]
pub fn op_http_set_response_headers(
scope: &mut v8::HandleScope,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
headers: v8::Local<v8::Array>,
) {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_headers") };
// TODO(mmastrac): Invalid headers should be handled?
- let resp_headers = http.response().headers_mut();
+ let mut response = http.response();
+ let resp_headers = response.headers_mut();
let len = headers.length();
let header_len = len * 2;
@@ -438,10 +511,12 @@ pub fn op_http_set_response_headers(
#[op2]
pub fn op_http_set_response_trailers(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[serde] trailers: Vec<(ByteString, ByteString)>,
) {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_trailers") };
let mut trailer_map: HeaderMap = HeaderMap::with_capacity(trailers.len());
for (name, value) in trailers {
// These are valid latin-1 strings
@@ -577,20 +652,21 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
/// Sets the appropriate response body. Use `force_instantiate_body` if you need
/// to ensure that the response is cleaned up correctly (eg: for resources).
fn set_response(
- slab_id: SlabId,
+ external: *const c_void,
length: Option<usize>,
status: u16,
force_instantiate_body: bool,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
- let mut http = slab_get(slab_id);
+ // SAFETY: external is deleted before calling this op.
+ let http = unsafe { take_external!(external, "set_response") };
// The request may have been cancelled by this point and if so, there's no need for us to
// do all of this work to send the response.
if !http.cancelled() {
let resource = http.take_resource();
let compression =
is_request_compressible(length, &http.request_parts().headers);
- let response = http.response();
+ let mut response = http.response();
let compression =
modify_compressibility_from_response(compression, response.headers_mut());
response
@@ -612,7 +688,7 @@ fn set_response(
#[op2(fast)]
pub fn op_http_set_response_body_resource(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
@@ -634,7 +710,7 @@ pub fn op_http_set_response_body_resource(
};
set_response(
- slab_id,
+ external,
resource.size_hint().1.map(|s| s as usize),
status,
true,
@@ -648,41 +724,42 @@ pub fn op_http_set_response_body_resource(
#[op2(fast)]
pub fn op_http_set_response_body_text(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[string] text: String,
status: u16,
) {
if !text.is_empty() {
- set_response(slab_id, Some(text.len()), status, false, |compression| {
+ set_response(external, Some(text.len()), status, false, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
} else {
- op_http_set_promise_complete::call(slab_id, status);
+ op_http_set_promise_complete::call(external, status);
}
}
#[op2(fast)]
pub fn op_http_set_response_body_bytes(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[buffer] buffer: JsBuffer,
status: u16,
) {
if !buffer.is_empty() {
- set_response(slab_id, Some(buffer.len()), status, false, |compression| {
+ set_response(external, Some(buffer.len()), status, false, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
} else {
- op_http_set_promise_complete::call(slab_id, status);
+ op_http_set_promise_complete::call(external, status);
}
}
#[op2(async)]
pub async fn op_http_track(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[smi] server_rid: ResourceId,
) -> Result<(), AnyError> {
- let http = slab_get(slab_id);
+ // SAFETY: op is called with external.
+ let http = unsafe { clone_external!(external, "op_http_track") };
let handle = http.body_promise();
let join_handle = state
@@ -764,7 +841,7 @@ fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
- tx: tokio::sync::mpsc::Sender<SlabId>,
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> {
let HttpLifetime {
refcount,
@@ -773,7 +850,7 @@ fn serve_https(
} = lifetime;
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
+ handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
async {
@@ -801,7 +878,7 @@ fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
- tx: tokio::sync::mpsc::Sender<SlabId>,
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> {
let HttpLifetime {
refcount,
@@ -810,7 +887,7 @@ fn serve_http(
} = lifetime;
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
+ handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
serve_http2_autodetect(io, svc, listen_cancel_handle)
@@ -822,7 +899,7 @@ fn serve_http_on<HTTP>(
connection: HTTP::Connection,
listen_properties: &HttpListenProperties,
lifetime: HttpLifetime,
- tx: tokio::sync::mpsc::Sender<SlabId>,
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>>
where
HTTP: HttpPropertyExtractor,
@@ -857,12 +934,12 @@ struct HttpJoinHandle {
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
- rx: AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
+ rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
refcount: RefCount,
}
impl HttpJoinHandle {
- fn new(rx: tokio::sync::mpsc::Receiver<SlabId>) -> Self {
+ fn new(rx: tokio::sync::mpsc::Receiver<Rc<HttpRecord>>) -> Self {
Self {
join_handle: AsyncRefCell::new(None),
connection_cancel_handle: CancelHandle::new_rc(),
@@ -918,8 +995,6 @@ pub fn op_http_serve<HTTP>(
where
HTTP: HttpPropertyExtractor,
{
- slab_init();
-
let listener =
HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?;
@@ -969,8 +1044,6 @@ pub fn op_http_serve_on<HTTP>(
where
HTTP: HttpPropertyExtractor,
{
- slab_init();
-
let connection =
HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?;
@@ -1000,36 +1073,38 @@ where
}
/// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything
-/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error.
+/// goes wrong in this method we return null and let the async handler pick up the real error.
#[op2(fast)]
-#[smi]
-pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
+pub fn op_http_try_wait(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+) -> *const c_void {
// The resource needs to exist.
let Ok(join_handle) = state.resource_table.get::<HttpJoinHandle>(rid) else {
- return SlabId::MAX;
+ return null();
};
// If join handle is somehow locked, just abort.
let Some(mut handle) =
RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut()
else {
- return SlabId::MAX;
+ return null();
};
// See if there are any requests waiting on this channel. If not, return.
- let Ok(id) = handle.try_recv() else {
- return SlabId::MAX;
+ let Ok(record) = handle.try_recv() else {
+ return null();
};
- id
+ let ptr = ExternalPointer::new(RcHttpRecord(record));
+ ptr.into_raw()
}
#[op2(async)]
-#[smi]
pub async fn op_http_wait(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<SlabId, AnyError> {
+) -> Result<*const c_void, AnyError> {
// We will get the join handle initially, as we might be consuming requests still
let join_handle = state
.borrow_mut()
@@ -1046,8 +1121,9 @@ pub async fn op_http_wait(
.await;
// Do we have a request?
- if let Some(req) = next {
- return Ok(req);
+ if let Some(record) = next {
+ let ptr = ExternalPointer::new(RcHttpRecord(record));
+ return Ok(ptr.into_raw());
}
// No - we're shutting down
@@ -1063,14 +1139,14 @@ pub async fn op_http_wait(
if let Some(err) = err.source() {
if let Some(err) = err.downcast_ref::<io::Error>() {
if err.kind() == io::ErrorKind::NotConnected {
- return Ok(SlabId::MAX);
+ return Ok(null());
}
}
}
return Err(err);
}
- Ok(SlabId::MAX)
+ Ok(null())
}
/// Cancels the HTTP handle.
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index d47011119..0460a3707 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -86,7 +86,7 @@ mod reader_stream;
mod request_body;
mod request_properties;
mod response_body;
-mod slab;
+mod service;
mod websocket_upgrade;
pub use request_properties::DefaultHttpPropertyExtractor;
diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs
index 4f7e3b0a5..7d91dce6b 100644
--- a/ext/http/response_body.rs
+++ b/ext/http/response_body.rs
@@ -23,7 +23,7 @@ use hyper1::body::Frame;
use hyper1::body::SizeHint;
use pin_project::pin_project;
-use crate::slab::HttpRequestBodyAutocloser;
+use crate::service::HttpRequestBodyAutocloser;
/// Simplification for nested types we use for our streams. We provide a way to convert from
/// this type into Hyper's body [`Frame`].
@@ -80,6 +80,7 @@ impl CompletionHandle {
}
}
+ #[allow(dead_code)]
pub fn is_completed(&self) -> bool {
self.inner.borrow().complete
}
diff --git a/ext/http/service.rs b/ext/http/service.rs
new file mode 100644
index 000000000..ea67980f3
--- /dev/null
+++ b/ext/http/service.rs
@@ -0,0 +1,401 @@
+// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
+use crate::request_properties::HttpConnectionProperties;
+use crate::response_body::CompletionHandle;
+use crate::response_body::ResponseBytes;
+use deno_core::error::AnyError;
+use deno_core::OpState;
+use deno_core::ResourceId;
+use http::request::Parts;
+use http::HeaderMap;
+use hyper1::body::Incoming;
+use hyper1::upgrade::OnUpgrade;
+
+use scopeguard::guard;
+use scopeguard::ScopeGuard;
+use std::cell::Ref;
+use std::cell::RefCell;
+use std::cell::RefMut;
+use std::future::Future;
+use std::rc::Rc;
+
+pub type Request = hyper1::Request<Incoming>;
+pub type Response = hyper1::Response<ResponseBytes>;
+
+macro_rules! http_trace {
+ ($record:expr, $args:tt) => {
+ #[cfg(feature = "__http_tracing")]
+ {
+ println!(
+ "HTTP id={:p} strong={}: {}",
+ $record,
+ std::rc::Rc::strong_count(&$record),
+ format!($args),
+ );
+ }
+ };
+}
+
+pub(crate) use http_trace;
+
+#[repr(transparent)]
+#[derive(Clone, Default)]
+pub struct RefCount(pub Rc<()>);
+
+enum RequestBodyState {
+ Incoming(Incoming),
+ Resource(HttpRequestBodyAutocloser),
+}
+
+impl From<Incoming> for RequestBodyState {
+ fn from(value: Incoming) -> Self {
+ RequestBodyState::Incoming(value)
+ }
+}
+
+/// Ensures that the request body closes itself when no longer needed.
+pub struct HttpRequestBodyAutocloser(ResourceId, Rc<RefCell<OpState>>);
+
+impl HttpRequestBodyAutocloser {
+ pub fn new(res: ResourceId, op_state: Rc<RefCell<OpState>>) -> Self {
+ Self(res, op_state)
+ }
+}
+
+impl Drop for HttpRequestBodyAutocloser {
+ fn drop(&mut self) {
+ if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) {
+ res.close();
+ }
+ }
+}
+
+pub async fn handle_request(
+ request: Request,
+ request_info: HttpConnectionProperties,
+ _refcount: RefCount, // Keep server alive for duration of this future.
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
+) -> Result<Response, hyper::Error> {
+ // If the underlying TCP connection is closed, this future will be dropped
+ // and execution could stop at any await point.
+ // The HttpRecord must live until JavaScript is done processing so is wrapped
+ // in an Rc. The guard ensures unneeded resources are freed at cancellation.
+ let guarded_record =
+ guard(HttpRecord::new(request, request_info), HttpRecord::cancel);
+
+ // Clone HttpRecord and send to JavaScript for processing.
+ // Safe to unwrap as channel receiver is never closed.
+ tx.send(guarded_record.clone()).await.unwrap();
+
+ // Wait for JavaScript handler to return request.
+ http_trace!(*guarded_record, "handle_request response_ready.await");
+ guarded_record.response_ready().await;
+
+ // Defuse the guard. Must not await after the point.
+ let record = ScopeGuard::into_inner(guarded_record);
+ http_trace!(record, "handle_request complete");
+ assert!(
+ Rc::strong_count(&record) == 1,
+ "HTTP state error: Expected to be last strong reference (handle_request)"
+ );
+ let response = record.take_response();
+ Ok(response)
+}
+
+struct HttpRecordInner {
+ request_info: HttpConnectionProperties,
+ request_parts: Parts,
+ request_body: Option<RequestBodyState>,
+ /// The response may get taken before we tear this down
+ response: Option<Response>,
+ response_ready: bool,
+ response_waker: Option<std::task::Waker>,
+ trailers: Rc<RefCell<Option<HeaderMap>>>,
+ been_dropped: bool,
+}
+
+pub struct HttpRecord(RefCell<HttpRecordInner>);
+
+#[cfg(feature = "__http_tracing")]
+pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
+ std::sync::atomic::AtomicUsize::new(0);
+
+#[cfg(feature = "__http_tracing")]
+impl Drop for HttpRecord {
+ fn drop(&mut self) {
+ let count = RECORD_COUNT
+ .fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
+ .checked_sub(1)
+ .expect("Count went below zero");
+ println!("HTTP count={count}: HttpRecord::drop");
+ }
+}
+
+impl HttpRecord {
+ fn new(request: Request, request_info: HttpConnectionProperties) -> Rc<Self> {
+ #[cfg(feature = "__http_tracing")]
+ {
+ RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+ }
+ let (request_parts, request_body) = request.into_parts();
+ let body = ResponseBytes::default();
+ let trailers = body.trailers();
+ let request_body = Some(request_body.into());
+ let inner = HttpRecordInner {
+ request_info,
+ request_parts,
+ request_body,
+ response: Some(Response::new(body)),
+ response_ready: false,
+ response_waker: None,
+ trailers,
+ been_dropped: false,
+ };
+ #[allow(clippy::let_and_return)]
+ let record = Rc::new(Self(RefCell::new(inner)));
+ http_trace!(record, "HttpRecord::new");
+ record
+ }
+
+ fn self_ref(&self) -> Ref<'_, HttpRecordInner> {
+ self.0.borrow()
+ }
+
+ fn self_mut(&self) -> RefMut<'_, HttpRecordInner> {
+ self.0.borrow_mut()
+ }
+
+ /// Perform the Hyper upgrade on this record.
+ pub fn upgrade(&self) -> Result<OnUpgrade, AnyError> {
+ // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
+ self
+ .self_mut()
+ .request_parts
+ .extensions
+ .remove::<OnUpgrade>()
+ .ok_or_else(|| AnyError::msg("upgrade unavailable"))
+ }
+
+ /// Take the Hyper body from this record.
+ pub fn take_body(&self) -> Option<Incoming> {
+ let body_holder = &mut self.self_mut().request_body;
+ let body = body_holder.take();
+ match body {
+ Some(RequestBodyState::Incoming(body)) => Some(body),
+ x => {
+ *body_holder = x;
+ None
+ }
+ }
+ }
+
+ pub fn take_resource(&self) -> Option<HttpRequestBodyAutocloser> {
+ let body_holder = &mut self.self_mut().request_body;
+ let body = body_holder.take();
+ match body {
+ Some(RequestBodyState::Resource(res)) => Some(res),
+ x => {
+ *body_holder = x;
+ None
+ }
+ }
+ }
+
+ /// Replace the request body with a resource ID and the OpState we'll need to shut it down.
+ /// We cannot keep just the resource itself, as JS code might be reading from the resource ID
+ /// to generate the response data (requiring us to keep it in the resource table).
+ pub fn put_resource(&self, res: HttpRequestBodyAutocloser) {
+ self.self_mut().request_body = Some(RequestBodyState::Resource(res));
+ }
+
+ /// Cleanup resources not needed after the future is dropped.
+ fn cancel(self: Rc<Self>) {
+ http_trace!(self, "HttpRecord::cancel");
+ let mut inner = self.0.borrow_mut();
+ inner.been_dropped = true;
+ // The request body might include actual resources.
+ inner.request_body.take();
+ }
+
+ /// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well).
+ pub fn complete(self: Rc<Self>) {
+ http_trace!(self, "HttpRecord::complete");
+ let mut inner = self.self_mut();
+ assert!(
+ !inner.been_dropped || Rc::strong_count(&self) == 1,
+ "HTTP state error: Expected to be last strong reference (been_dropped)"
+ );
+ assert!(
+ !inner.response_ready,
+ "HTTP state error: Entry has already been completed"
+ );
+ inner.response_ready = true;
+ if let Some(waker) = inner.response_waker.take() {
+ drop(inner);
+ waker.wake();
+ }
+ }
+
+ /// Has the future for this record been dropped? ie, has the underlying TCP connection
+ /// been closed?
+ pub fn cancelled(&self) -> bool {
+ self.self_ref().been_dropped
+ }
+
+ /// Get a mutable reference to the response.
+ pub fn response(&self) -> RefMut<'_, Response> {
+ RefMut::map(self.self_mut(), |inner| inner.response.as_mut().unwrap())
+ }
+
+ /// Get a mutable reference to the trailers.
+ pub fn trailers(&self) -> Ref<'_, Rc<RefCell<Option<HeaderMap>>>> {
+ Ref::map(self.self_ref(), |inner| &inner.trailers)
+ }
+
+ /// Take the response.
+ fn take_response(&self) -> Response {
+ self.self_mut().response.take().unwrap()
+ }
+
+ /// Get a reference to the connection properties.
+ pub fn request_info(&self) -> Ref<'_, HttpConnectionProperties> {
+ Ref::map(self.self_ref(), |inner| &inner.request_info)
+ }
+
+ /// Get a reference to the request parts.
+ pub fn request_parts(&self) -> Ref<'_, Parts> {
+ Ref::map(self.self_ref(), |inner| &inner.request_parts)
+ }
+
+ /// Get a reference to the completion handle.
+ fn response_ready(&self) -> impl Future<Output = ()> + '_ {
+ struct HttpRecordComplete<'a>(&'a HttpRecord);
+
+ impl<'a> Future for HttpRecordComplete<'a> {
+ type Output = ();
+
+ fn poll(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Self::Output> {
+ let mut mut_self = self.0 .0.borrow_mut();
+ if mut_self.response_ready {
+ return std::task::Poll::Ready(());
+ }
+ mut_self.response_waker = Some(cx.waker().clone());
+ std::task::Poll::Pending
+ }
+ }
+
+ HttpRecordComplete(self)
+ }
+
+ /// Get a reference to the response body completion handle.
+ pub fn body_promise(&self) -> CompletionHandle {
+ self
+ .self_ref()
+ .response
+ .as_ref()
+ .unwrap()
+ .body()
+ .completion_handle()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::hyper_util_tokioio::TokioIo;
+ use crate::response_body::Compression;
+ use crate::response_body::ResponseBytesInner;
+ use bytes::Buf;
+ use deno_net::raw::NetworkStreamType;
+ use hyper1::body::Body;
+ use hyper1::service::service_fn;
+ use hyper1::service::HttpService;
+ use std::error::Error as StdError;
+
+ /// Execute client request on service and concurrently map the response.
+ async fn serve_request<B, S, T, F>(
+ req: http::Request<B>,
+ service: S,
+ map_response: impl FnOnce(hyper1::Response<Incoming>) -> F,
+ ) -> hyper1::Result<T>
+ where
+ B: Body + Send + 'static, // Send bound due to DuplexStream
+ B::Data: Send,
+ B::Error: Into<Box<dyn StdError + Send + Sync>>,
+ S: HttpService<Incoming>,
+ S::Error: Into<Box<dyn StdError + Send + Sync>>,
+ S::ResBody: 'static,
+ <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
+ F: std::future::Future<Output = hyper1::Result<T>>,
+ {
+ use hyper1::client::conn::http1::handshake;
+ use hyper1::server::conn::http1::Builder;
+ let (stream_client, stream_server) = tokio::io::duplex(16 * 1024);
+ let conn_server =
+ Builder::new().serve_connection(TokioIo::new(stream_server), service);
+ let (mut sender, conn_client) =
+ handshake(TokioIo::new(stream_client)).await?;
+
+ let (res, _, _) = tokio::try_join!(
+ async move {
+ let res = sender.send_request(req).await?;
+ map_response(res).await
+ },
+ conn_server,
+ conn_client,
+ )?;
+ Ok(res)
+ }
+
+ #[tokio::test]
+ async fn test_handle_request() -> Result<(), AnyError> {
+ let (tx, mut rx) = tokio::sync::mpsc::channel(10);
+ let refcount = RefCount::default();
+ let refcount_check = refcount.clone();
+ let request_info = HttpConnectionProperties {
+ peer_address: "".into(),
+ peer_port: None,
+ local_port: None,
+ stream_type: NetworkStreamType::Tcp,
+ };
+ let svc = service_fn(move |req: hyper1::Request<Incoming>| {
+ handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
+ });
+
+ let client_req = http::Request::builder().uri("/").body("".to_string())?;
+
+ // Response produced by concurrent tasks
+ tokio::try_join!(
+ async move {
+ // JavaScript handler produces response
+ let record = rx.recv().await.unwrap();
+ let resource = record.take_resource();
+ record.response().body_mut().initialize(
+ ResponseBytesInner::from_vec(
+ Compression::None,
+ b"hello world".to_vec(),
+ ),
+ resource,
+ );
+ record.complete();
+ Ok(())
+ },
+ // Server connection executes service
+ async move {
+ serve_request(client_req, svc, |res| async {
+ // Client reads the response
+ use http_body_util::BodyExt;
+ assert_eq!(res.status(), 200);
+ let body = res.collect().await?.to_bytes();
+ assert_eq!(body.chunk(), b"hello world");
+ Ok(())
+ })
+ .await
+ },
+ )?;
+ assert_eq!(Rc::strong_count(&refcount_check.0), 1);
+ Ok(())
+ }
+}
diff --git a/ext/http/slab.rs b/ext/http/slab.rs
deleted file mode 100644
index 790b4649a..000000000
--- a/ext/http/slab.rs
+++ /dev/null
@@ -1,374 +0,0 @@
-// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
-use crate::request_properties::HttpConnectionProperties;
-use crate::response_body::CompletionHandle;
-use crate::response_body::ResponseBytes;
-use deno_core::error::AnyError;
-use deno_core::OpState;
-use deno_core::ResourceId;
-use http::request::Parts;
-use http::HeaderMap;
-use hyper1::body::Incoming;
-use hyper1::upgrade::OnUpgrade;
-
-use scopeguard::defer;
-use slab::Slab;
-use std::cell::RefCell;
-use std::cell::RefMut;
-use std::ptr::NonNull;
-use std::rc::Rc;
-
-pub type Request = hyper1::Request<Incoming>;
-pub type Response = hyper1::Response<ResponseBytes>;
-pub type SlabId = u32;
-
-#[repr(transparent)]
-#[derive(Clone, Default)]
-pub struct RefCount(pub Rc<()>);
-
-enum RequestBodyState {
- Incoming(Incoming),
- Resource(HttpRequestBodyAutocloser),
-}
-
-impl From<Incoming> for RequestBodyState {
- fn from(value: Incoming) -> Self {
- RequestBodyState::Incoming(value)
- }
-}
-
-/// Ensures that the request body closes itself when no longer needed.
-pub struct HttpRequestBodyAutocloser(ResourceId, Rc<RefCell<OpState>>);
-
-impl HttpRequestBodyAutocloser {
- pub fn new(res: ResourceId, op_state: Rc<RefCell<OpState>>) -> Self {
- Self(res, op_state)
- }
-}
-
-impl Drop for HttpRequestBodyAutocloser {
- fn drop(&mut self) {
- if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) {
- res.close();
- }
- }
-}
-
-pub async fn new_slab_future(
- request: Request,
- request_info: HttpConnectionProperties,
- refcount: RefCount,
- tx: tokio::sync::mpsc::Sender<SlabId>,
-) -> Result<Response, hyper::Error> {
- let index = slab_insert(request, request_info, refcount);
- defer! {
- slab_drop(index);
- }
- let rx = slab_get(index).promise();
- if tx.send(index).await.is_ok() {
- http_trace!(index, "SlabFuture await");
- // We only need to wait for completion if we aren't closed
- rx.await;
- http_trace!(index, "SlabFuture complete");
- }
- let response = slab_get(index).take_response();
- Ok(response)
-}
-
-pub struct HttpSlabRecord {
- request_info: HttpConnectionProperties,
- request_parts: Parts,
- request_body: Option<RequestBodyState>,
- /// The response may get taken before we tear this down
- response: Option<Response>,
- promise: CompletionHandle,
- trailers: Rc<RefCell<Option<HeaderMap>>>,
- been_dropped: bool,
- /// Use a `Rc` to keep track of outstanding requests. We don't use this, but
- /// when it drops, it decrements the refcount of the server itself.
- refcount: Option<RefCount>,
- #[cfg(feature = "__zombie_http_tracking")]
- alive: bool,
-}
-
-thread_local! {
- pub(crate) static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) };
-}
-
-macro_rules! http_trace {
- ($index:expr, $args:tt) => {
- #[cfg(feature = "__http_tracing")]
- {
- let total = $crate::slab::SLAB.with(|x| x.try_borrow().map(|x| x.len()));
- if let Ok(total) = total {
- println!("HTTP id={} total={}: {}", $index, total, format!($args));
- } else {
- println!("HTTP id={} total=?: {}", $index, format!($args));
- }
- }
- };
-}
-
-pub(crate) use http_trace;
-
-/// Hold a lock on the slab table and a reference to one entry in the table.
-pub struct SlabEntry(
- NonNull<HttpSlabRecord>,
- SlabId,
- RefMut<'static, Slab<HttpSlabRecord>>,
-);
-
-const SLAB_CAPACITY: usize = 1024;
-
-pub fn slab_init() {
- SLAB.with(|slab: &RefCell<Slab<HttpSlabRecord>>| {
- // Note that there might already be an active HTTP server, so this may just
- // end up adding room for an additional SLAB_CAPACITY items. All HTTP servers
- // on a single thread share the same slab.
- let mut slab = slab.borrow_mut();
- slab.reserve(SLAB_CAPACITY);
- })
-}
-
-pub fn slab_get(index: SlabId) -> SlabEntry {
- http_trace!(index, "slab_get");
- let mut lock: RefMut<'static, Slab<HttpSlabRecord>> = SLAB.with(|x| {
- // SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static
- unsafe { std::mem::transmute(x.borrow_mut()) }
- });
- let Some(entry) = lock.get_mut(index as usize) else {
- panic!("HTTP state error: Attempted to access invalid request {} ({} in total available)",
- index,
- lock.len())
- };
- #[cfg(feature = "__zombie_http_tracking")]
- {
- assert!(entry.alive, "HTTP state error: Entry is not alive");
- }
- let entry = NonNull::new(entry as _).unwrap();
-
- SlabEntry(entry, index, lock)
-}
-
-#[allow(clippy::let_and_return)]
-fn slab_insert_raw(
- request_parts: Parts,
- request_body: Option<Incoming>,
- request_info: HttpConnectionProperties,
- refcount: RefCount,
-) -> SlabId {
- let index = SLAB.with(|slab| {
- let mut slab = slab.borrow_mut();
- let body = ResponseBytes::default();
- let trailers = body.trailers();
- let request_body = request_body.map(|r| r.into());
- slab.insert(HttpSlabRecord {
- request_info,
- request_parts,
- request_body,
- response: Some(Response::new(body)),
- trailers,
- been_dropped: false,
- promise: CompletionHandle::default(),
- refcount: Some(refcount),
- #[cfg(feature = "__zombie_http_tracking")]
- alive: true,
- })
- }) as u32;
- http_trace!(index, "slab_insert");
- index
-}
-
-pub fn slab_insert(
- request: Request,
- request_info: HttpConnectionProperties,
- refcount: RefCount,
-) -> SlabId {
- let (request_parts, request_body) = request.into_parts();
- slab_insert_raw(request_parts, Some(request_body), request_info, refcount)
-}
-
-pub fn slab_drop(index: SlabId) {
- http_trace!(index, "slab_drop");
- let mut entry = slab_get(index);
- let record = entry.self_mut();
- assert!(
- !record.been_dropped,
- "HTTP state error: Entry has already been dropped"
- );
-
- // The logic here is somewhat complicated. A slab record cannot be expunged until it has been dropped by Rust AND
- // the promise has been completed (indicating that JavaScript is done processing). However, if Rust has finished
- // dealing with this entry, we DO want to clean up some of the associated items -- namely the request body, which
- // might include actual resources, and the refcount, which is keeping the server alive.
- record.been_dropped = true;
- if record.promise.is_completed() {
- drop(entry);
- slab_expunge(index);
- } else {
- // Take the request body, as the future has been dropped and this will allow some resources to close
- record.request_body.take();
- // Take the refcount keeping the server alive. The future is no longer alive, which means this request
- // is toast.
- record.refcount.take();
- }
-}
-
-fn slab_expunge(index: SlabId) {
- SLAB.with(|slab| {
- #[cfg(__zombie_http_tracking)]
- {
- slab.borrow_mut().get_mut(index as usize).unwrap().alive = false;
- }
- #[cfg(not(__zombie_http_tracking))]
- {
- slab.borrow_mut().remove(index as usize);
- }
- });
- http_trace!(index, "slab_expunge");
-}
-
-impl SlabEntry {
- fn self_ref(&self) -> &HttpSlabRecord {
- // SAFETY: We have the lock and we're borrowing lifetime from self
- unsafe { self.0.as_ref() }
- }
-
- fn self_mut(&mut self) -> &mut HttpSlabRecord {
- // SAFETY: We have the lock and we're borrowing lifetime from self
- unsafe { self.0.as_mut() }
- }
-
- /// Perform the Hyper upgrade on this entry.
- pub fn upgrade(&mut self) -> Result<OnUpgrade, AnyError> {
- // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
- self
- .self_mut()
- .request_parts
- .extensions
- .remove::<OnUpgrade>()
- .ok_or_else(|| AnyError::msg("upgrade unavailable"))
- }
-
- /// Take the Hyper body from this entry.
- pub fn take_body(&mut self) -> Option<Incoming> {
- let body_holder = &mut self.self_mut().request_body;
- let body = body_holder.take();
- match body {
- Some(RequestBodyState::Incoming(body)) => Some(body),
- x => {
- *body_holder = x;
- None
- }
- }
- }
-
- pub fn take_resource(&mut self) -> Option<HttpRequestBodyAutocloser> {
- let body_holder = &mut self.self_mut().request_body;
- let body = body_holder.take();
- match body {
- Some(RequestBodyState::Resource(res)) => Some(res),
- x => {
- *body_holder = x;
- None
- }
- }
- }
-
- /// Replace the request body with a resource ID and the OpState we'll need to shut it down.
- /// We cannot keep just the resource itself, as JS code might be reading from the resource ID
- /// to generate the response data (requiring us to keep it in the resource table).
- pub fn put_resource(&mut self, res: HttpRequestBodyAutocloser) {
- self.self_mut().request_body = Some(RequestBodyState::Resource(res));
- }
-
- /// Complete this entry, potentially expunging it if it is fully complete (ie: dropped as well).
- pub fn complete(self) {
- let promise = &self.self_ref().promise;
- assert!(
- !promise.is_completed(),
- "HTTP state error: Entry has already been completed"
- );
- http_trace!(self.1, "SlabEntry::complete");
- promise.complete(true);
- // If we're all done, we need to drop ourself to release the lock before we expunge this record
- if self.self_ref().been_dropped {
- let index = self.1;
- drop(self);
- slab_expunge(index);
- }
- }
-
- /// Has the future for this entry been dropped? ie, has the underlying TCP connection
- /// been closed?
- pub fn cancelled(&self) -> bool {
- self.self_ref().been_dropped
- }
-
- /// Get a mutable reference to the response.
- pub fn response(&mut self) -> &mut Response {
- self.self_mut().response.as_mut().unwrap()
- }
-
- /// Get a mutable reference to the trailers.
- pub fn trailers(&mut self) -> &RefCell<Option<HeaderMap>> {
- &self.self_mut().trailers
- }
-
- /// Take the response.
- pub fn take_response(&mut self) -> Response {
- self.self_mut().response.take().unwrap()
- }
-
- /// Get a reference to the connection properties.
- pub fn request_info(&self) -> &HttpConnectionProperties {
- &self.self_ref().request_info
- }
-
- /// Get a reference to the request parts.
- pub fn request_parts(&self) -> &Parts {
- &self.self_ref().request_parts
- }
-
- /// Get a reference to the completion handle.
- pub fn promise(&self) -> CompletionHandle {
- self.self_ref().promise.clone()
- }
-
- /// Get a reference to the response body completion handle.
- pub fn body_promise(&self) -> CompletionHandle {
- self
- .self_ref()
- .response
- .as_ref()
- .unwrap()
- .body()
- .completion_handle()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use deno_net::raw::NetworkStreamType;
- use http::Request;
-
- #[test]
- fn test_slab() {
- let req = Request::builder().body(()).unwrap();
- let (parts, _) = req.into_parts();
- let id = slab_insert_raw(
- parts,
- None,
- HttpConnectionProperties {
- peer_address: "".into(),
- peer_port: None,
- local_port: None,
- stream_type: NetworkStreamType::Tcp,
- },
- RefCount::default(),
- );
- let entry = slab_get(id);
- entry.complete();
- slab_drop(id);
- }
-}