diff options
Diffstat (limited to 'cli/http_util.rs')
-rw-r--r-- | cli/http_util.rs | 240 |
1 files changed, 175 insertions, 65 deletions
diff --git a/cli/http_util.rs b/cli/http_util.rs index 18c0687bd..a8646c188 100644 --- a/cli/http_util.rs +++ b/cli/http_util.rs @@ -12,18 +12,22 @@ use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::futures::StreamExt; use deno_core::parking_lot::Mutex; +use deno_core::serde; +use deno_core::serde_json; use deno_core::url::Url; +use deno_runtime::deno_fetch; use deno_runtime::deno_fetch::create_http_client; -use deno_runtime::deno_fetch::reqwest; -use deno_runtime::deno_fetch::reqwest::header::HeaderName; -use deno_runtime::deno_fetch::reqwest::header::HeaderValue; -use deno_runtime::deno_fetch::reqwest::header::ACCEPT; -use deno_runtime::deno_fetch::reqwest::header::AUTHORIZATION; -use deno_runtime::deno_fetch::reqwest::header::IF_NONE_MATCH; -use deno_runtime::deno_fetch::reqwest::header::LOCATION; -use deno_runtime::deno_fetch::reqwest::StatusCode; use deno_runtime::deno_fetch::CreateHttpClientOptions; use deno_runtime::deno_tls::RootCertStoreProvider; +use http::header::HeaderName; +use http::header::HeaderValue; +use http::header::ACCEPT; +use http::header::AUTHORIZATION; +use http::header::IF_NONE_MATCH; +use http::header::LOCATION; +use http::StatusCode; +use http_body_util::BodyExt; + use std::collections::HashMap; use std::sync::Arc; use std::thread::ThreadId; @@ -208,8 +212,7 @@ pub struct HttpClientProvider { // it's not safe to share a reqwest::Client across tokio runtimes, // so we store these Clients keyed by thread id // https://github.com/seanmonstar/reqwest/issues/1148#issuecomment-910868788 - #[allow(clippy::disallowed_types)] // reqwest::Client allowed here - clients_by_thread_id: Mutex<HashMap<ThreadId, reqwest::Client>>, + clients_by_thread_id: Mutex<HashMap<ThreadId, deno_fetch::Client>>, } impl std::fmt::Debug for HttpClientProvider { @@ -270,9 +273,15 @@ pub struct BadResponseError { #[derive(Debug, Error)] pub enum DownloadError { #[error(transparent)] - Reqwest(#[from] reqwest::Error), + Fetch(AnyError), + #[error(transparent)] + UrlParse(#[from] deno_core::url::ParseError), + #[error(transparent)] + HttpParse(#[from] http::Error), + #[error(transparent)] + Json(#[from] serde_json::Error), #[error(transparent)] - ToStr(#[from] reqwest::header::ToStrError), + ToStr(#[from] http::header::ToStrError), #[error("Redirection from '{}' did not provide location header", .request_url)] NoRedirectHeader { request_url: Url }, #[error("Too many redirects.")] @@ -283,8 +292,7 @@ pub enum DownloadError { #[derive(Debug)] pub struct HttpClient { - #[allow(clippy::disallowed_types)] // reqwest::Client allowed here - client: reqwest::Client, + client: deno_fetch::Client, // don't allow sending this across threads because then // it might be shared accidentally across tokio runtimes // which will cause issues @@ -295,22 +303,56 @@ pub struct HttpClient { impl HttpClient { // DO NOT make this public. You should always be creating one of these from // the HttpClientProvider - #[allow(clippy::disallowed_types)] // reqwest::Client allowed here - fn new(client: reqwest::Client) -> Self { + fn new(client: deno_fetch::Client) -> Self { Self { client, _unsend_marker: deno_core::unsync::UnsendMarker::default(), } } - // todo(dsherret): don't expose `reqwest::RequestBuilder` because it - // is `Sync` and could accidentally be shared with multiple tokio runtimes - pub fn get(&self, url: impl reqwest::IntoUrl) -> reqwest::RequestBuilder { - self.client.get(url) + pub fn get(&self, url: Url) -> Result<RequestBuilder, http::Error> { + let body = http_body_util::Empty::new() + .map_err(|never| match never {}) + .boxed(); + let mut req = http::Request::new(body); + *req.uri_mut() = url.as_str().parse()?; + Ok(RequestBuilder { + client: self.client.clone(), + req, + }) + } + + pub fn post( + &self, + url: Url, + body: deno_fetch::ReqBody, + ) -> Result<RequestBuilder, http::Error> { + let mut req = http::Request::new(body); + *req.method_mut() = http::Method::POST; + *req.uri_mut() = url.as_str().parse()?; + Ok(RequestBuilder { + client: self.client.clone(), + req, + }) } - pub fn post(&self, url: impl reqwest::IntoUrl) -> reqwest::RequestBuilder { - self.client.post(url) + pub fn post_json<S>( + &self, + url: Url, + ser: &S, + ) -> Result<RequestBuilder, DownloadError> + where + S: serde::Serialize, + { + let json = deno_core::serde_json::to_vec(ser)?; + let body = http_body_util::Full::new(json.into()) + .map_err(|never| match never {}) + .boxed(); + let builder = self.post(url, body)?; + Ok(builder.header( + http::header::CONTENT_TYPE, + "application/json".parse().map_err(http::Error::from)?, + )) } /// Asynchronously fetches the given HTTP URL one pass only. @@ -322,27 +364,35 @@ impl HttpClient { &self, args: FetchOnceArgs<'a>, ) -> Result<FetchOnceResult, AnyError> { - let mut request = self.client.get(args.url.clone()); + let body = http_body_util::Empty::new() + .map_err(|never| match never {}) + .boxed(); + let mut request = http::Request::new(body); + *request.uri_mut() = args.url.as_str().parse()?; if let Some(etag) = args.maybe_etag { let if_none_match_val = HeaderValue::from_str(&etag)?; - request = request.header(IF_NONE_MATCH, if_none_match_val); + request + .headers_mut() + .insert(IF_NONE_MATCH, if_none_match_val); } if let Some(auth_token) = args.maybe_auth_token { let authorization_val = HeaderValue::from_str(&auth_token.to_string())?; - request = request.header(AUTHORIZATION, authorization_val); + request + .headers_mut() + .insert(AUTHORIZATION, authorization_val); } if let Some(accept) = args.maybe_accept { let accepts_val = HeaderValue::from_str(&accept)?; - request = request.header(ACCEPT, accepts_val); + request.headers_mut().insert(ACCEPT, accepts_val); } - let response = match request.send().await { + let response = match self.client.clone().send(request).await { Ok(resp) => resp, Err(err) => { - if err.is_connect() || err.is_timeout() { + if is_error_connect(&err) { return Ok(FetchOnceResult::RequestError(err.to_string())); } - return Err(err.into()); + return Err(err); } }; @@ -406,18 +456,12 @@ impl HttpClient { Ok(FetchOnceResult::Code(body, result_headers)) } - pub async fn download_text( - &self, - url: impl reqwest::IntoUrl, - ) -> Result<String, AnyError> { + pub async fn download_text(&self, url: Url) -> Result<String, AnyError> { let bytes = self.download(url).await?; Ok(String::from_utf8(bytes)?) } - pub async fn download( - &self, - url: impl reqwest::IntoUrl, - ) -> Result<Vec<u8>, AnyError> { + pub async fn download(&self, url: Url) -> Result<Vec<u8>, AnyError> { let maybe_bytes = self.download_inner(url, None, None).await?; match maybe_bytes { Some(bytes) => Ok(bytes), @@ -427,7 +471,7 @@ impl HttpClient { pub async fn download_with_progress( &self, - url: impl reqwest::IntoUrl, + url: Url, maybe_header: Option<(HeaderName, HeaderValue)>, progress_guard: &UpdateGuard, ) -> Result<Option<Vec<u8>>, DownloadError> { @@ -438,26 +482,26 @@ impl HttpClient { pub async fn get_redirected_url( &self, - url: impl reqwest::IntoUrl, + url: Url, maybe_header: Option<(HeaderName, HeaderValue)>, ) -> Result<Url, AnyError> { - let response = self.get_redirected_response(url, maybe_header).await?; - Ok(response.url().clone()) + let (_, url) = self.get_redirected_response(url, maybe_header).await?; + Ok(url) } async fn download_inner( &self, - url: impl reqwest::IntoUrl, + url: Url, maybe_header: Option<(HeaderName, HeaderValue)>, progress_guard: Option<&UpdateGuard>, ) -> Result<Option<Vec<u8>>, DownloadError> { - let response = self.get_redirected_response(url, maybe_header).await?; + let (response, _) = self.get_redirected_response(url, maybe_header).await?; if response.status() == 404 { return Ok(None); } else if !response.status().is_success() { let status = response.status(); - let maybe_response_text = response.text().await.ok(); + let maybe_response_text = body_to_string(response).await.ok(); return Err(DownloadError::BadResponse(BadResponseError { status_code: status, response_text: maybe_response_text @@ -469,60 +513,77 @@ impl HttpClient { get_response_body_with_progress(response, progress_guard) .await .map(Some) - .map_err(Into::into) + .map_err(DownloadError::Fetch) } async fn get_redirected_response( &self, - url: impl reqwest::IntoUrl, + mut url: Url, mut maybe_header: Option<(HeaderName, HeaderValue)>, - ) -> Result<reqwest::Response, DownloadError> { - let mut url = url.into_url()?; - let mut builder = self.get(url.clone()); + ) -> Result<(http::Response<deno_fetch::ResBody>, Url), DownloadError> { + let mut req = self.get(url.clone())?.build(); if let Some((header_name, header_value)) = maybe_header.as_ref() { - builder = builder.header(header_name, header_value); + req.headers_mut().append(header_name, header_value.clone()); } - let mut response = builder.send().await?; + let mut response = self + .client + .clone() + .send(req) + .await + .map_err(DownloadError::Fetch)?; let status = response.status(); if status.is_redirection() { for _ in 0..5 { let new_url = resolve_redirect_from_response(&url, &response)?; - let mut builder = self.get(new_url.clone()); + let mut req = self.get(new_url.clone())?.build(); if new_url.origin() == url.origin() { if let Some((header_name, header_value)) = maybe_header.as_ref() { - builder = builder.header(header_name, header_value); + req.headers_mut().append(header_name, header_value.clone()); } } else { maybe_header = None; } - let new_response = builder.send().await?; + let new_response = self + .client + .clone() + .send(req) + .await + .map_err(DownloadError::Fetch)?; let status = new_response.status(); if status.is_redirection() { response = new_response; url = new_url; } else { - return Ok(new_response); + return Ok((new_response, new_url)); } } Err(DownloadError::TooManyRedirects) } else { - Ok(response) + Ok((response, url)) } } } +fn is_error_connect(err: &AnyError) -> bool { + err + .downcast_ref::<hyper_util::client::legacy::Error>() + .map(|err| err.is_connect()) + .unwrap_or(false) +} + async fn get_response_body_with_progress( - response: reqwest::Response, + response: http::Response<deno_fetch::ResBody>, progress_guard: Option<&UpdateGuard>, -) -> Result<Vec<u8>, reqwest::Error> { +) -> Result<Vec<u8>, AnyError> { + use http_body::Body as _; if let Some(progress_guard) = progress_guard { - if let Some(total_size) = response.content_length() { + if let Some(total_size) = response.body().size_hint().exact() { progress_guard.set_total_size(total_size); let mut current_size = 0; let mut data = Vec::with_capacity(total_size as usize); - let mut stream = response.bytes_stream(); + let mut stream = response.into_body().into_data_stream(); while let Some(item) = stream.next().await { let bytes = item?; current_size += bytes.len() as u64; @@ -532,7 +593,7 @@ async fn get_response_body_with_progress( return Ok(data); } } - let bytes = response.bytes().await?; + let bytes = response.collect().await?.to_bytes(); Ok(bytes.into()) } @@ -563,9 +624,9 @@ fn resolve_url_from_location(base_url: &Url, location: &str) -> Url { } } -fn resolve_redirect_from_response( +fn resolve_redirect_from_response<B>( request_url: &Url, - response: &reqwest::Response, + response: &http::Response<B>, ) -> Result<Url, DownloadError> { debug_assert!(response.status().is_redirection()); if let Some(location) = response.headers().get(LOCATION) { @@ -580,6 +641,49 @@ fn resolve_redirect_from_response( } } +pub async fn body_to_string<B>(body: B) -> Result<String, AnyError> +where + B: http_body::Body, + AnyError: From<B::Error>, +{ + let bytes = body.collect().await?.to_bytes(); + let s = std::str::from_utf8(&bytes)?; + Ok(s.into()) +} + +pub async fn body_to_json<B, D>(body: B) -> Result<D, AnyError> +where + B: http_body::Body, + AnyError: From<B::Error>, + D: serde::de::DeserializeOwned, +{ + let bytes = body.collect().await?.to_bytes(); + let val = deno_core::serde_json::from_slice(&bytes)?; + Ok(val) +} + +pub struct RequestBuilder { + client: deno_fetch::Client, + req: http::Request<deno_fetch::ReqBody>, +} + +impl RequestBuilder { + pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self { + self.req.headers_mut().append(name, value); + self + } + + pub async fn send( + self, + ) -> Result<http::Response<deno_fetch::ResBody>, AnyError> { + self.client.send(self.req).await + } + + pub fn build(self) -> http::Request<deno_fetch::ReqBody> { + self.req + } +} + #[allow(clippy::print_stdout)] #[allow(clippy::print_stderr)] #[cfg(test)] @@ -600,14 +704,20 @@ mod test { // make a request to the redirect server let text = client - .download_text("http://localhost:4546/subdir/redirects/redirect1.js") + .download_text( + Url::parse("http://localhost:4546/subdir/redirects/redirect1.js") + .unwrap(), + ) .await .unwrap(); assert_eq!(text, "export const redirect = 1;\n"); // now make one to the infinite redirects server let err = client - .download_text("http://localhost:4549/subdir/redirects/redirect1.js") + .download_text( + Url::parse("http://localhost:4549/subdir/redirects/redirect1.js") + .unwrap(), + ) .await .err() .unwrap(); |