diff options
author | Matt Mastracci <matthew@mastracci.com> | 2023-09-23 08:55:28 -0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-23 14:55:28 +0000 |
commit | 06297d952d61af180e214f7d6923e6820202740a (patch) | |
tree | acf87c53d030f3de2a5484b1516b1c0f6f571d43 /ext/web/06_streams.js | |
parent | b1ca67ac01278198eada8da0c61b74b55dea4a77 (diff) |
feat(ext/web): use readableStreamDefaultReaderRead in resourceForReadableStream (#20622)
We can go one level down in abstraction and avoid using the public
`ReadableStream` APIs.
This patch ~5% perf boost on small ReadableStream:
```
Running 10s test @ http://localhost:8080/
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 148.32us 108.95us 3.88ms 95.71%
Req/Sec 33.24k 2.68k 37.94k 73.76%
668188 requests in 10.10s, 77.74MB read
Requests/sec: 66162.91
Transfer/sec: 7.70MB
```
main:
```
Running 10s test @ http://localhost:8080/
2 threads and 10 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 150.23us 67.61us 4.39ms 94.80%
Req/Sec 31.81k 1.55k 35.56k 83.17%
639078 requests in 10.10s, 74.36MB read
Requests/sec: 63273.72
Transfer/sec: 7.36MB
```
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r-- | ext/web/06_streams.js | 169 |
1 files changed, 124 insertions, 45 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 6d2a55232..7f43d3fc2 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -15,6 +15,7 @@ const { op_readable_stream_resource_get_sink, op_readable_stream_resource_write_error, op_readable_stream_resource_write_buf, + op_readable_stream_resource_write_sync, op_readable_stream_resource_close, op_readable_stream_resource_await_close, } = core.ensureFastOps(); @@ -705,6 +706,121 @@ function isReadableStreamDisturbed(stream) { } /** + * @param {Error | string | undefined} error + * @returns {string} + */ +function extractStringErrorFromError(error) { + if (typeof error == "string") { + return error; + } + const message = error?.message; + const stringMessage = typeof message == "string" ? message : String(message); + return stringMessage; +} + +// We don't want to leak resources associated with our sink, even if something bad happens +const READABLE_STREAM_SOURCE_REGISTRY = new SafeFinalizationRegistry( + (external) => { + op_readable_stream_resource_close(external); + }, +); + +class ResourceStreamResourceSink { + external; + constructor(external) { + this.external = external; + READABLE_STREAM_SOURCE_REGISTRY.register(this, external, this); + } + close() { + if (this.external === undefined) { + return; + } + READABLE_STREAM_SOURCE_REGISTRY.unregister(this); + op_readable_stream_resource_close(this.external); + this.external = undefined; + } +} + +/** + * @param {ReadableStreamDefaultReader<Uint8Array>} reader + * @param {any} sink + * @param {Uint8Array} chunk + */ +function readableStreamWriteChunkFn(reader, sink, chunk) { + // Empty chunk. Re-read. + if (chunk.length == 0) { + readableStreamReadFn(reader, sink); + return; + } + + const res = op_readable_stream_resource_write_sync(sink.external, chunk); + if (res == 0) { + // Closed + reader.cancel("resource closed"); + sink.close(); + } else if (res == 1) { + // Successfully written (synchronous). Re-read. + readableStreamReadFn(reader, sink); + } else if (res == 2) { + // Full. If the channel is full, we perform an async await until we can write, and then return + // to a synchronous loop. + (async () => { + if ( + await op_readable_stream_resource_write_buf( + sink.external, + chunk, + ) + ) { + readableStreamReadFn(reader, sink); + } else { + reader.cancel("resource closed"); + sink.close(); + } + })(); + } +} + +/** + * @param {ReadableStreamDefaultReader<Uint8Array>} reader + * @param {any} sink + */ +function readableStreamReadFn(reader, sink) { + // The ops here look like op_write_all/op_close, but we're not actually writing to a + // real resource. + let reentrant = true; + let gotChunk = undefined; + readableStreamDefaultReaderRead(reader, { + chunkSteps(chunk) { + // If the chunk has non-zero length, write it + if (reentrant) { + gotChunk = chunk; + } else { + readableStreamWriteChunkFn(reader, sink, chunk); + } + }, + closeSteps() { + sink.close(); + }, + errorSteps(error) { + const success = op_readable_stream_resource_write_error( + sink.external, + extractStringErrorFromError(error), + ); + // We don't cancel the reader if there was an error reading. We'll let the downstream + // consumer close the resource after it receives the error. + if (!success) { + reader.cancel("resource closed"); + } + sink.close(); + }, + }); + reentrant = false; + if (gotChunk) { + readableStreamWriteChunkFn(reader, sink, gotChunk); + } +} + +/** * Create a new resource that wraps a ReadableStream. The resource will support * read operations, and those read operations will be fed by the output of the * ReadableStream source. @@ -726,51 +842,14 @@ function resourceForReadableStream(stream) { () => {}, ); - // The ops here look like op_write_all/op_close, but we're not actually writing to a - // real resource. - (async () => { - try { - // This allocation is freed in the finally block below, guaranteeing it won't leak - const sink = op_readable_stream_resource_get_sink(rid); - try { - while (true) { - let value; - try { - const read = await reader.read(); - value = read.value; - if (read.done) { - break; - } - } catch (err) { - const message = err?.message; - const success = (message && (typeof message == "string")) - ? await op_readable_stream_resource_write_error(sink, message) - : await op_readable_stream_resource_write_error( - sink, - String(err), - ); - // We don't cancel the reader if there was an error reading. We'll let the downstream - // consumer close the resource after it receives the error. - if (!success) { - reader.cancel("resource closed"); - } - break; - } - // If the chunk has non-zero length, write it - if (value.length > 0) { - if (!await op_readable_stream_resource_write_buf(sink, value)) { - reader.cancel("resource closed"); - } - } - } - } finally { - op_readable_stream_resource_close(sink); - } - } catch (err) { - // Something went terribly wrong with this stream -- log and continue - console.error("Unexpected internal error on stream", err); - } - })(); + // This allocation is freed when readableStreamReadFn is completed + const sink = new ResourceStreamResourceSink( + op_readable_stream_resource_get_sink(rid), + ); + + // Trigger the first read + readableStreamReadFn(reader, sink); + return rid; } |