diff options
author | Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> | 2024-08-14 15:26:21 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-14 22:26:21 +0000 |
commit | e92a05b5518e5fd30559c96c5990b08657bbc3e4 (patch) | |
tree | 037cad394db9097d8f695810426a2de9ba03d825 /tests/integration/serve_tests.rs | |
parent | 875ee618d318ea748e38641108d906eff34a9f86 (diff) |
feat(serve): Opt-in parallelism for `deno serve` (#24920)
Adds a `parallel` flag to `deno serve`. When present, we spawn multiple
workers to parallelize serving requests.
```bash
deno serve --parallel main.ts
```
Currently on linux we use `SO_REUSEPORT` and rely on the fact that the
kernel will distribute connections in a round-robin manner.
On mac and windows, we sort of emulate this by cloning the underlying
file descriptor and passing a handle to each worker. The connections
will not be guaranteed to be fairly distributed (and in practice almost
certainly won't be), but the distribution is still spread enough to
provide a significant performance increase.
---
(Run on an Macbook Pro with an M3 Max, serving `deno.com`
baseline::
```
❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000
Running 30s test @ http://127.0.0.1:8000
2 threads and 125 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 239.78ms 13.56ms 330.54ms 79.12%
Req/Sec 258.58 35.56 360.00 70.64%
Latency Distribution
50% 236.72ms
75% 248.46ms
90% 256.84ms
99% 268.23ms
15458 requests in 30.02s, 2.47GB read
Requests/sec: 514.89
Transfer/sec: 84.33MB
```
this PR (`with --parallel` flag)
```
❯ wrk -d 30s -c 125 --latency http://127.0.0.1:8000
Running 30s test @ http://127.0.0.1:8000
2 threads and 125 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 117.40ms 142.84ms 590.45ms 79.07%
Req/Sec 1.33k 175.19 1.77k 69.00%
Latency Distribution
50% 22.34ms
75% 223.67ms
90% 357.32ms
99% 460.50ms
79636 requests in 30.07s, 12.74GB read
Requests/sec: 2647.96
Transfer/sec: 433.71MB
```
Diffstat (limited to 'tests/integration/serve_tests.rs')
-rw-r--r-- | tests/integration/serve_tests.rs | 289 |
1 files changed, 220 insertions, 69 deletions
diff --git a/tests/integration/serve_tests.rs b/tests/integration/serve_tests.rs index cfe7e4d6a..f3d887ec2 100644 --- a/tests/integration/serve_tests.rs +++ b/tests/integration/serve_tests.rs @@ -1,93 +1,244 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +use std::cell::RefCell; +use std::collections::HashMap; use std::io::Read; +use std::time::Duration; use pretty_assertions::assert_eq; use regex::Regex; +use reqwest::RequestBuilder; use test_util as util; +use test_util::DenoChild; +use tokio::time::timeout; + +struct ServeClient { + child: RefCell<DenoChild>, + client: reqwest::Client, + output_buf: RefCell<Vec<u8>>, + endpoint: RefCell<Option<String>>, +} + +impl Drop for ServeClient { + fn drop(&mut self) { + let mut child = self.child.borrow_mut(); + child.kill().unwrap(); + child.wait().unwrap(); + } +} + +struct ServeClientBuilder(util::TestCommandBuilder, Option<String>); + +impl ServeClientBuilder { + fn build(self) -> ServeClient { + let Some(entry_point) = self.1 else { + panic!("entry point required"); + }; + let cmd = self.0.arg(entry_point); + let child = cmd.spawn().unwrap(); + + ServeClient::with_child(child) + } + fn map( + self, + f: impl FnOnce(util::TestCommandBuilder) -> util::TestCommandBuilder, + ) -> Self { + Self(f(self.0), self.1) + } + fn entry_point(self, file: impl AsRef<str>) -> Self { + Self(self.0, Some(file.as_ref().into())) + } + fn worker_count(self, n: Option<u64>) -> Self { + self.map(|t| { + let t = t.arg("--parallel"); + if let Some(n) = n { + t.env("DENO_JOBS", n.to_string()) + } else { + t + } + }) + } + fn new() -> Self { + Self( + util::deno_cmd() + .current_dir(util::testdata_path()) + .arg("serve") + .arg("--port") + .arg("0") + .stdout_piped(), + None, + ) + } +} + +impl ServeClient { + fn builder() -> ServeClientBuilder { + ServeClientBuilder::new() + } + fn with_child(child: DenoChild) -> Self { + Self { + child: RefCell::new(child), + output_buf: Default::default(), + endpoint: Default::default(), + client: reqwest::Client::builder() + .add_root_certificate( + reqwest::Certificate::from_pem(include_bytes!( + "../testdata/tls/RootCA.crt" + )) + .unwrap(), + ) + // disable connection pooling so we create a new connection per request + // which allows us to distribute requests across workers + .pool_max_idle_per_host(0) + .pool_idle_timeout(Duration::from_nanos(1)) + .http2_prior_knowledge() + .build() + .unwrap(), + } + } + + fn kill(self) { + let mut child = self.child.borrow_mut(); + child.kill().unwrap(); + child.wait().unwrap(); + } + + fn output(self) -> String { + let mut child = self.child.borrow_mut(); + child.kill().unwrap(); + let mut stdout = child.stdout.take().unwrap(); + child.wait().unwrap(); + + let mut output_buf = self.output_buf.borrow_mut(); + + stdout.read_to_end(&mut output_buf).unwrap(); + + String::from_utf8(std::mem::take(&mut *output_buf)).unwrap() + } + + fn get(&self) -> RequestBuilder { + let endpoint = self.endpoint(); + self.client.get(&*endpoint) + } + + fn endpoint(&self) -> String { + if let Some(e) = self.endpoint.borrow().as_ref() { + return e.to_string(); + }; + let mut buffer = self.output_buf.borrow_mut(); + let mut temp_buf = [0u8; 64]; + let mut child = self.child.borrow_mut(); + let stdout = child.stdout.as_mut().unwrap(); + let port_regex = regex::bytes::Regex::new(r":(\d+)").unwrap(); + + let start = std::time::Instant::now(); + // try to find the port number in the output + // it may not be the first line, so we need to read the output in a loop + let port = loop { + if start.elapsed() > Duration::from_secs(5) { + panic!( + "timed out waiting for serve to start. serve output:\n{}", + String::from_utf8_lossy(&buffer) + ); + } + let read = stdout.read(&mut temp_buf).unwrap(); + buffer.extend_from_slice(&temp_buf[..read]); + if let Some(p) = port_regex + .captures(&buffer) + .and_then(|c| c.get(1)) + .map(|v| std::str::from_utf8(v.as_bytes()).unwrap().to_owned()) + { + break p; + } + // this is technically blocking, but it's just a test and + // I don't want to switch RefCell to Mutex just for this + std::thread::sleep(Duration::from_millis(10)); + }; + self + .endpoint + .replace(Some(format!("http://127.0.0.1:{port}"))); + + return self.endpoint.borrow().clone().unwrap(); + } +} #[tokio::test] async fn deno_serve_port_0() { - let mut child = util::deno_cmd() - .current_dir(util::testdata_path()) - .arg("serve") - .arg("--port") - .arg("0") - .arg("./serve/port_0.ts") - .stdout_piped() - .spawn() - .unwrap(); - let stdout = child.stdout.as_mut().unwrap(); - let mut buffer = [0; 52]; - let _read = stdout.read(&mut buffer).unwrap(); - let msg = std::str::from_utf8(&buffer).unwrap(); - let port_regex = Regex::new(r":(\d+)").unwrap(); - let port = port_regex.captures(msg).unwrap().get(1).unwrap().as_str(); - - let cert = reqwest::Certificate::from_pem(include_bytes!( - "../testdata/tls/RootCA.crt" - )) - .unwrap(); - - let client = reqwest::Client::builder() - .add_root_certificate(cert) - .http2_prior_knowledge() - .build() - .unwrap(); - - let res = client - .get(&format!("http://127.0.0.1:{port}")) - .send() - .await - .unwrap(); + let client = ServeClient::builder() + .entry_point("./serve/port_0.ts") + .build(); + let res = client.get().send().await.unwrap(); assert_eq!(200, res.status()); let body = res.text().await.unwrap(); assert_eq!(body, "deno serve --port 0 works!"); - - child.kill().unwrap(); - child.wait().unwrap(); + client.kill(); } #[tokio::test] async fn deno_serve_no_args() { - let mut child = util::deno_cmd() - .current_dir(util::testdata_path()) - .arg("serve") - .arg("--port") - .arg("0") - .arg("./serve/no_args.ts") - .stdout_piped() - .spawn() - .unwrap(); - let stdout = child.stdout.as_mut().unwrap(); - let mut buffer = [0; 52]; - let _read = stdout.read(&mut buffer).unwrap(); - let msg = std::str::from_utf8(&buffer).unwrap(); - let port_regex = Regex::new(r":(\d+)").unwrap(); - let port = port_regex.captures(msg).unwrap().get(1).unwrap().as_str(); - - let cert = reqwest::Certificate::from_pem(include_bytes!( - "../testdata/tls/RootCA.crt" - )) - .unwrap(); - - let client = reqwest::Client::builder() - .add_root_certificate(cert) - .http2_prior_knowledge() - .build() - .unwrap(); - - let res = client - .get(&format!("http://127.0.0.1:{port}")) - .send() - .await - .unwrap(); + let client = ServeClient::builder() + .entry_point("./serve/no_args.ts") + .build(); + let res = client.get().send().await.unwrap(); assert_eq!(200, res.status()); let body = res.text().await.unwrap(); assert_eq!(body, "deno serve with no args in fetch() works!"); +} + +#[tokio::test] +async fn deno_serve_parallel() { + let client = ServeClient::builder() + .entry_point("./serve/parallel.ts") + .worker_count(Some(4)) + .build(); + + let mut serve_counts = HashMap::<u32, u32>::new(); + + tokio::time::sleep(Duration::from_millis(1000)).await; + + let serve_regex = + Regex::new(r"\[serve\-worker\-(\d+)\s*\] serving request").unwrap(); + + for _ in 0..100 { + let response = timeout(Duration::from_secs(2), client.get().send()) + .await + .unwrap() + .unwrap(); + assert_eq!(200, response.status()); + let body = response.text().await.unwrap(); + assert_eq!(body, "deno serve parallel"); + tokio::time::sleep(Duration::from_millis(1)).await; + } + + let output = client.output(); + + let listening_regex = + Regex::new(r"Listening on http[\w:/\.]+ with (\d+) threads").unwrap(); + + eprintln!("serve output:\n{output}"); + assert_eq!( + listening_regex + .captures(&output) + .unwrap() + .get(1) + .unwrap() + .as_str() + .trim(), + "4" + ); + + for capture in serve_regex.captures_iter(&output) { + if let Some(worker_number) = + capture.get(1).and_then(|m| m.as_str().parse::<u32>().ok()) + { + *serve_counts.entry(worker_number).or_default() += 1; + } + } - child.kill().unwrap(); - child.wait().unwrap(); + assert!( + serve_counts.values().filter(|&&n| n > 2).count() >= 2, + "bad {serve_counts:?}" + ); } |