From 9845361153f35f6a68a82eb3a13845fddbeab026 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Sun, 14 May 2023 15:40:01 -0600 Subject: refactor(core): bake single-thread assumptions into spawn/spawn_blocking (#19056) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Partially supersedes #19016. This migrates `spawn` and `spawn_blocking` to `deno_core`, and removes the requirement for `spawn` tasks to be `Send` given our single-threaded executor. While we don't need to technically do anything w/`spawn_blocking`, this allows us to have a single `JoinHandle` type that works for both cases, and allows us to more easily experiment with alternative `spawn_blocking` implementations that do not require tokio (ie: rayon). Async ops (+~35%): Before: ``` time 1310 ms rate 763358 time 1267 ms rate 789265 time 1259 ms rate 794281 time 1266 ms rate 789889 ``` After: ``` time 956 ms rate 1046025 time 954 ms rate 1048218 time 924 ms rate 1082251 time 920 ms rate 1086956 ``` HTTP serve (+~4.4%): Before: ``` Running 10s test @ http://localhost:4500 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 68.78us 19.77us 1.43ms 86.84% Req/Sec 68.78k 5.00k 73.84k 91.58% 1381833 requests in 10.10s, 167.36MB read Requests/sec: 136823.29 Transfer/sec: 16.57MB ``` After: ``` Running 10s test @ http://localhost:4500 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 63.12us 17.43us 1.11ms 85.13% Req/Sec 71.82k 3.71k 77.02k 79.21% 1443195 requests in 10.10s, 174.79MB read Requests/sec: 142921.99 Transfer/sec: 17.31MB ``` Suggested-By: alice@ryhl.io Co-authored-by: Bartek IwaƄczuk --- cli/tests/integration/cert_tests.rs | 197 ++++++++++++++----------------- cli/tests/integration/inspector_tests.rs | 2 +- cli/tests/integration/run_tests.rs | 77 ++++++------ 3 files changed, 127 insertions(+), 149 deletions(-) (limited to 'cli/tests') diff --git a/cli/tests/integration/cert_tests.rs b/cli/tests/integration/cert_tests.rs index d3da6d75a..b04f2d35e 100644 --- a/cli/tests/integration/cert_tests.rs +++ b/cli/tests/integration/cert_tests.rs @@ -11,7 +11,6 @@ use std::process::Command; use std::sync::Arc; use test_util as util; use test_util::TempDir; -use tokio::task::LocalSet; use util::TestContext; itest_flaky!(cafile_url_imports { @@ -219,113 +218,99 @@ fn cafile_bundle_remote_exports() { #[tokio::test] async fn listen_tls_alpn() { - // TLS streams require the presence of an ambient local task set to gracefully - // close dropped connections in the background. - LocalSet::new() - .run_until(async { - let mut child = util::deno_cmd() - .current_dir(util::testdata_path()) - .arg("run") - .arg("--unstable") - .arg("--quiet") - .arg("--allow-net") - .arg("--allow-read") - .arg("./cert/listen_tls_alpn.ts") - .arg("4504") - .stdout(std::process::Stdio::piped()) - .spawn() - .unwrap(); - let stdout = child.stdout.as_mut().unwrap(); - let mut msg = [0; 5]; - let read = stdout.read(&mut msg).unwrap(); - assert_eq!(read, 5); - assert_eq!(&msg, b"READY"); - - let mut reader = &mut BufReader::new(Cursor::new(include_bytes!( - "../testdata/tls/RootCA.crt" - ))); - let certs = rustls_pemfile::certs(&mut reader).unwrap(); - let mut root_store = rustls::RootCertStore::empty(); - root_store.add_parsable_certificates(&certs); - let mut cfg = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_store) - .with_no_client_auth(); - cfg.alpn_protocols.push(b"foobar".to_vec()); - let cfg = Arc::new(cfg); - - let hostname = rustls::ServerName::try_from("localhost").unwrap(); - - let tcp_stream = tokio::net::TcpStream::connect("localhost:4504") - .await - .unwrap(); - let mut tls_stream = - TlsStream::new_client_side(tcp_stream, cfg, hostname); - - tls_stream.handshake().await.unwrap(); - - let (_, rustls_connection) = tls_stream.get_ref(); - let alpn = rustls_connection.alpn_protocol().unwrap(); - assert_eq!(alpn, b"foobar"); - - let status = child.wait().unwrap(); - assert!(status.success()); - }) - .await; + let mut child = util::deno_cmd() + .current_dir(util::testdata_path()) + .arg("run") + .arg("--unstable") + .arg("--quiet") + .arg("--allow-net") + .arg("--allow-read") + .arg("./cert/listen_tls_alpn.ts") + .arg("4504") + .stdout(std::process::Stdio::piped()) + .spawn() + .unwrap(); + let stdout = child.stdout.as_mut().unwrap(); + let mut msg = [0; 5]; + let read = stdout.read(&mut msg).unwrap(); + assert_eq!(read, 5); + assert_eq!(&msg, b"READY"); + + let mut reader = &mut BufReader::new(Cursor::new(include_bytes!( + "../testdata/tls/RootCA.crt" + ))); + let certs = rustls_pemfile::certs(&mut reader).unwrap(); + let mut root_store = rustls::RootCertStore::empty(); + root_store.add_parsable_certificates(&certs); + let mut cfg = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_store) + .with_no_client_auth(); + cfg.alpn_protocols.push(b"foobar".to_vec()); + let cfg = Arc::new(cfg); + + let hostname = rustls::ServerName::try_from("localhost").unwrap(); + + let tcp_stream = tokio::net::TcpStream::connect("localhost:4504") + .await + .unwrap(); + let mut tls_stream = TlsStream::new_client_side(tcp_stream, cfg, hostname); + + tls_stream.handshake().await.unwrap(); + + let (_, rustls_connection) = tls_stream.get_ref(); + let alpn = rustls_connection.alpn_protocol().unwrap(); + assert_eq!(alpn, b"foobar"); + + let status = child.wait().unwrap(); + assert!(status.success()); } #[tokio::test] async fn listen_tls_alpn_fail() { - // TLS streams require the presence of an ambient local task set to gracefully - // close dropped connections in the background. - LocalSet::new() - .run_until(async { - let mut child = util::deno_cmd() - .current_dir(util::testdata_path()) - .arg("run") - .arg("--unstable") - .arg("--quiet") - .arg("--allow-net") - .arg("--allow-read") - .arg("./cert/listen_tls_alpn_fail.ts") - .arg("4505") - .stdout(std::process::Stdio::piped()) - .spawn() - .unwrap(); - let stdout = child.stdout.as_mut().unwrap(); - let mut msg = [0; 5]; - let read = stdout.read(&mut msg).unwrap(); - assert_eq!(read, 5); - assert_eq!(&msg, b"READY"); - - let mut reader = &mut BufReader::new(Cursor::new(include_bytes!( - "../testdata/tls/RootCA.crt" - ))); - let certs = rustls_pemfile::certs(&mut reader).unwrap(); - let mut root_store = rustls::RootCertStore::empty(); - root_store.add_parsable_certificates(&certs); - let mut cfg = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_store) - .with_no_client_auth(); - cfg.alpn_protocols.push(b"boofar".to_vec()); - let cfg = Arc::new(cfg); - - let hostname = rustls::ServerName::try_from("localhost").unwrap(); - - let tcp_stream = tokio::net::TcpStream::connect("localhost:4505") - .await - .unwrap(); - let mut tls_stream = - TlsStream::new_client_side(tcp_stream, cfg, hostname); - - tls_stream.handshake().await.unwrap_err(); - - let (_, rustls_connection) = tls_stream.get_ref(); - assert!(rustls_connection.alpn_protocol().is_none()); - - let status = child.wait().unwrap(); - assert!(status.success()); - }) - .await; + let mut child = util::deno_cmd() + .current_dir(util::testdata_path()) + .arg("run") + .arg("--unstable") + .arg("--quiet") + .arg("--allow-net") + .arg("--allow-read") + .arg("./cert/listen_tls_alpn_fail.ts") + .arg("4505") + .stdout(std::process::Stdio::piped()) + .spawn() + .unwrap(); + let stdout = child.stdout.as_mut().unwrap(); + let mut msg = [0; 5]; + let read = stdout.read(&mut msg).unwrap(); + assert_eq!(read, 5); + assert_eq!(&msg, b"READY"); + + let mut reader = &mut BufReader::new(Cursor::new(include_bytes!( + "../testdata/tls/RootCA.crt" + ))); + let certs = rustls_pemfile::certs(&mut reader).unwrap(); + let mut root_store = rustls::RootCertStore::empty(); + root_store.add_parsable_certificates(&certs); + let mut cfg = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_store) + .with_no_client_auth(); + cfg.alpn_protocols.push(b"boofar".to_vec()); + let cfg = Arc::new(cfg); + + let hostname = rustls::ServerName::try_from("localhost").unwrap(); + + let tcp_stream = tokio::net::TcpStream::connect("localhost:4505") + .await + .unwrap(); + let mut tls_stream = TlsStream::new_client_side(tcp_stream, cfg, hostname); + + tls_stream.handshake().await.unwrap_err(); + + let (_, rustls_connection) = tls_stream.get_ref(); + assert!(rustls_connection.alpn_protocol().is_none()); + + let status = child.wait().unwrap(); + assert!(status.success()); } diff --git a/cli/tests/integration/inspector_tests.rs b/cli/tests/integration/inspector_tests.rs index cf66c4adc..8fa9ec85c 100644 --- a/cli/tests/integration/inspector_tests.rs +++ b/cli/tests/integration/inspector_tests.rs @@ -29,7 +29,7 @@ where Fut::Output: Send + 'static, { fn execute(&self, fut: Fut) { - tokio::task::spawn(fut); + deno_core::task::spawn(fut); } } diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs index e6ea85da4..bc717351a 100644 --- a/cli/tests/integration/run_tests.rs +++ b/cli/tests/integration/run_tests.rs @@ -9,7 +9,6 @@ use std::process::Stdio; use std::time::Duration; use test_util as util; use test_util::TempDir; -use tokio::task::LocalSet; use trust_dns_client::serialize::txt::Lexer; use trust_dns_client::serialize::txt::Parser; use util::assert_contains; @@ -3886,50 +3885,44 @@ async fn test_resolve_dns() { #[tokio::test] async fn http2_request_url() { - // TLS streams require the presence of an ambient local task set to gracefully - // close dropped connections in the background. - LocalSet::new() - .run_until(async { - let mut child = util::deno_cmd() - .current_dir(util::testdata_path()) - .arg("run") - .arg("--unstable") - .arg("--quiet") - .arg("--allow-net") - .arg("--allow-read") - .arg("./run/http2_request_url.ts") - .arg("4506") - .stdout(std::process::Stdio::piped()) - .spawn() - .unwrap(); - let stdout = child.stdout.as_mut().unwrap(); - let mut buffer = [0; 5]; - let read = stdout.read(&mut buffer).unwrap(); - assert_eq!(read, 5); - let msg = std::str::from_utf8(&buffer).unwrap(); - assert_eq!(msg, "READY"); - - let cert = reqwest::Certificate::from_pem(include_bytes!( - "../testdata/tls/RootCA.crt" - )) - .unwrap(); + let mut child = util::deno_cmd() + .current_dir(util::testdata_path()) + .arg("run") + .arg("--unstable") + .arg("--quiet") + .arg("--allow-net") + .arg("--allow-read") + .arg("./run/http2_request_url.ts") + .arg("4506") + .stdout(std::process::Stdio::piped()) + .spawn() + .unwrap(); + let stdout = child.stdout.as_mut().unwrap(); + let mut buffer = [0; 5]; + let read = stdout.read(&mut buffer).unwrap(); + assert_eq!(read, 5); + let msg = std::str::from_utf8(&buffer).unwrap(); + assert_eq!(msg, "READY"); - let client = reqwest::Client::builder() - .add_root_certificate(cert) - .http2_prior_knowledge() - .build() - .unwrap(); + let cert = reqwest::Certificate::from_pem(include_bytes!( + "../testdata/tls/RootCA.crt" + )) + .unwrap(); + + let client = reqwest::Client::builder() + .add_root_certificate(cert) + .http2_prior_knowledge() + .build() + .unwrap(); - let res = client.get("http://127.0.0.1:4506").send().await.unwrap(); - assert_eq!(200, res.status()); + let res = client.get("http://127.0.0.1:4506").send().await.unwrap(); + assert_eq!(200, res.status()); - let body = res.text().await.unwrap(); - assert_eq!(body, "http://127.0.0.1:4506/"); + let body = res.text().await.unwrap(); + assert_eq!(body, "http://127.0.0.1:4506/"); - child.kill().unwrap(); - child.wait().unwrap(); - }) - .await; + child.kill().unwrap(); + child.wait().unwrap(); } #[cfg(not(windows))] @@ -4173,7 +4166,7 @@ where Fut::Output: Send + 'static, { fn execute(&self, fut: Fut) { - tokio::task::spawn(fut); + deno_core::task::spawn(fut); } } -- cgit v1.2.3