summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron O'Mullan <aaron.omullan@gmail.com>2021-08-14 13:25:05 +0200
committerGitHub <noreply@github.com>2021-08-14 13:25:05 +0200
commit8fa46a7b4444cfbdaefe0bd0aa882601f338cef9 (patch)
tree1ad0d644964de73d2cc88c89be4066b9d3e6798d
parentf90231924d96130ec80b31e3589253a15e250896 (diff)
cleanup(ext/http): simplify op_http_request_next (#11691)
* cleanup(ext/http): simplify op_http_request_next Keep op_http_request_next's high-level logic simple, factor out NextRequestResponse building to prepare_next_request() for improved readability & maintainability * cleanup(ext/http): break prepare_next_request() into meaningful sub-funcs
-rw-r--r--ext/http/lib.rs273
1 files changed, 145 insertions, 128 deletions
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index 38c14409c..4e569ae22 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -182,143 +182,40 @@ async fn op_http_request_next(
poll_fn(|cx| {
conn_resource.deno_service.waker.register(cx.waker());
- let connection_closed = match conn_resource.poll(cx) {
- Poll::Pending => false,
- Poll::Ready(Ok(())) => {
- // try to close ConnResource, but don't unwrap as it might
- // already be closed
- let _ = state
- .borrow_mut()
- .resource_table
- .take::<ConnResource>(conn_rid);
- true
- }
+
+ // Check if conn is open/close/errored
+ let (conn_closed, conn_result) = match conn_resource.poll(cx) {
+ Poll::Pending => (false, Ok(())),
+ Poll::Ready(Ok(())) => (true, Ok(())),
Poll::Ready(Err(e)) => {
- // TODO(ry) close RequestResource associated with connection
- // TODO(ry) close ResponseBodyResource associated with connection
- // try to close ConnResource, but don't unwrap as it might
- // already be closed
- let _ = state
- .borrow_mut()
- .resource_table
- .take::<ConnResource>(conn_rid);
if should_ignore_error(&e) {
- true
+ (true, Ok(()))
} else {
- return Poll::Ready(Err(e));
+ (true, Err(e))
}
}
};
- if let Some(request_resource) =
- conn_resource.deno_service.inner.borrow_mut().take()
- {
- let tx = request_resource.response_tx;
- let req = request_resource.request;
- let method = req.method().to_string();
-
- // We treat cookies specially, because we don't want them to get them
- // mangled by the `Headers` object in JS. What we do is take all cookie
- // headers and concat them into a single cookie header, seperated by
- // semicolons.
- let mut total_cookie_length = 0;
- let mut cookies = vec![];
-
- let mut headers = Vec::with_capacity(req.headers().len());
- for (name, value) in req.headers().iter() {
- if name == hyper::header::COOKIE {
- let bytes = value.as_bytes();
- total_cookie_length += bytes.len();
- cookies.push(bytes);
- } else {
- let name: &[u8] = name.as_ref();
- let value = value.as_bytes();
- headers
- .push((ByteString(name.to_owned()), ByteString(value.to_owned())));
- }
- }
-
- if !cookies.is_empty() {
- let cookie_count = cookies.len();
- total_cookie_length += (cookie_count * 2) - 2;
- let mut bytes = Vec::with_capacity(total_cookie_length);
- for (i, cookie) in cookies.into_iter().enumerate() {
- bytes.extend(cookie);
- if i != cookie_count - 1 {
- bytes.extend("; ".as_bytes());
- }
- }
- headers.push((
- ByteString("cookie".as_bytes().to_owned()),
- ByteString(bytes),
- ));
- }
+ // Drop conn resource if closed
+ if conn_closed {
+ // TODO(ry) close RequestResource associated with connection
+ // TODO(ry) close ResponseBodyResource associated with connection
+ // try to close ConnResource, but don't unwrap as it might
+ // already be closed
+ let _ = state
+ .borrow_mut()
+ .resource_table
+ .take::<ConnResource>(conn_rid);
- let url = {
- let scheme = &conn_resource.hyper_connection.scheme;
- let host: Cow<str> = if let Some(host) = req.uri().host() {
- Cow::Borrowed(host)
- } else if let Some(host) = req.headers().get("HOST") {
- Cow::Borrowed(host.to_str()?)
- } else {
- Cow::Owned(conn_resource.hyper_connection.addr.to_string())
- };
- let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
- format!("{}://{}{}", scheme, host, path)
- };
-
- let is_websocket_request = req
- .headers()
- .get_all(hyper::header::CONNECTION)
- .iter()
- .any(|v| {
- v.to_str()
- .map(|s| s.to_lowercase().contains("upgrade"))
- .unwrap_or(false)
- })
- && req
- .headers()
- .get_all(hyper::header::UPGRADE)
- .iter()
- .any(|v| {
- v.to_str()
- .map(|s| s.to_lowercase().contains("websocket"))
- .unwrap_or(false)
- });
-
- let has_body = if let Some(exact_size) = req.size_hint().exact() {
- exact_size > 0
- } else {
- true
- };
-
- let maybe_request_rid = if is_websocket_request || has_body {
- let mut state = state.borrow_mut();
- let request_rid = state.resource_table.add(RequestResource {
- conn_rid,
- inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))),
- cancel: CancelHandle::default(),
- });
- Some(request_rid)
- } else {
- None
- };
+ // Fail with err if unexpected conn error, early return None otherwise
+ return Poll::Ready(conn_result.map(|_| None));
+ }
+ if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() {
+ let Conn { scheme, addr, .. } = conn_resource.hyper_connection;
let mut state = state.borrow_mut();
- let response_sender_rid =
- state.resource_table.add(ResponseSenderResource {
- sender: tx,
- conn_rid,
- });
-
- Poll::Ready(Ok(Some(NextRequestResponse(
- maybe_request_rid,
- response_sender_rid,
- method,
- headers,
- url,
- ))))
- } else if connection_closed {
- Poll::Ready(Ok(None))
+ let next =
+ prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?;
+ Poll::Ready(Ok(Some(next)))
} else {
Poll::Pending
}
@@ -328,6 +225,126 @@ async fn op_http_request_next(
.map_err(AnyError::from)
}
+fn prepare_next_request(
+ state: &mut OpState,
+ conn_rid: ResourceId,
+ request_resource: ServiceInner,
+ scheme: &'static str,
+ addr: SocketAddr,
+) -> Result<NextRequestResponse, AnyError> {
+ let tx = request_resource.response_tx;
+ let req = request_resource.request;
+ let method = req.method().to_string();
+ let headers = req_headers(&req);
+ let url = req_url(&req, scheme, addr)?;
+
+ let is_websocket = is_websocket_request(&req);
+ let has_body = if let Some(exact_size) = req.size_hint().exact() {
+ exact_size > 0
+ } else {
+ true
+ };
+
+ let maybe_request_rid = if is_websocket || has_body {
+ let request_rid = state.resource_table.add(RequestResource {
+ conn_rid,
+ inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))),
+ cancel: CancelHandle::default(),
+ });
+ Some(request_rid)
+ } else {
+ None
+ };
+
+ let response_sender_rid = state.resource_table.add(ResponseSenderResource {
+ sender: tx,
+ conn_rid,
+ });
+
+ Ok(NextRequestResponse(
+ maybe_request_rid,
+ response_sender_rid,
+ method,
+ headers,
+ url,
+ ))
+}
+
+fn req_url(
+ req: &hyper::Request<hyper::Body>,
+ scheme: &'static str,
+ addr: SocketAddr,
+) -> Result<String, AnyError> {
+ let host: Cow<str> = if let Some(host) = req.uri().host() {
+ Cow::Borrowed(host)
+ } else if let Some(host) = req.headers().get("HOST") {
+ Cow::Borrowed(host.to_str()?)
+ } else {
+ Cow::Owned(addr.to_string())
+ };
+ let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
+ Ok(format!("{}://{}{}", scheme, host, path))
+}
+
+fn req_headers(
+ req: &hyper::Request<hyper::Body>,
+) -> Vec<(ByteString, ByteString)> {
+ // We treat cookies specially, because we don't want them to get them
+ // mangled by the `Headers` object in JS. What we do is take all cookie
+ // headers and concat them into a single cookie header, seperated by
+ // semicolons.
+ let mut total_cookie_length = 0;
+ let mut cookies = vec![];
+
+ let mut headers = Vec::with_capacity(req.headers().len());
+ for (name, value) in req.headers().iter() {
+ if name == hyper::header::COOKIE {
+ let bytes = value.as_bytes();
+ total_cookie_length += bytes.len();
+ cookies.push(bytes);
+ } else {
+ let name: &[u8] = name.as_ref();
+ let value = value.as_bytes();
+ headers.push((ByteString(name.to_owned()), ByteString(value.to_owned())));
+ }
+ }
+
+ if !cookies.is_empty() {
+ let cookie_count = cookies.len();
+ total_cookie_length += (cookie_count * 2) - 2;
+ let mut bytes = Vec::with_capacity(total_cookie_length);
+ for (i, cookie) in cookies.into_iter().enumerate() {
+ bytes.extend(cookie);
+ if i != cookie_count - 1 {
+ bytes.extend("; ".as_bytes());
+ }
+ }
+ headers.push((
+ ByteString("cookie".as_bytes().to_owned()),
+ ByteString(bytes),
+ ));
+ }
+
+ headers
+}
+
+fn is_websocket_request(req: &hyper::Request<hyper::Body>) -> bool {
+ req_header_contains(req, hyper::header::CONNECTION, "upgrade")
+ && req_header_contains(req, hyper::header::UPGRADE, "websocket")
+}
+
+fn req_header_contains(
+ req: &hyper::Request<hyper::Body>,
+ key: impl hyper::header::AsHeaderName,
+ value: &str,
+) -> bool {
+ req.headers().get_all(key).iter().any(|v| {
+ v.to_str()
+ .map(|s| s.to_lowercase().contains(value))
+ .unwrap_or(false)
+ })
+}
+
fn should_ignore_error(e: &AnyError) -> bool {
if let Some(e) = e.downcast_ref::<hyper::Error>() {
use std::error::Error;