diff options
author | Aaron O'Mullan <aaron.omullan@gmail.com> | 2021-08-14 13:25:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-14 13:25:05 +0200 |
commit | 8fa46a7b4444cfbdaefe0bd0aa882601f338cef9 (patch) | |
tree | 1ad0d644964de73d2cc88c89be4066b9d3e6798d | |
parent | f90231924d96130ec80b31e3589253a15e250896 (diff) |
cleanup(ext/http): simplify op_http_request_next (#11691)
* cleanup(ext/http): simplify op_http_request_next
Keep op_http_request_next's high-level logic simple, factor out NextRequestResponse building to prepare_next_request() for improved readability & maintainability
* cleanup(ext/http): break prepare_next_request() into meaningful sub-funcs
-rw-r--r-- | ext/http/lib.rs | 273 |
1 files changed, 145 insertions, 128 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 38c14409c..4e569ae22 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -182,143 +182,40 @@ async fn op_http_request_next( poll_fn(|cx| { conn_resource.deno_service.waker.register(cx.waker()); - let connection_closed = match conn_resource.poll(cx) { - Poll::Pending => false, - Poll::Ready(Ok(())) => { - // try to close ConnResource, but don't unwrap as it might - // already be closed - let _ = state - .borrow_mut() - .resource_table - .take::<ConnResource>(conn_rid); - true - } + + // Check if conn is open/close/errored + let (conn_closed, conn_result) = match conn_resource.poll(cx) { + Poll::Pending => (false, Ok(())), + Poll::Ready(Ok(())) => (true, Ok(())), Poll::Ready(Err(e)) => { - // TODO(ry) close RequestResource associated with connection - // TODO(ry) close ResponseBodyResource associated with connection - // try to close ConnResource, but don't unwrap as it might - // already be closed - let _ = state - .borrow_mut() - .resource_table - .take::<ConnResource>(conn_rid); if should_ignore_error(&e) { - true + (true, Ok(())) } else { - return Poll::Ready(Err(e)); + (true, Err(e)) } } }; - if let Some(request_resource) = - conn_resource.deno_service.inner.borrow_mut().take() - { - let tx = request_resource.response_tx; - let req = request_resource.request; - let method = req.method().to_string(); - - // We treat cookies specially, because we don't want them to get them - // mangled by the `Headers` object in JS. What we do is take all cookie - // headers and concat them into a single cookie header, seperated by - // semicolons. - let mut total_cookie_length = 0; - let mut cookies = vec![]; - - let mut headers = Vec::with_capacity(req.headers().len()); - for (name, value) in req.headers().iter() { - if name == hyper::header::COOKIE { - let bytes = value.as_bytes(); - total_cookie_length += bytes.len(); - cookies.push(bytes); - } else { - let name: &[u8] = name.as_ref(); - let value = value.as_bytes(); - headers - .push((ByteString(name.to_owned()), ByteString(value.to_owned()))); - } - } - - if !cookies.is_empty() { - let cookie_count = cookies.len(); - total_cookie_length += (cookie_count * 2) - 2; - let mut bytes = Vec::with_capacity(total_cookie_length); - for (i, cookie) in cookies.into_iter().enumerate() { - bytes.extend(cookie); - if i != cookie_count - 1 { - bytes.extend("; ".as_bytes()); - } - } - headers.push(( - ByteString("cookie".as_bytes().to_owned()), - ByteString(bytes), - )); - } + // Drop conn resource if closed + if conn_closed { + // TODO(ry) close RequestResource associated with connection + // TODO(ry) close ResponseBodyResource associated with connection + // try to close ConnResource, but don't unwrap as it might + // already be closed + let _ = state + .borrow_mut() + .resource_table + .take::<ConnResource>(conn_rid); - let url = { - let scheme = &conn_resource.hyper_connection.scheme; - let host: Cow<str> = if let Some(host) = req.uri().host() { - Cow::Borrowed(host) - } else if let Some(host) = req.headers().get("HOST") { - Cow::Borrowed(host.to_str()?) - } else { - Cow::Owned(conn_resource.hyper_connection.addr.to_string()) - }; - let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); - format!("{}://{}{}", scheme, host, path) - }; - - let is_websocket_request = req - .headers() - .get_all(hyper::header::CONNECTION) - .iter() - .any(|v| { - v.to_str() - .map(|s| s.to_lowercase().contains("upgrade")) - .unwrap_or(false) - }) - && req - .headers() - .get_all(hyper::header::UPGRADE) - .iter() - .any(|v| { - v.to_str() - .map(|s| s.to_lowercase().contains("websocket")) - .unwrap_or(false) - }); - - let has_body = if let Some(exact_size) = req.size_hint().exact() { - exact_size > 0 - } else { - true - }; - - let maybe_request_rid = if is_websocket_request || has_body { - let mut state = state.borrow_mut(); - let request_rid = state.resource_table.add(RequestResource { - conn_rid, - inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), - cancel: CancelHandle::default(), - }); - Some(request_rid) - } else { - None - }; + // Fail with err if unexpected conn error, early return None otherwise + return Poll::Ready(conn_result.map(|_| None)); + } + if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() { + let Conn { scheme, addr, .. } = conn_resource.hyper_connection; let mut state = state.borrow_mut(); - let response_sender_rid = - state.resource_table.add(ResponseSenderResource { - sender: tx, - conn_rid, - }); - - Poll::Ready(Ok(Some(NextRequestResponse( - maybe_request_rid, - response_sender_rid, - method, - headers, - url, - )))) - } else if connection_closed { - Poll::Ready(Ok(None)) + let next = + prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?; + Poll::Ready(Ok(Some(next))) } else { Poll::Pending } @@ -328,6 +225,126 @@ async fn op_http_request_next( .map_err(AnyError::from) } +fn prepare_next_request( + state: &mut OpState, + conn_rid: ResourceId, + request_resource: ServiceInner, + scheme: &'static str, + addr: SocketAddr, +) -> Result<NextRequestResponse, AnyError> { + let tx = request_resource.response_tx; + let req = request_resource.request; + let method = req.method().to_string(); + let headers = req_headers(&req); + let url = req_url(&req, scheme, addr)?; + + let is_websocket = is_websocket_request(&req); + let has_body = if let Some(exact_size) = req.size_hint().exact() { + exact_size > 0 + } else { + true + }; + + let maybe_request_rid = if is_websocket || has_body { + let request_rid = state.resource_table.add(RequestResource { + conn_rid, + inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), + cancel: CancelHandle::default(), + }); + Some(request_rid) + } else { + None + }; + + let response_sender_rid = state.resource_table.add(ResponseSenderResource { + sender: tx, + conn_rid, + }); + + Ok(NextRequestResponse( + maybe_request_rid, + response_sender_rid, + method, + headers, + url, + )) +} + +fn req_url( + req: &hyper::Request<hyper::Body>, + scheme: &'static str, + addr: SocketAddr, +) -> Result<String, AnyError> { + let host: Cow<str> = if let Some(host) = req.uri().host() { + Cow::Borrowed(host) + } else if let Some(host) = req.headers().get("HOST") { + Cow::Borrowed(host.to_str()?) + } else { + Cow::Owned(addr.to_string()) + }; + let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); + Ok(format!("{}://{}{}", scheme, host, path)) +} + +fn req_headers( + req: &hyper::Request<hyper::Body>, +) -> Vec<(ByteString, ByteString)> { + // We treat cookies specially, because we don't want them to get them + // mangled by the `Headers` object in JS. What we do is take all cookie + // headers and concat them into a single cookie header, seperated by + // semicolons. + let mut total_cookie_length = 0; + let mut cookies = vec![]; + + let mut headers = Vec::with_capacity(req.headers().len()); + for (name, value) in req.headers().iter() { + if name == hyper::header::COOKIE { + let bytes = value.as_bytes(); + total_cookie_length += bytes.len(); + cookies.push(bytes); + } else { + let name: &[u8] = name.as_ref(); + let value = value.as_bytes(); + headers.push((ByteString(name.to_owned()), ByteString(value.to_owned()))); + } + } + + if !cookies.is_empty() { + let cookie_count = cookies.len(); + total_cookie_length += (cookie_count * 2) - 2; + let mut bytes = Vec::with_capacity(total_cookie_length); + for (i, cookie) in cookies.into_iter().enumerate() { + bytes.extend(cookie); + if i != cookie_count - 1 { + bytes.extend("; ".as_bytes()); + } + } + headers.push(( + ByteString("cookie".as_bytes().to_owned()), + ByteString(bytes), + )); + } + + headers +} + +fn is_websocket_request(req: &hyper::Request<hyper::Body>) -> bool { + req_header_contains(req, hyper::header::CONNECTION, "upgrade") + && req_header_contains(req, hyper::header::UPGRADE, "websocket") +} + +fn req_header_contains( + req: &hyper::Request<hyper::Body>, + key: impl hyper::header::AsHeaderName, + value: &str, +) -> bool { + req.headers().get_all(key).iter().any(|v| { + v.to_str() + .map(|s| s.to_lowercase().contains(value)) + .unwrap_or(false) + }) +} + fn should_ignore_error(e: &AnyError) -> bool { if let Some(e) = e.downcast_ref::<hyper::Error>() { use std::error::Error; |