summaryrefslogtreecommitdiff
path: root/ext/http/http_next.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r--ext/http/http_next.rs131
1 files changed, 63 insertions, 68 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs
index 4035fe259..f42275b0e 100644
--- a/ext/http/http_next.rs
+++ b/ext/http/http_next.rs
@@ -8,11 +8,11 @@ use crate::request_properties::HttpConnectionProperties;
use crate::request_properties::HttpListenProperties;
use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
-use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::service::handle_request;
use crate::service::http_trace;
use crate::service::HttpRecord;
+use crate::service::HttpRecordResponse;
use crate::service::HttpRequestBodyAutocloser;
use crate::service::HttpServerState;
use crate::websocket_upgrade::WebSocketUpgrade;
@@ -68,7 +68,6 @@ use std::io;
use std::pin::Pin;
use std::ptr::null;
use std::rc::Rc;
-use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
@@ -182,7 +181,7 @@ pub fn op_http_upgrade_raw(
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
spawn(async move {
- let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();
+ let mut upgrade_stream = WebSocketUpgrade::<()>::default();
// Stage 2: Extract the Upgraded connection
let mut buf = [0; 1024];
@@ -191,7 +190,8 @@ pub fn op_http_upgrade_raw(
match upgrade_stream.write(&buf[..read]) {
Ok(None) => continue,
Ok(Some((response, bytes))) => {
- *http.response() = response;
+ let (response_parts, _) = response.into_parts();
+ *http.response_parts() = response_parts;
http.complete();
let mut upgraded = TokioIo::new(upgrade.await?);
upgraded.write_all(&bytes).await?;
@@ -250,10 +250,10 @@ pub async fn op_http_upgrade_websocket_next(
// Stage 1: set the response to 101 Switching Protocols and send it
let upgrade = http.upgrade()?;
{
- let mut response = http.response();
- *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
+ let mut response_parts = http.response_parts();
+ response_parts.status = StatusCode::SWITCHING_PROTOCOLS;
for (name, value) in headers {
- response.headers_mut().append(
+ response_parts.headers.append(
HeaderName::from_bytes(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
@@ -274,10 +274,14 @@ pub fn op_http_set_promise_complete(external: *const c_void, status: u16) {
let http =
// SAFETY: external is deleted before calling this op.
unsafe { take_external!(external, "op_http_set_promise_complete") };
+ set_promise_complete(http, status);
+}
+
+fn set_promise_complete(http: Rc<HttpRecord>, status: u16) {
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
- *http.response().status_mut() = code;
+ http.response_parts().status = code;
}
http.complete();
}
@@ -441,7 +445,7 @@ pub fn op_http_read_request_body(
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_read_request_body") };
- let rid = if let Some(incoming) = http.take_body() {
+ let rid = if let Some(incoming) = http.take_request_body() {
let body_resource = Rc::new(HttpRequestBody::new(incoming));
state.borrow_mut().resource_table.add_rc(body_resource)
} else {
@@ -462,8 +466,7 @@ pub fn op_http_set_response_header(
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_header") };
- let mut response = http.response();
- let resp_headers = response.headers_mut();
+ let mut response_parts = http.response_parts();
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = match value {
@@ -473,7 +476,7 @@ pub fn op_http_set_response_header(
HeaderValue::from_maybe_shared_unchecked(bytes::Bytes::from(bytes_vec))
},
};
- resp_headers.append(name, value);
+ response_parts.headers.append(name, value);
}
#[op2]
@@ -486,12 +489,13 @@ pub fn op_http_set_response_headers(
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_set_response_headers") };
// TODO(mmastrac): Invalid headers should be handled?
- let mut response = http.response();
- let resp_headers = response.headers_mut();
+ let mut response_parts = http.response_parts();
let len = headers.length();
let header_len = len * 2;
- resp_headers.reserve(header_len.try_into().unwrap());
+ response_parts
+ .headers
+ .reserve(header_len.try_into().unwrap());
for i in 0..len {
let item = headers.get_index(scope, i).unwrap();
@@ -505,7 +509,7 @@ pub fn op_http_set_response_headers(
let header_value =
// SAFETY: These are valid latin-1 strings
unsafe { HeaderValue::from_maybe_shared_unchecked(v8_value) };
- resp_headers.append(header_name, header_value);
+ response_parts.headers.append(header_name, header_value);
}
}
@@ -525,7 +529,7 @@ pub fn op_http_set_response_trailers(
let value = unsafe { HeaderValue::from_maybe_shared_unchecked(value) };
trailer_map.append(name, value);
}
- *http.trailers().borrow_mut() = Some(trailer_map);
+ *http.trailers() = Some(trailer_map);
}
fn is_request_compressible(
@@ -652,31 +656,28 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
/// Sets the appropriate response body. Use `force_instantiate_body` if you need
/// to ensure that the response is cleaned up correctly (eg: for resources).
fn set_response(
- external: *const c_void,
+ http: Rc<HttpRecord>,
length: Option<usize>,
status: u16,
force_instantiate_body: bool,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
- // SAFETY: external is deleted before calling this op.
- let http = unsafe { take_external!(external, "set_response") };
// The request may have been cancelled by this point and if so, there's no need for us to
// do all of this work to send the response.
if !http.cancelled() {
- let resource = http.take_resource();
let compression =
is_request_compressible(length, &http.request_parts().headers);
- let mut response = http.response();
+ let mut response_headers =
+ std::cell::RefMut::map(http.response_parts(), |this| &mut this.headers);
let compression =
- modify_compressibility_from_response(compression, response.headers_mut());
- response
- .body_mut()
- .initialize(response_fn(compression), resource);
+ modify_compressibility_from_response(compression, &mut response_headers);
+ drop(response_headers);
+ http.set_response_body(response_fn(compression));
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
- *response.status_mut() = code;
+ http.response_parts().status = code;
}
} else if force_instantiate_body {
response_fn(Compression::None).abort();
@@ -685,14 +686,20 @@ fn set_response(
http.complete();
}
-#[op2(fast)]
-pub fn op_http_set_response_body_resource(
+/// Returned promise resolves when body streaming finishes.
+/// Call [`op_http_close_after_finish`] when done with the external.
+#[op2(async)]
+pub async fn op_http_set_response_body_resource(
state: Rc<RefCell<OpState>>,
external: *const c_void,
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
) -> Result<(), AnyError> {
+ let http =
+ // SAFETY: op is called with external.
+ unsafe { clone_external!(external, "op_http_set_response_body_resource") };
+
// IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request
// body resource so we _cannot_ hold the OpState lock longer than necessary.
@@ -710,7 +717,7 @@ pub fn op_http_set_response_body_resource(
};
set_response(
- external,
+ http.clone(),
resource.size_hint().1.map(|s| s as usize),
status,
true,
@@ -719,21 +726,34 @@ pub fn op_http_set_response_body_resource(
},
);
+ *http.needs_close_after_finish() = true;
+ http.response_body_finished().await;
Ok(())
}
#[op2(fast)]
+pub fn op_http_close_after_finish(external: *const c_void) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_close_after_finish") };
+ http.close_after_finish();
+}
+
+#[op2(fast)]
pub fn op_http_set_response_body_text(
external: *const c_void,
#[string] text: String,
status: u16,
) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_set_response_body_text") };
if !text.is_empty() {
- set_response(external, Some(text.len()), status, false, |compression| {
+ set_response(http, Some(text.len()), status, false, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
} else {
- op_http_set_promise_complete::call(external, status);
+ set_promise_complete(http, status);
}
}
@@ -743,45 +763,21 @@ pub fn op_http_set_response_body_bytes(
#[buffer] buffer: JsBuffer,
status: u16,
) {
+ let http =
+ // SAFETY: external is deleted before calling this op.
+ unsafe { take_external!(external, "op_http_set_response_body_bytes") };
if !buffer.is_empty() {
- set_response(external, Some(buffer.len()), status, false, |compression| {
+ set_response(http, Some(buffer.len()), status, false, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
} else {
- op_http_set_promise_complete::call(external, status);
- }
-}
-
-#[op2(async)]
-pub async fn op_http_track(
- state: Rc<RefCell<OpState>>,
- external: *const c_void,
- #[smi] server_rid: ResourceId,
-) -> Result<(), AnyError> {
- // SAFETY: op is called with external.
- let http = unsafe { clone_external!(external, "op_http_track") };
- let handle = http.body_promise();
-
- let join_handle = state
- .borrow_mut()
- .resource_table
- .get::<HttpJoinHandle>(server_rid)?;
-
- match handle
- .or_cancel(join_handle.connection_cancel_handle())
- .await
- {
- Ok(true) => Ok(()),
- Ok(false) => {
- Err(AnyError::msg("connection closed before message completed"))
- }
- Err(_e) => Ok(()),
+ set_promise_complete(http, status);
}
}
fn serve_http11_unconditional(
io: impl HttpServeStream,
- svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn = http1::Builder::new()
@@ -803,7 +799,7 @@ fn serve_http11_unconditional(
fn serve_http2_unconditional(
io: impl HttpServeStream,
- svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn =
@@ -821,7 +817,7 @@ fn serve_http2_unconditional(
async fn serve_http2_autodetect(
io: impl HttpServeStream,
- svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
+ svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static,
cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
@@ -1194,14 +1190,13 @@ pub async fn op_http_close(
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
join_handle.listen_cancel_handle().cancel();
- // Async spin on the server_state while we wait for everything to drain
- while Rc::strong_count(&join_handle.server_state) > 1 {
- tokio::time::sleep(Duration::from_millis(10)).await;
- }
+ join_handle.server_state.drain().await;
} else {
// In a forceful shutdown, we close everything
join_handle.listen_cancel_handle().cancel();
join_handle.connection_cancel_handle().cancel();
+ // Give streaming responses a tick to close
+ tokio::task::yield_now().await;
}
let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)