summaryrefslogtreecommitdiff
path: root/tests/integration/serve_tests.rs
diff options
context:
space:
mode:
authorNathan Whitaker <17734409+nathanwhit@users.noreply.github.com>2024-08-14 15:26:21 -0700
committerGitHub <noreply@github.com>2024-08-14 22:26:21 +0000
commite92a05b5518e5fd30559c96c5990b08657bbc3e4 (patch)
tree037cad394db9097d8f695810426a2de9ba03d825 /tests/integration/serve_tests.rs
parent875ee618d318ea748e38641108d906eff34a9f86 (diff)
feat(serve): Opt-in parallelism for `deno serve` (#24920)
Adds a `parallel` flag to `deno serve`. When present, we spawn multiple workers to parallelize serving requests. ```bash deno serve --parallel main.ts ``` Currently on linux we use `SO_REUSEPORT` and rely on the fact that the kernel will distribute connections in a round-robin manner. On mac and windows, we sort of emulate this by cloning the underlying file descriptor and passing a handle to each worker. The connections will not be guaranteed to be fairly distributed (and in practice almost certainly won't be), but the distribution is still spread enough to provide a significant performance increase. --- (Run on an Macbook Pro with an M3 Max, serving `deno.com` baseline:: ``` ❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000 Running 30s test @ http://127.0.0.1:8000 2 threads and 125 connections Thread Stats Avg Stdev Max +/- Stdev Latency 239.78ms 13.56ms 330.54ms 79.12% Req/Sec 258.58 35.56 360.00 70.64% Latency Distribution 50% 236.72ms 75% 248.46ms 90% 256.84ms 99% 268.23ms 15458 requests in 30.02s, 2.47GB read Requests/sec: 514.89 Transfer/sec: 84.33MB ``` this PR (`with --parallel` flag) ``` ❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000 Running 30s test @ http://127.0.0.1:8000 2 threads and 125 connections Thread Stats Avg Stdev Max +/- Stdev Latency 117.40ms 142.84ms 590.45ms 79.07% Req/Sec 1.33k 175.19 1.77k 69.00% Latency Distribution 50% 22.34ms 75% 223.67ms 90% 357.32ms 99% 460.50ms 79636 requests in 30.07s, 12.74GB read Requests/sec: 2647.96 Transfer/sec: 433.71MB ```
Diffstat (limited to 'tests/integration/serve_tests.rs')
-rw-r--r--tests/integration/serve_tests.rs289
1 files changed, 220 insertions, 69 deletions
diff --git a/tests/integration/serve_tests.rs b/tests/integration/serve_tests.rs
index cfe7e4d6a..f3d887ec2 100644
--- a/tests/integration/serve_tests.rs
+++ b/tests/integration/serve_tests.rs
@@ -1,93 +1,244 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
+use std::cell::RefCell;
+use std::collections::HashMap;
use std::io::Read;
+use std::time::Duration;
use pretty_assertions::assert_eq;
use regex::Regex;
+use reqwest::RequestBuilder;
use test_util as util;
+use test_util::DenoChild;
+use tokio::time::timeout;
+
+struct ServeClient {
+ child: RefCell<DenoChild>,
+ client: reqwest::Client,
+ output_buf: RefCell<Vec<u8>>,
+ endpoint: RefCell<Option<String>>,
+}
+
+impl Drop for ServeClient {
+ fn drop(&mut self) {
+ let mut child = self.child.borrow_mut();
+ child.kill().unwrap();
+ child.wait().unwrap();
+ }
+}
+
+struct ServeClientBuilder(util::TestCommandBuilder, Option<String>);
+
+impl ServeClientBuilder {
+ fn build(self) -> ServeClient {
+ let Some(entry_point) = self.1 else {
+ panic!("entry point required");
+ };
+ let cmd = self.0.arg(entry_point);
+ let child = cmd.spawn().unwrap();
+
+ ServeClient::with_child(child)
+ }
+ fn map(
+ self,
+ f: impl FnOnce(util::TestCommandBuilder) -> util::TestCommandBuilder,
+ ) -> Self {
+ Self(f(self.0), self.1)
+ }
+ fn entry_point(self, file: impl AsRef<str>) -> Self {
+ Self(self.0, Some(file.as_ref().into()))
+ }
+ fn worker_count(self, n: Option<u64>) -> Self {
+ self.map(|t| {
+ let t = t.arg("--parallel");
+ if let Some(n) = n {
+ t.env("DENO_JOBS", n.to_string())
+ } else {
+ t
+ }
+ })
+ }
+ fn new() -> Self {
+ Self(
+ util::deno_cmd()
+ .current_dir(util::testdata_path())
+ .arg("serve")
+ .arg("--port")
+ .arg("0")
+ .stdout_piped(),
+ None,
+ )
+ }
+}
+
+impl ServeClient {
+ fn builder() -> ServeClientBuilder {
+ ServeClientBuilder::new()
+ }
+ fn with_child(child: DenoChild) -> Self {
+ Self {
+ child: RefCell::new(child),
+ output_buf: Default::default(),
+ endpoint: Default::default(),
+ client: reqwest::Client::builder()
+ .add_root_certificate(
+ reqwest::Certificate::from_pem(include_bytes!(
+ "../testdata/tls/RootCA.crt"
+ ))
+ .unwrap(),
+ )
+ // disable connection pooling so we create a new connection per request
+ // which allows us to distribute requests across workers
+ .pool_max_idle_per_host(0)
+ .pool_idle_timeout(Duration::from_nanos(1))
+ .http2_prior_knowledge()
+ .build()
+ .unwrap(),
+ }
+ }
+
+ fn kill(self) {
+ let mut child = self.child.borrow_mut();
+ child.kill().unwrap();
+ child.wait().unwrap();
+ }
+
+ fn output(self) -> String {
+ let mut child = self.child.borrow_mut();
+ child.kill().unwrap();
+ let mut stdout = child.stdout.take().unwrap();
+ child.wait().unwrap();
+
+ let mut output_buf = self.output_buf.borrow_mut();
+
+ stdout.read_to_end(&mut output_buf).unwrap();
+
+ String::from_utf8(std::mem::take(&mut *output_buf)).unwrap()
+ }
+
+ fn get(&self) -> RequestBuilder {
+ let endpoint = self.endpoint();
+ self.client.get(&*endpoint)
+ }
+
+ fn endpoint(&self) -> String {
+ if let Some(e) = self.endpoint.borrow().as_ref() {
+ return e.to_string();
+ };
+ let mut buffer = self.output_buf.borrow_mut();
+ let mut temp_buf = [0u8; 64];
+ let mut child = self.child.borrow_mut();
+ let stdout = child.stdout.as_mut().unwrap();
+ let port_regex = regex::bytes::Regex::new(r":(\d+)").unwrap();
+
+ let start = std::time::Instant::now();
+ // try to find the port number in the output
+ // it may not be the first line, so we need to read the output in a loop
+ let port = loop {
+ if start.elapsed() > Duration::from_secs(5) {
+ panic!(
+ "timed out waiting for serve to start. serve output:\n{}",
+ String::from_utf8_lossy(&buffer)
+ );
+ }
+ let read = stdout.read(&mut temp_buf).unwrap();
+ buffer.extend_from_slice(&temp_buf[..read]);
+ if let Some(p) = port_regex
+ .captures(&buffer)
+ .and_then(|c| c.get(1))
+ .map(|v| std::str::from_utf8(v.as_bytes()).unwrap().to_owned())
+ {
+ break p;
+ }
+ // this is technically blocking, but it's just a test and
+ // I don't want to switch RefCell to Mutex just for this
+ std::thread::sleep(Duration::from_millis(10));
+ };
+ self
+ .endpoint
+ .replace(Some(format!("http://127.0.0.1:{port}")));
+
+ return self.endpoint.borrow().clone().unwrap();
+ }
+}
#[tokio::test]
async fn deno_serve_port_0() {
- let mut child = util::deno_cmd()
- .current_dir(util::testdata_path())
- .arg("serve")
- .arg("--port")
- .arg("0")
- .arg("./serve/port_0.ts")
- .stdout_piped()
- .spawn()
- .unwrap();
- let stdout = child.stdout.as_mut().unwrap();
- let mut buffer = [0; 52];
- let _read = stdout.read(&mut buffer).unwrap();
- let msg = std::str::from_utf8(&buffer).unwrap();
- let port_regex = Regex::new(r":(\d+)").unwrap();
- let port = port_regex.captures(msg).unwrap().get(1).unwrap().as_str();
-
- let cert = reqwest::Certificate::from_pem(include_bytes!(
- "../testdata/tls/RootCA.crt"
- ))
- .unwrap();
-
- let client = reqwest::Client::builder()
- .add_root_certificate(cert)
- .http2_prior_knowledge()
- .build()
- .unwrap();
-
- let res = client
- .get(&format!("http://127.0.0.1:{port}"))
- .send()
- .await
- .unwrap();
+ let client = ServeClient::builder()
+ .entry_point("./serve/port_0.ts")
+ .build();
+ let res = client.get().send().await.unwrap();
assert_eq!(200, res.status());
let body = res.text().await.unwrap();
assert_eq!(body, "deno serve --port 0 works!");
-
- child.kill().unwrap();
- child.wait().unwrap();
+ client.kill();
}
#[tokio::test]
async fn deno_serve_no_args() {
- let mut child = util::deno_cmd()
- .current_dir(util::testdata_path())
- .arg("serve")
- .arg("--port")
- .arg("0")
- .arg("./serve/no_args.ts")
- .stdout_piped()
- .spawn()
- .unwrap();
- let stdout = child.stdout.as_mut().unwrap();
- let mut buffer = [0; 52];
- let _read = stdout.read(&mut buffer).unwrap();
- let msg = std::str::from_utf8(&buffer).unwrap();
- let port_regex = Regex::new(r":(\d+)").unwrap();
- let port = port_regex.captures(msg).unwrap().get(1).unwrap().as_str();
-
- let cert = reqwest::Certificate::from_pem(include_bytes!(
- "../testdata/tls/RootCA.crt"
- ))
- .unwrap();
-
- let client = reqwest::Client::builder()
- .add_root_certificate(cert)
- .http2_prior_knowledge()
- .build()
- .unwrap();
-
- let res = client
- .get(&format!("http://127.0.0.1:{port}"))
- .send()
- .await
- .unwrap();
+ let client = ServeClient::builder()
+ .entry_point("./serve/no_args.ts")
+ .build();
+ let res = client.get().send().await.unwrap();
assert_eq!(200, res.status());
let body = res.text().await.unwrap();
assert_eq!(body, "deno serve with no args in fetch() works!");
+}
+
+#[tokio::test]
+async fn deno_serve_parallel() {
+ let client = ServeClient::builder()
+ .entry_point("./serve/parallel.ts")
+ .worker_count(Some(4))
+ .build();
+
+ let mut serve_counts = HashMap::<u32, u32>::new();
+
+ tokio::time::sleep(Duration::from_millis(1000)).await;
+
+ let serve_regex =
+ Regex::new(r"\[serve\-worker\-(\d+)\s*\] serving request").unwrap();
+
+ for _ in 0..100 {
+ let response = timeout(Duration::from_secs(2), client.get().send())
+ .await
+ .unwrap()
+ .unwrap();
+ assert_eq!(200, response.status());
+ let body = response.text().await.unwrap();
+ assert_eq!(body, "deno serve parallel");
+ tokio::time::sleep(Duration::from_millis(1)).await;
+ }
+
+ let output = client.output();
+
+ let listening_regex =
+ Regex::new(r"Listening on http[\w:/\.]+ with (\d+) threads").unwrap();
+
+ eprintln!("serve output:\n{output}");
+ assert_eq!(
+ listening_regex
+ .captures(&output)
+ .unwrap()
+ .get(1)
+ .unwrap()
+ .as_str()
+ .trim(),
+ "4"
+ );
+
+ for capture in serve_regex.captures_iter(&output) {
+ if let Some(worker_number) =
+ capture.get(1).and_then(|m| m.as_str().parse::<u32>().ok())
+ {
+ *serve_counts.entry(worker_number).or_default() += 1;
+ }
+ }
- child.kill().unwrap();
- child.wait().unwrap();
+ assert!(
+ serve_counts.values().filter(|&&n| n > 2).count() >= 2,
+ "bad {serve_counts:?}"
+ );
}