diff options
author | Bartek Iwańczuk <biwanczuk@gmail.com> | 2024-07-18 00:37:31 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-18 01:37:31 +0200 |
commit | 7b33623b1d70803b43e511a58666a73dd0b2ed67 (patch) | |
tree | 2d900f2be67caebf6a886d6e06a340b095e636cc /ext/fetch/lib.rs | |
parent | f4b9d8586215fc07c28998e5d896fefa876139b7 (diff) |
Reland "refactor(fetch): reimplement fetch with hyper instead of reqwest" (#24593)
Originally landed in
https://github.com/denoland/deno/commit/f6fd6619e708a515831f707438368d81b0c9aa56.
Reverted in https://github.com/denoland/deno/pull/24574.
This reland contains a fix that sends "Accept: */*" header for calls made
from "FileFetcher". Absence of this header made downloading source code
from JSR broken. This is tested by ensuring this header is present in the
test server that servers JSR packages.
---------
Co-authored-by: Sean McArthur <sean@seanmonstar.com>
Diffstat (limited to 'ext/fetch/lib.rs')
-rw-r--r-- | ext/fetch/lib.rs | 318 |
1 files changed, 208 insertions, 110 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 75ceb86d9..1372329c4 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -1,6 +1,7 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. mod fs_fetch_handler; +mod proxy; use std::borrow::Cow; use std::cell::RefCell; @@ -14,7 +15,7 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -use bytes::Bytes; +use deno_core::anyhow::anyhow; use deno_core::anyhow::Error; use deno_core::error::type_error; use deno_core::error::AnyError; @@ -42,34 +43,39 @@ use deno_core::ResourceId; use deno_tls::rustls::RootCertStore; use deno_tls::Proxy; use deno_tls::RootCertStoreProvider; - -use data_url::DataUrl; use deno_tls::TlsKey; use deno_tls::TlsKeys; use deno_tls::TlsKeysHolder; + +use bytes::Bytes; +use data_url::DataUrl; +use http::header::HeaderName; +use http::header::HeaderValue; +use http::header::ACCEPT; +use http::header::ACCEPT_ENCODING; use http::header::CONTENT_LENGTH; +use http::header::HOST; +use http::header::PROXY_AUTHORIZATION; +use http::header::RANGE; +use http::header::USER_AGENT; +use http::Method; use http::Uri; -use reqwest::header::HeaderMap; -use reqwest::header::HeaderName; -use reqwest::header::HeaderValue; -use reqwest::header::ACCEPT_ENCODING; -use reqwest::header::HOST; -use reqwest::header::RANGE; -use reqwest::header::USER_AGENT; -use reqwest::redirect::Policy; -use reqwest::Body; -use reqwest::Client; -use reqwest::Method; -use reqwest::RequestBuilder; -use reqwest::Response; +use http_body_util::BodyExt; +use hyper::body::Frame; +use hyper_rustls::HttpsConnector; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::rt::TokioExecutor; +use hyper_util::rt::TokioIo; +use hyper_util::rt::TokioTimer; use serde::Deserialize; use serde::Serialize; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; +use tower::ServiceExt; +use tower_http::decompression::Decompression; -// Re-export reqwest and data_url +// Re-export data_url pub use data_url; -pub use reqwest; pub use fs_fetch_handler::FsFetchHandler; @@ -78,8 +84,9 @@ pub struct Options { pub user_agent: String, pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>, pub proxy: Option<Proxy>, + #[allow(clippy::type_complexity)] pub request_builder_hook: - Option<fn(RequestBuilder) -> Result<RequestBuilder, AnyError>>, + Option<fn(&mut http::Request<ReqBody>) -> Result<(), AnyError>>, pub unsafely_ignore_certificate_errors: Option<Vec<String>>, pub client_cert_chain_and_key: TlsKeys, pub file_fetch_handler: Rc<dyn FetchHandler>, @@ -146,7 +153,7 @@ pub trait FetchHandler: dyn_clone::DynClone { fn fetch_file( &self, state: &mut OpState, - url: Url, + url: &Url, ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>); } @@ -160,7 +167,7 @@ impl FetchHandler for DefaultFileFetchHandler { fn fetch_file( &self, _state: &mut OpState, - _url: Url, + _url: &Url, ) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>) { let fut = async move { Ok(Err(type_error( @@ -183,20 +190,20 @@ pub struct FetchReturn { pub fn get_or_create_client_from_state( state: &mut OpState, -) -> Result<reqwest::Client, AnyError> { - if let Some(client) = state.try_borrow::<reqwest::Client>() { +) -> Result<Client, AnyError> { + if let Some(client) = state.try_borrow::<Client>() { Ok(client.clone()) } else { let options = state.borrow::<Options>(); let client = create_client_from_options(options)?; - state.put::<reqwest::Client>(client.clone()); + state.put::<Client>(client.clone()); Ok(client) } } pub fn create_client_from_options( options: &Options, -) -> Result<reqwest::Client, AnyError> { +) -> Result<Client, AnyError> { create_http_client( &options.user_agent, CreateHttpClientOptions { @@ -253,11 +260,11 @@ impl Stream for ResourceToBodyAdapter { } Poll::Ready(res) => match res { Ok(buf) if buf.is_empty() => Poll::Ready(None), - Ok(_) => { + Ok(buf) => { this.1 = Some(this.0.clone().read(64 * 1024)); - Poll::Ready(Some(res.map(|b| b.to_vec().into()))) + Poll::Ready(Some(Ok(buf.to_vec().into()))) } - _ => Poll::Ready(Some(res.map(|b| b.to_vec().into()))), + Err(err) => Poll::Ready(Some(Err(err))), }, } } else { @@ -266,6 +273,22 @@ impl Stream for ResourceToBodyAdapter { } } +impl hyper::body::Body for ResourceToBodyAdapter { + type Data = Bytes; + type Error = Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { + match self.poll_next(cx) { + Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + impl Drop for ResourceToBodyAdapter { fn drop(&mut self) { self.0.clone().close() @@ -347,9 +370,11 @@ where file_fetch_handler, .. } = state.borrow_mut::<Options>(); let file_fetch_handler = file_fetch_handler.clone(); - let (request, maybe_cancel_handle) = - file_fetch_handler.fetch_file(state, url); - let request_rid = state.resource_table.add(FetchRequestResource(request)); + let (future, maybe_cancel_handle) = + file_fetch_handler.fetch_file(state, &url); + let request_rid = state + .resource_table + .add(FetchRequestResource { future, url }); let maybe_cancel_handle_rid = maybe_cancel_handle .map(|ch| state.resource_table.add(FetchCancelHandle(ch))); @@ -359,31 +384,31 @@ where let permissions = state.borrow_mut::<FP>(); permissions.check_net_url(&url, "fetch()")?; - // Make sure that we have a valid URI early, as reqwest's `RequestBuilder::send` - // internally uses `expect_uri`, which panics instead of returning a usable `Result`. - if url.as_str().parse::<Uri>().is_err() { - return Err(type_error("Invalid URL")); - } - - let mut request = client.request(method.clone(), url); + let uri = url + .as_str() + .parse::<Uri>() + .map_err(|_| type_error("Invalid URL"))?; - if has_body { + let mut con_len = None; + let body = if has_body { match (data, resource) { (Some(data), _) => { // If a body is passed, we use it, and don't return a body for streaming. - request = request.body(data.to_vec()); + con_len = Some(data.len() as u64); + + http_body_util::Full::new(data.to_vec().into()) + .map_err(|never| match never {}) + .boxed() } (_, Some(resource)) => { let resource = state.resource_table.take_any(resource)?; match resource.size_hint() { (body_size, Some(n)) if body_size == n && body_size > 0 => { - request = - request.header(CONTENT_LENGTH, HeaderValue::from(body_size)); + con_len = Some(body_size); } _ => {} } - request = request - .body(Body::wrap_stream(ResourceToBodyAdapter::new(resource))) + ReqBody::new(ResourceToBodyAdapter::new(resource)) } (None, None) => unreachable!(), } @@ -391,11 +416,21 @@ where // POST and PUT requests should always have a 0 length content-length, // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch if matches!(method, Method::POST | Method::PUT) { - request = request.header(CONTENT_LENGTH, HeaderValue::from(0)); + con_len = Some(0); } + http_body_util::Empty::new() + .map_err(|never| match never {}) + .boxed() }; - let mut header_map = HeaderMap::new(); + let mut request = http::Request::new(body); + *request.method_mut() = method.clone(); + *request.uri_mut() = uri; + + if let Some(len) = con_len { + request.headers_mut().insert(CONTENT_LENGTH, len.into()); + } + for (key, value) in headers { let name = HeaderName::from_bytes(&key) .map_err(|err| type_error(err.to_string()))?; @@ -403,38 +438,34 @@ where .map_err(|err| type_error(err.to_string()))?; if (name != HOST || allow_host) && name != CONTENT_LENGTH { - header_map.append(name, v); + request.headers_mut().append(name, v); } } - if header_map.contains_key(RANGE) { + if request.headers().contains_key(RANGE) { // https://fetch.spec.whatwg.org/#http-network-or-cache-fetch step 18 // If httpRequest’s header list contains `Range`, then append (`Accept-Encoding`, `identity`) - header_map + request + .headers_mut() .insert(ACCEPT_ENCODING, HeaderValue::from_static("identity")); } - request = request.headers(header_map); let options = state.borrow::<Options>(); if let Some(request_builder_hook) = options.request_builder_hook { - request = request_builder_hook(request) + request_builder_hook(&mut request) .map_err(|err| type_error(err.to_string()))?; } let cancel_handle = CancelHandle::new_rc(); let cancel_handle_ = cancel_handle.clone(); - let fut = async move { - request - .send() - .or_cancel(cancel_handle_) - .await - .map(|res| res.map_err(|err| err.into())) - }; + let fut = + async move { client.send(request).or_cancel(cancel_handle_).await }; - let request_rid = state - .resource_table - .add(FetchRequestResource(Box::pin(fut))); + let request_rid = state.resource_table.add(FetchRequestResource { + future: Box::pin(fut), + url, + }); let cancel_handle_rid = state.resource_table.add(FetchCancelHandle(cancel_handle)); @@ -448,17 +479,21 @@ where let (body, _) = data_url .decode_to_vec() .map_err(|e| type_error(format!("{e:?}")))?; + let body = http_body_util::Full::new(body.into()) + .map_err(|never| match never {}) + .boxed(); let response = http::Response::builder() .status(http::StatusCode::OK) .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string()) - .body(reqwest::Body::from(body))?; + .body(body)?; - let fut = async move { Ok(Ok(Response::from(response))) }; + let fut = async move { Ok(Ok(response)) }; - let request_rid = state - .resource_table - .add(FetchRequestResource(Box::pin(fut))); + let request_rid = state.resource_table.add(FetchRequestResource { + future: Box::pin(fut), + url, + }); (request_rid, None) } @@ -505,24 +540,21 @@ pub async fn op_fetch_send( .ok() .expect("multiple op_fetch_send ongoing"); - let res = match request.0.await { + let res = match request.future.await { Ok(Ok(res)) => res, Ok(Err(err)) => { // We're going to try and rescue the error cause from a stream and return it from this fetch. - // If any error in the chain is a reqwest body error, return that as a special result we can use to + // If any error in the chain is a hyper body error, return that as a special result we can use to // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`). // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead let mut err_ref: &dyn std::error::Error = err.as_ref(); while let Some(err) = std::error::Error::source(err_ref) { - if let Some(err) = err.downcast_ref::<reqwest::Error>() { - if err.is_body() { - // Extracts the next error cause and uses that for the message - if let Some(err) = std::error::Error::source(err) { - return Ok(FetchResponse { - error: Some(err.to_string()), - ..Default::default() - }); - } + if let Some(err) = err.downcast_ref::<hyper::Error>() { + if let Some(err) = std::error::Error::source(err) { + return Ok(FetchResponse { + error: Some(err.to_string()), + ..Default::default() + }); } } err_ref = err; @@ -534,14 +566,17 @@ pub async fn op_fetch_send( }; let status = res.status(); - let url = res.url().to_string(); + let url = request.url.into(); let mut res_headers = Vec::new(); for (key, val) in res.headers().iter() { res_headers.push((key.as_str().into(), val.as_bytes().into())); } - let content_length = res.content_length(); - let remote_addr = res.remote_addr(); + let content_length = hyper::body::Body::size_hint(res.body()).exact(); + let remote_addr = res + .extensions() + .get::<hyper_util::client::legacy::connect::HttpInfo>() + .map(|info| info.remote_addr()); let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr { (Some(addr.ip().to_string()), Some(addr.port())) } else { @@ -585,7 +620,8 @@ pub async fn op_fetch_response_upgrade( let upgraded = raw_response.upgrade().await?; { // Stage 3: Pump the data - let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); + let (mut upgraded_rx, mut upgraded_tx) = + tokio::io::split(TokioIo::new(upgraded)); spawn(async move { let mut buf = [0; 1024]; @@ -673,11 +709,13 @@ impl Resource for UpgradeStream { } } -type CancelableResponseResult = Result<Result<Response, AnyError>, Canceled>; +type CancelableResponseResult = + Result<Result<http::Response<ResBody>, AnyError>, Canceled>; -pub struct FetchRequestResource( - pub Pin<Box<dyn Future<Output = CancelableResponseResult>>>, -); +pub struct FetchRequestResource { + pub future: Pin<Box<dyn Future<Output = CancelableResponseResult>>>, + pub url: Url, +} impl Resource for FetchRequestResource { fn name(&self) -> Cow<str> { @@ -701,7 +739,7 @@ type BytesStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>; pub enum FetchResponseReader { - Start(Response), + Start(http::Response<ResBody>), BodyReader(Peekable<BytesStream>), } @@ -719,7 +757,7 @@ pub struct FetchResponseResource { } impl FetchResponseResource { - pub fn new(response: Response, size: Option<u64>) -> Self { + pub fn new(response: http::Response<ResBody>, size: Option<u64>) -> Self { Self { response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)), cancel: CancelHandle::default(), @@ -727,10 +765,10 @@ impl FetchResponseResource { } } - pub async fn upgrade(self) -> Result<reqwest::Upgraded, AnyError> { + pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, AnyError> { let reader = self.response_reader.into_inner(); match reader { - FetchResponseReader::Start(resp) => Ok(resp.upgrade().await?), + FetchResponseReader::Start(resp) => Ok(hyper::upgrade::on(resp).await?), _ => unreachable!(), } } @@ -754,11 +792,12 @@ impl Resource for FetchResponseResource { match std::mem::take(&mut *reader) { FetchResponseReader::Start(resp) => { - let stream: BytesStream = Box::pin(resp.bytes_stream().map(|r| { - r.map_err(|err| { - std::io::Error::new(std::io::ErrorKind::Other, err) - }) - })); + let stream: BytesStream = + Box::pin(resp.into_body().into_data_stream().map(|r| { + r.map_err(|err| { + std::io::Error::new(std::io::ErrorKind::Other, err) + }) + })); *reader = FetchResponseReader::BodyReader(stream.peekable()); } FetchResponseReader::BodyReader(_) => unreachable!(), @@ -922,7 +961,7 @@ impl Default for CreateHttpClientOptions { } } -/// Create new instance of async reqwest::Client. This client supports +/// Create new instance of async Client. This client supports /// proxies and doesn't follow redirects. pub fn create_http_client( user_agent: &str, @@ -944,43 +983,64 @@ pub fn create_http_client( alpn_protocols.push("http/1.1".into()); } tls_config.alpn_protocols = alpn_protocols; + let tls_config = Arc::from(tls_config); + + let mut http_connector = HttpConnector::new(); + http_connector.enforce_http(false); + let connector = HttpsConnector::from((http_connector, tls_config.clone())); - let mut headers = HeaderMap::new(); - headers.insert(USER_AGENT, user_agent.parse().unwrap()); - let mut builder = Client::builder() - .redirect(Policy::none()) - .default_headers(headers) - .use_preconfigured_tls(tls_config); + let user_agent = user_agent + .parse::<HeaderValue>() + .map_err(|_| type_error("illegal characters in User-Agent"))?; + let mut builder = + hyper_util::client::legacy::Builder::new(TokioExecutor::new()); + builder.timer(TokioTimer::new()); + builder.pool_timer(TokioTimer::new()); + + let mut proxies = proxy::from_env(); if let Some(proxy) = options.proxy { - let mut reqwest_proxy = reqwest::Proxy::all(&proxy.url)?; + let mut intercept = proxy::Intercept::all(&proxy.url) + .ok_or_else(|| type_error("invalid proxy url"))?; if let Some(basic_auth) = &proxy.basic_auth { - reqwest_proxy = - reqwest_proxy.basic_auth(&basic_auth.username, &basic_auth.password); + intercept.set_auth(&basic_auth.username, &basic_auth.password); } - builder = builder.proxy(reqwest_proxy); + proxies.prepend(intercept); } + let proxies = Arc::new(proxies); + let mut connector = + proxy::ProxyConnector::new(proxies.clone(), connector, tls_config); + connector.user_agent(user_agent.clone()); if let Some(pool_max_idle_per_host) = options.pool_max_idle_per_host { - builder = builder.pool_max_idle_per_host(pool_max_idle_per_host); + builder.pool_max_idle_per_host(pool_max_idle_per_host); } if let Some(pool_idle_timeout) = options.pool_idle_timeout { - builder = builder.pool_idle_timeout( + builder.pool_idle_timeout( pool_idle_timeout.map(std::time::Duration::from_millis), ); } match (options.http1, options.http2) { - (true, false) => builder = builder.http1_only(), - (false, true) => builder = builder.http2_prior_knowledge(), + (true, false) => {} // noop, handled by ALPN above + (false, true) => { + builder.http2_only(true); + } (true, true) => {} (false, false) => { return Err(type_error("Either `http1` or `http2` needs to be true")) } } - builder.build().map_err(|e| e.into()) + let pooled_client = builder.build(connector); + let decompress = Decompression::new(pooled_client).gzip(true).br(true); + + Ok(Client { + inner: decompress, + proxies, + user_agent, + }) } #[op2] @@ -990,3 +1050,41 @@ pub fn op_utf8_to_byte_string( ) -> Result<ByteString, AnyError> { Ok(input.into()) } + +#[derive(Clone, Debug)] +pub struct Client { + inner: Decompression<hyper_util::client::legacy::Client<Connector, ReqBody>>, + // Used to check whether to include a proxy-authorization header + proxies: Arc<proxy::Proxies>, + user_agent: HeaderValue, +} + +type Connector = proxy::ProxyConnector<HttpsConnector<HttpConnector>>; + +// clippy is wrong here +#[allow(clippy::declare_interior_mutable_const)] +const STAR_STAR: HeaderValue = HeaderValue::from_static("*/*"); + +impl Client { + pub async fn send( + self, + mut req: http::Request<ReqBody>, + ) -> Result<http::Response<ResBody>, AnyError> { + req + .headers_mut() + .entry(USER_AGENT) + .or_insert_with(|| self.user_agent.clone()); + + req.headers_mut().entry(ACCEPT).or_insert(STAR_STAR); + + if let Some(auth) = self.proxies.http_forward_auth(req.uri()) { + req.headers_mut().insert(PROXY_AUTHORIZATION, auth.clone()); + } + + let resp = self.inner.oneshot(req).await?; + Ok(resp.map(|b| b.map_err(|e| anyhow!(e)).boxed())) + } +} + +pub type ReqBody = http_body_util::combinators::BoxBody<Bytes, Error>; +pub type ResBody = http_body_util::combinators::BoxBody<Bytes, Error>; |