summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Sherret <dsherret@users.noreply.github.com>2024-05-05 10:07:21 -0400
committerGitHub <noreply@github.com>2024-05-05 10:07:21 -0400
commitd527b635753566e7d01391d675bf010c4856eff9 (patch)
treead424ff7554d782402528a0701995bfdbca3aff2
parent4b0f22eee7d9217cb19af2730818dd70280b7c6b (diff)
fix(workers): `importScripts` concurrently and use a new `reqwest::Client` per importScripts (#23699)
1. We were polling each future in sequence, so this meant it was fetching scripts in sequence. 2. It's not safe to share `reqwest::Client` across tokio runtimes (https://github.com/seanmonstar/reqwest/issues/1148#issuecomment-910868788)
-rw-r--r--ext/fetch/lib.rs38
-rw-r--r--runtime/ops/web_worker/sync_fetch.rs187
2 files changed, 118 insertions, 107 deletions
diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs
index e384a918e..3e43370d3 100644
--- a/ext/fetch/lib.rs
+++ b/ext/fetch/lib.rs
@@ -187,27 +187,33 @@ pub fn get_or_create_client_from_state(
Ok(client.clone())
} else {
let options = state.borrow::<Options>();
- let client = create_http_client(
- &options.user_agent,
- CreateHttpClientOptions {
- root_cert_store: options.root_cert_store()?,
- ca_certs: vec![],
- proxy: options.proxy.clone(),
- unsafely_ignore_certificate_errors: options
- .unsafely_ignore_certificate_errors
- .clone(),
- client_cert_chain_and_key: options.client_cert_chain_and_key.clone(),
- pool_max_idle_per_host: None,
- pool_idle_timeout: None,
- http1: true,
- http2: true,
- },
- )?;
+ let client = create_client_from_options(options)?;
state.put::<reqwest::Client>(client.clone());
Ok(client)
}
}
+pub fn create_client_from_options(
+ options: &Options,
+) -> Result<reqwest::Client, AnyError> {
+ create_http_client(
+ &options.user_agent,
+ CreateHttpClientOptions {
+ root_cert_store: options.root_cert_store()?,
+ ca_certs: vec![],
+ proxy: options.proxy.clone(),
+ unsafely_ignore_certificate_errors: options
+ .unsafely_ignore_certificate_errors
+ .clone(),
+ client_cert_chain_and_key: options.client_cert_chain_and_key.clone(),
+ pool_max_idle_per_host: None,
+ pool_idle_timeout: None,
+ http1: true,
+ http2: true,
+ },
+ )
+}
+
#[allow(clippy::type_complexity)]
pub struct ResourceToBodyAdapter(
Rc<dyn Resource>,
diff --git a/runtime/ops/web_worker/sync_fetch.rs b/runtime/ops/web_worker/sync_fetch.rs
index b8a588624..37286ca62 100644
--- a/runtime/ops/web_worker/sync_fetch.rs
+++ b/runtime/ops/web_worker/sync_fetch.rs
@@ -6,6 +6,7 @@ use crate::web_worker::WebWorkerInternalHandle;
use crate::web_worker::WebWorkerType;
use deno_core::error::type_error;
use deno_core::error::AnyError;
+use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::url::Url;
use deno_core::OpState;
@@ -15,7 +16,6 @@ use deno_websocket::DomExceptionNetworkError;
use hyper::body::Bytes;
use serde::Deserialize;
use serde::Serialize;
-use tokio::task::JoinHandle;
// TODO(andreubotella) Properly parse the MIME type
fn mime_type_essence(mime_type: &str) -> String {
@@ -38,12 +38,15 @@ pub struct SyncFetchScript {
pub fn op_worker_sync_fetch(
state: &mut OpState,
#[serde] scripts: Vec<String>,
- mut loose_mime_checks: bool,
+ loose_mime_checks: bool,
) -> Result<Vec<SyncFetchScript>, AnyError> {
let handle = state.borrow::<WebWorkerInternalHandle>().clone();
assert_eq!(handle.worker_type, WebWorkerType::Classic);
- let client = deno_fetch::get_or_create_client_from_state(state)?;
+ // it's not safe to share a client across tokio runtimes, so create a fresh one
+ // https://github.com/seanmonstar/reqwest/issues/1148#issuecomment-910868788
+ let options = state.borrow::<deno_fetch::Options>().clone();
+ let client = deno_fetch::create_client_from_options(&options)?;
// TODO(andreubotella) It's not good to throw an exception related to blob
// URLs when none of the script URLs use the blob scheme.
@@ -62,107 +65,109 @@ pub fn op_worker_sync_fetch(
.enable_time()
.build()?;
- let handles: Vec<_> = scripts
- .into_iter()
- .map(|script| -> JoinHandle<Result<SyncFetchScript, AnyError>> {
- let client = client.clone();
- let blob_store = blob_store.clone();
- runtime.spawn(async move {
- let script_url = Url::parse(&script)
- .map_err(|_| type_error("Invalid script URL"))?;
-
- let (body, mime_type, res_url) = match script_url.scheme() {
- "http" | "https" => {
- let resp =
- client.get(script_url).send().await?.error_for_status()?;
-
- let res_url = resp.url().to_string();
-
- // TODO(andreubotella) Properly run fetch's "extract a MIME type".
- let mime_type = resp
- .headers()
- .get("Content-Type")
- .and_then(|v| v.to_str().ok())
- .map(mime_type_essence);
-
- // Always check the MIME type with HTTP(S).
- loose_mime_checks = false;
-
- let body = resp.bytes().await?;
-
- (body, mime_type, res_url)
- }
- "data" => {
- let data_url = DataUrl::process(&script)
- .map_err(|e| type_error(format!("{e:?}")))?;
+ runtime.block_on(async move {
+ let mut futures = scripts
+ .into_iter()
+ .map(|script| {
+ let client = client.clone();
+ let blob_store = blob_store.clone();
+ deno_core::unsync::spawn(async move {
+ let script_url = Url::parse(&script)
+ .map_err(|_| type_error("Invalid script URL"))?;
+ let mut loose_mime_checks = loose_mime_checks;
+
+ let (body, mime_type, res_url) = match script_url.scheme() {
+ "http" | "https" => {
+ let resp =
+ client.get(script_url).send().await?.error_for_status()?;
+
+ let res_url = resp.url().to_string();
+
+ // TODO(andreubotella) Properly run fetch's "extract a MIME type".
+ let mime_type = resp
+ .headers()
+ .get("Content-Type")
+ .and_then(|v| v.to_str().ok())
+ .map(mime_type_essence);
+
+ // Always check the MIME type with HTTP(S).
+ loose_mime_checks = false;
+
+ let body = resp.bytes().await?;
+
+ (body, mime_type, res_url)
+ }
+ "data" => {
+ let data_url = DataUrl::process(&script)
+ .map_err(|e| type_error(format!("{e:?}")))?;
- let mime_type = {
- let mime = data_url.mime_type();
- format!("{}/{}", mime.type_, mime.subtype)
- };
+ let mime_type = {
+ let mime = data_url.mime_type();
+ format!("{}/{}", mime.type_, mime.subtype)
+ };
- let (body, _) = data_url
- .decode_to_vec()
- .map_err(|e| type_error(format!("{e:?}")))?;
+ let (body, _) = data_url
+ .decode_to_vec()
+ .map_err(|e| type_error(format!("{e:?}")))?;
- (Bytes::from(body), Some(mime_type), script)
- }
- "blob" => {
- let blob =
- blob_store.get_object_url(script_url).ok_or_else(|| {
- type_error("Blob for the given URL not found.")
- })?;
+ (Bytes::from(body), Some(mime_type), script)
+ }
+ "blob" => {
+ let blob =
+ blob_store.get_object_url(script_url).ok_or_else(|| {
+ type_error("Blob for the given URL not found.")
+ })?;
- let mime_type = mime_type_essence(&blob.media_type);
+ let mime_type = mime_type_essence(&blob.media_type);
- let body = blob.read_all().await?;
+ let body = blob.read_all().await?;
- (Bytes::from(body), Some(mime_type), script)
- }
- _ => {
- return Err(type_error(format!(
- "Classic scripts with scheme {}: are not supported in workers.",
- script_url.scheme()
- )))
- }
- };
-
- if !loose_mime_checks {
- // TODO(andreubotella) Check properly for a Javascript MIME type.
- match mime_type.as_deref() {
- Some("application/javascript" | "text/javascript") => {}
- Some(mime_type) => {
- return Err(
- DomExceptionNetworkError {
- msg: format!("Invalid MIME type {mime_type:?}."),
- }
- .into(),
- )
+ (Bytes::from(body), Some(mime_type), script)
+ }
+ _ => {
+ return Err(type_error(format!(
+ "Classic scripts with scheme {}: are not supported in workers.",
+ script_url.scheme()
+ )))
}
- None => {
- return Err(
- DomExceptionNetworkError::new("Missing MIME type.").into(),
- )
+ };
+
+ if !loose_mime_checks {
+ // TODO(andreubotella) Check properly for a Javascript MIME type.
+ match mime_type.as_deref() {
+ Some("application/javascript" | "text/javascript") => {}
+ Some(mime_type) => {
+ return Err(
+ DomExceptionNetworkError {
+ msg: format!("Invalid MIME type {mime_type:?}."),
+ }
+ .into(),
+ )
+ }
+ None => {
+ return Err(
+ DomExceptionNetworkError::new("Missing MIME type.").into(),
+ )
+ }
}
}
- }
- let (text, _) = encoding_rs::UTF_8.decode_with_bom_removal(&body);
+ let (text, _) = encoding_rs::UTF_8.decode_with_bom_removal(&body);
- Ok(SyncFetchScript {
- url: res_url,
- script: text.into_owned(),
+ Ok(SyncFetchScript {
+ url: res_url,
+ script: text.into_owned(),
+ })
})
})
- })
- .collect();
-
- let mut ret = Vec::with_capacity(handles.len());
- for handle in handles {
- let script = runtime.block_on(handle)??;
- ret.push(script);
- }
- Ok(ret)
+ .collect::<deno_core::futures::stream::FuturesUnordered<_>>();
+ let mut ret = Vec::with_capacity(futures.len());
+ while let Some(result) = futures.next().await {
+ let script = result??;
+ ret.push(script);
+ }
+ Ok(ret)
+ })
});
thread.join().unwrap()