summaryrefslogtreecommitdiff
path: root/ext/web/06_streams.js
diff options
context:
space:
mode:
authorMatt Mastracci <matthew@mastracci.com>2023-09-23 08:55:28 -0600
committerGitHub <noreply@github.com>2023-09-23 14:55:28 +0000
commit06297d952d61af180e214f7d6923e6820202740a (patch)
treeacf87c53d030f3de2a5484b1516b1c0f6f571d43 /ext/web/06_streams.js
parentb1ca67ac01278198eada8da0c61b74b55dea4a77 (diff)
feat(ext/web): use readableStreamDefaultReaderRead in resourceForReadableStream (#20622)
We can go one level down in abstraction and avoid using the public `ReadableStream` APIs. This patch ~5% perf boost on small ReadableStream: ``` Running 10s test @ http://localhost:8080/ 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 148.32us 108.95us 3.88ms 95.71% Req/Sec 33.24k 2.68k 37.94k 73.76% 668188 requests in 10.10s, 77.74MB read Requests/sec: 66162.91 Transfer/sec: 7.70MB ``` main: ``` Running 10s test @ http://localhost:8080/ 2 threads and 10 connections Thread Stats Avg Stdev Max +/- Stdev Latency 150.23us 67.61us 4.39ms 94.80% Req/Sec 31.81k 1.55k 35.56k 83.17% 639078 requests in 10.10s, 74.36MB read Requests/sec: 63273.72 Transfer/sec: 7.36MB ```
Diffstat (limited to 'ext/web/06_streams.js')
-rw-r--r--ext/web/06_streams.js169
1 files changed, 124 insertions, 45 deletions
diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js
index 6d2a55232..7f43d3fc2 100644
--- a/ext/web/06_streams.js
+++ b/ext/web/06_streams.js
@@ -15,6 +15,7 @@ const {
op_readable_stream_resource_get_sink,
op_readable_stream_resource_write_error,
op_readable_stream_resource_write_buf,
+ op_readable_stream_resource_write_sync,
op_readable_stream_resource_close,
op_readable_stream_resource_await_close,
} = core.ensureFastOps();
@@ -705,6 +706,121 @@ function isReadableStreamDisturbed(stream) {
}
/**
+ * @param {Error | string | undefined} error
+ * @returns {string}
+ */
+function extractStringErrorFromError(error) {
+ if (typeof error == "string") {
+ return error;
+ }
+ const message = error?.message;
+ const stringMessage = typeof message == "string" ? message : String(message);
+ return stringMessage;
+}
+
+// We don't want to leak resources associated with our sink, even if something bad happens
+const READABLE_STREAM_SOURCE_REGISTRY = new SafeFinalizationRegistry(
+ (external) => {
+ op_readable_stream_resource_close(external);
+ },
+);
+
+class ResourceStreamResourceSink {
+ external;
+ constructor(external) {
+ this.external = external;
+ READABLE_STREAM_SOURCE_REGISTRY.register(this, external, this);
+ }
+ close() {
+ if (this.external === undefined) {
+ return;
+ }
+ READABLE_STREAM_SOURCE_REGISTRY.unregister(this);
+ op_readable_stream_resource_close(this.external);
+ this.external = undefined;
+ }
+}
+
+/**
+ * @param {ReadableStreamDefaultReader<Uint8Array>} reader
+ * @param {any} sink
+ * @param {Uint8Array} chunk
+ */
+function readableStreamWriteChunkFn(reader, sink, chunk) {
+ // Empty chunk. Re-read.
+ if (chunk.length == 0) {
+ readableStreamReadFn(reader, sink);
+ return;
+ }
+
+ const res = op_readable_stream_resource_write_sync(sink.external, chunk);
+ if (res == 0) {
+ // Closed
+ reader.cancel("resource closed");
+ sink.close();
+ } else if (res == 1) {
+ // Successfully written (synchronous). Re-read.
+ readableStreamReadFn(reader, sink);
+ } else if (res == 2) {
+ // Full. If the channel is full, we perform an async await until we can write, and then return
+ // to a synchronous loop.
+ (async () => {
+ if (
+ await op_readable_stream_resource_write_buf(
+ sink.external,
+ chunk,
+ )
+ ) {
+ readableStreamReadFn(reader, sink);
+ } else {
+ reader.cancel("resource closed");
+ sink.close();
+ }
+ })();
+ }
+}
+
+/**
+ * @param {ReadableStreamDefaultReader<Uint8Array>} reader
+ * @param {any} sink
+ */
+function readableStreamReadFn(reader, sink) {
+ // The ops here look like op_write_all/op_close, but we're not actually writing to a
+ // real resource.
+ let reentrant = true;
+ let gotChunk = undefined;
+ readableStreamDefaultReaderRead(reader, {
+ chunkSteps(chunk) {
+ // If the chunk has non-zero length, write it
+ if (reentrant) {
+ gotChunk = chunk;
+ } else {
+ readableStreamWriteChunkFn(reader, sink, chunk);
+ }
+ },
+ closeSteps() {
+ sink.close();
+ },
+ errorSteps(error) {
+ const success = op_readable_stream_resource_write_error(
+ sink.external,
+ extractStringErrorFromError(error),
+ );
+ // We don't cancel the reader if there was an error reading. We'll let the downstream
+ // consumer close the resource after it receives the error.
+ if (!success) {
+ reader.cancel("resource closed");
+ }
+ sink.close();
+ },
+ });
+ reentrant = false;
+ if (gotChunk) {
+ readableStreamWriteChunkFn(reader, sink, gotChunk);
+ }
+}
+
+/**
* Create a new resource that wraps a ReadableStream. The resource will support
* read operations, and those read operations will be fed by the output of the
* ReadableStream source.
@@ -726,51 +842,14 @@ function resourceForReadableStream(stream) {
() => {},
);
- // The ops here look like op_write_all/op_close, but we're not actually writing to a
- // real resource.
- (async () => {
- try {
- // This allocation is freed in the finally block below, guaranteeing it won't leak
- const sink = op_readable_stream_resource_get_sink(rid);
- try {
- while (true) {
- let value;
- try {
- const read = await reader.read();
- value = read.value;
- if (read.done) {
- break;
- }
- } catch (err) {
- const message = err?.message;
- const success = (message && (typeof message == "string"))
- ? await op_readable_stream_resource_write_error(sink, message)
- : await op_readable_stream_resource_write_error(
- sink,
- String(err),
- );
- // We don't cancel the reader if there was an error reading. We'll let the downstream
- // consumer close the resource after it receives the error.
- if (!success) {
- reader.cancel("resource closed");
- }
- break;
- }
- // If the chunk has non-zero length, write it
- if (value.length > 0) {
- if (!await op_readable_stream_resource_write_buf(sink, value)) {
- reader.cancel("resource closed");
- }
- }
- }
- } finally {
- op_readable_stream_resource_close(sink);
- }
- } catch (err) {
- // Something went terribly wrong with this stream -- log and continue
- console.error("Unexpected internal error on stream", err);
- }
- })();
+ // This allocation is freed when readableStreamReadFn is completed
+ const sink = new ResourceStreamResourceSink(
+ op_readable_stream_resource_get_sink(rid),
+ );
+
+ // Trigger the first read
+ readableStreamReadFn(reader, sink);
+
return rid;
}