diff options
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r-- | ext/http/http_next.rs | 131 |
1 files changed, 63 insertions, 68 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 4035fe259..f42275b0e 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -8,11 +8,11 @@ use crate::request_properties::HttpConnectionProperties; use crate::request_properties::HttpListenProperties; use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; -use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; use crate::service::handle_request; use crate::service::http_trace; use crate::service::HttpRecord; +use crate::service::HttpRecordResponse; use crate::service::HttpRequestBodyAutocloser; use crate::service::HttpServerState; use crate::websocket_upgrade::WebSocketUpgrade; @@ -68,7 +68,6 @@ use std::io; use std::pin::Pin; use std::ptr::null; use std::rc::Rc; -use std::time::Duration; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -182,7 +181,7 @@ pub fn op_http_upgrade_raw( let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); spawn(async move { - let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default(); + let mut upgrade_stream = WebSocketUpgrade::<()>::default(); // Stage 2: Extract the Upgraded connection let mut buf = [0; 1024]; @@ -191,7 +190,8 @@ pub fn op_http_upgrade_raw( match upgrade_stream.write(&buf[..read]) { Ok(None) => continue, Ok(Some((response, bytes))) => { - *http.response() = response; + let (response_parts, _) = response.into_parts(); + *http.response_parts() = response_parts; http.complete(); let mut upgraded = TokioIo::new(upgrade.await?); upgraded.write_all(&bytes).await?; @@ -250,10 +250,10 @@ pub async fn op_http_upgrade_websocket_next( // Stage 1: set the response to 101 Switching Protocols and send it let upgrade = http.upgrade()?; { - let mut response = http.response(); - *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + let mut response_parts = http.response_parts(); + response_parts.status = StatusCode::SWITCHING_PROTOCOLS; for (name, value) in headers { - response.headers_mut().append( + response_parts.headers.append( HeaderName::from_bytes(&name).unwrap(), HeaderValue::from_bytes(&value).unwrap(), ); @@ -274,10 +274,14 @@ pub fn op_http_set_promise_complete(external: *const c_void, status: u16) { let http = // SAFETY: external is deleted before calling this op. unsafe { take_external!(external, "op_http_set_promise_complete") }; + set_promise_complete(http, status); +} + +fn set_promise_complete(http: Rc<HttpRecord>, status: u16) { // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we // will quitely ignore invalid values. if let Ok(code) = StatusCode::from_u16(status) { - *http.response().status_mut() = code; + http.response_parts().status = code; } http.complete(); } @@ -441,7 +445,7 @@ pub fn op_http_read_request_body( let http = // SAFETY: op is called with external. unsafe { clone_external!(external, "op_http_read_request_body") }; - let rid = if let Some(incoming) = http.take_body() { + let rid = if let Some(incoming) = http.take_request_body() { let body_resource = Rc::new(HttpRequestBody::new(incoming)); state.borrow_mut().resource_table.add_rc(body_resource) } else { @@ -462,8 +466,7 @@ pub fn op_http_set_response_header( let http = // SAFETY: op is called with external. unsafe { clone_external!(external, "op_http_set_response_header") }; - let mut response = http.response(); - let resp_headers = response.headers_mut(); + let mut response_parts = http.response_parts(); // These are valid latin-1 strings let name = HeaderName::from_bytes(&name).unwrap(); let value = match value { @@ -473,7 +476,7 @@ pub fn op_http_set_response_header( HeaderValue::from_maybe_shared_unchecked(bytes::Bytes::from(bytes_vec)) }, }; - resp_headers.append(name, value); + response_parts.headers.append(name, value); } #[op2] @@ -486,12 +489,13 @@ pub fn op_http_set_response_headers( // SAFETY: op is called with external. unsafe { clone_external!(external, "op_http_set_response_headers") }; // TODO(mmastrac): Invalid headers should be handled? - let mut response = http.response(); - let resp_headers = response.headers_mut(); + let mut response_parts = http.response_parts(); let len = headers.length(); let header_len = len * 2; - resp_headers.reserve(header_len.try_into().unwrap()); + response_parts + .headers + .reserve(header_len.try_into().unwrap()); for i in 0..len { let item = headers.get_index(scope, i).unwrap(); @@ -505,7 +509,7 @@ pub fn op_http_set_response_headers( let header_value = // SAFETY: These are valid latin-1 strings unsafe { HeaderValue::from_maybe_shared_unchecked(v8_value) }; - resp_headers.append(header_name, header_value); + response_parts.headers.append(header_name, header_value); } } @@ -525,7 +529,7 @@ pub fn op_http_set_response_trailers( let value = unsafe { HeaderValue::from_maybe_shared_unchecked(value) }; trailer_map.append(name, value); } - *http.trailers().borrow_mut() = Some(trailer_map); + *http.trailers() = Some(trailer_map); } fn is_request_compressible( @@ -652,31 +656,28 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { /// Sets the appropriate response body. Use `force_instantiate_body` if you need /// to ensure that the response is cleaned up correctly (eg: for resources). fn set_response( - external: *const c_void, + http: Rc<HttpRecord>, length: Option<usize>, status: u16, force_instantiate_body: bool, response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { - // SAFETY: external is deleted before calling this op. - let http = unsafe { take_external!(external, "set_response") }; // The request may have been cancelled by this point and if so, there's no need for us to // do all of this work to send the response. if !http.cancelled() { - let resource = http.take_resource(); let compression = is_request_compressible(length, &http.request_parts().headers); - let mut response = http.response(); + let mut response_headers = + std::cell::RefMut::map(http.response_parts(), |this| &mut this.headers); let compression = - modify_compressibility_from_response(compression, response.headers_mut()); - response - .body_mut() - .initialize(response_fn(compression), resource); + modify_compressibility_from_response(compression, &mut response_headers); + drop(response_headers); + http.set_response_body(response_fn(compression)); // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we // will quitely ignore invalid values. if let Ok(code) = StatusCode::from_u16(status) { - *response.status_mut() = code; + http.response_parts().status = code; } } else if force_instantiate_body { response_fn(Compression::None).abort(); @@ -685,14 +686,20 @@ fn set_response( http.complete(); } -#[op2(fast)] -pub fn op_http_set_response_body_resource( +/// Returned promise resolves when body streaming finishes. +/// Call [`op_http_close_after_finish`] when done with the external. +#[op2(async)] +pub async fn op_http_set_response_body_resource( state: Rc<RefCell<OpState>>, external: *const c_void, #[smi] stream_rid: ResourceId, auto_close: bool, status: u16, ) -> Result<(), AnyError> { + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_set_response_body_resource") }; + // IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request // body resource so we _cannot_ hold the OpState lock longer than necessary. @@ -710,7 +717,7 @@ pub fn op_http_set_response_body_resource( }; set_response( - external, + http.clone(), resource.size_hint().1.map(|s| s as usize), status, true, @@ -719,21 +726,34 @@ pub fn op_http_set_response_body_resource( }, ); + *http.needs_close_after_finish() = true; + http.response_body_finished().await; Ok(()) } #[op2(fast)] +pub fn op_http_close_after_finish(external: *const c_void) { + let http = + // SAFETY: external is deleted before calling this op. + unsafe { take_external!(external, "op_http_close_after_finish") }; + http.close_after_finish(); +} + +#[op2(fast)] pub fn op_http_set_response_body_text( external: *const c_void, #[string] text: String, status: u16, ) { + let http = + // SAFETY: external is deleted before calling this op. + unsafe { take_external!(external, "op_http_set_response_body_text") }; if !text.is_empty() { - set_response(external, Some(text.len()), status, false, |compression| { + set_response(http, Some(text.len()), status, false, |compression| { ResponseBytesInner::from_vec(compression, text.into_bytes()) }); } else { - op_http_set_promise_complete::call(external, status); + set_promise_complete(http, status); } } @@ -743,45 +763,21 @@ pub fn op_http_set_response_body_bytes( #[buffer] buffer: JsBuffer, status: u16, ) { + let http = + // SAFETY: external is deleted before calling this op. + unsafe { take_external!(external, "op_http_set_response_body_bytes") }; if !buffer.is_empty() { - set_response(external, Some(buffer.len()), status, false, |compression| { + set_response(http, Some(buffer.len()), status, false, |compression| { ResponseBytesInner::from_bufview(compression, BufView::from(buffer)) }); } else { - op_http_set_promise_complete::call(external, status); - } -} - -#[op2(async)] -pub async fn op_http_track( - state: Rc<RefCell<OpState>>, - external: *const c_void, - #[smi] server_rid: ResourceId, -) -> Result<(), AnyError> { - // SAFETY: op is called with external. - let http = unsafe { clone_external!(external, "op_http_track") }; - let handle = http.body_promise(); - - let join_handle = state - .borrow_mut() - .resource_table - .get::<HttpJoinHandle>(server_rid)?; - - match handle - .or_cancel(join_handle.connection_cancel_handle()) - .await - { - Ok(true) => Ok(()), - Ok(false) => { - Err(AnyError::msg("connection closed before message completed")) - } - Err(_e) => Ok(()), + set_promise_complete(http, status); } } fn serve_http11_unconditional( io: impl HttpServeStream, - svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, + svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, ) -> impl Future<Output = Result<(), hyper1::Error>> + 'static { let conn = http1::Builder::new() @@ -803,7 +799,7 @@ fn serve_http11_unconditional( fn serve_http2_unconditional( io: impl HttpServeStream, - svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, + svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, ) -> impl Future<Output = Result<(), hyper1::Error>> + 'static { let conn = @@ -821,7 +817,7 @@ fn serve_http2_unconditional( async fn serve_http2_autodetect( io: impl HttpServeStream, - svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, + svc: impl HttpService<Incoming, ResBody = HttpRecordResponse> + 'static, cancel: Rc<CancelHandle>, ) -> Result<(), AnyError> { let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX); @@ -1194,14 +1190,13 @@ pub async fn op_http_close( // In a graceful shutdown, we close the listener and allow all the remaining connections to drain join_handle.listen_cancel_handle().cancel(); - // Async spin on the server_state while we wait for everything to drain - while Rc::strong_count(&join_handle.server_state) > 1 { - tokio::time::sleep(Duration::from_millis(10)).await; - } + join_handle.server_state.drain().await; } else { // In a forceful shutdown, we close everything join_handle.listen_cancel_handle().cancel(); join_handle.connection_cancel_handle().cancel(); + // Give streaming responses a tick to close + tokio::task::yield_now().await; } let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle) |