diff options
author | Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> | 2024-08-14 15:26:21 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-14 22:26:21 +0000 |
commit | e92a05b5518e5fd30559c96c5990b08657bbc3e4 (patch) | |
tree | 037cad394db9097d8f695810426a2de9ba03d825 /cli/tools/serve.rs | |
parent | 875ee618d318ea748e38641108d906eff34a9f86 (diff) |
feat(serve): Opt-in parallelism for `deno serve` (#24920)
Adds a `parallel` flag to `deno serve`. When present, we spawn multiple
workers to parallelize serving requests.
```bash
deno serve --parallel main.ts
```
Currently on linux we use `SO_REUSEPORT` and rely on the fact that the
kernel will distribute connections in a round-robin manner.
On mac and windows, we sort of emulate this by cloning the underlying
file descriptor and passing a handle to each worker. The connections
will not be guaranteed to be fairly distributed (and in practice almost
certainly won't be), but the distribution is still spread enough to
provide a significant performance increase.
---
(Run on an Macbook Pro with an M3 Max, serving `deno.com`
baseline::
```
❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000
Running 30s test @ http://127.0.0.1:8000
2 threads and 125 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 239.78ms 13.56ms 330.54ms 79.12%
Req/Sec 258.58 35.56 360.00 70.64%
Latency Distribution
50% 236.72ms
75% 248.46ms
90% 256.84ms
99% 268.23ms
15458 requests in 30.02s, 2.47GB read
Requests/sec: 514.89
Transfer/sec: 84.33MB
```
this PR (`with --parallel` flag)
```
❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000
Running 30s test @ http://127.0.0.1:8000
2 threads and 125 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 117.40ms 142.84ms 590.45ms 79.07%
Req/Sec 1.33k 175.19 1.77k 69.00%
Latency Distribution
50% 22.34ms
75% 223.67ms
90% 357.32ms
99% 460.50ms
79636 requests in 30.07s, 12.74GB read
Requests/sec: 2647.96
Transfer/sec: 433.71MB
```
Diffstat (limited to 'cli/tools/serve.rs')
-rw-r--r-- | cli/tools/serve.rs | 192 |
1 files changed, 192 insertions, 0 deletions
diff --git a/cli/tools/serve.rs b/cli/tools/serve.rs new file mode 100644 index 000000000..24666b8f6 --- /dev/null +++ b/cli/tools/serve.rs @@ -0,0 +1,192 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use std::sync::Arc; + +use deno_core::error::AnyError; +use deno_core::futures::TryFutureExt; +use deno_core::ModuleSpecifier; +use deno_runtime::deno_permissions::Permissions; +use deno_runtime::deno_permissions::PermissionsContainer; + +use super::run::check_permission_before_script; +use super::run::maybe_npm_install; +use crate::args::Flags; +use crate::args::ServeFlags; +use crate::args::WatchFlagsWithPaths; +use crate::factory::CliFactory; +use crate::util::file_watcher::WatcherRestartMode; +use crate::worker::CliMainWorkerFactory; + +pub async fn serve( + flags: Arc<Flags>, + serve_flags: ServeFlags, +) -> Result<i32, AnyError> { + check_permission_before_script(&flags); + + if let Some(watch_flags) = serve_flags.watch { + return serve_with_watch(flags, watch_flags, serve_flags.worker_count) + .await; + } + + let factory = CliFactory::from_flags(flags); + let cli_options = factory.cli_options()?; + let deno_dir = factory.deno_dir()?; + let http_client = factory.http_client_provider(); + + // Run a background task that checks for available upgrades or output + // if an earlier run of this background task found a new version of Deno. + #[cfg(feature = "upgrade")] + super::upgrade::check_for_upgrades( + http_client.clone(), + deno_dir.upgrade_check_file_path(), + ); + + let main_module = cli_options.resolve_main_module()?; + + maybe_npm_install(&factory).await?; + + let permissions = PermissionsContainer::new(Permissions::from_options( + &cli_options.permissions_options()?, + )?); + let worker_factory = factory.create_cli_main_worker_factory().await?; + + do_serve( + worker_factory, + main_module, + permissions, + serve_flags.worker_count, + false, + ) + .await +} + +async fn do_serve( + worker_factory: CliMainWorkerFactory, + main_module: ModuleSpecifier, + permissions: PermissionsContainer, + worker_count: Option<usize>, + hmr: bool, +) -> Result<i32, AnyError> { + let mut worker = worker_factory + .create_main_worker( + deno_runtime::WorkerExecutionMode::Serve { + is_main: true, + worker_count, + }, + main_module.clone(), + permissions.clone(), + ) + .await?; + let worker_count = match worker_count { + None | Some(1) => return worker.run().await, + Some(c) => c, + }; + + let main = deno_core::unsync::spawn(async move { worker.run().await }); + + let extra_workers = worker_count.saturating_sub(1); + + let mut channels = Vec::with_capacity(extra_workers); + for i in 0..extra_workers { + let worker_factory = worker_factory.clone(); + let main_module = main_module.clone(); + let permissions = permissions.clone(); + let (tx, rx) = tokio::sync::oneshot::channel(); + channels.push(rx); + std::thread::Builder::new() + .name(format!("serve-worker-{i}")) + .spawn(move || { + deno_runtime::tokio_util::create_and_run_current_thread(async move { + let result = + run_worker(i, worker_factory, main_module, permissions, hmr).await; + let _ = tx.send(result); + }); + })?; + } + + let (main_result, worker_results) = tokio::try_join!( + main.map_err(AnyError::from), + deno_core::futures::future::try_join_all( + channels.into_iter().map(|r| r.map_err(AnyError::from)) + ) + )?; + + let mut exit_code = main_result?; + for res in worker_results { + let ret = res?; + if ret != 0 && exit_code == 0 { + exit_code = ret; + } + } + Ok(exit_code) + + // main.await? +} + +async fn run_worker( + worker_count: usize, + worker_factory: CliMainWorkerFactory, + main_module: ModuleSpecifier, + permissions: PermissionsContainer, + hmr: bool, +) -> Result<i32, AnyError> { + let mut worker = worker_factory + .create_main_worker( + deno_runtime::WorkerExecutionMode::Serve { + is_main: false, + worker_count: Some(worker_count), + }, + main_module, + permissions, + ) + .await?; + if hmr { + worker.run_for_watcher().await?; + Ok(0) + } else { + worker.run().await + } +} + +async fn serve_with_watch( + flags: Arc<Flags>, + watch_flags: WatchFlagsWithPaths, + worker_count: Option<usize>, +) -> Result<i32, AnyError> { + let hmr = watch_flags.hmr; + crate::util::file_watcher::watch_recv( + flags, + crate::util::file_watcher::PrintConfig::new_with_banner( + if watch_flags.hmr { "HMR" } else { "Watcher" }, + "Process", + !watch_flags.no_clear_screen, + ), + WatcherRestartMode::Automatic, + move |flags, watcher_communicator, _changed_paths| { + Ok(async move { + let factory = CliFactory::from_flags_for_watcher( + flags, + watcher_communicator.clone(), + ); + let cli_options = factory.cli_options()?; + let main_module = cli_options.resolve_main_module()?; + + maybe_npm_install(&factory).await?; + + let _ = watcher_communicator.watch_paths(cli_options.watch_paths()); + + let permissions = PermissionsContainer::new(Permissions::from_options( + &cli_options.permissions_options()?, + )?); + let worker_factory = factory.create_cli_main_worker_factory().await?; + + do_serve(worker_factory, main_module, permissions, worker_count, hmr) + .await?; + + Ok(()) + }) + }, + ) + .await?; + Ok(0) +} |