diff options
author | Bartek IwaĆczuk <biwanczuk@gmail.com> | 2019-11-22 18:46:57 +0100 |
---|---|---|
committer | Ry Dahl <ry@tinyclouds.org> | 2019-11-22 12:46:57 -0500 |
commit | c6bb3d5a10ba8acceadcaa66050abcaefb7bc0bb (patch) | |
tree | e60f007215fc0f146db4457db85eda8f4716d314 | |
parent | 363b968bfcef26c30f84e485beec6194e5b1dd98 (diff) |
remove tokio_util::block_on (#3388)
This PR removes tokio_util::block_on - refactored compiler and file
fetcher slightly so that we can safely block there - that's because
only blocking path consist of only synchronous operations.
Additionally I removed excessive use of tokio_util::panic_on_error
and tokio_util::run_in_task and moved both functions to cli/worker.rs,
to tests module.
Closes #2960
-rw-r--r-- | cli/compilers/ts.rs | 9 | ||||
-rw-r--r-- | cli/file_fetcher.rs | 151 | ||||
-rw-r--r-- | cli/global_timer.rs | 9 | ||||
-rw-r--r-- | cli/tokio_util.rs | 64 | ||||
-rw-r--r-- | cli/worker.rs | 34 |
5 files changed, 120 insertions, 147 deletions
diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index c255e18be..ff5405f52 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -473,7 +473,7 @@ impl TsCompiler { let source_file = self .file_fetcher - .fetch_source_file(&module_specifier) + .fetch_cached_source_file(&module_specifier) .expect("Source file not found"); let version_hash = source_code_version_hash( @@ -581,10 +581,9 @@ impl TsCompiler { script_name: &str, ) -> Option<SourceFile> { if let Some(module_specifier) = self.try_to_resolve(script_name) { - return match self.file_fetcher.fetch_source_file(&module_specifier) { - Ok(out) => Some(out), - Err(_) => None, - }; + return self + .file_fetcher + .fetch_cached_source_file(&module_specifier); } None diff --git a/cli/file_fetcher.rs b/cli/file_fetcher.rs index 4bf6abd93..79ea3ab81 100644 --- a/cli/file_fetcher.rs +++ b/cli/file_fetcher.rs @@ -8,7 +8,6 @@ use crate::http_util; use crate::http_util::FetchOnceResult; use crate::msg; use crate::progress::Progress; -use crate::tokio_util; use deno::ErrBox; use deno::ModuleSpecifier; use futures::future::Either; @@ -111,12 +110,25 @@ impl SourceFileFetcher { Ok(()) } - /// Required for TS compiler. - pub fn fetch_source_file( + /// Required for TS compiler and source maps. + pub fn fetch_cached_source_file( self: &Self, specifier: &ModuleSpecifier, - ) -> Result<SourceFile, ErrBox> { - tokio_util::block_on(self.fetch_source_file_async(specifier)) + ) -> Option<SourceFile> { + let maybe_source_file = self.source_file_cache.get(specifier.to_string()); + + if maybe_source_file.is_some() { + return maybe_source_file; + } + + // If file is not in memory cache check if it can be found + // in local cache - which effectively means trying to fetch + // using "--no-fetch" flag. We can safely block on this + // future, because it doesn't do any asynchronous action + // it that path. + let fut = self.get_source_file_async(specifier.as_url(), true, true); + + futures::executor::block_on(fut).ok() } pub fn fetch_source_file_async( @@ -663,6 +675,7 @@ impl SourceCodeHeaders { mod tests { use super::*; use crate::fs as deno_fs; + use crate::tokio_util; use tempfile::TempDir; fn setup_file_fetcher(dir_path: &Path) -> SourceFileFetcher { @@ -987,45 +1000,45 @@ mod tests { fn test_get_source_code_multiple_downloads_of_same_file() { let http_server_guard = crate::test_util::http_server(); let (_temp_dir, fetcher) = test_setup(); - // http_util::fetch_sync_string requires tokio - tokio_util::init(|| { - let specifier = ModuleSpecifier::resolve_url( - "http://localhost:4545/tests/subdir/mismatch_ext.ts", - ) - .unwrap(); - let headers_file_name = fetcher.deps_cache.location.join( - fetcher.deps_cache.get_cache_filename_with_extension( - specifier.as_url(), - "headers.json", - ), - ); + let specifier = ModuleSpecifier::resolve_url( + "http://localhost:4545/tests/subdir/mismatch_ext.ts", + ) + .unwrap(); + let headers_file_name = fetcher.deps_cache.location.join( + fetcher + .deps_cache + .get_cache_filename_with_extension(specifier.as_url(), "headers.json"), + ); - // first download - let result = fetcher.fetch_source_file(&specifier); - assert!(result.is_ok()); - - let result = fs::File::open(&headers_file_name); - assert!(result.is_ok()); - let headers_file = result.unwrap(); - // save modified timestamp for headers file - let headers_file_metadata = headers_file.metadata().unwrap(); - let headers_file_modified = headers_file_metadata.modified().unwrap(); - - // download file again, it should use already fetched file even though `use_disk_cache` is set to - // false, this can be verified using source header file creation timestamp (should be - // the same as after first download) - let result = fetcher.fetch_source_file(&specifier); - assert!(result.is_ok()); - - let result = fs::File::open(&headers_file_name); - assert!(result.is_ok()); - let headers_file_2 = result.unwrap(); - // save modified timestamp for headers file - let headers_file_metadata_2 = headers_file_2.metadata().unwrap(); - let headers_file_modified_2 = headers_file_metadata_2.modified().unwrap(); - - assert_eq!(headers_file_modified, headers_file_modified_2); - }); + // first download + tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| { + assert!(r.is_ok()); + futures::future::ok(()) + })); + + let result = fs::File::open(&headers_file_name); + assert!(result.is_ok()); + let headers_file = result.unwrap(); + // save modified timestamp for headers file + let headers_file_metadata = headers_file.metadata().unwrap(); + let headers_file_modified = headers_file_metadata.modified().unwrap(); + + // download file again, it should use already fetched file even though `use_disk_cache` is set to + // false, this can be verified using source header file creation timestamp (should be + // the same as after first download) + tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| { + assert!(r.is_ok()); + futures::future::ok(()) + })); + + let result = fs::File::open(&headers_file_name); + assert!(result.is_ok()); + let headers_file_2 = result.unwrap(); + // save modified timestamp for headers file + let headers_file_metadata_2 = headers_file_2.metadata().unwrap(); + let headers_file_modified_2 = headers_file_metadata_2.modified().unwrap(); + + assert_eq!(headers_file_modified, headers_file_modified_2); drop(http_server_guard); } @@ -1427,21 +1440,23 @@ mod tests { fn test_fetch_source_file() { let (_temp_dir, fetcher) = test_setup(); - tokio_util::init(|| { - // Test failure case. - let specifier = - ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap(); - let r = fetcher.fetch_source_file(&specifier); + // Test failure case. + let specifier = + ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap(); + tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| { assert!(r.is_err()); + futures::future::ok(()) + })); - let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("js/main.ts") - .to_owned(); - let specifier = - ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap(); - let r = fetcher.fetch_source_file(&specifier); + let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("js/main.ts") + .to_owned(); + let specifier = + ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap(); + tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| { assert!(r.is_ok()); - }) + futures::future::ok(()) + })); } #[test] @@ -1449,21 +1464,23 @@ mod tests { /*recompile ts file*/ let (_temp_dir, fetcher) = test_setup(); - tokio_util::init(|| { - // Test failure case. - let specifier = - ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap(); - let r = fetcher.fetch_source_file(&specifier); + // Test failure case. + let specifier = + ModuleSpecifier::resolve_url(file_url!("/baddir/hello.ts")).unwrap(); + tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| { assert!(r.is_err()); + futures::future::ok(()) + })); - let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("js/main.ts") - .to_owned(); - let specifier = - ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap(); - let r = fetcher.fetch_source_file(&specifier); + let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("js/main.ts") + .to_owned(); + let specifier = + ModuleSpecifier::resolve_url_or_path(p.to_str().unwrap()).unwrap(); + tokio_util::run(fetcher.fetch_source_file_async(&specifier).then(|r| { assert!(r.is_ok()); - }) + futures::future::ok(()) + })); } #[test] diff --git a/cli/global_timer.rs b/cli/global_timer.rs index 9ab760ab4..e06cabc48 100644 --- a/cli/global_timer.rs +++ b/cli/global_timer.rs @@ -8,7 +8,7 @@ //! only need to be able to start and cancel a single timer (or Delay, as Tokio //! calls it) for an entire Isolate. This is what is implemented here. -use crate::tokio_util::panic_on_error; +use crate::futures::TryFutureExt; use futures::channel::oneshot; use futures::future::FutureExt; use std::future::Future; @@ -43,9 +43,10 @@ impl GlobalTimer { let (tx, rx) = oneshot::channel(); self.tx = Some(tx); - let delay = - panic_on_error(futures::compat::Compat01As03::new(Delay::new(deadline))); - let rx = panic_on_error(rx); + let delay = futures::compat::Compat01As03::new(Delay::new(deadline)) + .map_err(|err| panic!("Unexpected error in timeout {:?}", err)); + let rx = rx + .map_err(|err| panic!("Unexpected error in receiving channel {:?}", err)); futures::future::select(delay, rx).then(|_| futures::future::ok(())) } diff --git a/cli/tokio_util.rs b/cli/tokio_util.rs index 050080b70..fea47792e 100644 --- a/cli/tokio_util.rs +++ b/cli/tokio_util.rs @@ -1,5 +1,4 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use deno::ErrBox; use futures; use futures::future::FutureExt; use futures::future::TryFutureExt; @@ -29,66 +28,3 @@ where { tokio::runtime::current_thread::run(future.boxed().compat()); } - -/// THIS IS A HACK AND SHOULD BE AVOIDED. -/// -/// This spawns a new thread and creates a single-threaded tokio runtime on that thread, -/// to execute the given future. -/// -/// This is useful when we want to block the main runtime to -/// resolve a future without worrying that we'll use up all the threads in the -/// main runtime. -pub fn block_on<F, R>(future: F) -> Result<R, ErrBox> -where - F: Send + 'static + Future<Output = Result<R, ErrBox>> + Unpin, - R: Send + 'static, -{ - use std::sync::mpsc::channel; - use std::thread; - let (sender, receiver) = channel(); - // Create a new runtime to evaluate the future asynchronously. - thread::spawn(move || { - let r = tokio::runtime::current_thread::block_on_all(future.compat()); - sender - .send(r) - .expect("Unable to send blocking future result") - }); - receiver - .recv() - .expect("Unable to receive blocking future result") -} - -// Set the default executor so we can use tokio::spawn(). It's difficult to -// pass around mut references to the runtime, so using with_default is -// preferable. Ideally Tokio would provide this function. -#[cfg(test)] -pub fn init<F>(f: F) -where - F: FnOnce(), -{ - let rt = create_threadpool_runtime().expect("Unable to create Tokio runtime"); - let mut executor = rt.executor(); - let mut enter = tokio_executor::enter().expect("Multiple executors at once"); - tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f()); -} - -pub fn panic_on_error<I, E, F>(f: F) -> impl Future<Output = Result<I, ()>> -where - F: Future<Output = Result<I, E>>, - E: std::fmt::Debug, -{ - f.map_err(|err| panic!("Future got unexpected error: {:?}", err)) -} - -#[cfg(test)] -pub fn run_in_task<F>(f: F) -where - F: FnOnce() + Send + 'static, -{ - let fut = futures::future::lazy(move |_cx| { - f(); - Ok(()) - }); - - run(fut) -} diff --git a/cli/worker.rs b/cli/worker.rs index 08ac43659..b9802d581 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -209,6 +209,26 @@ mod tests { use futures::executor::block_on; use std::sync::atomic::Ordering; + pub fn run_in_task<F>(f: F) + where + F: FnOnce() + Send + 'static, + { + let fut = futures::future::lazy(move |_cx| { + f(); + Ok(()) + }); + + tokio_util::run(fut) + } + + pub fn panic_on_error<I, E, F>(f: F) -> impl Future<Output = Result<I, ()>> + where + F: Future<Output = Result<I, E>>, + E: std::fmt::Debug, + { + f.map_err(|err| panic!("Future got unexpected error: {:?}", err)) + } + #[test] fn execute_mod_esm_imports_a() { let p = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) @@ -243,7 +263,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - tokio_util::panic_on_error(worker).await + panic_on_error(worker).await }); let metrics = &state_.metrics; @@ -283,7 +303,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - tokio_util::panic_on_error(worker).await + panic_on_error(worker).await }); let metrics = &state_.metrics; @@ -333,7 +353,7 @@ mod tests { if let Err(err) = result { eprintln!("execute_mod err {:?}", err); } - tokio_util::panic_on_error(worker).await + panic_on_error(worker).await }); assert_eq!(state_.metrics.resolve_count.load(Ordering::SeqCst), 3); @@ -364,7 +384,7 @@ mod tests { #[test] fn test_worker_messages() { - tokio_util::run_in_task(|| { + run_in_task(|| { let mut worker = create_test_worker(); let source = r#" onmessage = function(e) { @@ -412,7 +432,7 @@ mod tests { #[test] fn removed_from_resource_table_on_close() { - tokio_util::run_in_task(|| { + run_in_task(|| { let mut worker = create_test_worker(); worker .execute("onmessage = () => { delete window.onmessage; }") @@ -444,7 +464,7 @@ mod tests { #[test] fn execute_mod_resolve_error() { - tokio_util::run_in_task(|| { + run_in_task(|| { // "foo" is not a valid module specifier so this should return an error. let mut worker = create_test_worker(); let module_specifier = @@ -457,7 +477,7 @@ mod tests { #[test] fn execute_mod_002_hello() { - tokio_util::run_in_task(|| { + run_in_task(|| { // This assumes cwd is project root (an assumption made throughout the // tests). let mut worker = create_test_worker(); |