summaryrefslogtreecommitdiff
path: root/cli/http_util.rs
diff options
context:
space:
mode:
authorBartek IwaƄczuk <biwanczuk@gmail.com>2024-07-18 00:37:31 +0100
committerGitHub <noreply@github.com>2024-07-18 01:37:31 +0200
commit7b33623b1d70803b43e511a58666a73dd0b2ed67 (patch)
tree2d900f2be67caebf6a886d6e06a340b095e636cc /cli/http_util.rs
parentf4b9d8586215fc07c28998e5d896fefa876139b7 (diff)
Reland "refactor(fetch): reimplement fetch with hyper instead of reqwest" (#24593)
Originally landed in https://github.com/denoland/deno/commit/f6fd6619e708a515831f707438368d81b0c9aa56. Reverted in https://github.com/denoland/deno/pull/24574. This reland contains a fix that sends "Accept: */*" header for calls made from "FileFetcher". Absence of this header made downloading source code from JSR broken. This is tested by ensuring this header is present in the test server that servers JSR packages. --------- Co-authored-by: Sean McArthur <sean@seanmonstar.com>
Diffstat (limited to 'cli/http_util.rs')
-rw-r--r--cli/http_util.rs240
1 files changed, 175 insertions, 65 deletions
diff --git a/cli/http_util.rs b/cli/http_util.rs
index 18c0687bd..a8646c188 100644
--- a/cli/http_util.rs
+++ b/cli/http_util.rs
@@ -12,18 +12,22 @@ use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::futures::StreamExt;
use deno_core::parking_lot::Mutex;
+use deno_core::serde;
+use deno_core::serde_json;
use deno_core::url::Url;
+use deno_runtime::deno_fetch;
use deno_runtime::deno_fetch::create_http_client;
-use deno_runtime::deno_fetch::reqwest;
-use deno_runtime::deno_fetch::reqwest::header::HeaderName;
-use deno_runtime::deno_fetch::reqwest::header::HeaderValue;
-use deno_runtime::deno_fetch::reqwest::header::ACCEPT;
-use deno_runtime::deno_fetch::reqwest::header::AUTHORIZATION;
-use deno_runtime::deno_fetch::reqwest::header::IF_NONE_MATCH;
-use deno_runtime::deno_fetch::reqwest::header::LOCATION;
-use deno_runtime::deno_fetch::reqwest::StatusCode;
use deno_runtime::deno_fetch::CreateHttpClientOptions;
use deno_runtime::deno_tls::RootCertStoreProvider;
+use http::header::HeaderName;
+use http::header::HeaderValue;
+use http::header::ACCEPT;
+use http::header::AUTHORIZATION;
+use http::header::IF_NONE_MATCH;
+use http::header::LOCATION;
+use http::StatusCode;
+use http_body_util::BodyExt;
+
use std::collections::HashMap;
use std::sync::Arc;
use std::thread::ThreadId;
@@ -208,8 +212,7 @@ pub struct HttpClientProvider {
// it's not safe to share a reqwest::Client across tokio runtimes,
// so we store these Clients keyed by thread id
// https://github.com/seanmonstar/reqwest/issues/1148#issuecomment-910868788
- #[allow(clippy::disallowed_types)] // reqwest::Client allowed here
- clients_by_thread_id: Mutex<HashMap<ThreadId, reqwest::Client>>,
+ clients_by_thread_id: Mutex<HashMap<ThreadId, deno_fetch::Client>>,
}
impl std::fmt::Debug for HttpClientProvider {
@@ -270,9 +273,15 @@ pub struct BadResponseError {
#[derive(Debug, Error)]
pub enum DownloadError {
#[error(transparent)]
- Reqwest(#[from] reqwest::Error),
+ Fetch(AnyError),
+ #[error(transparent)]
+ UrlParse(#[from] deno_core::url::ParseError),
+ #[error(transparent)]
+ HttpParse(#[from] http::Error),
+ #[error(transparent)]
+ Json(#[from] serde_json::Error),
#[error(transparent)]
- ToStr(#[from] reqwest::header::ToStrError),
+ ToStr(#[from] http::header::ToStrError),
#[error("Redirection from '{}' did not provide location header", .request_url)]
NoRedirectHeader { request_url: Url },
#[error("Too many redirects.")]
@@ -283,8 +292,7 @@ pub enum DownloadError {
#[derive(Debug)]
pub struct HttpClient {
- #[allow(clippy::disallowed_types)] // reqwest::Client allowed here
- client: reqwest::Client,
+ client: deno_fetch::Client,
// don't allow sending this across threads because then
// it might be shared accidentally across tokio runtimes
// which will cause issues
@@ -295,22 +303,56 @@ pub struct HttpClient {
impl HttpClient {
// DO NOT make this public. You should always be creating one of these from
// the HttpClientProvider
- #[allow(clippy::disallowed_types)] // reqwest::Client allowed here
- fn new(client: reqwest::Client) -> Self {
+ fn new(client: deno_fetch::Client) -> Self {
Self {
client,
_unsend_marker: deno_core::unsync::UnsendMarker::default(),
}
}
- // todo(dsherret): don't expose `reqwest::RequestBuilder` because it
- // is `Sync` and could accidentally be shared with multiple tokio runtimes
- pub fn get(&self, url: impl reqwest::IntoUrl) -> reqwest::RequestBuilder {
- self.client.get(url)
+ pub fn get(&self, url: Url) -> Result<RequestBuilder, http::Error> {
+ let body = http_body_util::Empty::new()
+ .map_err(|never| match never {})
+ .boxed();
+ let mut req = http::Request::new(body);
+ *req.uri_mut() = url.as_str().parse()?;
+ Ok(RequestBuilder {
+ client: self.client.clone(),
+ req,
+ })
+ }
+
+ pub fn post(
+ &self,
+ url: Url,
+ body: deno_fetch::ReqBody,
+ ) -> Result<RequestBuilder, http::Error> {
+ let mut req = http::Request::new(body);
+ *req.method_mut() = http::Method::POST;
+ *req.uri_mut() = url.as_str().parse()?;
+ Ok(RequestBuilder {
+ client: self.client.clone(),
+ req,
+ })
}
- pub fn post(&self, url: impl reqwest::IntoUrl) -> reqwest::RequestBuilder {
- self.client.post(url)
+ pub fn post_json<S>(
+ &self,
+ url: Url,
+ ser: &S,
+ ) -> Result<RequestBuilder, DownloadError>
+ where
+ S: serde::Serialize,
+ {
+ let json = deno_core::serde_json::to_vec(ser)?;
+ let body = http_body_util::Full::new(json.into())
+ .map_err(|never| match never {})
+ .boxed();
+ let builder = self.post(url, body)?;
+ Ok(builder.header(
+ http::header::CONTENT_TYPE,
+ "application/json".parse().map_err(http::Error::from)?,
+ ))
}
/// Asynchronously fetches the given HTTP URL one pass only.
@@ -322,27 +364,35 @@ impl HttpClient {
&self,
args: FetchOnceArgs<'a>,
) -> Result<FetchOnceResult, AnyError> {
- let mut request = self.client.get(args.url.clone());
+ let body = http_body_util::Empty::new()
+ .map_err(|never| match never {})
+ .boxed();
+ let mut request = http::Request::new(body);
+ *request.uri_mut() = args.url.as_str().parse()?;
if let Some(etag) = args.maybe_etag {
let if_none_match_val = HeaderValue::from_str(&etag)?;
- request = request.header(IF_NONE_MATCH, if_none_match_val);
+ request
+ .headers_mut()
+ .insert(IF_NONE_MATCH, if_none_match_val);
}
if let Some(auth_token) = args.maybe_auth_token {
let authorization_val = HeaderValue::from_str(&auth_token.to_string())?;
- request = request.header(AUTHORIZATION, authorization_val);
+ request
+ .headers_mut()
+ .insert(AUTHORIZATION, authorization_val);
}
if let Some(accept) = args.maybe_accept {
let accepts_val = HeaderValue::from_str(&accept)?;
- request = request.header(ACCEPT, accepts_val);
+ request.headers_mut().insert(ACCEPT, accepts_val);
}
- let response = match request.send().await {
+ let response = match self.client.clone().send(request).await {
Ok(resp) => resp,
Err(err) => {
- if err.is_connect() || err.is_timeout() {
+ if is_error_connect(&err) {
return Ok(FetchOnceResult::RequestError(err.to_string()));
}
- return Err(err.into());
+ return Err(err);
}
};
@@ -406,18 +456,12 @@ impl HttpClient {
Ok(FetchOnceResult::Code(body, result_headers))
}
- pub async fn download_text(
- &self,
- url: impl reqwest::IntoUrl,
- ) -> Result<String, AnyError> {
+ pub async fn download_text(&self, url: Url) -> Result<String, AnyError> {
let bytes = self.download(url).await?;
Ok(String::from_utf8(bytes)?)
}
- pub async fn download(
- &self,
- url: impl reqwest::IntoUrl,
- ) -> Result<Vec<u8>, AnyError> {
+ pub async fn download(&self, url: Url) -> Result<Vec<u8>, AnyError> {
let maybe_bytes = self.download_inner(url, None, None).await?;
match maybe_bytes {
Some(bytes) => Ok(bytes),
@@ -427,7 +471,7 @@ impl HttpClient {
pub async fn download_with_progress(
&self,
- url: impl reqwest::IntoUrl,
+ url: Url,
maybe_header: Option<(HeaderName, HeaderValue)>,
progress_guard: &UpdateGuard,
) -> Result<Option<Vec<u8>>, DownloadError> {
@@ -438,26 +482,26 @@ impl HttpClient {
pub async fn get_redirected_url(
&self,
- url: impl reqwest::IntoUrl,
+ url: Url,
maybe_header: Option<(HeaderName, HeaderValue)>,
) -> Result<Url, AnyError> {
- let response = self.get_redirected_response(url, maybe_header).await?;
- Ok(response.url().clone())
+ let (_, url) = self.get_redirected_response(url, maybe_header).await?;
+ Ok(url)
}
async fn download_inner(
&self,
- url: impl reqwest::IntoUrl,
+ url: Url,
maybe_header: Option<(HeaderName, HeaderValue)>,
progress_guard: Option<&UpdateGuard>,
) -> Result<Option<Vec<u8>>, DownloadError> {
- let response = self.get_redirected_response(url, maybe_header).await?;
+ let (response, _) = self.get_redirected_response(url, maybe_header).await?;
if response.status() == 404 {
return Ok(None);
} else if !response.status().is_success() {
let status = response.status();
- let maybe_response_text = response.text().await.ok();
+ let maybe_response_text = body_to_string(response).await.ok();
return Err(DownloadError::BadResponse(BadResponseError {
status_code: status,
response_text: maybe_response_text
@@ -469,60 +513,77 @@ impl HttpClient {
get_response_body_with_progress(response, progress_guard)
.await
.map(Some)
- .map_err(Into::into)
+ .map_err(DownloadError::Fetch)
}
async fn get_redirected_response(
&self,
- url: impl reqwest::IntoUrl,
+ mut url: Url,
mut maybe_header: Option<(HeaderName, HeaderValue)>,
- ) -> Result<reqwest::Response, DownloadError> {
- let mut url = url.into_url()?;
- let mut builder = self.get(url.clone());
+ ) -> Result<(http::Response<deno_fetch::ResBody>, Url), DownloadError> {
+ let mut req = self.get(url.clone())?.build();
if let Some((header_name, header_value)) = maybe_header.as_ref() {
- builder = builder.header(header_name, header_value);
+ req.headers_mut().append(header_name, header_value.clone());
}
- let mut response = builder.send().await?;
+ let mut response = self
+ .client
+ .clone()
+ .send(req)
+ .await
+ .map_err(DownloadError::Fetch)?;
let status = response.status();
if status.is_redirection() {
for _ in 0..5 {
let new_url = resolve_redirect_from_response(&url, &response)?;
- let mut builder = self.get(new_url.clone());
+ let mut req = self.get(new_url.clone())?.build();
if new_url.origin() == url.origin() {
if let Some((header_name, header_value)) = maybe_header.as_ref() {
- builder = builder.header(header_name, header_value);
+ req.headers_mut().append(header_name, header_value.clone());
}
} else {
maybe_header = None;
}
- let new_response = builder.send().await?;
+ let new_response = self
+ .client
+ .clone()
+ .send(req)
+ .await
+ .map_err(DownloadError::Fetch)?;
let status = new_response.status();
if status.is_redirection() {
response = new_response;
url = new_url;
} else {
- return Ok(new_response);
+ return Ok((new_response, new_url));
}
}
Err(DownloadError::TooManyRedirects)
} else {
- Ok(response)
+ Ok((response, url))
}
}
}
+fn is_error_connect(err: &AnyError) -> bool {
+ err
+ .downcast_ref::<hyper_util::client::legacy::Error>()
+ .map(|err| err.is_connect())
+ .unwrap_or(false)
+}
+
async fn get_response_body_with_progress(
- response: reqwest::Response,
+ response: http::Response<deno_fetch::ResBody>,
progress_guard: Option<&UpdateGuard>,
-) -> Result<Vec<u8>, reqwest::Error> {
+) -> Result<Vec<u8>, AnyError> {
+ use http_body::Body as _;
if let Some(progress_guard) = progress_guard {
- if let Some(total_size) = response.content_length() {
+ if let Some(total_size) = response.body().size_hint().exact() {
progress_guard.set_total_size(total_size);
let mut current_size = 0;
let mut data = Vec::with_capacity(total_size as usize);
- let mut stream = response.bytes_stream();
+ let mut stream = response.into_body().into_data_stream();
while let Some(item) = stream.next().await {
let bytes = item?;
current_size += bytes.len() as u64;
@@ -532,7 +593,7 @@ async fn get_response_body_with_progress(
return Ok(data);
}
}
- let bytes = response.bytes().await?;
+ let bytes = response.collect().await?.to_bytes();
Ok(bytes.into())
}
@@ -563,9 +624,9 @@ fn resolve_url_from_location(base_url: &Url, location: &str) -> Url {
}
}
-fn resolve_redirect_from_response(
+fn resolve_redirect_from_response<B>(
request_url: &Url,
- response: &reqwest::Response,
+ response: &http::Response<B>,
) -> Result<Url, DownloadError> {
debug_assert!(response.status().is_redirection());
if let Some(location) = response.headers().get(LOCATION) {
@@ -580,6 +641,49 @@ fn resolve_redirect_from_response(
}
}
+pub async fn body_to_string<B>(body: B) -> Result<String, AnyError>
+where
+ B: http_body::Body,
+ AnyError: From<B::Error>,
+{
+ let bytes = body.collect().await?.to_bytes();
+ let s = std::str::from_utf8(&bytes)?;
+ Ok(s.into())
+}
+
+pub async fn body_to_json<B, D>(body: B) -> Result<D, AnyError>
+where
+ B: http_body::Body,
+ AnyError: From<B::Error>,
+ D: serde::de::DeserializeOwned,
+{
+ let bytes = body.collect().await?.to_bytes();
+ let val = deno_core::serde_json::from_slice(&bytes)?;
+ Ok(val)
+}
+
+pub struct RequestBuilder {
+ client: deno_fetch::Client,
+ req: http::Request<deno_fetch::ReqBody>,
+}
+
+impl RequestBuilder {
+ pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self {
+ self.req.headers_mut().append(name, value);
+ self
+ }
+
+ pub async fn send(
+ self,
+ ) -> Result<http::Response<deno_fetch::ResBody>, AnyError> {
+ self.client.send(self.req).await
+ }
+
+ pub fn build(self) -> http::Request<deno_fetch::ReqBody> {
+ self.req
+ }
+}
+
#[allow(clippy::print_stdout)]
#[allow(clippy::print_stderr)]
#[cfg(test)]
@@ -600,14 +704,20 @@ mod test {
// make a request to the redirect server
let text = client
- .download_text("http://localhost:4546/subdir/redirects/redirect1.js")
+ .download_text(
+ Url::parse("http://localhost:4546/subdir/redirects/redirect1.js")
+ .unwrap(),
+ )
.await
.unwrap();
assert_eq!(text, "export const redirect = 1;\n");
// now make one to the infinite redirects server
let err = client
- .download_text("http://localhost:4549/subdir/redirects/redirect1.js")
+ .download_text(
+ Url::parse("http://localhost:4549/subdir/redirects/redirect1.js")
+ .unwrap(),
+ )
.await
.err()
.unwrap();