diff options
author | Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> | 2024-08-14 15:26:21 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-14 22:26:21 +0000 |
commit | e92a05b5518e5fd30559c96c5990b08657bbc3e4 (patch) | |
tree | 037cad394db9097d8f695810426a2de9ba03d825 /cli | |
parent | 875ee618d318ea748e38641108d906eff34a9f86 (diff) |
feat(serve): Opt-in parallelism for `deno serve` (#24920)
Adds a `parallel` flag to `deno serve`. When present, we spawn multiple
workers to parallelize serving requests.
```bash
deno serve --parallel main.ts
```
Currently on linux we use `SO_REUSEPORT` and rely on the fact that the
kernel will distribute connections in a round-robin manner.
On mac and windows, we sort of emulate this by cloning the underlying
file descriptor and passing a handle to each worker. The connections
will not be guaranteed to be fairly distributed (and in practice almost
certainly won't be), but the distribution is still spread enough to
provide a significant performance increase.
---
(Run on an Macbook Pro with an M3 Max, serving `deno.com`
baseline::
```
❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000
Running 30s test @ http://127.0.0.1:8000
2 threads and 125 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 239.78ms 13.56ms 330.54ms 79.12%
Req/Sec 258.58 35.56 360.00 70.64%
Latency Distribution
50% 236.72ms
75% 248.46ms
90% 256.84ms
99% 268.23ms
15458 requests in 30.02s, 2.47GB read
Requests/sec: 514.89
Transfer/sec: 84.33MB
```
this PR (`with --parallel` flag)
```
❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000
Running 30s test @ http://127.0.0.1:8000
2 threads and 125 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 117.40ms 142.84ms 590.45ms 79.07%
Req/Sec 1.33k 175.19 1.77k 69.00%
Latency Distribution
50% 22.34ms
75% 223.67ms
90% 357.32ms
99% 460.50ms
79636 requests in 30.07s, 12.74GB read
Requests/sec: 2647.96
Transfer/sec: 433.71MB
```
Diffstat (limited to 'cli')
-rw-r--r-- | cli/args/flags.rs | 93 | ||||
-rw-r--r-- | cli/main.rs | 2 | ||||
-rw-r--r-- | cli/tools/mod.rs | 1 | ||||
-rw-r--r-- | cli/tools/run/mod.rs | 16 | ||||
-rw-r--r-- | cli/tools/serve.rs | 192 | ||||
-rw-r--r-- | cli/worker.rs | 8 |
6 files changed, 266 insertions, 46 deletions
diff --git a/cli/args/flags.rs b/cli/args/flags.rs index f8577ed1b..800d6ff5a 100644 --- a/cli/args/flags.rs +++ b/cli/args/flags.rs @@ -339,6 +339,7 @@ pub struct ServeFlags { pub watch: Option<WatchFlagsWithPaths>, pub port: u16, pub host: String, + pub worker_count: Option<usize>, } impl ServeFlags { @@ -349,6 +350,7 @@ impl ServeFlags { watch: None, port, host: host.to_owned(), + worker_count: None, } } } @@ -2693,6 +2695,9 @@ fn serve_subcommand() -> Command { .help("The TCP address to serve on, defaulting to 0.0.0.0 (all interfaces).") .value_parser(serve_host_validator), ) + .arg( + parallel_arg("multiple server workers", false) + ) .arg(check_arg(false)) .arg(watch_arg(true)) .arg(watch_exclude_arg()) @@ -2854,11 +2859,7 @@ Directory arguments are expanded to all contained files matching the glob .action(ArgAction::SetTrue), ) .arg( - Arg::new("parallel") - .long("parallel") - .help("Run test modules in parallel. Parallelism defaults to the number of available CPUs or the value in the DENO_JOBS environment variable.") - .conflicts_with("jobs") - .action(ArgAction::SetTrue) + parallel_arg("test modules", true) ) .arg( Arg::new("jobs") @@ -2901,6 +2902,18 @@ Directory arguments are expanded to all contained files matching the glob ) } +fn parallel_arg(descr: &str, jobs_fallback: bool) -> Arg { + let arg = Arg::new("parallel") + .long("parallel") + .help(format!("Run {descr} in parallel. Parallelism defaults to the number of available CPUs or the value in the DENO_JOBS environment variable.")) + .action(ArgAction::SetTrue); + if jobs_fallback { + arg.conflicts_with("jobs") + } else { + arg + } +} + fn types_subcommand() -> Command { Command::new("types").about( "Print runtime TypeScript declarations. @@ -4416,6 +4429,8 @@ fn serve_parse( .remove_one::<String>("host") .unwrap_or_else(|| "0.0.0.0".to_owned()); + let worker_count = parallel_arg_parse(matches, false).map(|v| v.get()); + runtime_args_parse(flags, matches, true, true); // If the user didn't pass --allow-net, add this port to the network // allowlist. If the host is 0.0.0.0, we add :{port} and allow the same network perms @@ -4455,6 +4470,7 @@ fn serve_parse( watch: watch_arg_parse_with_paths(matches), port, host, + worker_count, }); Ok(()) @@ -4486,6 +4502,42 @@ fn task_parse(flags: &mut Flags, matches: &mut ArgMatches) { flags.subcommand = DenoSubcommand::Task(task_flags); } +fn parallel_arg_parse( + matches: &mut ArgMatches, + fallback_to_jobs: bool, +) -> Option<NonZeroUsize> { + if matches.get_flag("parallel") { + if let Ok(value) = env::var("DENO_JOBS") { + value.parse::<NonZeroUsize>().ok() + } else { + std::thread::available_parallelism().ok() + } + } else if fallback_to_jobs && matches.contains_id("jobs") { + // We can't change this to use the log crate because its not configured + // yet at this point since the flags haven't been parsed. This flag is + // deprecated though so it's not worth changing the code to use the log + // crate here and this is only done for testing anyway. + #[allow(clippy::print_stderr)] + { + eprintln!( + "⚠️ {}", + crate::colors::yellow(concat!( + "The `--jobs` flag is deprecated and will be removed in Deno 2.0.\n", + "Use the `--parallel` flag with possibly the `DENO_JOBS` environment variable instead.\n", + "Learn more at: https://docs.deno.com/runtime/manual/basics/env_variables" + )), + ); + } + if let Some(value) = matches.remove_one::<NonZeroUsize>("jobs") { + Some(value) + } else { + std::thread::available_parallelism().ok() + } + } else { + None + } +} + fn test_parse(flags: &mut Flags, matches: &mut ArgMatches) { flags.type_check_mode = TypeCheckMode::Local; runtime_args_parse(flags, matches, true, true); @@ -4552,36 +4604,7 @@ fn test_parse(flags: &mut Flags, matches: &mut ArgMatches) { flags.argv.extend(script_arg); } - let concurrent_jobs = if matches.get_flag("parallel") { - if let Ok(value) = env::var("DENO_JOBS") { - value.parse::<NonZeroUsize>().ok() - } else { - std::thread::available_parallelism().ok() - } - } else if matches.contains_id("jobs") { - // We can't change this to use the log crate because its not configured - // yet at this point since the flags haven't been parsed. This flag is - // deprecated though so it's not worth changing the code to use the log - // crate here and this is only done for testing anyway. - #[allow(clippy::print_stderr)] - { - eprintln!( - "⚠️ {}", - crate::colors::yellow(concat!( - "The `--jobs` flag is deprecated and will be removed in Deno 2.0.\n", - "Use the `--parallel` flag with possibly the `DENO_JOBS` environment variable instead.\n", - "Learn more at: https://docs.deno.com/runtime/manual/basics/env_variables" - )), - ); - } - if let Some(value) = matches.remove_one::<NonZeroUsize>("jobs") { - Some(value) - } else { - std::thread::available_parallelism().ok() - } - } else { - None - }; + let concurrent_jobs = parallel_arg_parse(matches, true); let include = if let Some(files) = matches.remove_many::<String>("files") { files.collect() diff --git a/cli/main.rs b/cli/main.rs index e8ecaa393..1752c3373 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -236,7 +236,7 @@ async fn run_subcommand(flags: Arc<Flags>) -> Result<i32, AnyError> { } }), DenoSubcommand::Serve(serve_flags) => spawn_subcommand(async move { - tools::run::run_script(WorkerExecutionMode::Serve, flags, serve_flags.watch).await + tools::serve::serve(flags, serve_flags).await }), DenoSubcommand::Task(task_flags) => spawn_subcommand(async { tools::task::execute_script(flags, task_flags, false).await diff --git a/cli/tools/mod.rs b/cli/tools/mod.rs index 7bb9b7cf6..1e1c65565 100644 --- a/cli/tools/mod.rs +++ b/cli/tools/mod.rs @@ -16,6 +16,7 @@ pub mod lint; pub mod registry; pub mod repl; pub mod run; +pub mod serve; pub mod task; pub mod test; pub mod upgrade; diff --git a/cli/tools/run/mod.rs b/cli/tools/run/mod.rs index 65044fbad..1964cfdd9 100644 --- a/cli/tools/run/mod.rs +++ b/cli/tools/run/mod.rs @@ -18,11 +18,7 @@ use crate::util::file_watcher::WatcherRestartMode; pub mod hmr; -pub async fn run_script( - mode: WorkerExecutionMode, - flags: Arc<Flags>, - watch: Option<WatchFlagsWithPaths>, -) -> Result<i32, AnyError> { +pub fn check_permission_before_script(flags: &Flags) { if !flags.has_permission() && flags.has_permission_in_argv() { log::warn!( "{}", @@ -33,6 +29,14 @@ To grant permissions, set them before the script argument. For example: ) ); } +} + +pub async fn run_script( + mode: WorkerExecutionMode, + flags: Arc<Flags>, + watch: Option<WatchFlagsWithPaths>, +) -> Result<i32, AnyError> { + check_permission_before_script(&flags); if let Some(watch_flags) = watch { return run_with_watch(mode, flags, watch_flags).await; @@ -187,7 +191,7 @@ pub async fn eval_command( Ok(exit_code) } -async fn maybe_npm_install(factory: &CliFactory) -> Result<(), AnyError> { +pub async fn maybe_npm_install(factory: &CliFactory) -> Result<(), AnyError> { // ensure an "npm install" is done if the user has explicitly // opted into using a managed node_modules directory if factory.cli_options()?.node_modules_dir_enablement() == Some(true) { diff --git a/cli/tools/serve.rs b/cli/tools/serve.rs new file mode 100644 index 000000000..24666b8f6 --- /dev/null +++ b/cli/tools/serve.rs @@ -0,0 +1,192 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use std::sync::Arc; + +use deno_core::error::AnyError; +use deno_core::futures::TryFutureExt; +use deno_core::ModuleSpecifier; +use deno_runtime::deno_permissions::Permissions; +use deno_runtime::deno_permissions::PermissionsContainer; + +use super::run::check_permission_before_script; +use super::run::maybe_npm_install; +use crate::args::Flags; +use crate::args::ServeFlags; +use crate::args::WatchFlagsWithPaths; +use crate::factory::CliFactory; +use crate::util::file_watcher::WatcherRestartMode; +use crate::worker::CliMainWorkerFactory; + +pub async fn serve( + flags: Arc<Flags>, + serve_flags: ServeFlags, +) -> Result<i32, AnyError> { + check_permission_before_script(&flags); + + if let Some(watch_flags) = serve_flags.watch { + return serve_with_watch(flags, watch_flags, serve_flags.worker_count) + .await; + } + + let factory = CliFactory::from_flags(flags); + let cli_options = factory.cli_options()?; + let deno_dir = factory.deno_dir()?; + let http_client = factory.http_client_provider(); + + // Run a background task that checks for available upgrades or output + // if an earlier run of this background task found a new version of Deno. + #[cfg(feature = "upgrade")] + super::upgrade::check_for_upgrades( + http_client.clone(), + deno_dir.upgrade_check_file_path(), + ); + + let main_module = cli_options.resolve_main_module()?; + + maybe_npm_install(&factory).await?; + + let permissions = PermissionsContainer::new(Permissions::from_options( + &cli_options.permissions_options()?, + )?); + let worker_factory = factory.create_cli_main_worker_factory().await?; + + do_serve( + worker_factory, + main_module, + permissions, + serve_flags.worker_count, + false, + ) + .await +} + +async fn do_serve( + worker_factory: CliMainWorkerFactory, + main_module: ModuleSpecifier, + permissions: PermissionsContainer, + worker_count: Option<usize>, + hmr: bool, +) -> Result<i32, AnyError> { + let mut worker = worker_factory + .create_main_worker( + deno_runtime::WorkerExecutionMode::Serve { + is_main: true, + worker_count, + }, + main_module.clone(), + permissions.clone(), + ) + .await?; + let worker_count = match worker_count { + None | Some(1) => return worker.run().await, + Some(c) => c, + }; + + let main = deno_core::unsync::spawn(async move { worker.run().await }); + + let extra_workers = worker_count.saturating_sub(1); + + let mut channels = Vec::with_capacity(extra_workers); + for i in 0..extra_workers { + let worker_factory = worker_factory.clone(); + let main_module = main_module.clone(); + let permissions = permissions.clone(); + let (tx, rx) = tokio::sync::oneshot::channel(); + channels.push(rx); + std::thread::Builder::new() + .name(format!("serve-worker-{i}")) + .spawn(move || { + deno_runtime::tokio_util::create_and_run_current_thread(async move { + let result = + run_worker(i, worker_factory, main_module, permissions, hmr).await; + let _ = tx.send(result); + }); + })?; + } + + let (main_result, worker_results) = tokio::try_join!( + main.map_err(AnyError::from), + deno_core::futures::future::try_join_all( + channels.into_iter().map(|r| r.map_err(AnyError::from)) + ) + )?; + + let mut exit_code = main_result?; + for res in worker_results { + let ret = res?; + if ret != 0 && exit_code == 0 { + exit_code = ret; + } + } + Ok(exit_code) + + // main.await? +} + +async fn run_worker( + worker_count: usize, + worker_factory: CliMainWorkerFactory, + main_module: ModuleSpecifier, + permissions: PermissionsContainer, + hmr: bool, +) -> Result<i32, AnyError> { + let mut worker = worker_factory + .create_main_worker( + deno_runtime::WorkerExecutionMode::Serve { + is_main: false, + worker_count: Some(worker_count), + }, + main_module, + permissions, + ) + .await?; + if hmr { + worker.run_for_watcher().await?; + Ok(0) + } else { + worker.run().await + } +} + +async fn serve_with_watch( + flags: Arc<Flags>, + watch_flags: WatchFlagsWithPaths, + worker_count: Option<usize>, +) -> Result<i32, AnyError> { + let hmr = watch_flags.hmr; + crate::util::file_watcher::watch_recv( + flags, + crate::util::file_watcher::PrintConfig::new_with_banner( + if watch_flags.hmr { "HMR" } else { "Watcher" }, + "Process", + !watch_flags.no_clear_screen, + ), + WatcherRestartMode::Automatic, + move |flags, watcher_communicator, _changed_paths| { + Ok(async move { + let factory = CliFactory::from_flags_for_watcher( + flags, + watcher_communicator.clone(), + ); + let cli_options = factory.cli_options()?; + let main_module = cli_options.resolve_main_module()?; + + maybe_npm_install(&factory).await?; + + let _ = watcher_communicator.watch_paths(cli_options.watch_paths()); + + let permissions = PermissionsContainer::new(Permissions::from_options( + &cli_options.permissions_options()?, + )?); + let worker_factory = factory.create_cli_main_worker_factory().await?; + + do_serve(worker_factory, main_module, permissions, worker_count, hmr) + .await?; + + Ok(()) + }) + }, + ) + .await?; + Ok(0) +} diff --git a/cli/worker.rs b/cli/worker.rs index 82051da6c..7f04e8846 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -414,6 +414,7 @@ impl CliMainWorker { } } +#[derive(Clone)] pub struct CliMainWorkerFactory { shared: Arc<SharedWorkerState>, } @@ -546,7 +547,7 @@ impl CliMainWorkerFactory { let maybe_inspector_server = shared.maybe_inspector_server.clone(); let create_web_worker_cb = - create_web_worker_callback(mode, shared.clone(), stdio.clone()); + create_web_worker_callback(shared.clone(), stdio.clone()); let maybe_storage_key = shared .storage_key_resolver @@ -739,7 +740,6 @@ impl CliMainWorkerFactory { } fn create_web_worker_callback( - mode: WorkerExecutionMode, shared: Arc<SharedWorkerState>, stdio: deno_runtime::deno_io::Stdio, ) -> Arc<CreateWebWorkerCb> { @@ -752,7 +752,7 @@ fn create_web_worker_callback( args.permissions.clone(), ); let create_web_worker_cb = - create_web_worker_callback(mode, shared.clone(), stdio.clone()); + create_web_worker_callback(shared.clone(), stdio.clone()); let maybe_storage_key = shared .storage_key_resolver @@ -802,7 +802,7 @@ fn create_web_worker_callback( disable_deprecated_api_warning: shared.disable_deprecated_api_warning, verbose_deprecated_api_warning: shared.verbose_deprecated_api_warning, future: shared.enable_future_features, - mode, + mode: WorkerExecutionMode::Worker, serve_port: shared.serve_port, serve_host: shared.serve_host.clone(), }, |