diff options
Diffstat (limited to 'ext/http/http_next.rs')
-rw-r--r-- | ext/http/http_next.rs | 447 |
1 files changed, 135 insertions, 312 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index a986de7f3..34281ee92 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -6,11 +6,14 @@ use crate::request_body::HttpRequestBody; use crate::request_properties::HttpConnectionProperties; use crate::request_properties::HttpListenProperties; use crate::request_properties::HttpPropertyExtractor; -use crate::response_body::CompletionHandle; use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; use crate::response_body::V8StreamHttpResponseBody; +use crate::slab::slab_drop; +use crate::slab::slab_get; +use crate::slab::slab_insert; +use crate::slab::SlabId; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; use cache_control::CacheControl; @@ -39,7 +42,6 @@ use http::header::CONTENT_ENCODING; use http::header::CONTENT_LENGTH; use http::header::CONTENT_RANGE; use http::header::CONTENT_TYPE; -use http::request::Parts; use http::HeaderMap; use hyper1::body::Incoming; use hyper1::header::COOKIE; @@ -49,12 +51,10 @@ use hyper1::server::conn::http1; use hyper1::server::conn::http2; use hyper1::service::service_fn; use hyper1::service::HttpService; -use hyper1::upgrade::OnUpgrade; use hyper1::StatusCode; use pin_project::pin_project; use pin_project::pinned_drop; -use slab::Slab; use std::borrow::Cow; use std::cell::RefCell; use std::future::Future; @@ -99,163 +99,16 @@ impl< { } -pub struct HttpSlabRecord { - request_info: HttpConnectionProperties, - request_parts: Parts, - request_body: Option<Incoming>, - // The response may get taken before we tear this down - response: Option<Response>, - body: Option<Rc<HttpRequestBody>>, - promise: CompletionHandle, - #[cfg(__zombie_http_tracking)] - alive: bool, -} - -thread_local! { - pub static SLAB: RefCell<Slab<HttpSlabRecord>> = RefCell::new(Slab::with_capacity(1024)); -} - -/// Generates getters and setters for the [`SLAB`]. For example, -/// `with!(with_req, with_req_mut, Parts, http, http.request_parts);` expands to: -/// -/// ```ignore -/// #[inline(always)] -/// #[allow(dead_code)] -/// pub(crate) fn with_req_mut<T>(key: usize, f: impl FnOnce(&mut Parts) -> T) -> T { -/// SLAB.with(|slab| { -/// let mut borrow = slab.borrow_mut(); -/// let mut http = borrow.get_mut(key).unwrap(); -/// #[cfg(__zombie_http_tracking)] -/// if !http.alive { -/// panic!("Attempted to access a dead HTTP object") -/// } -/// f(&mut http.expr) -/// }) -/// } - -/// #[inline(always)] -/// #[allow(dead_code)] -/// pub(crate) fn with_req<T>(key: usize, f: impl FnOnce(&Parts) -> T) -> T { -/// SLAB.with(|slab| { -/// let mut borrow = slab.borrow(); -/// let mut http = borrow.get(key).unwrap(); -/// #[cfg(__zombie_http_tracking)] -/// if !http.alive { -/// panic!("Attempted to access a dead HTTP object") -/// } -/// f(&http.expr) -/// }) -/// } -/// ``` -macro_rules! with { - ($ref:ident, $mut:ident, $type:ty, $http:ident, $expr:expr) => { - #[inline(always)] - #[allow(dead_code)] - pub(crate) fn $mut<T>(key: u32, f: impl FnOnce(&mut $type) -> T) -> T { - SLAB.with(|slab| { - let mut borrow = slab.borrow_mut(); - #[allow(unused_mut)] // TODO(mmastrac): compiler issue? - let mut $http = match borrow.get_mut(key as usize) { - Some(http) => http, - None => panic!( - "Attemped to access invalid request {} ({} in total available)", - key, - borrow.len() - ), - }; - #[cfg(__zombie_http_tracking)] - if !$http.alive { - panic!("Attempted to access a dead HTTP object") - } - f(&mut $expr) - }) - } - - #[inline(always)] - #[allow(dead_code)] - pub(crate) fn $ref<T>(key: u32, f: impl FnOnce(&$type) -> T) -> T { - SLAB.with(|slab| { - let borrow = slab.borrow(); - let $http = borrow.get(key as usize).unwrap(); - #[cfg(__zombie_http_tracking)] - if !$http.alive { - panic!("Attempted to access a dead HTTP object") - } - f(&$expr) - }) - } - }; -} - -with!(with_req, with_req_mut, Parts, http, http.request_parts); -with!( - with_req_body, - with_req_body_mut, - Option<Incoming>, - http, - http.request_body -); -with!( - with_resp, - with_resp_mut, - Option<Response>, - http, - http.response -); -with!( - with_body, - with_body_mut, - Option<Rc<HttpRequestBody>>, - http, - http.body -); -with!( - with_promise, - with_promise_mut, - CompletionHandle, - http, - http.promise -); -with!(with_http, with_http_mut, HttpSlabRecord, http, http); - -fn slab_insert( - request: Request, - request_info: HttpConnectionProperties, -) -> u32 { - SLAB.with(|slab| { - let (request_parts, request_body) = request.into_parts(); - slab.borrow_mut().insert(HttpSlabRecord { - request_info, - request_parts, - request_body: Some(request_body), - response: Some(Response::new(ResponseBytes::default())), - body: None, - promise: CompletionHandle::default(), - #[cfg(__zombie_http_tracking)] - alive: true, - }) - }) as u32 -} - #[op] pub fn op_http_upgrade_raw( state: &mut OpState, - index: u32, + slab_id: SlabId, ) -> Result<ResourceId, AnyError> { // Stage 1: extract the upgrade future - let upgrade = with_http_mut(index, |http| { - // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit - http - .request_parts - .extensions - .remove::<OnUpgrade>() - .ok_or_else(|| AnyError::msg("upgrade unavailable")) - })?; - + let upgrade = slab_get(slab_id).upgrade()?; let (read, write) = tokio::io::duplex(1024); let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); - spawn(async move { let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default(); @@ -266,8 +119,9 @@ pub fn op_http_upgrade_raw( match upgrade_stream.write(&buf[..read]) { Ok(None) => continue, Ok(Some((response, bytes))) => { - with_resp_mut(index, |resp| *resp = Some(response)); - with_promise_mut(index, |promise| promise.complete(true)); + let mut http = slab_get(slab_id); + *http.response() = response; + http.complete(); let mut upgraded = upgrade.await?; upgraded.write_all(&bytes).await?; break upgraded; @@ -315,29 +169,22 @@ pub fn op_http_upgrade_raw( #[op] pub async fn op_http_upgrade_websocket_next( state: Rc<RefCell<OpState>>, - index: u32, + slab_id: SlabId, headers: Vec<(ByteString, ByteString)>, ) -> Result<ResourceId, AnyError> { - // Stage 1: set the respnse to 101 Switching Protocols and send it - let upgrade = with_http_mut(index, |http| { - // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit - let upgrade = http - .request_parts - .extensions - .remove::<OnUpgrade>() - .ok_or_else(|| AnyError::msg("upgrade unavailable"))?; - - let response = http.response.as_mut().unwrap(); - *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; - for (name, value) in headers { - response.headers_mut().append( - HeaderName::from_bytes(&name).unwrap(), - HeaderValue::from_bytes(&value).unwrap(), - ); - } - http.promise.complete(true); - Ok::<_, AnyError>(upgrade) - })?; + let mut http = slab_get(slab_id); + // Stage 1: set the response to 101 Switching Protocols and send it + let upgrade = http.upgrade()?; + + let response = http.response(); + *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + for (name, value) in headers { + response.headers_mut().append( + HeaderName::from_bytes(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } + http.complete(); // Stage 2: wait for the request to finish upgrading let upgraded = upgrade.await?; @@ -348,137 +195,127 @@ pub async fn op_http_upgrade_websocket_next( } #[op(fast)] -pub fn op_http_set_promise_complete(index: u32, status: u16) { - with_resp_mut(index, |resp| { - // The Javascript code will never provide a status that is invalid here (see 23_response.js) - *resp.as_mut().unwrap().status_mut() = - StatusCode::from_u16(status).unwrap(); - }); - with_promise_mut(index, |promise| { - promise.complete(true); - }); +pub fn op_http_set_promise_complete(slab_id: SlabId, status: u16) { + let mut http = slab_get(slab_id); + // The Javascript code will never provide a status that is invalid here (see 23_response.js) + *http.response().status_mut() = StatusCode::from_u16(status).unwrap(); + http.complete(); } #[op] pub fn op_http_get_request_method_and_url<HTTP>( - index: u32, + slab_id: SlabId, ) -> (String, Option<String>, String, String, Option<u16>) where HTTP: HttpPropertyExtractor, { - // TODO(mmastrac): Passing method can be optimized - with_http(index, |http| { - let request_properties = HTTP::request_properties( - &http.request_info, - &http.request_parts.uri, - &http.request_parts.headers, - ); + let http = slab_get(slab_id); + let request_info = http.request_info(); + let request_parts = http.request_parts(); + let request_properties = HTTP::request_properties( + request_info, + &request_parts.uri, + &request_parts.headers, + ); - // Only extract the path part - we handle authority elsewhere - let path = match &http.request_parts.uri.path_and_query() { - Some(path_and_query) => path_and_query.to_string(), - None => "".to_owned(), - }; + // Only extract the path part - we handle authority elsewhere + let path = match &request_parts.uri.path_and_query() { + Some(path_and_query) => path_and_query.to_string(), + None => "".to_owned(), + }; - ( - http.request_parts.method.as_str().to_owned(), - request_properties.authority, - path, - String::from(http.request_info.peer_address.as_ref()), - http.request_info.peer_port, - ) - }) + // TODO(mmastrac): Passing method can be optimized + ( + request_parts.method.as_str().to_owned(), + request_properties.authority, + path, + String::from(request_info.peer_address.as_ref()), + request_info.peer_port, + ) } #[op] pub fn op_http_get_request_header( - index: u32, + slab_id: SlabId, name: String, ) -> Option<ByteString> { - with_req(index, |req| { - let value = req.headers.get(name); - value.map(|value| value.as_bytes().into()) - }) + let http = slab_get(slab_id); + let value = http.request_parts().headers.get(name); + value.map(|value| value.as_bytes().into()) } #[op] pub fn op_http_get_request_headers( - index: u32, + slab_id: SlabId, ) -> Vec<(ByteString, ByteString)> { - with_req(index, |req| { - let headers = &req.headers; - let mut vec = Vec::with_capacity(headers.len()); - let mut cookies: Option<Vec<&[u8]>> = None; - for (name, value) in headers { - if name == COOKIE { - if let Some(ref mut cookies) = cookies { - cookies.push(value.as_bytes()); - } else { - cookies = Some(vec![value.as_bytes()]); - } + let http = slab_get(slab_id); + let headers = &http.request_parts().headers; + let mut vec = Vec::with_capacity(headers.len()); + let mut cookies: Option<Vec<&[u8]>> = None; + for (name, value) in headers { + if name == COOKIE { + if let Some(ref mut cookies) = cookies { + cookies.push(value.as_bytes()); } else { - let name: &[u8] = name.as_ref(); - vec.push((name.into(), value.as_bytes().into())) + cookies = Some(vec![value.as_bytes()]); } + } else { + let name: &[u8] = name.as_ref(); + vec.push((name.into(), value.as_bytes().into())) } + } - // We treat cookies specially, because we don't want them to get them - // mangled by the `Headers` object in JS. What we do is take all cookie - // headers and concat them into a single cookie header, separated by - // semicolons. - // TODO(mmastrac): This should probably happen on the JS side on-demand - if let Some(cookies) = cookies { - let cookie_sep = "; ".as_bytes(); - vec.push(( - ByteString::from(COOKIE.as_str()), - ByteString::from(cookies.join(cookie_sep)), - )); - } - vec - }) + // We treat cookies specially, because we don't want them to get them + // mangled by the `Headers` object in JS. What we do is take all cookie + // headers and concat them into a single cookie header, separated by + // semicolons. + // TODO(mmastrac): This should probably happen on the JS side on-demand + if let Some(cookies) = cookies { + let cookie_sep = "; ".as_bytes(); + vec.push(( + ByteString::from(COOKIE.as_str()), + ByteString::from(cookies.join(cookie_sep)), + )); + } + vec } #[op(fast)] pub fn op_http_read_request_body( state: &mut OpState, - index: u32, + slab_id: SlabId, ) -> ResourceId { - let incoming = with_req_body_mut(index, |body| body.take().unwrap()); + let mut http = slab_get(slab_id); + let incoming = http.take_body(); let body_resource = Rc::new(HttpRequestBody::new(incoming)); - let res = state.resource_table.add_rc(body_resource.clone()); - with_body_mut(index, |body| { - *body = Some(body_resource); - }); - res + state.resource_table.add_rc(body_resource) } #[op(fast)] -pub fn op_http_set_response_header(index: u32, name: &str, value: &str) { - with_resp_mut(index, |resp| { - let resp_headers = resp.as_mut().unwrap().headers_mut(); - // These are valid latin-1 strings - let name = HeaderName::from_bytes(name.as_bytes()).unwrap(); - let value = HeaderValue::from_bytes(value.as_bytes()).unwrap(); - resp_headers.append(name, value); - }); +pub fn op_http_set_response_header(slab_id: SlabId, name: &str, value: &str) { + let mut http = slab_get(slab_id); + let resp_headers = http.response().headers_mut(); + // These are valid latin-1 strings + let name = HeaderName::from_bytes(name.as_bytes()).unwrap(); + let value = HeaderValue::from_bytes(value.as_bytes()).unwrap(); + resp_headers.append(name, value); } #[op] pub fn op_http_set_response_headers( - index: u32, + slab_id: SlabId, headers: Vec<(ByteString, ByteString)>, ) { + let mut http = slab_get(slab_id); // TODO(mmastrac): Invalid headers should be handled? - with_resp_mut(index, |resp| { - let resp_headers = resp.as_mut().unwrap().headers_mut(); - resp_headers.reserve(headers.len()); - for (name, value) in headers { - // These are valid latin-1 strings - let name = HeaderName::from_bytes(&name).unwrap(); - let value = HeaderValue::from_bytes(&value).unwrap(); - resp_headers.append(name, value); - } - }) + let resp_headers = http.response().headers_mut(); + resp_headers.reserve(headers.len()); + for (name, value) in headers { + // These are valid latin-1 strings + let name = HeaderName::from_bytes(&name).unwrap(); + let value = HeaderValue::from_bytes(&value).unwrap(); + resp_headers.append(name, value); + } } fn is_request_compressible(headers: &HeaderMap) -> Compression { @@ -588,28 +425,25 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { } fn set_response( - index: u32, + slab_id: SlabId, length: Option<usize>, response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { - let compression = - with_req(index, |req| is_request_compressible(&req.headers)); - - with_resp_mut(index, move |response| { - let response = response.as_mut().unwrap(); - let compression = modify_compressibility_from_response( - compression, - length, - response.headers_mut(), - ); - response.body_mut().initialize(response_fn(compression)) - }); + let mut http = slab_get(slab_id); + let compression = is_request_compressible(&http.request_parts().headers); + let response = http.response(); + let compression = modify_compressibility_from_response( + compression, + length, + response.headers_mut(), + ); + response.body_mut().initialize(response_fn(compression)) } #[op(fast)] pub fn op_http_set_response_body_resource( state: &mut OpState, - index: u32, + slab_id: SlabId, stream_rid: ResourceId, auto_close: bool, ) -> Result<(), AnyError> { @@ -621,7 +455,7 @@ pub fn op_http_set_response_body_resource( }; set_response( - index, + slab_id, resource.size_hint().1.map(|s| s as usize), move |compression| { ResponseBytesInner::from_resource(compression, resource, auto_close) @@ -634,12 +468,11 @@ pub fn op_http_set_response_body_resource( #[op(fast)] pub fn op_http_set_response_body_stream( state: &mut OpState, - index: u32, + slab_id: SlabId, ) -> Result<ResourceId, AnyError> { // TODO(mmastrac): what should this channel size be? let (tx, rx) = tokio::sync::mpsc::channel(1); - - set_response(index, None, |compression| { + set_response(slab_id, None, |compression| { ResponseBytesInner::from_v8(compression, rx) }); @@ -647,18 +480,18 @@ pub fn op_http_set_response_body_stream( } #[op(fast)] -pub fn op_http_set_response_body_text(index: u32, text: String) { +pub fn op_http_set_response_body_text(slab_id: SlabId, text: String) { if !text.is_empty() { - set_response(index, Some(text.len()), |compression| { + set_response(slab_id, Some(text.len()), |compression| { ResponseBytesInner::from_vec(compression, text.into_bytes()) }); } } #[op(fast)] -pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) { +pub fn op_http_set_response_body_bytes(slab_id: SlabId, buffer: &[u8]) { if !buffer.is_empty() { - set_response(index, Some(buffer.len()), |compression| { + set_response(slab_id, Some(buffer.len()), |compression| { ResponseBytesInner::from_slice(compression, buffer) }); }; @@ -667,12 +500,11 @@ pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) { #[op] pub async fn op_http_track( state: Rc<RefCell<OpState>>, - index: u32, + slab_id: SlabId, server_rid: ResourceId, ) -> Result<(), AnyError> { - let handle = with_resp(index, |resp| { - resp.as_ref().unwrap().body().completion_handle() - }); + let http = slab_get(slab_id); + let handle = http.body_promise(); let join_handle = state .borrow_mut() @@ -689,15 +521,15 @@ pub async fn op_http_track( } #[pin_project(PinnedDrop)] -pub struct SlabFuture<F: Future<Output = ()>>(u32, #[pin] F); +pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F); pub fn new_slab_future( request: Request, request_info: HttpConnectionProperties, - tx: tokio::sync::mpsc::Sender<u32>, + tx: tokio::sync::mpsc::Sender<SlabId>, ) -> SlabFuture<impl Future<Output = ()>> { let index = slab_insert(request, request_info); - let rx = with_promise(index, |promise| promise.clone()); + let rx = slab_get(index).promise(); SlabFuture(index, async move { if tx.send(index).await.is_ok() { // We only need to wait for completion if we aren't closed @@ -711,16 +543,7 @@ impl<F: Future<Output = ()>> SlabFuture<F> {} #[pinned_drop] impl<F: Future<Output = ()>> PinnedDrop for SlabFuture<F> { fn drop(self: Pin<&mut Self>) { - SLAB.with(|slab| { - #[cfg(__zombie_http_tracking)] - { - slab.borrow_mut().get_mut(self.0 as usize).unwrap().alive = false; - } - #[cfg(not(__zombie_http_tracking))] - { - slab.borrow_mut().remove(self.0 as usize); - } - }); + slab_drop(self.0); } } @@ -736,7 +559,7 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> { .project() .1 .poll(cx) - .map(|_| Ok(with_resp_mut(index, |resp| resp.take().unwrap()))) + .map(|_| Ok(slab_get(index).take_response())) } } @@ -776,7 +599,7 @@ fn serve_https( mut io: TlsStream, request_info: HttpConnectionProperties, cancel: Rc<CancelHandle>, - tx: tokio::sync::mpsc::Sender<u32>, + tx: tokio::sync::mpsc::Sender<SlabId>, ) -> JoinHandle<Result<(), AnyError>> { let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) @@ -803,7 +626,7 @@ fn serve_http( io: impl HttpServeStream, request_info: HttpConnectionProperties, cancel: Rc<CancelHandle>, - tx: tokio::sync::mpsc::Sender<u32>, + tx: tokio::sync::mpsc::Sender<SlabId>, ) -> JoinHandle<Result<(), AnyError>> { let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) @@ -815,7 +638,7 @@ fn serve_http_on<HTTP>( connection: HTTP::Connection, listen_properties: &HttpListenProperties, cancel: Rc<CancelHandle>, - tx: tokio::sync::mpsc::Sender<u32>, + tx: tokio::sync::mpsc::Sender<SlabId>, ) -> JoinHandle<Result<(), AnyError>> where HTTP: HttpPropertyExtractor, @@ -843,7 +666,7 @@ struct HttpJoinHandle( AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>, // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd Rc<CancelHandle>, - AsyncRefCell<tokio::sync::mpsc::Receiver<u32>>, + AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>, ); impl HttpJoinHandle { @@ -963,7 +786,7 @@ where pub async fn op_http_wait( state: Rc<RefCell<OpState>>, rid: ResourceId, -) -> Result<u32, AnyError> { +) -> Result<SlabId, AnyError> { // We will get the join handle initially, as we might be consuming requests still let join_handle = state .borrow_mut() @@ -1003,14 +826,14 @@ pub async fn op_http_wait( if let Some(err) = err.source() { if let Some(err) = err.downcast_ref::<io::Error>() { if err.kind() == io::ErrorKind::NotConnected { - return Ok(u32::MAX); + return Ok(SlabId::MAX); } } } return Err(err); } - Ok(u32::MAX) + Ok(SlabId::MAX) } struct UpgradeStream { |