From 9845361153f35f6a68a82eb3a13845fddbeab026 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Sun, 14 May 2023 15:40:01 -0600 Subject: refactor(core): bake single-thread assumptions into spawn/spawn_blocking (#19056) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Partially supersedes #19016. This migrates `spawn` and `spawn_blocking` to `deno_core`, and removes the requirement for `spawn` tasks to be `Send` given our single-threaded executor. While we don't need to technically do anything w/`spawn_blocking`, this allows us to have a single `JoinHandle` type that works for both cases, and allows us to more easily experiment with alternative `spawn_blocking` implementations that do not require tokio (ie: rayon). Async ops (+~35%): Before: ``` time 1310 ms rate 763358 time 1267 ms rate 789265 time 1259 ms rate 794281 time 1266 ms rate 789889 ``` After: ``` time 956 ms rate 1046025 time 954 ms rate 1048218 time 924 ms rate 1082251 time 920 ms rate 1086956 ``` HTTP serve (+~4.4%): Before: ``` Running 10s test @ http://localhost:4500 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 68.78us 19.77us 1.43ms 86.84% Req/Sec 68.78k 5.00k 73.84k 91.58% 1381833 requests in 10.10s, 167.36MB read Requests/sec: 136823.29 Transfer/sec: 16.57MB ``` After: ``` Running 10s test @ http://localhost:4500 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 63.12us 17.43us 1.11ms 85.13% Req/Sec 71.82k 3.71k 77.02k 79.21% 1443195 requests in 10.10s, 174.79MB read Requests/sec: 142921.99 Transfer/sec: 17.31MB ``` Suggested-By: alice@ryhl.io Co-authored-by: Bartek IwaƄczuk --- ext/http/http_next.rs | 19 ++++++++----------- ext/http/lib.rs | 8 ++++---- 2 files changed, 12 insertions(+), 15 deletions(-) (limited to 'ext/http') diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 12db29b1b..8b2f91be0 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -17,6 +17,8 @@ use cache_control::CacheControl; use deno_core::error::AnyError; use deno_core::futures::TryFutureExt; use deno_core::op; +use deno_core::task::spawn; +use deno_core::task::JoinHandle; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::ByteString; @@ -68,9 +70,6 @@ use std::rc::Rc; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; -use tokio::task::spawn_local; -use tokio::task::JoinHandle; - type Request = hyper1::Request; type Response = hyper1::Response; @@ -262,7 +261,7 @@ pub fn op_http_upgrade_raw( let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); - spawn_local(async move { + spawn(async move { let mut upgrade_stream = WebSocketUpgrade::::default(); // Stage 2: Extract the Upgraded connection @@ -285,7 +284,7 @@ pub fn op_http_upgrade_raw( // Stage 3: Pump the data let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); - spawn_local(async move { + spawn(async move { let mut buf = [0; 1024]; loop { let read = upgraded_rx.read(&mut buf).await?; @@ -296,7 +295,7 @@ pub fn op_http_upgrade_raw( } Ok::<_, AnyError>(()) }); - spawn_local(async move { + spawn(async move { let mut buf = [0; 1024]; loop { let read = write_rx.read(&mut buf).await?; @@ -792,11 +791,10 @@ fn serve_https( cancel: Rc, tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { - // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) }); - spawn_local( + spawn( async { io.handshake().await?; // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect @@ -820,11 +818,10 @@ fn serve_http( cancel: Rc, tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { - // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) }); - spawn_local(serve_http2_autodetect(io, svc).try_or_cancel(cancel)) + spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel)) } fn serve_http_on( @@ -916,7 +913,7 @@ where let cancel_clone = resource.cancel_handle(); let listen_properties_clone: HttpListenProperties = listen_properties.clone(); - let handle = spawn_local(async move { + let handle = spawn(async move { loop { let conn = listener .accept() diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 21d3dc651..7a1a93f80 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -20,6 +20,7 @@ use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::op; +use deno_core::task::spawn; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::BufView; @@ -68,7 +69,6 @@ use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use tokio::task::spawn_local; use crate::network_buffered_stream::NetworkBufferedStream; use crate::reader_stream::ExternallyAbortableReaderStream; @@ -184,7 +184,7 @@ impl HttpConnResource { }; let (task_fut, closed_fut) = task_fut.remote_handle(); let closed_fut = closed_fut.shared(); - spawn_local(task_fut); + spawn(task_fut); Self { addr, @@ -1005,7 +1005,7 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - spawn_local(fut); + deno_core::task::spawn(fut); } } @@ -1015,7 +1015,7 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - spawn_local(fut); + deno_core::task::spawn(fut); } } -- cgit v1.2.3