diff options
| author | Matt Mastracci <matthew@mastracci.com> | 2023-05-14 15:40:01 -0600 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-05-14 15:40:01 -0600 |
| commit | 9845361153f35f6a68a82eb3a13845fddbeab026 (patch) | |
| tree | 307b58f09cac9e681255dac74f487f8da70d76d2 /ext/http | |
| parent | b99159bf14d15418a7dbb22e9ce78b15d52971cc (diff) | |
refactor(core): bake single-thread assumptions into spawn/spawn_blocking (#19056)
Partially supersedes #19016.
This migrates `spawn` and `spawn_blocking` to `deno_core`, and removes
the requirement for `spawn` tasks to be `Send` given our single-threaded
executor.
While we don't need to technically do anything w/`spawn_blocking`, this
allows us to have a single `JoinHandle` type that works for both cases,
and allows us to more easily experiment with alternative
`spawn_blocking` implementations that do not require tokio (ie: rayon).
Async ops (+~35%):
Before:
```
time 1310 ms rate 763358
time 1267 ms rate 789265
time 1259 ms rate 794281
time 1266 ms rate 789889
```
After:
```
time 956 ms rate 1046025
time 954 ms rate 1048218
time 924 ms rate 1082251
time 920 ms rate 1086956
```
HTTP serve (+~4.4%):
Before:
```
Running 10s test @ http://localhost:4500
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 68.78us 19.77us 1.43ms 86.84%
Req/Sec 68.78k 5.00k 73.84k 91.58%
1381833 requests in 10.10s, 167.36MB read
Requests/sec: 136823.29
Transfer/sec: 16.57MB
```
After:
```
Running 10s test @ http://localhost:4500
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 63.12us 17.43us 1.11ms 85.13%
Req/Sec 71.82k 3.71k 77.02k 79.21%
1443195 requests in 10.10s, 174.79MB read
Requests/sec: 142921.99
Transfer/sec: 17.31MB
```
Suggested-By: alice@ryhl.io
Co-authored-by: Bartek IwaĆczuk <biwanczuk@gmail.com>
Diffstat (limited to 'ext/http')
| -rw-r--r-- | ext/http/http_next.rs | 19 | ||||
| -rw-r--r-- | ext/http/lib.rs | 8 |
2 files changed, 12 insertions, 15 deletions
diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 12db29b1b..8b2f91be0 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -17,6 +17,8 @@ use cache_control::CacheControl; use deno_core::error::AnyError; use deno_core::futures::TryFutureExt; use deno_core::op; +use deno_core::task::spawn; +use deno_core::task::JoinHandle; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::ByteString; @@ -68,9 +70,6 @@ use std::rc::Rc; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; -use tokio::task::spawn_local; -use tokio::task::JoinHandle; - type Request = hyper1::Request<Incoming>; type Response = hyper1::Response<ResponseBytes>; @@ -262,7 +261,7 @@ pub fn op_http_upgrade_raw( let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); - spawn_local(async move { + spawn(async move { let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default(); // Stage 2: Extract the Upgraded connection @@ -285,7 +284,7 @@ pub fn op_http_upgrade_raw( // Stage 3: Pump the data let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); - spawn_local(async move { + spawn(async move { let mut buf = [0; 1024]; loop { let read = upgraded_rx.read(&mut buf).await?; @@ -296,7 +295,7 @@ pub fn op_http_upgrade_raw( } Ok::<_, AnyError>(()) }); - spawn_local(async move { + spawn(async move { let mut buf = [0; 1024]; loop { let read = write_rx.read(&mut buf).await?; @@ -792,11 +791,10 @@ fn serve_https( cancel: Rc<CancelHandle>, tx: tokio::sync::mpsc::Sender<u32>, ) -> JoinHandle<Result<(), AnyError>> { - // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) }); - spawn_local( + spawn( async { io.handshake().await?; // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect @@ -820,11 +818,10 @@ fn serve_http( cancel: Rc<CancelHandle>, tx: tokio::sync::mpsc::Sender<u32>, ) -> JoinHandle<Result<(), AnyError>> { - // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) }); - spawn_local(serve_http2_autodetect(io, svc).try_or_cancel(cancel)) + spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel)) } fn serve_http_on<HTTP>( @@ -916,7 +913,7 @@ where let cancel_clone = resource.cancel_handle(); let listen_properties_clone: HttpListenProperties = listen_properties.clone(); - let handle = spawn_local(async move { + let handle = spawn(async move { loop { let conn = listener .accept() diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 21d3dc651..7a1a93f80 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -20,6 +20,7 @@ use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::op; +use deno_core::task::spawn; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::BufView; @@ -68,7 +69,6 @@ use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use tokio::task::spawn_local; use crate::network_buffered_stream::NetworkBufferedStream; use crate::reader_stream::ExternallyAbortableReaderStream; @@ -184,7 +184,7 @@ impl HttpConnResource { }; let (task_fut, closed_fut) = task_fut.remote_handle(); let closed_fut = closed_fut.shared(); - spawn_local(task_fut); + spawn(task_fut); Self { addr, @@ -1005,7 +1005,7 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - spawn_local(fut); + deno_core::task::spawn(fut); } } @@ -1015,7 +1015,7 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - spawn_local(fut); + deno_core::task::spawn(fut); } } |
