summaryrefslogtreecommitdiff
path: root/ext/http/http_next.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r--ext/http/http_next.rs230
1 files changed, 153 insertions, 77 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 7fc396b38..399515159 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -10,12 +10,11 @@ use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
-use crate::slab::new_slab_future;
-use crate::slab::slab_get;
-use crate::slab::slab_init;
-use crate::slab::HttpRequestBodyAutocloser;
-use crate::slab::RefCount;
-use crate::slab::SlabId;
+use crate::service::handle_request;
+use crate::service::http_trace;
+use crate::service::HttpRecord;
+use crate::service::HttpRequestBodyAutocloser;
+use crate::service::RefCount;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
use cache_control::CacheControl;
@@ -33,6 +32,7 @@ use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
+use deno_core::ExternalPointer;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
@@ -62,9 +62,11 @@ use once_cell::sync::Lazy;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::cell::RefCell;
+use std::ffi::c_void;
use std::future::Future;
use std::io;
use std::pin::Pin;
+use std::ptr::null;
use std::rc::Rc;
use std::time::Duration;
@@ -116,14 +118,66 @@ impl<
{
}
+#[repr(transparent)]
+struct RcHttpRecord(Rc<HttpRecord>);
+
+// Temp copy
+/// Define an external type.
+macro_rules! external {
+ ($type:ident, $name:literal) => {
+ impl deno_core::Externalizable for $type {
+ fn external_marker() -> usize {
+ // Use the address of a static mut as a way to get around lack of usize-sized TypeId. Because it is mutable, the
+ // compiler cannot collapse multiple definitions into one.
+ static mut DEFINITION: deno_core::ExternalDefinition =
+ deno_core::ExternalDefinition::new($name);
+ // Wash the pointer through black_box so the compiler cannot see what we're going to do with it and needs
+ // to assume it will be used for valid purposes.
+ // SAFETY: temporary while waiting on deno core bump
+ let ptr = std::hint::black_box(unsafe { &mut DEFINITION } as *mut _);
+ ptr as usize
+ }
+
+ fn external_name() -> &'static str {
+ $name
+ }
+ }
+ };
+}
+
+// Register the [`HttpRecord`] as an external.
+external!(RcHttpRecord, "http record");
+
+/// Construct Rc<HttpRecord> from raw external pointer, consuming
+/// refcount. You must make sure the external is deleted on the JS side.
+macro_rules! take_external {
+ ($external:expr, $args:tt) => {{
+ let ptr = ExternalPointer::<RcHttpRecord>::from_raw($external);
+ let record = ptr.unsafely_take().0;
+ http_trace!(record, $args);
+ record
+ }};
+}
+
+/// Clone Rc<HttpRecord> from raw external pointer.
+macro_rules! clone_external {
+ ($external:expr, $args:tt) => {{
+ let ptr = ExternalPointer::<RcHttpRecord>::from_raw($external);
+ ptr.unsafely_deref().0.clone()
+ }};
+}
+
#[op2(fast)]
#[smi]
pub fn op_http_upgrade_raw(
state: &mut OpState,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> Result<ResourceId, AnyError> {
+ // SAFETY: external is deleted before calling this op.
+ let http = unsafe { take_external!(external, "op_http_upgrade_raw") };
+
// Stage 1: extract the upgrade future
- let upgrade = slab_get(slab_id).upgrade()?;
+ let upgrade = http.upgrade()?;
let (read, write) = tokio::io::duplex(1024);
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
@@ -137,7 +191,6 @@ pub fn op_http_upgrade_raw(
match upgrade_stream.write(&buf[..read]) {
Ok(None) => continue,
Ok(Some((response, bytes))) => {
- let mut http = slab_get(slab_id);
*http.response() = response;
http.complete();
let mut upgraded = TokioIo::new(upgrade.await?);
@@ -188,20 +241,23 @@ pub fn op_http_upgrade_raw(
#[smi]
pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<ResourceId, AnyError> {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_upgrade_websocket_next") };
// Stage 1: set the response to 101 Switching Protocols and send it
let upgrade = http.upgrade()?;
-
- let response = http.response();
- *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
- for (name, value) in headers {
- response.headers_mut().append(
- HeaderName::from_bytes(&name).unwrap(),
- HeaderValue::from_bytes(&value).unwrap(),
- );
+ {
+ let mut response = http.response();
+ *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
+ for (name, value) in headers {
+ response.headers_mut().append(
+ HeaderName::from_bytes(&name).unwrap(),
+ HeaderValue::from_bytes(&value).unwrap(),
+ );
+ }
}
http.complete();
@@ -214,8 +270,10 @@ pub async fn op_http_upgrade_websocket_next(
}
#[op2(fast)]
-pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) {
- let mut http = slab_get(slab_id);
+pub fn op_http_set_promise_complete(external: *const c_void, status: u16) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_set_promise_complete") };
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
@@ -227,16 +285,18 @@ pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) {
#[op2]
pub fn op_http_get_request_method_and_url<'scope, HTTP>(
scope: &mut v8::HandleScope<'scope>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> v8::Local<'scope, v8::Array>
where
HTTP: HttpPropertyExtractor,
{
- let http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_method_and_url") };
let request_info = http.request_info();
let request_parts = http.request_parts();
let request_properties = HTTP::request_properties(
- request_info,
+ &request_info,
&request_parts.uri,
&request_parts.headers,
);
@@ -291,20 +351,25 @@ where
#[op2]
#[serde]
pub fn op_http_get_request_header(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[string] name: String,
) -> Option<ByteString> {
- let http = slab_get(slab_id);
- let value = http.request_parts().headers.get(name);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_header") };
+ let request_parts = http.request_parts();
+ let value = request_parts.headers.get(name);
value.map(|value| value.as_bytes().into())
}
#[op2]
pub fn op_http_get_request_headers<'scope>(
scope: &mut v8::HandleScope<'scope>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> v8::Local<'scope, v8::Array> {
- let http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_get_request_headers") };
let headers = &http.request_parts().headers;
// Two slots for each header key/value pair
let mut vec: SmallVec<[v8::Local<v8::Value>; 32]> =
@@ -371,9 +436,11 @@ pub fn op_http_get_request_headers<'scope>(
#[smi]
pub fn op_http_read_request_body(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
) -> ResourceId {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_read_request_body") };
let rid = if let Some(incoming) = http.take_body() {
let body_resource = Rc::new(HttpRequestBody::new(incoming));
state.borrow_mut().resource_table.add_rc(body_resource)
@@ -388,12 +455,15 @@ pub fn op_http_read_request_body(
#[op2(fast)]
pub fn op_http_set_response_header(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[string(onebyte)] name: Cow<[u8]>,
#[string(onebyte)] value: Cow<[u8]>,
) {
- let mut http = slab_get(slab_id);
- let resp_headers = http.response().headers_mut();
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_header") };
+ let mut response = http.response();
+ let resp_headers = response.headers_mut();
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = match value {
@@ -409,12 +479,15 @@ pub fn op_http_set_response_header(
#[op2]
pub fn op_http_set_response_headers(
scope: &mut v8::HandleScope,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
headers: v8::Local<v8::Array>,
) {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_headers") };
// TODO(mmastrac): Invalid headers should be handled?
- let resp_headers = http.response().headers_mut();
+ let mut response = http.response();
+ let resp_headers = response.headers_mut();
let len = headers.length();
let header_len = len * 2;
@@ -438,10 +511,12 @@ pub fn op_http_set_response_headers(
#[op2]
pub fn op_http_set_response_trailers(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[serde] trailers: Vec<(ByteString, ByteString)>,
) {
- let mut http = slab_get(slab_id);
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_trailers") };
let mut trailer_map: HeaderMap = HeaderMap::with_capacity(trailers.len());
for (name, value) in trailers {
// These are valid latin-1 strings
@@ -577,20 +652,21 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
/// Sets the appropriate response body. Use `force_instantiate_body` if you need
/// to ensure that the response is cleaned up correctly (eg: for resources).
fn set_response(
- slab_id: SlabId,
+ external: *const c_void,
length: Option<usize>,
status: u16,
force_instantiate_body: bool,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
- let mut http = slab_get(slab_id);
+ // SAFETY: external is deleted before calling this op.
+ let http = unsafe { take_external!(external, "set_response") };
// The request may have been cancelled by this point and if so, there's no need for us to
// do all of this work to send the response.
if !http.cancelled() {
let resource = http.take_resource();
let compression =
is_request_compressible(length, &http.request_parts().headers);
- let response = http.response();
+ let mut response = http.response();
let compression =
modify_compressibility_from_response(compression, response.headers_mut());
response
@@ -612,7 +688,7 @@ fn set_response(
#[op2(fast)]
pub fn op_http_set_response_body_resource(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
@@ -634,7 +710,7 @@ pub fn op_http_set_response_body_resource(
};
set_response(
- slab_id,
+ external,
resource.size_hint().1.map(|s| s as usize),
status,
true,
@@ -648,41 +724,42 @@ pub fn op_http_set_response_body_resource(
#[op2(fast)]
pub fn op_http_set_response_body_text(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[string] text: String,
status: u16,
) {
if !text.is_empty() {
- set_response(slab_id, Some(text.len()), status, false, |compression| {
+ set_response(external, Some(text.len()), status, false, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
} else {
- op_http_set_promise_complete::call(slab_id, status);
+ op_http_set_promise_complete::call(external, status);
}
}
#[op2(fast)]
pub fn op_http_set_response_body_bytes(
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[buffer] buffer: JsBuffer,
status: u16,
) {
if !buffer.is_empty() {
- set_response(slab_id, Some(buffer.len()), status, false, |compression| {
+ set_response(external, Some(buffer.len()), status, false, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
} else {
- op_http_set_promise_complete::call(slab_id, status);
+ op_http_set_promise_complete::call(external, status);
}
}
#[op2(async)]
pub async fn op_http_track(
state: Rc<RefCell<OpState>>,
- #[smi] slab_id: SlabId,
+ external: *const c_void,
#[smi] server_rid: ResourceId,
) -> Result<(), AnyError> {
- let http = slab_get(slab_id);
+ // SAFETY: op is called with external.
+ let http = unsafe { clone_external!(external, "op_http_track") };
let handle = http.body_promise();
let join_handle = state
@@ -764,7 +841,7 @@ fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
- tx: tokio::sync::mpsc::Sender<SlabId>,
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> {
let HttpLifetime {
refcount,
@@ -773,7 +850,7 @@ fn serve_https(
} = lifetime;
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
+ handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
async {
@@ -801,7 +878,7 @@ fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
- tx: tokio::sync::mpsc::Sender<SlabId>,
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>> {
let HttpLifetime {
refcount,
@@ -810,7 +887,7 @@ fn serve_http(
} = lifetime;
let svc = service_fn(move |req: Request| {
- new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
+ handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
serve_http2_autodetect(io, svc, listen_cancel_handle)
@@ -822,7 +899,7 @@ fn serve_http_on<HTTP>(
connection: HTTP::Connection,
listen_properties: &HttpListenProperties,
lifetime: HttpLifetime,
- tx: tokio::sync::mpsc::Sender<SlabId>,
+ tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> JoinHandle<Result<(), AnyError>>
where
HTTP: HttpPropertyExtractor,
@@ -857,12 +934,12 @@ struct HttpJoinHandle {
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
- rx: AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
+ rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
refcount: RefCount,
}
impl HttpJoinHandle {
- fn new(rx: tokio::sync::mpsc::Receiver<SlabId>) -> Self {
+ fn new(rx: tokio::sync::mpsc::Receiver<Rc<HttpRecord>>) -> Self {
Self {
join_handle: AsyncRefCell::new(None),
connection_cancel_handle: CancelHandle::new_rc(),
@@ -918,8 +995,6 @@ pub fn op_http_serve<HTTP>(
where
HTTP: HttpPropertyExtractor,
{
- slab_init();
-
let listener =
HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?;
@@ -969,8 +1044,6 @@ pub fn op_http_serve_on<HTTP>(
where
HTTP: HttpPropertyExtractor,
{
- slab_init();
-
let connection =
HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?;
@@ -1000,36 +1073,38 @@ where
}
/// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything
-/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error.
+/// goes wrong in this method we return null and let the async handler pick up the real error.
#[op2(fast)]
-#[smi]
-pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
+pub fn op_http_try_wait(
+ state: &mut OpState,
+ #[smi] rid: ResourceId,
+) -> *const c_void {
// The resource needs to exist.
let Ok(join_handle) = state.resource_table.get::<HttpJoinHandle>(rid) else {
- return SlabId::MAX;
+ return null();
};
// If join handle is somehow locked, just abort.
let Some(mut handle) =
RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut()
else {
- return SlabId::MAX;
+ return null();
};
// See if there are any requests waiting on this channel. If not, return.
- let Ok(id) = handle.try_recv() else {
- return SlabId::MAX;
+ let Ok(record) = handle.try_recv() else {
+ return null();
};
- id
+ let ptr = ExternalPointer::new(RcHttpRecord(record));
+ ptr.into_raw()
}
#[op2(async)]
-#[smi]
pub async fn op_http_wait(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
-) -> Result<SlabId, AnyError> {
+) -> Result<*const c_void, AnyError> {
// We will get the join handle initially, as we might be consuming requests still
let join_handle = state
.borrow_mut()
@@ -1046,8 +1121,9 @@ pub async fn op_http_wait(
.await;
// Do we have a request?
- if let Some(req) = next {
- return Ok(req);
+ if let Some(record) = next {
+ let ptr = ExternalPointer::new(RcHttpRecord(record));
+ return Ok(ptr.into_raw());
}
// No - we're shutting down
@@ -1063,14 +1139,14 @@ pub async fn op_http_wait(
if let Some(err) = err.source() {
if let Some(err) = err.downcast_ref::<io::Error>() {
if err.kind() == io::ErrorKind::NotConnected {
- return Ok(SlabId::MAX);
+ return Ok(null());
}
}
}
return Err(err);
}
- Ok(SlabId::MAX)
+ Ok(null())
}
/// Cancels the HTTP handle.