diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-08-17 07:52:37 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-17 07:52:37 -0600 |
commit | 23ff0e722e3c4b0827940853c53c5ee2ede5ec9f (patch) | |
tree | 1521ffd2ac5e803224546cb349b3905925b9b5ff /ext/web/06_streams.js | |
parent | 0960e895da1275792c1f38999f6a185c864edb3f (diff) |
feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work.
This is a resource wrapper for `ReadableStream`, allowing us to treat
all `ReadableStream` instances as resources, and remove special paths in
both `fetch` and `serve`.
Performance with a ReadableStream response yields ~18% improvement:
```
return new Response(new ReadableStream({
start(controller) {
controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]));
controller.close();
}
})
```
This patch:
```
12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 99.96us 100.03us 6.65ms 98.84%
Req/Sec 47.73k 2.43k 51.02k 89.11%
959308 requests in 10.10s, 117.10MB read
Requests/sec: 94978.71
Transfer/sec: 11.59MB
```
main:
```
Running 10s test @ http://localhost:8080
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 163.03us 685.51us 19.73ms 99.27%
Req/Sec 39.50k 3.98k 66.11k 95.52%
789582 requests in 10.10s, 82.83MB read
Requests/sec: 78182.65
Transfer/sec: 8.20MB
```
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r-- | ext/web/06_streams.js | 83 |
1 files changed, 80 insertions, 3 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 01f84aa2c..0849d221d 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -1,4 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// deno-lint-ignore-file camelcase // @ts-check /// <reference path="../webidl/internal.d.ts" /> @@ -7,7 +8,17 @@ /// <reference lib="esnext" /> const core = globalThis.Deno.core; -const ops = core.ops; +const internals = globalThis.__bootstrap.internals; +const { + op_arraybuffer_was_detached, + op_transfer_arraybuffer, + op_readable_stream_resource_allocate, + op_readable_stream_resource_get_sink, + op_readable_stream_resource_write_error, + op_readable_stream_resource_write_buf, + op_readable_stream_resource_close, + op_readable_stream_resource_await_close, +} = core.ensureFastOps(); import * as webidl from "ext:deno_webidl/00_webidl.js"; import { structuredClone } from "ext:deno_web/02_structured_clone.js"; import { @@ -61,6 +72,7 @@ const { SafeWeakMap, // TODO(lucacasonato): add SharedArrayBuffer to primordials // SharedArrayBufferPrototype, + String, Symbol, SymbolAsyncIterator, SymbolIterator, @@ -218,7 +230,7 @@ function isDetachedBuffer(O) { return false; } return ArrayBufferPrototypeGetByteLength(O) === 0 && - ops.op_arraybuffer_was_detached(O); + op_arraybuffer_was_detached(O); } /** @@ -244,7 +256,7 @@ function canTransferArrayBuffer(O) { * @returns {ArrayBufferLike} */ function transferArrayBuffer(O) { - return ops.op_transfer_arraybuffer(O); + return op_transfer_arraybuffer(O); } /** @@ -695,6 +707,68 @@ function isReadableStreamDisturbed(stream) { return stream[_disturbed]; } +/** + * Create a new resource that wraps a ReadableStream. The resource will support + * read operations, and those read operations will be fed by the output of the + * ReadableStream source. + * @param {ReadableStream<Uint8Array>} stream + * @returns {number} + */ +function resourceForReadableStream(stream) { + const reader = acquireReadableStreamDefaultReader(stream); + + // Allocate the resource + const rid = op_readable_stream_resource_allocate(); + + // Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors + PromisePrototypeCatch( + PromisePrototypeThen( + op_readable_stream_resource_await_close(rid), + () => reader.cancel(), + ), + () => {}, + ); + + // The ops here look like op_write_all/op_close, but we're not actually writing to a + // real resource. + (async () => { + try { + // This allocation is freed in the finally block below, guaranteeing it won't leak + const sink = op_readable_stream_resource_get_sink(rid); + try { + while (true) { + let value; + try { + const read = await reader.read(); + value = read.value; + if (read.done) { + break; + } + } catch (err) { + const message = err.message; + if (message) { + await op_readable_stream_resource_write_error(sink, err.message); + } else { + await op_readable_stream_resource_write_error(sink, String(err)); + } + break; + } + // If the chunk has non-zero length, write it + if (value.length > 0) { + await op_readable_stream_resource_write_buf(sink, value); + } + } + } finally { + op_readable_stream_resource_close(sink); + } + } catch (err) { + // Something went terribly wrong with this stream -- log and continue + console.error("Unexpected internal error on stream", err); + } + })(); + return rid; +} + const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB // A finalization registry to clean up underlying resources that are GC'ed. @@ -6454,6 +6528,8 @@ webidl.converters.StreamPipeOptions = webidl { key: "signal", converter: webidl.converters.AbortSignal }, ]); +internals.resourceForReadableStream = resourceForReadableStream; + export { // Non-Public _state, @@ -6482,6 +6558,7 @@ export { ReadableStreamPrototype, readableStreamTee, readableStreamThrowIfErrored, + resourceForReadableStream, TransformStream, TransformStreamDefaultController, WritableStream, |