diff options
author | David Sherret <dsherret@users.noreply.github.com> | 2024-05-05 10:07:21 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-05 10:07:21 -0400 |
commit | d527b635753566e7d01391d675bf010c4856eff9 (patch) | |
tree | ad424ff7554d782402528a0701995bfdbca3aff2 | |
parent | 4b0f22eee7d9217cb19af2730818dd70280b7c6b (diff) |
fix(workers): `importScripts` concurrently and use a new `reqwest::Client` per importScripts (#23699)
1. We were polling each future in sequence, so this meant it was
fetching scripts in sequence.
2. It's not safe to share `reqwest::Client` across tokio runtimes
(https://github.com/seanmonstar/reqwest/issues/1148#issuecomment-910868788)
-rw-r--r-- | ext/fetch/lib.rs | 38 | ||||
-rw-r--r-- | runtime/ops/web_worker/sync_fetch.rs | 187 |
2 files changed, 118 insertions, 107 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index e384a918e..3e43370d3 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -187,27 +187,33 @@ pub fn get_or_create_client_from_state( Ok(client.clone()) } else { let options = state.borrow::<Options>(); - let client = create_http_client( - &options.user_agent, - CreateHttpClientOptions { - root_cert_store: options.root_cert_store()?, - ca_certs: vec![], - proxy: options.proxy.clone(), - unsafely_ignore_certificate_errors: options - .unsafely_ignore_certificate_errors - .clone(), - client_cert_chain_and_key: options.client_cert_chain_and_key.clone(), - pool_max_idle_per_host: None, - pool_idle_timeout: None, - http1: true, - http2: true, - }, - )?; + let client = create_client_from_options(options)?; state.put::<reqwest::Client>(client.clone()); Ok(client) } } +pub fn create_client_from_options( + options: &Options, +) -> Result<reqwest::Client, AnyError> { + create_http_client( + &options.user_agent, + CreateHttpClientOptions { + root_cert_store: options.root_cert_store()?, + ca_certs: vec![], + proxy: options.proxy.clone(), + unsafely_ignore_certificate_errors: options + .unsafely_ignore_certificate_errors + .clone(), + client_cert_chain_and_key: options.client_cert_chain_and_key.clone(), + pool_max_idle_per_host: None, + pool_idle_timeout: None, + http1: true, + http2: true, + }, + ) +} + #[allow(clippy::type_complexity)] pub struct ResourceToBodyAdapter( Rc<dyn Resource>, diff --git a/runtime/ops/web_worker/sync_fetch.rs b/runtime/ops/web_worker/sync_fetch.rs index b8a588624..37286ca62 100644 --- a/runtime/ops/web_worker/sync_fetch.rs +++ b/runtime/ops/web_worker/sync_fetch.rs @@ -6,6 +6,7 @@ use crate::web_worker::WebWorkerInternalHandle; use crate::web_worker::WebWorkerType; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::StreamExt; use deno_core::op2; use deno_core::url::Url; use deno_core::OpState; @@ -15,7 +16,6 @@ use deno_websocket::DomExceptionNetworkError; use hyper::body::Bytes; use serde::Deserialize; use serde::Serialize; -use tokio::task::JoinHandle; // TODO(andreubotella) Properly parse the MIME type fn mime_type_essence(mime_type: &str) -> String { @@ -38,12 +38,15 @@ pub struct SyncFetchScript { pub fn op_worker_sync_fetch( state: &mut OpState, #[serde] scripts: Vec<String>, - mut loose_mime_checks: bool, + loose_mime_checks: bool, ) -> Result<Vec<SyncFetchScript>, AnyError> { let handle = state.borrow::<WebWorkerInternalHandle>().clone(); assert_eq!(handle.worker_type, WebWorkerType::Classic); - let client = deno_fetch::get_or_create_client_from_state(state)?; + // it's not safe to share a client across tokio runtimes, so create a fresh one + // https://github.com/seanmonstar/reqwest/issues/1148#issuecomment-910868788 + let options = state.borrow::<deno_fetch::Options>().clone(); + let client = deno_fetch::create_client_from_options(&options)?; // TODO(andreubotella) It's not good to throw an exception related to blob // URLs when none of the script URLs use the blob scheme. @@ -62,107 +65,109 @@ pub fn op_worker_sync_fetch( .enable_time() .build()?; - let handles: Vec<_> = scripts - .into_iter() - .map(|script| -> JoinHandle<Result<SyncFetchScript, AnyError>> { - let client = client.clone(); - let blob_store = blob_store.clone(); - runtime.spawn(async move { - let script_url = Url::parse(&script) - .map_err(|_| type_error("Invalid script URL"))?; - - let (body, mime_type, res_url) = match script_url.scheme() { - "http" | "https" => { - let resp = - client.get(script_url).send().await?.error_for_status()?; - - let res_url = resp.url().to_string(); - - // TODO(andreubotella) Properly run fetch's "extract a MIME type". - let mime_type = resp - .headers() - .get("Content-Type") - .and_then(|v| v.to_str().ok()) - .map(mime_type_essence); - - // Always check the MIME type with HTTP(S). - loose_mime_checks = false; - - let body = resp.bytes().await?; - - (body, mime_type, res_url) - } - "data" => { - let data_url = DataUrl::process(&script) - .map_err(|e| type_error(format!("{e:?}")))?; + runtime.block_on(async move { + let mut futures = scripts + .into_iter() + .map(|script| { + let client = client.clone(); + let blob_store = blob_store.clone(); + deno_core::unsync::spawn(async move { + let script_url = Url::parse(&script) + .map_err(|_| type_error("Invalid script URL"))?; + let mut loose_mime_checks = loose_mime_checks; + + let (body, mime_type, res_url) = match script_url.scheme() { + "http" | "https" => { + let resp = + client.get(script_url).send().await?.error_for_status()?; + + let res_url = resp.url().to_string(); + + // TODO(andreubotella) Properly run fetch's "extract a MIME type". + let mime_type = resp + .headers() + .get("Content-Type") + .and_then(|v| v.to_str().ok()) + .map(mime_type_essence); + + // Always check the MIME type with HTTP(S). + loose_mime_checks = false; + + let body = resp.bytes().await?; + + (body, mime_type, res_url) + } + "data" => { + let data_url = DataUrl::process(&script) + .map_err(|e| type_error(format!("{e:?}")))?; - let mime_type = { - let mime = data_url.mime_type(); - format!("{}/{}", mime.type_, mime.subtype) - }; + let mime_type = { + let mime = data_url.mime_type(); + format!("{}/{}", mime.type_, mime.subtype) + }; - let (body, _) = data_url - .decode_to_vec() - .map_err(|e| type_error(format!("{e:?}")))?; + let (body, _) = data_url + .decode_to_vec() + .map_err(|e| type_error(format!("{e:?}")))?; - (Bytes::from(body), Some(mime_type), script) - } - "blob" => { - let blob = - blob_store.get_object_url(script_url).ok_or_else(|| { - type_error("Blob for the given URL not found.") - })?; + (Bytes::from(body), Some(mime_type), script) + } + "blob" => { + let blob = + blob_store.get_object_url(script_url).ok_or_else(|| { + type_error("Blob for the given URL not found.") + })?; - let mime_type = mime_type_essence(&blob.media_type); + let mime_type = mime_type_essence(&blob.media_type); - let body = blob.read_all().await?; + let body = blob.read_all().await?; - (Bytes::from(body), Some(mime_type), script) - } - _ => { - return Err(type_error(format!( - "Classic scripts with scheme {}: are not supported in workers.", - script_url.scheme() - ))) - } - }; - - if !loose_mime_checks { - // TODO(andreubotella) Check properly for a Javascript MIME type. - match mime_type.as_deref() { - Some("application/javascript" | "text/javascript") => {} - Some(mime_type) => { - return Err( - DomExceptionNetworkError { - msg: format!("Invalid MIME type {mime_type:?}."), - } - .into(), - ) + (Bytes::from(body), Some(mime_type), script) + } + _ => { + return Err(type_error(format!( + "Classic scripts with scheme {}: are not supported in workers.", + script_url.scheme() + ))) } - None => { - return Err( - DomExceptionNetworkError::new("Missing MIME type.").into(), - ) + }; + + if !loose_mime_checks { + // TODO(andreubotella) Check properly for a Javascript MIME type. + match mime_type.as_deref() { + Some("application/javascript" | "text/javascript") => {} + Some(mime_type) => { + return Err( + DomExceptionNetworkError { + msg: format!("Invalid MIME type {mime_type:?}."), + } + .into(), + ) + } + None => { + return Err( + DomExceptionNetworkError::new("Missing MIME type.").into(), + ) + } } } - } - let (text, _) = encoding_rs::UTF_8.decode_with_bom_removal(&body); + let (text, _) = encoding_rs::UTF_8.decode_with_bom_removal(&body); - Ok(SyncFetchScript { - url: res_url, - script: text.into_owned(), + Ok(SyncFetchScript { + url: res_url, + script: text.into_owned(), + }) }) }) - }) - .collect(); - - let mut ret = Vec::with_capacity(handles.len()); - for handle in handles { - let script = runtime.block_on(handle)??; - ret.push(script); - } - Ok(ret) + .collect::<deno_core::futures::stream::FuturesUnordered<_>>(); + let mut ret = Vec::with_capacity(futures.len()); + while let Some(result) = futures.next().await { + let script = result??; + ret.push(script); + } + Ok(ret) + }) }); thread.join().unwrap() |