summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathan Whitaker <17734409+nathanwhit@users.noreply.github.com>2024-08-14 15:26:21 -0700
committerGitHub <noreply@github.com>2024-08-14 22:26:21 +0000
commite92a05b5518e5fd30559c96c5990b08657bbc3e4 (patch)
tree037cad394db9097d8f695810426a2de9ba03d825
parent875ee618d318ea748e38641108d906eff34a9f86 (diff)
feat(serve): Opt-in parallelism for `deno serve` (#24920)
Adds a `parallel` flag to `deno serve`. When present, we spawn multiple workers to parallelize serving requests. ```bash deno serve --parallel main.ts ``` Currently on linux we use `SO_REUSEPORT` and rely on the fact that the kernel will distribute connections in a round-robin manner. On mac and windows, we sort of emulate this by cloning the underlying file descriptor and passing a handle to each worker. The connections will not be guaranteed to be fairly distributed (and in practice almost certainly won't be), but the distribution is still spread enough to provide a significant performance increase. --- (Run on an Macbook Pro with an M3 Max, serving `deno.com` baseline:: ``` ❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000 Running 30s test @ http://127.0.0.1:8000 2 threads and 125 connections Thread Stats Avg Stdev Max +/- Stdev Latency 239.78ms 13.56ms 330.54ms 79.12% Req/Sec 258.58 35.56 360.00 70.64% Latency Distribution 50% 236.72ms 75% 248.46ms 90% 256.84ms 99% 268.23ms 15458 requests in 30.02s, 2.47GB read Requests/sec: 514.89 Transfer/sec: 84.33MB ``` this PR (`with --parallel` flag) ``` ❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000 Running 30s test @ http://127.0.0.1:8000 2 threads and 125 connections Thread Stats Avg Stdev Max +/- Stdev Latency 117.40ms 142.84ms 590.45ms 79.07% Req/Sec 1.33k 175.19 1.77k 69.00% Latency Distribution 50% 22.34ms 75% 223.67ms 90% 357.32ms 99% 460.50ms 79636 requests in 30.07s, 12.74GB read Requests/sec: 2647.96 Transfer/sec: 433.71MB ```
-rw-r--r--Cargo.toml6
-rw-r--r--cli/args/flags.rs93
-rw-r--r--cli/main.rs2
-rw-r--r--cli/tools/mod.rs1
-rw-r--r--cli/tools/run/mod.rs16
-rw-r--r--cli/tools/serve.rs192
-rw-r--r--cli/worker.rs8
-rw-r--r--ext/http/00_serve.ts26
-rw-r--r--ext/net/01_net.js12
-rw-r--r--ext/net/ops.rs7
-rw-r--r--ext/net/ops_tls.rs8
-rw-r--r--runtime/js/99_main.js49
-rw-r--r--runtime/worker_bootstrap.rs40
-rw-r--r--tests/integration/serve_tests.rs289
-rw-r--r--tests/testdata/serve/parallel.ts7
15 files changed, 616 insertions, 140 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 14437003e..3032b40b5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -229,6 +229,12 @@ opt-level = 'z' # Optimize for size
inherits = "release"
debug = true
+# Faster to compile than `release` but with similar performance.
+[profile.release-lite]
+inherits = "release"
+codegen-units = 128
+lto = "thin"
+
# NB: the `bench` and `release` profiles must remain EXACTLY the same.
[profile.bench]
codegen-units = 1
diff --git a/cli/args/flags.rs b/cli/args/flags.rs
index f8577ed1b..800d6ff5a 100644
--- a/cli/args/flags.rs
+++ b/cli/args/flags.rs
@@ -339,6 +339,7 @@ pub struct ServeFlags {
pub watch: Option<WatchFlagsWithPaths>,
pub port: u16,
pub host: String,
+ pub worker_count: Option<usize>,
}
impl ServeFlags {
@@ -349,6 +350,7 @@ impl ServeFlags {
watch: None,
port,
host: host.to_owned(),
+ worker_count: None,
}
}
}
@@ -2693,6 +2695,9 @@ fn serve_subcommand() -> Command {
.help("The TCP address to serve on, defaulting to 0.0.0.0 (all interfaces).")
.value_parser(serve_host_validator),
)
+ .arg(
+ parallel_arg("multiple server workers", false)
+ )
.arg(check_arg(false))
.arg(watch_arg(true))
.arg(watch_exclude_arg())
@@ -2854,11 +2859,7 @@ Directory arguments are expanded to all contained files matching the glob
.action(ArgAction::SetTrue),
)
.arg(
- Arg::new("parallel")
- .long("parallel")
- .help("Run test modules in parallel. Parallelism defaults to the number of available CPUs or the value in the DENO_JOBS environment variable.")
- .conflicts_with("jobs")
- .action(ArgAction::SetTrue)
+ parallel_arg("test modules", true)
)
.arg(
Arg::new("jobs")
@@ -2901,6 +2902,18 @@ Directory arguments are expanded to all contained files matching the glob
)
}
+fn parallel_arg(descr: &str, jobs_fallback: bool) -> Arg {
+ let arg = Arg::new("parallel")
+ .long("parallel")
+ .help(format!("Run {descr} in parallel. Parallelism defaults to the number of available CPUs or the value in the DENO_JOBS environment variable."))
+ .action(ArgAction::SetTrue);
+ if jobs_fallback {
+ arg.conflicts_with("jobs")
+ } else {
+ arg
+ }
+}
+
fn types_subcommand() -> Command {
Command::new("types").about(
"Print runtime TypeScript declarations.
@@ -4416,6 +4429,8 @@ fn serve_parse(
.remove_one::<String>("host")
.unwrap_or_else(|| "0.0.0.0".to_owned());
+ let worker_count = parallel_arg_parse(matches, false).map(|v| v.get());
+
runtime_args_parse(flags, matches, true, true);
// If the user didn't pass --allow-net, add this port to the network
// allowlist. If the host is 0.0.0.0, we add :{port} and allow the same network perms
@@ -4455,6 +4470,7 @@ fn serve_parse(
watch: watch_arg_parse_with_paths(matches),
port,
host,
+ worker_count,
});
Ok(())
@@ -4486,6 +4502,42 @@ fn task_parse(flags: &mut Flags, matches: &mut ArgMatches) {
flags.subcommand = DenoSubcommand::Task(task_flags);
}
+fn parallel_arg_parse(
+ matches: &mut ArgMatches,
+ fallback_to_jobs: bool,
+) -> Option<NonZeroUsize> {
+ if matches.get_flag("parallel") {
+ if let Ok(value) = env::var("DENO_JOBS") {
+ value.parse::<NonZeroUsize>().ok()
+ } else {
+ std::thread::available_parallelism().ok()
+ }
+ } else if fallback_to_jobs && matches.contains_id("jobs") {
+ // We can't change this to use the log crate because its not configured
+ // yet at this point since the flags haven't been parsed. This flag is
+ // deprecated though so it's not worth changing the code to use the log
+ // crate here and this is only done for testing anyway.
+ #[allow(clippy::print_stderr)]
+ {
+ eprintln!(
+ "⚠️ {}",
+ crate::colors::yellow(concat!(
+ "The `--jobs` flag is deprecated and will be removed in Deno 2.0.\n",
+ "Use the `--parallel` flag with possibly the `DENO_JOBS` environment variable instead.\n",
+ "Learn more at: https://docs.deno.com/runtime/manual/basics/env_variables"
+ )),
+ );
+ }
+ if let Some(value) = matches.remove_one::<NonZeroUsize>("jobs") {
+ Some(value)
+ } else {
+ std::thread::available_parallelism().ok()
+ }
+ } else {
+ None
+ }
+}
+
fn test_parse(flags: &mut Flags, matches: &mut ArgMatches) {
flags.type_check_mode = TypeCheckMode::Local;
runtime_args_parse(flags, matches, true, true);
@@ -4552,36 +4604,7 @@ fn test_parse(flags: &mut Flags, matches: &mut ArgMatches) {
flags.argv.extend(script_arg);
}
- let concurrent_jobs = if matches.get_flag("parallel") {
- if let Ok(value) = env::var("DENO_JOBS") {
- value.parse::<NonZeroUsize>().ok()
- } else {
- std::thread::available_parallelism().ok()
- }
- } else if matches.contains_id("jobs") {
- // We can't change this to use the log crate because its not configured
- // yet at this point since the flags haven't been parsed. This flag is
- // deprecated though so it's not worth changing the code to use the log
- // crate here and this is only done for testing anyway.
- #[allow(clippy::print_stderr)]
- {
- eprintln!(
- "⚠️ {}",
- crate::colors::yellow(concat!(
- "The `--jobs` flag is deprecated and will be removed in Deno 2.0.\n",
- "Use the `--parallel` flag with possibly the `DENO_JOBS` environment variable instead.\n",
- "Learn more at: https://docs.deno.com/runtime/manual/basics/env_variables"
- )),
- );
- }
- if let Some(value) = matches.remove_one::<NonZeroUsize>("jobs") {
- Some(value)
- } else {
- std::thread::available_parallelism().ok()
- }
- } else {
- None
- };
+ let concurrent_jobs = parallel_arg_parse(matches, true);
let include = if let Some(files) = matches.remove_many::<String>("files") {
files.collect()
diff --git a/cli/main.rs b/cli/main.rs
index e8ecaa393..1752c3373 100644
--- a/cli/main.rs
+++ b/cli/main.rs
@@ -236,7 +236,7 @@ async fn run_subcommand(flags: Arc<Flags>) -> Result<i32, AnyError> {
}
}),
DenoSubcommand::Serve(serve_flags) => spawn_subcommand(async move {
- tools::run::run_script(WorkerExecutionMode::Serve, flags, serve_flags.watch).await
+ tools::serve::serve(flags, serve_flags).await
}),
DenoSubcommand::Task(task_flags) => spawn_subcommand(async {
tools::task::execute_script(flags, task_flags, false).await
diff --git a/cli/tools/mod.rs b/cli/tools/mod.rs
index 7bb9b7cf6..1e1c65565 100644
--- a/cli/tools/mod.rs
+++ b/cli/tools/mod.rs
@@ -16,6 +16,7 @@ pub mod lint;
pub mod registry;
pub mod repl;
pub mod run;
+pub mod serve;
pub mod task;
pub mod test;
pub mod upgrade;
diff --git a/cli/tools/run/mod.rs b/cli/tools/run/mod.rs
index 65044fbad..1964cfdd9 100644
--- a/cli/tools/run/mod.rs
+++ b/cli/tools/run/mod.rs
@@ -18,11 +18,7 @@ use crate::util::file_watcher::WatcherRestartMode;
pub mod hmr;
-pub async fn run_script(
- mode: WorkerExecutionMode,
- flags: Arc<Flags>,
- watch: Option<WatchFlagsWithPaths>,
-) -> Result<i32, AnyError> {
+pub fn check_permission_before_script(flags: &Flags) {
if !flags.has_permission() && flags.has_permission_in_argv() {
log::warn!(
"{}",
@@ -33,6 +29,14 @@ To grant permissions, set them before the script argument. For example:
)
);
}
+}
+
+pub async fn run_script(
+ mode: WorkerExecutionMode,
+ flags: Arc<Flags>,
+ watch: Option<WatchFlagsWithPaths>,
+) -> Result<i32, AnyError> {
+ check_permission_before_script(&flags);
if let Some(watch_flags) = watch {
return run_with_watch(mode, flags, watch_flags).await;
@@ -187,7 +191,7 @@ pub async fn eval_command(
Ok(exit_code)
}
-async fn maybe_npm_install(factory: &CliFactory) -> Result<(), AnyError> {
+pub async fn maybe_npm_install(factory: &CliFactory) -> Result<(), AnyError> {
// ensure an "npm install" is done if the user has explicitly
// opted into using a managed node_modules directory
if factory.cli_options()?.node_modules_dir_enablement() == Some(true) {
diff --git a/cli/tools/serve.rs b/cli/tools/serve.rs
new file mode 100644
index 000000000..24666b8f6
--- /dev/null
+++ b/cli/tools/serve.rs
@@ -0,0 +1,192 @@
+// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+
+use std::sync::Arc;
+
+use deno_core::error::AnyError;
+use deno_core::futures::TryFutureExt;
+use deno_core::ModuleSpecifier;
+use deno_runtime::deno_permissions::Permissions;
+use deno_runtime::deno_permissions::PermissionsContainer;
+
+use super::run::check_permission_before_script;
+use super::run::maybe_npm_install;
+use crate::args::Flags;
+use crate::args::ServeFlags;
+use crate::args::WatchFlagsWithPaths;
+use crate::factory::CliFactory;
+use crate::util::file_watcher::WatcherRestartMode;
+use crate::worker::CliMainWorkerFactory;
+
+pub async fn serve(
+ flags: Arc<Flags>,
+ serve_flags: ServeFlags,
+) -> Result<i32, AnyError> {
+ check_permission_before_script(&flags);
+
+ if let Some(watch_flags) = serve_flags.watch {
+ return serve_with_watch(flags, watch_flags, serve_flags.worker_count)
+ .await;
+ }
+
+ let factory = CliFactory::from_flags(flags);
+ let cli_options = factory.cli_options()?;
+ let deno_dir = factory.deno_dir()?;
+ let http_client = factory.http_client_provider();
+
+ // Run a background task that checks for available upgrades or output
+ // if an earlier run of this background task found a new version of Deno.
+ #[cfg(feature = "upgrade")]
+ super::upgrade::check_for_upgrades(
+ http_client.clone(),
+ deno_dir.upgrade_check_file_path(),
+ );
+
+ let main_module = cli_options.resolve_main_module()?;
+
+ maybe_npm_install(&factory).await?;
+
+ let permissions = PermissionsContainer::new(Permissions::from_options(
+ &cli_options.permissions_options()?,
+ )?);
+ let worker_factory = factory.create_cli_main_worker_factory().await?;
+
+ do_serve(
+ worker_factory,
+ main_module,
+ permissions,
+ serve_flags.worker_count,
+ false,
+ )
+ .await
+}
+
+async fn do_serve(
+ worker_factory: CliMainWorkerFactory,
+ main_module: ModuleSpecifier,
+ permissions: PermissionsContainer,
+ worker_count: Option<usize>,
+ hmr: bool,
+) -> Result<i32, AnyError> {
+ let mut worker = worker_factory
+ .create_main_worker(
+ deno_runtime::WorkerExecutionMode::Serve {
+ is_main: true,
+ worker_count,
+ },
+ main_module.clone(),
+ permissions.clone(),
+ )
+ .await?;
+ let worker_count = match worker_count {
+ None | Some(1) => return worker.run().await,
+ Some(c) => c,
+ };
+
+ let main = deno_core::unsync::spawn(async move { worker.run().await });
+
+ let extra_workers = worker_count.saturating_sub(1);
+
+ let mut channels = Vec::with_capacity(extra_workers);
+ for i in 0..extra_workers {
+ let worker_factory = worker_factory.clone();
+ let main_module = main_module.clone();
+ let permissions = permissions.clone();
+ let (tx, rx) = tokio::sync::oneshot::channel();
+ channels.push(rx);
+ std::thread::Builder::new()
+ .name(format!("serve-worker-{i}"))
+ .spawn(move || {
+ deno_runtime::tokio_util::create_and_run_current_thread(async move {
+ let result =
+ run_worker(i, worker_factory, main_module, permissions, hmr).await;
+ let _ = tx.send(result);
+ });
+ })?;
+ }
+
+ let (main_result, worker_results) = tokio::try_join!(
+ main.map_err(AnyError::from),
+ deno_core::futures::future::try_join_all(
+ channels.into_iter().map(|r| r.map_err(AnyError::from))
+ )
+ )?;
+
+ let mut exit_code = main_result?;
+ for res in worker_results {
+ let ret = res?;
+ if ret != 0 && exit_code == 0 {
+ exit_code = ret;
+ }
+ }
+ Ok(exit_code)
+
+ // main.await?
+}
+
+async fn run_worker(
+ worker_count: usize,
+ worker_factory: CliMainWorkerFactory,
+ main_module: ModuleSpecifier,
+ permissions: PermissionsContainer,
+ hmr: bool,
+) -> Result<i32, AnyError> {
+ let mut worker = worker_factory
+ .create_main_worker(
+ deno_runtime::WorkerExecutionMode::Serve {
+ is_main: false,
+ worker_count: Some(worker_count),
+ },
+ main_module,
+ permissions,
+ )
+ .await?;
+ if hmr {
+ worker.run_for_watcher().await?;
+ Ok(0)
+ } else {
+ worker.run().await
+ }
+}
+
+async fn serve_with_watch(
+ flags: Arc<Flags>,
+ watch_flags: WatchFlagsWithPaths,
+ worker_count: Option<usize>,
+) -> Result<i32, AnyError> {
+ let hmr = watch_flags.hmr;
+ crate::util::file_watcher::watch_recv(
+ flags,
+ crate::util::file_watcher::PrintConfig::new_with_banner(
+ if watch_flags.hmr { "HMR" } else { "Watcher" },
+ "Process",
+ !watch_flags.no_clear_screen,
+ ),
+ WatcherRestartMode::Automatic,
+ move |flags, watcher_communicator, _changed_paths| {
+ Ok(async move {
+ let factory = CliFactory::from_flags_for_watcher(
+ flags,
+ watcher_communicator.clone(),
+ );
+ let cli_options = factory.cli_options()?;
+ let main_module = cli_options.resolve_main_module()?;
+
+ maybe_npm_install(&factory).await?;
+
+ let _ = watcher_communicator.watch_paths(cli_options.watch_paths());
+
+ let permissions = PermissionsContainer::new(Permissions::from_options(
+ &cli_options.permissions_options()?,
+ )?);
+ let worker_factory = factory.create_cli_main_worker_factory().await?;
+
+ do_serve(worker_factory, main_module, permissions, worker_count, hmr)
+ .await?;
+
+ Ok(())
+ })
+ },
+ )
+ .await?;
+ Ok(0)
+}
diff --git a/cli/worker.rs b/cli/worker.rs
index 82051da6c..7f04e8846 100644
--- a/cli/worker.rs
+++ b/cli/worker.rs
@@ -414,6 +414,7 @@ impl CliMainWorker {
}
}
+#[derive(Clone)]
pub struct CliMainWorkerFactory {
shared: Arc<SharedWorkerState>,
}
@@ -546,7 +547,7 @@ impl CliMainWorkerFactory {
let maybe_inspector_server = shared.maybe_inspector_server.clone();
let create_web_worker_cb =
- create_web_worker_callback(mode, shared.clone(), stdio.clone());
+ create_web_worker_callback(shared.clone(), stdio.clone());
let maybe_storage_key = shared
.storage_key_resolver
@@ -739,7 +740,6 @@ impl CliMainWorkerFactory {
}
fn create_web_worker_callback(
- mode: WorkerExecutionMode,
shared: Arc<SharedWorkerState>,
stdio: deno_runtime::deno_io::Stdio,
) -> Arc<CreateWebWorkerCb> {
@@ -752,7 +752,7 @@ fn create_web_worker_callback(
args.permissions.clone(),
);
let create_web_worker_cb =
- create_web_worker_callback(mode, shared.clone(), stdio.clone());
+ create_web_worker_callback(shared.clone(), stdio.clone());
let maybe_storage_key = shared
.storage_key_resolver
@@ -802,7 +802,7 @@ fn create_web_worker_callback(
disable_deprecated_api_warning: shared.disable_deprecated_api_warning,
verbose_deprecated_api_warning: shared.verbose_deprecated_api_warning,
future: shared.enable_future_features,
- mode,
+ mode: WorkerExecutionMode::Worker,
serve_port: shared.serve_port,
serve_host: shared.serve_host.clone(),
},
diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts
index 8ed1a1d04..9c6f80552 100644
--- a/ext/http/00_serve.ts
+++ b/ext/http/00_serve.ts
@@ -579,6 +579,8 @@ type RawServeOptions = {
handler?: RawHandler;
};
+const kLoadBalanced = Symbol("kLoadBalanced");
+
function serve(arg1, arg2) {
let options: RawServeOptions | undefined;
let handler: RawHandler | undefined;
@@ -634,6 +636,7 @@ function serve(arg1, arg2) {
hostname: options.hostname ?? "0.0.0.0",
port: options.port ?? 8000,
reusePort: options.reusePort ?? false,
+ loadBalanced: options[kLoadBalanced] ?? false,
};
if (options.certFile || options.keyFile) {
@@ -842,18 +845,25 @@ function registerDeclarativeServer(exports) {
"Invalid type for fetch: must be a function with a single or no parameter",
);
}
- return ({ servePort, serveHost }) => {
+ return ({ servePort, serveHost, serveIsMain, serveWorkerCount }) => {
Deno.serve({
port: servePort,
hostname: serveHost,
+ [kLoadBalanced]: (serveIsMain && serveWorkerCount > 1) ||
+ (serveWorkerCount !== null),
onListen: ({ port, hostname }) => {
- console.debug(
- `%cdeno serve%c: Listening on %chttp://${hostname}:${port}/%c`,
- "color: green",
- "color: inherit",
- "color: yellow",
- "color: inherit",
- );
+ if (serveIsMain) {
+ const nThreads = serveWorkerCount > 1
+ ? ` with ${serveWorkerCount} threads`
+ : "";
+ console.debug(
+ `%cdeno serve%c: Listening on %chttp://${hostname}:${port}/%c${nThreads}`,
+ "color: green",
+ "color: inherit",
+ "color: yellow",
+ "color: inherit",
+ );
+ }
},
handler: (req) => {
return exports.fetch(req);
diff --git a/ext/net/01_net.js b/ext/net/01_net.js
index 517ab127e..536f79bbf 100644
--- a/ext/net/01_net.js
+++ b/ext/net/01_net.js
@@ -531,10 +531,14 @@ const listenOptionApiName = Symbol("listenOptionApiName");
function listen(args) {
switch (args.transport ?? "tcp") {
case "tcp": {
- const { 0: rid, 1: addr } = op_net_listen_tcp({
- hostname: args.hostname ?? "0.0.0.0",
- port: Number(args.port),
- }, args.reusePort);
+ const { 0: rid, 1: addr } = op_net_listen_tcp(
+ {
+ hostname: args.hostname ?? "0.0.0.0",
+ port: Number(args.port),
+ },
+ args.reusePort,
+ args.loadBalanced ?? false,
+ );
addr.transport = "tcp";
return new Listener(rid, addr);
}
diff --git a/ext/net/ops.rs b/ext/net/ops.rs
index f28778d29..b74dc8d75 100644
--- a/ext/net/ops.rs
+++ b/ext/net/ops.rs
@@ -353,6 +353,7 @@ pub fn op_net_listen_tcp<NP>(
state: &mut OpState,
#[serde] addr: IpAddr,
reuse_port: bool,
+ load_balanced: bool,
) -> Result<(ResourceId, IpAddr), AnyError>
where
NP: NetPermissions + 'static,
@@ -367,7 +368,11 @@ where
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
- let listener = TcpListener::bind_direct(addr, reuse_port)?;
+ let listener = if load_balanced {
+ TcpListener::bind_load_balanced(addr)
+ } else {
+ TcpListener::bind_direct(addr, reuse_port)
+ }?;
let local_addr = listener.local_addr()?;
let listener_resource = NetworkListenerResource::new(listener);
let rid = state.resource_table.add(listener_resource);
diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs
index a2a27c4ad..8483e7e66 100644
--- a/ext/net/ops_tls.rs
+++ b/ext/net/ops_tls.rs
@@ -475,6 +475,8 @@ fn load_private_keys_from_file(
pub struct ListenTlsArgs {
alpn_protocols: Option<Vec<String>>,
reuse_port: bool,
+ #[serde(default)]
+ load_balanced: bool,
}
#[op2]
@@ -502,7 +504,11 @@ where
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
- let tcp_listener = TcpListener::bind_direct(bind_addr, args.reuse_port)?;
+ let tcp_listener = if args.load_balanced {
+ TcpListener::bind_load_balanced(bind_addr)
+ } else {
+ TcpListener::bind_direct(bind_addr, args.reuse_port)
+ }?;
let local_addr = tcp_listener.local_addr()?;
let alpn = args
.alpn_protocols
diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js
index ca96e34b7..5e25a3818 100644
--- a/runtime/js/99_main.js
+++ b/runtime/js/99_main.js
@@ -45,6 +45,7 @@ const {
PromiseResolve,
SafeSet,
StringPrototypeIncludes,
+ StringPrototypePadEnd,
StringPrototypeSplit,
StringPrototypeTrim,
Symbol,
@@ -709,8 +710,37 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) {
11: mode,
12: servePort,
13: serveHost,
+ 14: serveIsMain,
+ 15: serveWorkerCount,
} = runtimeOptions;
+ if (mode === executionModes.serve) {
+ if (serveIsMain && serveWorkerCount) {
+ const origLog = console.log;
+ const origError = console.error;
+ const prefix = `[serve-worker-0 ]`;
+ console.log = (...args) => {
+ return origLog(prefix, ...new primordials.SafeArrayIterator(args));
+ };
+ console.error = (...args) => {
+ return origError(prefix, ...new primordials.SafeArrayIterator(args));
+ };
+ } else if (serveWorkerCount !== null) {
+ const origLog = console.log;
+ const origError = console.error;
+ const base = `serve-worker-${serveWorkerCount + 1}`;
+ // 15 = "serve-worker-nn".length, assuming
+ // serveWorkerCount < 100
+ const prefix = `[${StringPrototypePadEnd(base, 15, " ")}]`;
+ console.log = (...args) => {
+ return origLog(prefix, ...new primordials.SafeArrayIterator(args));
+ };
+ console.error = (...args) => {
+ return origError(prefix, ...new primordials.SafeArrayIterator(args));
+ };
+ }
+ }
+
if (mode === executionModes.run || mode === executionModes.serve) {
let serve = undefined;
core.addMainModuleHandler((main) => {
@@ -725,13 +755,16 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) {
}
if (mode === executionModes.serve && !serve) {
- console.error(
- `%cerror: %cdeno serve requires %cexport default { fetch }%c in the main module, did you mean to run \"deno run\"?`,
- "color: yellow;",
- "color: inherit;",
- "font-weight: bold;",
- "font-weight: normal;",
- );
+ if (serveIsMain) {
+ // Only error if main worker
+ console.error(
+ `%cerror: %cdeno serve requires %cexport default { fetch }%c in the main module, did you mean to run \"deno run\"?`,
+ "color: yellow;",
+ "color: inherit;",
+ "font-weight: bold;",
+ "font-weight: normal;",
+ );
+ }
return;
}
@@ -746,7 +779,7 @@ function bootstrapMainRuntime(runtimeOptions, warmup = false) {
);
}
if (mode === executionModes.serve) {
- serve({ servePort, serveHost });
+ serve({ servePort, serveHost, serveIsMain, serveWorkerCount });
}
}
});
diff --git a/runtime/worker_bootstrap.rs b/runtime/worker_bootstrap.rs
index b13c3c428..afd3242e8 100644
--- a/runtime/worker_bootstrap.rs
+++ b/runtime/worker_bootstrap.rs
@@ -10,7 +10,6 @@ use deno_terminal::colors;
/// The execution mode for this worker. Some modes may have implicit behaviour.
#[derive(Copy, Clone)]
-#[repr(u8)]
pub enum WorkerExecutionMode {
/// No special behaviour.
None,
@@ -28,11 +27,39 @@ pub enum WorkerExecutionMode {
/// `deno bench`
Bench,
/// `deno serve`
- Serve,
+ Serve {
+ is_main: bool,
+ worker_count: Option<usize>,
+ },
/// `deno jupyter`
Jupyter,
}
+impl WorkerExecutionMode {
+ pub fn discriminant(&self) -> u8 {
+ match self {
+ WorkerExecutionMode::None => 0,
+ WorkerExecutionMode::Worker => 1,
+ WorkerExecutionMode::Run => 2,
+ WorkerExecutionMode::Repl => 3,
+ WorkerExecutionMode::Eval => 4,
+ WorkerExecutionMode::Test => 5,
+ WorkerExecutionMode::Bench => 6,
+ WorkerExecutionMode::Serve { .. } => 7,
+ WorkerExecutionMode::Jupyter => 8,
+ }
+ }
+ pub fn serve_info(&self) -> (Option<bool>, Option<usize>) {
+ match *self {
+ WorkerExecutionMode::Serve {
+ is_main,
+ worker_count,
+ } => (Some(is_main), worker_count),
+ _ => (None, None),
+ }
+ }
+}
+
/// The log level to use when printing diagnostic log messages, warnings,
/// or errors in the worker.
///
@@ -175,6 +202,10 @@ struct BootstrapV8<'a>(
u16,
// serve host
Option<&'a str>,
+ // serve is main
+ Option<bool>,
+ // serve worker count
+ Option<usize>,
);
impl BootstrapOptions {
@@ -186,6 +217,7 @@ impl BootstrapOptions {
let scope = RefCell::new(scope);
let ser = deno_core::serde_v8::Serializer::new(&scope);
+ let (serve_is_main, serve_worker_count) = self.mode.serve_info();
let bootstrap = BootstrapV8(
self.location.as_ref().map(|l| l.as_str()),
self.unstable,
@@ -198,9 +230,11 @@ impl BootstrapOptions {
self.disable_deprecated_api_warning,
self.verbose_deprecated_api_warning,
self.future,
- self.mode as u8 as _,
+ self.mode.discriminant() as _,
self.serve_port.unwrap_or_default(),
self.serve_host.as_deref(),
+ serve_is_main,
+ serve_worker_count,
);
bootstrap.serialize(ser).unwrap()
diff --git a/tests/integration/serve_tests.rs b/tests/integration/serve_tests.rs
index cfe7e4d6a..f3d887ec2 100644
--- a/tests/integration/serve_tests.rs
+++ b/tests/integration/serve_tests.rs
@@ -1,93 +1,244 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+use std::cell::RefCell;
+use std::collections::HashMap;
use std::io::Read;
+use std::time::Duration;
use pretty_assertions::assert_eq;
use regex::Regex;
+use reqwest::RequestBuilder;
use test_util as util;
+use test_util::DenoChild;
+use tokio::time::timeout;
+
+struct ServeClient {
+ child: RefCell<DenoChild>,
+ client: reqwest::Client,
+ output_buf: RefCell<Vec<u8>>,
+ endpoint: RefCell<Option<String>>,
+}
+
+impl Drop for ServeClient {
+ fn drop(&mut self) {
+ let mut child = self.child.borrow_mut();
+ child.kill().unwrap();
+ child.wait().unwrap();
+ }
+}
+
+struct ServeClientBuilder(util::TestCommandBuilder, Option<String>);
+
+impl ServeClientBuilder {
+ fn build(self) -> ServeClient {
+ let Some(entry_point) = self.1 else {
+ panic!("entry point required");
+ };
+ let cmd = self.0.arg(entry_point);
+ let child = cmd.spawn().unwrap();
+
+ ServeClient::with_child(child)
+ }
+ fn map(
+ self,
+ f: impl FnOnce(util::TestCommandBuilder) -> util::TestCommandBuilder,
+ ) -> Self {
+ Self(f(self.0), self.1)
+ }
+ fn entry_point(self, file: impl AsRef<str>) -> Self {
+ Self(self.0, Some(file.as_ref().into()))
+ }
+ fn worker_count(self, n: Option<u64>) -> Self {
+ self.map(|t| {
+ let t = t.arg("--parallel");
+ if let Some(n) = n {
+ t.env("DENO_JOBS", n.to_string())
+ } else {
+ t
+ }
+ })
+ }
+ fn new() -> Self {
+ Self(
+ util::deno_cmd()
+ .current_dir(util::testdata_path())
+ .arg("serve")
+ .arg("--port")
+ .arg("0")
+ .stdout_piped(),
+ None,
+ )
+ }
+}
+
+impl ServeClient {
+ fn builder() -> ServeClientBuilder {
+ ServeClientBuilder::new()
+ }
+ fn with_child(child: DenoChild) -> Self {
+ Self {
+ child: RefCell::new(child),
+ output_buf: Default::default(),
+ endpoint: Default::default(),
+ client: reqwest::Client::builder()
+ .add_root_certificate(
+ reqwest::Certificate::from_pem(include_bytes!(
+ "../testdata/tls/RootCA.crt"
+ ))
+ .unwrap(),
+ )
+ // disable connection pooling so we create a new connection per request
+ // which allows us to distribute requests across workers
+ .pool_max_idle_per_host(0)
+ .pool_idle_timeout(Duration::from_nanos(1))
+ .http2_prior_knowledge()
+ .build()
+ .unwrap(),
+ }
+ }
+
+ fn kill(self) {
+ let mut child = self.child.borrow_mut();
+ child.kill().unwrap();
+ child.wait().unwrap();
+ }
+
+ fn output(self) -> String {
+ let mut child = self.child.borrow_mut();
+ child.kill().unwrap();
+ let mut stdout = child.stdout.take().unwrap();
+ child.wait().unwrap();
+
+ let mut output_buf = self.output_buf.borrow_mut();
+
+ stdout.read_to_end(&mut output_buf).unwrap();
+
+ String::from_utf8(std::mem::take(&mut *output_buf)).unwrap()
+ }
+
+ fn get(&self) -> RequestBuilder {
+ let endpoint = self.endpoint();
+ self.client.get(&*endpoint)
+ }
+
+ fn endpoint(&self) -> String {
+ if let Some(e) = self.endpoint.borrow().as_ref() {
+ return e.to_string();
+ };
+ let mut buffer = self.output_buf.borrow_mut();
+ let mut temp_buf = [0u8; 64];
+ let mut child = self.child.borrow_mut();
+ let stdout = child.stdout.as_mut().unwrap();
+ let port_regex = regex::bytes::Regex::new(r":(\d+)").unwrap();
+
+ let start = std::time::Instant::now();
+ // try to find the port number in the output
+ // it may not be the first line, so we need to read the output in a loop
+ let port = loop {
+ if start.elapsed() > Duration::from_secs(5) {
+ panic!(
+ "timed out waiting for serve to start. serve output:\n{}",
+ String::from_utf8_lossy(&buffer)
+ );
+ }
+ let read = stdout.read(&mut temp_buf).unwrap();
+ buffer.extend_from_slice(&temp_buf[..read]);
+ if let Some(p) = port_regex
+ .captures(&buffer)
+ .and_then(|c| c.get(1))
+ .map(|v| std::str::from_utf8(v.as_bytes()).unwrap().to_owned())
+ {
+ break p;
+ }
+ // this is technically blocking, but it's just a test and
+ // I don't want to switch RefCell to Mutex just for this
+ std::thread::sleep(Duration::from_millis(10));
+ };
+ self
+ .endpoint
+ .replace(Some(format!("http://127.0.0.1:{port}")));
+
+ return self.endpoint.borrow().clone().unwrap();
+ }
+}
#[tokio::test]
async fn deno_serve_port_0() {
- let mut child = util::deno_cmd()
- .current_dir(util::testdata_path())
- .arg("serve")
- .arg("--port")
- .arg("0")
- .arg("./serve/port_0.ts")
- .stdout_piped()
- .spawn()
- .unwrap();
- let stdout = child.stdout.as_mut().unwrap();
- let mut buffer = [0; 52];
- let _read = stdout.read(&mut buffer).unwrap();
- let msg = std::str::from_utf8(&buffer).unwrap();
- let port_regex = Regex::new(r":(\d+)").unwrap();
- let port = port_regex.captures(msg).unwrap().get(1).unwrap().as_str();
-
- let cert = reqwest::Certificate::from_pem(include_bytes!(
- "../testdata/tls/RootCA.crt"
- ))
- .unwrap();
-
- let client = reqwest::Client::builder()
- .add_root_certificate(cert)
- .http2_prior_knowledge()
- .build()
- .unwrap();
-
- let res = client
- .get(&format!("http://127.0.0.1:{port}"))
- .send()
- .await
- .unwrap();
+ let client = ServeClient::builder()
+ .entry_point("./serve/port_0.ts")
+ .build();
+ let res = client.get().send().await.unwrap();
assert_eq!(200, res.status());
let body = res.text().await.unwrap();
assert_eq!(body, "deno serve --port 0 works!");
-
- child.kill().unwrap();
- child.wait().unwrap();
+ client.kill();
}
#[tokio::test]
async fn deno_serve_no_args() {
- let mut child = util::deno_cmd()
- .current_dir(util::testdata_path())
- .arg("serve")
- .arg("--port")
- .arg("0")
- .arg("./serve/no_args.ts")
- .stdout_piped()
- .spawn()
- .unwrap();
- let stdout = child.stdout.as_mut().unwrap();
- let mut buffer = [0; 52];
- let _read = stdout.read(&mut buffer).unwrap();
- let msg = std::str::from_utf8(&buffer).unwrap();
- let port_regex = Regex::new(r":(\d+)").unwrap();
- let port = port_regex.captures(msg).unwrap().get(1).unwrap().as_str();
-
- let cert = reqwest::Certificate::from_pem(include_bytes!(
- "../testdata/tls/RootCA.crt"
- ))
- .unwrap();
-
- let client = reqwest::Client::builder()
- .add_root_certificate(cert)
- .http2_prior_knowledge()
- .build()
- .unwrap();
-
- let res = client
- .get(&format!("http://127.0.0.1:{port}"))
- .send()
- .await
- .unwrap();
+ let client = ServeClient::builder()
+ .entry_point("./serve/no_args.ts")
+ .build();
+ let res = client.get().send().await.unwrap();
assert_eq!(200, res.status());
let body = res.text().await.unwrap();
assert_eq!(body, "deno serve with no args in fetch() works!");
+}
+
+#[tokio::test]
+async fn deno_serve_parallel() {
+ let client = ServeClient::builder()
+ .entry_point("./serve/parallel.ts")
+ .worker_count(Some(4))
+ .build();
+
+ let mut serve_counts = HashMap::<u32, u32>::new();
+
+ tokio::time::sleep(Duration::from_millis(1000)).await;
+
+ let serve_regex =
+ Regex::new(r"\[serve\-worker\-(\d+)\s*\] serving request").unwrap();
+
+ for _ in 0..100 {
+ let response = timeout(Duration::from_secs(2), client.get().send())
+ .await
+ .unwrap()
+ .unwrap();
+ assert_eq!(200, response.status());
+ let body = response.text().await.unwrap();
+ assert_eq!(body, "deno serve parallel");
+ tokio::time::sleep(Duration::from_millis(1)).await;
+ }
+
+ let output = client.output();
+
+ let listening_regex =
+ Regex::new(r"Listening on http[\w:/\.]+ with (\d+) threads").unwrap();
+
+ eprintln!("serve output:\n{output}");
+ assert_eq!(
+ listening_regex
+ .captures(&output)
+ .unwrap()
+ .get(1)
+ .unwrap()
+ .as_str()
+ .trim(),
+ "4"
+ );
+
+ for capture in serve_regex.captures_iter(&output) {
+ if let Some(worker_number) =
+ capture.get(1).and_then(|m| m.as_str().parse::<u32>().ok())
+ {
+ *serve_counts.entry(worker_number).or_default() += 1;
+ }
+ }
- child.kill().unwrap();
- child.wait().unwrap();
+ assert!(
+ serve_counts.values().filter(|&&n| n > 2).count() >= 2,
+ "bad {serve_counts:?}"
+ );
}
diff --git a/tests/testdata/serve/parallel.ts b/tests/testdata/serve/parallel.ts
new file mode 100644
index 000000000..f1f118c71
--- /dev/null
+++ b/tests/testdata/serve/parallel.ts
@@ -0,0 +1,7 @@
+console.log("starting serve");
+export default {
+ fetch(_req: Request) {
+ console.log("serving request");
+ return new Response("deno serve parallel");
+ },
+};