summaryrefslogtreecommitdiff
path: root/ext/http/http_next.rs
diff options
context:
space:
mode:
authorLaurence Rowe <l@lrowe.co.uk>2023-11-13 06:04:49 -0800
committerGitHub <noreply@github.com>2023-11-13 07:04:49 -0700
commit542314a0becbba120dbee13b3f410f647b4c9cb7 (patch)
treed552d924bb68e0d5ae245cf9b65f87187cce4555 /ext/http/http_next.rs
parent1ef617e8f3d48098e69e222b6eb6fe981aeca1c3 (diff)
refactor(ext/http): refer to HttpRecord directly using v8::External (#20770)
Makes the JavaScript Request use a v8:External opaque pointer to directly refer to the Rust HttpRecord. The HttpRecord is now reference counted. To avoid leaks the strong count is checked at request completion. Performance seems unchanged on the minimal benchmark. 118614 req/s this branch vs 118564 req/s on main, but variance between runs on my laptop is pretty high. --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r--ext/http/http_next.rs230
1 files changed, 153 insertions, 77 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 7fc396b38..399515159 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -10,12 +10,11 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
-use crate::slab::new_slab_future;
-use crate::slab::slab_get;
-use crate::slab::slab_init;
-use crate::slab::HttpRequestBodyAutocloser;
-use crate::slab::RefCount;
-use crate::slab::SlabId;
+use crate::service::handle_request;
+use crate::service::http_trace;
+use crate::service::HttpRecord;
+use crate::service::HttpRequestBodyAutocloser;
+use crate::service::RefCount;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
use cache_control::CacheControl;
@@ -33,6 +32,7 @@ use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
+use deno_core::ExternalPointer;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
@@ -62,9 +62,11 @@ use once_cell::sync::Lazy;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::ffi::c_void;
use std::future::Future;
use std::io;
use std::pin::Pin;
+use std::ptr::null;
use std::rc::Rc;
use std::time::Duration;
@@ -116,14 +118,66 @@ impl<
{
}
+#[repr(transparent)]
+struct RcHttpRecord(Rc<HttpRecord>);
+
+// Temp copy
+/// Define an external type.
+macro_rules! external {
+ ($type:ident, $name:literal) => {
+ impl deno_core::Externalizable for $type {
+ fn external_marker() -> usize {
+ // Use the address of a static mut as a way to get around lack of usize-sized TypeId. Because it is mutable, the
+ // compiler cannot collapse multiple definitions into one.
+ static mut DEFINITION: deno_core::ExternalDefinition =
+ deno_core::ExternalDefinition::new($name);
+ // Wash the pointer through black_box so the compiler cannot see what we're going to do with it and needs
+ // to assume it will be used for valid purposes.
+ // SAFETY: temporary while waiting on deno core bump
+ let ptr = std::hint::black_box(unsafe { &mut DEFINITION } as *mut _);
+ ptr as usize
+ }
+
+ fn external_name() -> &'static str {
+ $name
+ }
+ }
+ };
+}
+
+// Register the [`HttpRecord`] as an external.
+external!(RcHttpRecord, "http record");
+
+/// Construct Rc<HttpRecord> from raw external pointer, consuming
+/// refcount. You must make sure the external is deleted on the JS side.
+macro_rules! take_external {
+ ($external:expr, $args:tt) => {{
+ let ptr = ExternalPointer::<RcHttpRecord>::from_raw($external);
+ let record = ptr.unsafely_take().0;
+ http_trace!(record, $args);
+ record
+ }};
+}
+
+/// Clone Rc<HttpRecord> from raw external pointer.
+macro_rules! clone_external {
+ ($external:expr, $args:tt) => {{
+ let ptr = ExternalPointer::<RcHttpRecord>::from_raw($external);
+ ptr.unsafely_deref().0.clone()
+ }};
+}
+
#[op2(fast)]
#[smi]
pub fn op_http_upgrade_raw(
state: &mut OpState,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> Result<ResourceId, AnyError> {
+ // SAFETY: external is deleted before calling this op.
+ let http = unsafe { take_external!(external, "op_http_upgrade_raw") };
+
// Stage 1: extract the upgrade future
- let upgrade = slab_get(slab_id).upgrade()?;
+ let upgrade = http.upgrade()?;
let (read, write) = tokio::io::duplex(1024);
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
@@ -137,7 +191,6 @@ pub fn op_http_upgrade_raw(
match upgrade_stream.write(&buf[..read]) {
Ok(None) => continue,
Ok(Some((response, bytes))) => {
- let mut http = slab_get(slab_id);
*http.response() = response;
http.complete();
let mut upgraded = TokioIo::new(upgrade.await?);
@@ -188,20 +241,23 @@ pub fn op_http_upgrade_raw(
#[smi]
pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<ResourceId, AnyError> {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_upgrade_websocket_next") };
// Stage 1: set the response to 101 Switching Protocols and send it
let upgrade = http.upgrade()?;
-
- let response = http.response();
- *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
- for (name, value) in headers {
- response.headers_mut().append(
- HeaderName::from_bytes(&name).unwrap(),
- HeaderValue::from_bytes(&value).unwrap(),
- );
+ {
+ let mut response = http.response();
+ *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
+ for (name, value) in headers {
+ response.headers_mut().append(
+ HeaderName::from_bytes(&name).unwrap(),
+ HeaderValue::from_bytes(&value).unwrap(),
+ );
+ }
}
http.complete();
@@ -214,8 +270,10 @@ pub async fn op_http_upgrade_websocket_next(
}
#[op2(fast)]
-pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) {
- let mut http = slab_get(slab_id);
+pub fn op_http_set_promise_complete(external: *const c_void, status: u16) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_set_promise_complete") };
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
@@ -227,16 +285,18 @@ pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) {
#[op2]
pub fn op_http_get_request_method_and_url<'scope, HTTP>(
scope: &mut v8::HandleScope<'scope>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> v8::Local<'scope, v8::Array>
where
HTTP: HttpPropertyExtractor,
{
- let http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_method_and_url") };
let request_info = http.request_info();
let request_parts = http.request_parts();
let request_properties = HTTP::request_properties(
- request_info,
+ &request_info,
&request_parts.uri,
&request_parts.headers,
);
@@ -291,20 +351,25 @@ where
#[op2]
#[serde]
pub fn op_http_get_request_header(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[string] name: String,
) -> Option<ByteString> {
- let http = slab_get(slab_id);
- let value = http.request_parts().headers.get(name);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_header") };
+ let request_parts = http.request_parts();
+ let value = request_parts.headers.get(name);
value.map(|value| value.as_bytes().into())
}
#[op2]
pub fn op_http_get_request_headers<'scope>(
scope: &mut v8::HandleScope<'scope>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> v8::Local<'scope, v8::Array> {
- let http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_headers") };
let headers = &http.request_parts().headers;
// Two slots for each header key/value pair
let mut vec: SmallVec<[v8::Local<v8::Value>; 32]> =
@@ -371,9 +436,11 @@ pub fn op_http_get_request_headers<'scope>(
#[smi]
pub fn op_http_read_request_body(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> ResourceId {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_read_request_body") };
let rid = if let Some(incoming) = http.take_body() {
let body_resource = Rc::new(HttpRequestBody::new(incoming));
state.borrow_mut().resource_table.add_rc(body_resource)
@@ -388,12 +455,15 @@ pub fn op_http_read_request_body(
#[op2(fast)]
pub fn op_http_set_response_header(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[string(onebyte)] name: Cow<[u8]>,
#[string(onebyte)] value: Cow<[u8]>,
) {
- let mut http = slab_get(slab_id);
- let resp_headers = http.response().headers_mut();
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_header") };
+ let mut response = http.response();
+ let resp_headers = response.headers_mut();
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = match value {
@@ -409,12 +479,15 @@ pub fn op_http_set_response_header(
#[op2]
pub fn op_http_set_response_headers(
scope: &mut v8::HandleScope,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
headers: v8::Local<v8::Array>,
) {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_headers") };
// TODO(mmastrac): Invalid headers should be handled?
- let resp_headers = http.response().headers_mut();
+ let mut response = http.response();
+ let resp_headers = response.headers_mut();
let len = headers.length();
let header_len = len * 2;
@@ -438,10 +511,12 @@ pub fn op_http_set_response_headers(
#[op2]
pub fn op_http_set_response_trailers(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[serde] trailers: Vec<(ByteString, ByteString)>,
) {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_trailers") };
let mut trailer_map: HeaderMap = HeaderMap::with_capacity(trailers.len());
for (name, value) in trailers {
// These are valid latin-1 strings
@@ -577,20 +652,21 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
/// Sets the appropriate response body. Use `force_instantiate_body` if you need
/// to ensure that the response is cleaned up correctly (eg: for resources).
fn set_response(
- slab_id: SlabId,
+ external: *const c_void,
length: Option<usize>,
status: u16,
force_instantiate_body: bool,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
- let mut http = slab_get(slab_id);
+ // SAFETY: external is deleted before calling this op.
+ let http = unsafe { take_external!(external, "set_response") };
// The request may have been cancelled by this point and if so, there's no need for us to
// do all of this work to send the response.
if !http.cancelled() {
let resource = http.take_resource();
let compression =
is_request_compressible(length, &http.request_parts().headers);
- let response = http.response();
+ let mut response = http.response();
let compression =
modify_compressibility_from_response(compression, response.headers_mut());
response
@@ -612,7 +688,7 @@ fn set_response(
#[op2(fast)]
pub fn op_http_set_response_body_resource(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
@@ -634,7 +710,7 @@ pub fn op_http_set_response_body_resource(
};
set_response(
- slab_id,
+ external,
resource.size_hint().1.map(|s| s as usize),
status,
true,
@@ -648,41 +724,42 @@ pub fn op_http_set_response_body_resource(
#[op2(fast)]
pub fn op_http_set_response_body_text(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[string] text: String,
status: u16,
) {
if !text.is_empty() {
- set_response(slab_id, Some(text.len()), status, false, |compression| {
+ set_response(external, Some(text.len()), status, false, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
} else {
- op_http_set_promise_complete::call(slab_id, status);
+ op_http_set_promise_complete::call(external, status);
}
}
#[op2(fast)]
pub fn op_http_set_response_body_bytes(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[buffer] buffer: JsBuffer,
status: u16,
) {
if !buffer.is_empty() {
- set_response(slab_id, Some(buffer.len()), status, false, |compression| {
+ set_response(external, Some(buffer.len()), status, false, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
} else {
- op_http_set_promise_complete::call(slab_id, status);
+ op_http_set_promise_complete::call(external, status);
}
}
#[op2(async)]
pub async fn op_http_track(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[smi] server_rid: ResourceId,
) -> Result<(), AnyError> {
- let http = slab_get(slab_id);
+ // SAFETY: op is called with external.
+ let http = unsafe { clone_external!(external, "op_http_track") };
let handle = http.body_promise();
let join_handle = state
@@ -764,7 +841,7 @@ fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
- tx: tokio::sync::mpsc::Sender<SlabId>,
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> {
let HttpLifetime {
refcount,
@@ -773,7 +850,7 @@ fn serve_https(
} = lifetime;
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
+ handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
async {
@@ -801,7 +878,7 @@ fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
- tx: tokio::sync::mpsc::Sender<SlabId>,
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> {
let HttpLifetime {
refcount,
@@ -810,7 +887,7 @@ fn serve_http(
} = lifetime;
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
+ handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
serve_http2_autodetect(io, svc, listen_cancel_handle)
@@ -822,7 +899,7 @@ fn serve_http_on<HTTP>(
connection: HTTP::Connection,
listen_properties: &HttpListenProperties,
lifetime: HttpLifetime,
- tx: tokio::sync::mpsc::Sender<SlabId>,
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>>
where
HTTP: HttpPropertyExtractor,
@@ -857,12 +934,12 @@ struct HttpJoinHandle {
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
- rx: AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
+ rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
refcount: RefCount,
}
impl HttpJoinHandle {
- fn new(rx: tokio::sync::mpsc::Receiver<SlabId>) -> Self {
+ fn new(rx: tokio::sync::mpsc::Receiver<Rc<HttpRecord>>) -> Self {
Self {
join_handle: AsyncRefCell::new(None),
connection_cancel_handle: CancelHandle::new_rc(),
@@ -918,8 +995,6 @@ pub fn op_http_serve<HTTP>(
where
HTTP: HttpPropertyExtractor,
{
- slab_init();
-
let listener =
HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?;
@@ -969,8 +1044,6 @@ pub fn op_http_serve_on<HTTP>(
where
HTTP: HttpPropertyExtractor,
{
- slab_init();
-
let connection =
HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?;
@@ -1000,36 +1073,38 @@ where
}
/// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything
-/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error.
+/// goes wrong in this method we return null and let the async handler pick up the real error.
#[op2(fast)]
-#[smi]
-pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
+pub fn op_http_try_wait(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+) -> *const c_void {
// The resource needs to exist.
let Ok(join_handle) = state.resource_table.get::<HttpJoinHandle>(rid) else {
- return SlabId::MAX;
+ return null();
};
// If join handle is somehow locked, just abort.
let Some(mut handle) =
RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut()
else {
- return SlabId::MAX;
+ return null();
};
// See if there are any requests waiting on this channel. If not, return.
- let Ok(id) = handle.try_recv() else {
- return SlabId::MAX;
+ let Ok(record) = handle.try_recv() else {
+ return null();
};
- id
+ let ptr = ExternalPointer::new(RcHttpRecord(record));
+ ptr.into_raw()
}
#[op2(async)]
-#[smi]
pub async fn op_http_wait(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<SlabId, AnyError> {
+) -> Result<*const c_void, AnyError> {
// We will get the join handle initially, as we might be consuming requests still
let join_handle = state
.borrow_mut()
@@ -1046,8 +1121,9 @@ pub async fn op_http_wait(
.await;
// Do we have a request?
- if let Some(req) = next {
- return Ok(req);
+ if let Some(record) = next {
+ let ptr = ExternalPointer::new(RcHttpRecord(record));
+ return Ok(ptr.into_raw());
}
// No - we're shutting down
@@ -1063,14 +1139,14 @@ pub async fn op_http_wait(
if let Some(err) = err.source() {
if let Some(err) = err.downcast_ref::<io::Error>() {
if err.kind() == io::ErrorKind::NotConnected {
- return Ok(SlabId::MAX);
+ return Ok(null());
}
}
}
return Err(err);
}
- Ok(SlabId::MAX)
+ Ok(null())
}
/// Cancels the HTTP handle.